import json from dataclasses import dataclass, field from typing import Any import valkey from pydantic import BaseModel from datetime import datetime from sqlalchemy.util.typing import Self from collections.abc import Sequence, Callable import modules.utils as utils def ret_true(): return True class Locked_Value(Sequence): def __init__(self, initial_value: Any = None, unlock_func: Callable=ret_true): self._value: Any = initial_value self._unlock_func: Callable = unlock_func self._is_locked: bool = True def __repr__(self): return str((self._value, self._is_locked, self._unlock_func)) def __len__(self): return len((self._value, self._is_locked, self._unlock_func)) def __getitem__(self, index): return (self._value, self._is_locked, self._unlock_func)[index] def __str__(self): return str((self._value)) def unlock(self) -> Self: if self._unlock_func(): self._is_locked = False return self def lock(self) -> Self: self._is_locked = True return self @property def is_locked(self): return self._is_locked @property def is_unlocked(self): return not(self._is_locked) @property def value(self): return self._value @value.setter def value(self, v): if not(self._is_locked): self._value = v else: raise ValueError(f'Failed to set value, item is locked: {str(self.__repr__)}') class Current_Previous_Value: def __init__(self, value: Any = None, previous_value: Any = None): self._value: Any = value self._previous_value: Any = previous_value def __repr__(self): return str((self._value, self._previous_value)) def __len__(self): return len((self._value, self._previous_value)) def __getitem__(self, index): return (self._value, self._previous_value)[index] def __str__(self): return str(self._value) @property def value(self): return self._value @property def previous_value(self): return self._previous_value @value.setter def value(self, v): self._previous_value = self._value self._value = v ### Valkey Objects ### class VK_Check(BaseModel): status: str = 'HEALTHY' # HEALTHY | UNHEALTHY | DEAD method: str | None class VK_Checks(BaseModel): status: VK_Check = VK_Check(method='check_status') timestamp: VK_Check = VK_Check(method='check_timestamp') symbol: VK_Check = VK_Check(method='check_symbol') async def run_checks(self, args: dict | None = None): for f in self.model_dump(): method = getattr(getattr(self, f), 'method') if method is not None: await getattr(self, method)(args = args) async def check_status(self, args: dict | None = None) -> None: # print('checking status') if self.status.status == '': self.status.status = 'UNHEALTHY' else: self.status.status = self.status.status async def check_status_nested(self, args: dict | None = None) -> None: # print('checking status') if self.status.status == '': self.status.status = 'UNHEALTHY' else: self.status.status = self.status.status async def check_timestamp(self, args: dict | None = None) -> None: # print('checking timestamp') if args is not None: ts = int(args.get('timestamp', 0)) now = round(datetime.now().timestamp()*1000) if (now - ts) > 1: self.timestamp.status = 'UNHEALTHY' else: self.timestamp.status = 'HEALTHY' else: raise ValueError("Must pass in 'timestamp' arg") async def check_symbol(self, args: dict | None = None) -> None: # print('checking symbol') if args is not None: symbol = utils.symbol_to_extend_fmt(args.get('algo_symbol', '')) vk_symbol = utils.symbol_to_extend_fmt(args.get('vk_symbol', '')) if symbol == vk_symbol: self.symbol.status = 'HEALTHY' else: self.symbol.status = 'UNHEALTHY' else: raise ValueError("Must pass in 'algo_symbol' and 'vk_symbol' args") class VK_Obj(BaseModel): vk_name: str timestamp: int = round(datetime.now().timestamp()*1000) status: str = 'HEALTHY' # HEALTHY | UNHEALTHY | DEAD checks: VK_Checks = VK_Checks() data: Any = None async def get(self, VK_CON: valkey.Valkey) -> None: print('getting') vk_get: str = VK_CON.get(self.vk_name) # ty:ignore[invalid-assignment] vk_dict: dict = json.loads(vk_get) self.__init__(**vk_dict) async def set(self, VK_CON: valkey.Valkey, data_override: Any = None) -> None: print('setting') if data_override is not None: self.data = data_override self.timestamp: int = round(datetime.now().timestamp()*1000) j: str = self.model_dump_json() VK_CON.set(self.vk_name, j) ### Valkey Archetypes ### # Engine - Health # Engine - Orchestrator class VK_Orchestrator_Output(VK_Obj): vk_name: str = 'fr_orchestrator_output' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) # Algo class VK_Working_Symbol(VK_Obj): vk_name: str = 'fr_algo_working_symbol' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_Algo_Status(VK_Obj): vk_name: str = 'algo_status' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) # User - Orders class VK_User_Orders_Aster(VK_Obj): vk_name: str = 'fr_aster_user_orders' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_User_Orders_Extend(VK_Obj): vk_name: str = 'fr_extended_user_orders' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) # User - Trades class VK_User_Trades_Extend(VK_Obj): vk_name: str = 'fr_extended_user_trades' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) # User - Balances class VK_User_Balances_Aster(VK_Obj): vk_name: str = 'fr_aster_user_balances' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_User_Balances_Extend(VK_Obj): vk_name: str = 'fr_extended_user_balances' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) # User - Positions class VK_User_Positions_Aster(VK_Obj): vk_name: str = 'fr_aster_user_positions' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_User_Positions_Extend(VK_Obj): vk_name: str = 'fr_extended_user_positions' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) # Fund Rates class VK_FR_Aster(VK_Obj): vk_name: str = 'fund_rate_aster' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_FR_All_Aster(VK_Obj): vk_name: str = 'fund_rate_aster_all' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_FR_Extend(VK_Obj): vk_name: str = 'fund_rate_extended' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_FR_All_Extend(VK_Obj): vk_name: str = 'fund_rate_extended_all' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) # Tickers class VK_Ticker_Aster(VK_Obj): vk_name: str = 'fut_ticker_aster' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_Ticker_Extend(VK_Obj): vk_name: str = 'fut_ticker_extended' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) # Trades class VK_Trade_Aster(VK_Obj): vk_name: str = 'fut_last_trade_aster' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) class VK_Trade_Extend(VK_Obj): vk_name: str = 'fut_last_trade_extended' checks: VK_Checks = VK_Checks( symbol = VK_Check(method = None) ) ### Algo Objects ### class Algo_Status(BaseModel): last_update_ts_ms: int status: str # 'WORKING' | 'STOPPED' ///// # ENUM: 'HEALTHY' | 'UNHEALTHY' | 'DEAD' expected_alpha: float model_ratio: float current_ratio: float class Algo_Config_Overrides(BaseModel): Allow_Ordering_Aster: bool Allow_Ordering_Extend: bool Allow_Symbol_Change: bool Flatten_Open_Positions: bool Flatten_Open_Positions_Opportunistic: bool Flip_Side_For_Testing: bool class Algo_Config_Config(BaseModel): Loop_Sleep_Sec: int Max_Order_Over_Notional_Ratio: float Max_Target_Notional: float Min_Time_To_Funding_Minutes: int Min_Fund_Rate_Pct_To_Trade: float Price_Worsener_Aster: int Price_Worsener_Extend: int Switch_To_Taker_Seconds: int Target_Open_Cash_Position: int class Algo_Config_Logging(BaseModel): Log_Summary_Each_Loop: bool Print_Summary_Each_Loop: bool class Algo_Config(BaseModel): Updated_Timestamp: int Config: Algo_Config_Config Logging: Algo_Config_Logging Overrides: Algo_Config_Overrides @dataclass(kw_only=True) class Flags: LIQUIDATE_POS_AND_KILL_ALGO_FLAG: bool = False NET_FUNDING_IS_ZERO: bool = False @dataclass(kw_only=True) class Valkey_Stream: client: valkey.Valkey channel: str data: Any = None none_fill: Any = None async def update(self): r: str = self.client.get(name=self.channel) # ty:ignore[invalid-assignment] self.data = json.loads(s=r) if r is not None else self.none_fill @dataclass(kw_only=True) class Position: market: str notional: float qty: float @dataclass(kw_only=True) class Open_Positions: Valkey: Valkey_Stream Positions: list[Position] = field(default_factory = list) async def update(self) -> None: self.Valkey = await self.Valkey.update() ### Collateral ### @dataclass(kw_only=True) class Asset: symbol: str balance: float # min_order_qty: float @dataclass(kw_only=True) class Collateral: Valkey: Valkey_Stream # Last_Updated_Ts_Ms: int # Last_Pulled_Ts_Ms: int Assets: list[Asset] = field(default_factory = list) async def update(self) -> None: self.Valkey = await self.Valkey.update() ### Orders ### @dataclass(kw_only=True) class Order: symbol: str order_id: str client_order_id: str side: str order_type: str original_qty: float original_price: float order_status: str last_filled_qty: float last_filled_price: float commission: float trade_is_maker: bool @dataclass(kw_only=True) class Order_Updates: # Last_Updated_Ts_Ms: int # Last_Pulled_Ts_Ms: int Valkey: Valkey_Stream Orders: list[Order] = field(default_factory = list) async def update(self) -> None: self.Valkey = await self.Valkey.update() ### Funding Rate ### @dataclass(kw_only=True) class Funding_Rate: # Last_Updated_Ts_Ms: int # Last_Pulled_Ts_Ms: int Valkey: Valkey_Stream timestamp_arrival: int timestamp_msg: int symbol: str funding_rate: float next_funding_time_ts_ms: int mark_price: float index_price: float estimated_settle_price: float async def update(self) -> None: self.Valkey = await self.Valkey.update() ### Markets Info ### @dataclass(kw_only=True) class Market: symbol: str min_order_qty: float @dataclass(kw_only=True) class Markets_Details: Markets: list[Market] = field(default_factory=list) ### Exchanges ### @dataclass(kw_only=True) class Perpetual_Exchange: # Order_Updates: Order_Updates # Position_Updates: Open_Positions # Collateral_Updates: Collateral # Funding_Rate: Funding_Rate # Markets: Markets_Details mult: int lh_asset: str rh_asset: str symbol: str = '' symbol_asset_separator: str = '' initial_funding_rate: float = 0 fund_rate_at_same_time: bool = False min_price: float = 0 min_order_size: float = 0 min_lot_size: float = 0 min_notional: float = 0 buy_ratio: float = 0 buy_ratio_std: float = 0 notional_obj: dict = field(default_factory=dict) notional_position: float = 0 unrealized_pnl: float = 0 just_rejected_count: int = 0 cancel_request_pending: bool = False # async def update(self): # await self.Collateral_Updates.update() # await self.Order_Updates.update() # await self.Position_Updates.update() # await self.Funding_Rate.update() def __post_init__(self) -> None: self.symbol = f'{self.lh_asset.upper()}{self.symbol_asset_separator}{self.rh_asset.upper()}' # @dataclass(kw_only=True) # class Aster(Perpetual_Exchange): # name: str = 'Aster' # lh_asset: str = 'ETH' # rh_asset: str = 'USDT' # def __post_init__(self): # super().__post_init__() # self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) # self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) # self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) # self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) # @dataclass(kw_only=True) # class Extend(Perpetual_Exchange): # name: str = 'Extended' # lh_asset: str = 'ETH' # rh_asset: str = 'USD' # symbol_asset_separator: str = '-' # def __post_init__(self): # super().__post_init__() # self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) # self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) # self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) # self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None))