Files
Funding_Rate/modules/structs.py

515 lines
14 KiB
Python

import json
from dataclasses import dataclass, field
from typing import Any
import valkey
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy.util.typing import Self
from collections.abc import Sequence, Callable
import modules.utils as utils
def ret_true():
return True
class Locked_Value(Sequence):
def __init__(self, initial_value: Any = None, unlock_func: Callable=ret_true):
self._value: Any = initial_value
self._unlock_func: Callable = unlock_func
self._is_locked: bool = True
def __repr__(self):
return str((self._value, self._is_locked, self._unlock_func))
def __len__(self):
return len((self._value, self._is_locked, self._unlock_func))
def __getitem__(self, index):
return (self._value, self._is_locked, self._unlock_func)[index]
def __str__(self):
return str((self._value))
def unlock(self) -> Self:
if self._unlock_func():
self._is_locked = False
return self
def lock(self) -> Self:
self._is_locked = True
return self
@property
def is_locked(self):
return self._is_locked
@property
def is_unlocked(self):
return not(self._is_locked)
@property
def value(self):
return self._value
@value.setter
def value(self, v):
if not(self._is_locked):
self._value = v
else:
raise ValueError(f'Failed to set value, item is locked: {str(self.__repr__)}')
class Current_Previous_Value:
def __init__(self, value: Any = None, previous_value: Any = None):
self._value: Any = value
self._previous_value: Any = previous_value
def __repr__(self):
return str((self._value, self._previous_value))
def __len__(self):
return len((self._value, self._previous_value))
def __getitem__(self, index):
return (self._value, self._previous_value)[index]
def __str__(self):
return str(self._value)
@property
def value(self):
return self._value
@property
def previous_value(self):
return self._previous_value
@value.setter
def value(self, v):
self._previous_value = self._value
self._value = v
### Valkey Objects ###
class VK_Check(BaseModel):
status: str = 'HEALTHY' # HEALTHY | UNHEALTHY | DEAD
method: str | None
class VK_Checks(BaseModel):
status: VK_Check = VK_Check(method='check_status')
timestamp: VK_Check = VK_Check(method='check_timestamp')
symbol: VK_Check = VK_Check(method='check_symbol')
async def run_checks(self, args: dict | None = None):
for f in self.model_dump():
method = getattr(getattr(self, f), 'method')
if method is not None:
await getattr(self, method)(args = args)
async def check_status(self, args: dict | None = None) -> None:
# print('checking status')
if self.status.status == '':
self.status.status = 'UNHEALTHY'
else:
self.status.status = self.status.status
async def check_status_nested(self, args: dict | None = None) -> None:
# print('checking status')
if self.status.status == '':
self.status.status = 'UNHEALTHY'
else:
self.status.status = self.status.status
async def check_timestamp(self, args: dict | None = None) -> None:
# print('checking timestamp')
if args is not None:
ts = int(args.get('timestamp', 0))
now = round(datetime.now().timestamp()*1000)
if (now - ts) > 1:
self.timestamp.status = 'UNHEALTHY'
else:
self.timestamp.status = 'HEALTHY'
else:
raise ValueError("Must pass in 'timestamp' arg")
async def check_symbol(self, args: dict | None = None) -> None:
# print('checking symbol')
if args is not None:
symbol = utils.symbol_to_extend_fmt(args.get('algo_symbol', ''))
vk_symbol = utils.symbol_to_extend_fmt(args.get('vk_symbol', ''))
if symbol == vk_symbol:
self.symbol.status = 'HEALTHY'
else:
self.symbol.status = 'UNHEALTHY'
else:
raise ValueError("Must pass in 'algo_symbol' and 'vk_symbol' args")
class VK_Obj(BaseModel):
vk_name: str
timestamp: int = round(datetime.now().timestamp()*1000)
status: str = 'HEALTHY' # HEALTHY | UNHEALTHY | DEAD
checks: VK_Checks = VK_Checks()
data: Any = None
async def get(self, VK_CON: valkey.Valkey) -> None:
print('getting')
vk_get: str = VK_CON.get(self.vk_name) # ty:ignore[invalid-assignment]
vk_dict: dict = json.loads(vk_get)
self.__init__(**vk_dict)
async def set(self, VK_CON: valkey.Valkey, data_override: Any = None) -> None:
print('setting')
if data_override is not None:
self.data = data_override
self.timestamp: int = round(datetime.now().timestamp()*1000)
j: str = self.model_dump_json()
VK_CON.set(self.vk_name, j)
### Valkey Archetypes ###
# Engine - Health
# Engine - Orchestrator
class VK_Orchestrator_Output(VK_Obj):
vk_name: str = 'fr_orchestrator_output'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
# Algo
class VK_Working_Symbol(VK_Obj):
vk_name: str = 'fr_algo_working_symbol'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_Algo_Status(VK_Obj):
vk_name: str = 'algo_status'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
# User - Orders
class VK_User_Orders_Aster(VK_Obj):
vk_name: str = 'fr_aster_user_orders'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_User_Orders_Extend(VK_Obj):
vk_name: str = 'fr_extended_user_orders'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
# User - Trades
class VK_User_Trades_Extend(VK_Obj):
vk_name: str = 'fr_extended_user_trades'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
# User - Balances
class VK_User_Balances_Aster(VK_Obj):
vk_name: str = 'fr_aster_user_balances'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_User_Balances_Extend(VK_Obj):
vk_name: str = 'fr_extended_user_balances'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
# User - Positions
class VK_User_Positions_Aster(VK_Obj):
vk_name: str = 'fr_aster_user_positions'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_User_Positions_Extend(VK_Obj):
vk_name: str = 'fr_extended_user_positions'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
# Fund Rates
class VK_FR_Aster(VK_Obj):
vk_name: str = 'fund_rate_aster'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_FR_All_Aster(VK_Obj):
vk_name: str = 'fund_rate_aster_all'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_FR_Extend(VK_Obj):
vk_name: str = 'fund_rate_extended'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_FR_All_Extend(VK_Obj):
vk_name: str = 'fund_rate_extended_all'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
# Tickers
class VK_Ticker_Aster(VK_Obj):
vk_name: str = 'fut_ticker_aster'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_Ticker_Extend(VK_Obj):
vk_name: str = 'fut_ticker_extended'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
# Trades
class VK_Trade_Aster(VK_Obj):
vk_name: str = 'fut_last_trade_aster'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
class VK_Trade_Extend(VK_Obj):
vk_name: str = 'fut_last_trade_extended'
checks: VK_Checks = VK_Checks(
symbol = VK_Check(method = None)
)
### Algo Objects ###
class Algo_Status(BaseModel):
last_update_ts_ms: int
status: str # 'WORKING' | 'STOPPED' ///// # ENUM: 'HEALTHY' | 'UNHEALTHY' | 'DEAD'
expected_alpha: float
model_ratio: float
current_ratio: float
class Algo_Config_Overrides(BaseModel):
Allow_Ordering_Aster: bool
Allow_Ordering_Extend: bool
Allow_Symbol_Change: bool
Flatten_Open_Positions: bool
Flatten_Open_Positions_Opportunistic: bool
Flip_Side_For_Testing: bool
class Algo_Config_Config(BaseModel):
Loop_Sleep_Sec: int
Max_Order_Over_Notional_Ratio: float
Max_Target_Notional: float
Min_Time_To_Funding_Minutes: int
Min_Fund_Rate_Pct_To_Trade: float
Price_Worsener_Aster: int
Price_Worsener_Extend: int
Switch_To_Taker_Seconds: int
Target_Open_Cash_Position: int
class Algo_Config_Logging(BaseModel):
Log_Summary_Each_Loop: bool
Print_Summary_Each_Loop: bool
class Algo_Config(BaseModel):
Updated_Timestamp: int
Config: Algo_Config_Config
Logging: Algo_Config_Logging
Overrides: Algo_Config_Overrides
@dataclass(kw_only=True)
class Flags:
LIQUIDATE_POS_AND_KILL_ALGO_FLAG: bool = False
NET_FUNDING_IS_ZERO: bool = False
@dataclass(kw_only=True)
class Valkey_Stream:
client: valkey.Valkey
channel: str
data: Any = None
none_fill: Any = None
async def update(self):
r: str = self.client.get(name=self.channel) # ty:ignore[invalid-assignment]
self.data = json.loads(s=r) if r is not None else self.none_fill
@dataclass(kw_only=True)
class Position:
market: str
notional: float
qty: float
@dataclass(kw_only=True)
class Open_Positions:
Valkey: Valkey_Stream
Positions: list[Position] = field(default_factory = list)
async def update(self) -> None:
self.Valkey = await self.Valkey.update()
### Collateral ###
@dataclass(kw_only=True)
class Asset:
symbol: str
balance: float
# min_order_qty: float
@dataclass(kw_only=True)
class Collateral:
Valkey: Valkey_Stream
# Last_Updated_Ts_Ms: int
# Last_Pulled_Ts_Ms: int
Assets: list[Asset] = field(default_factory = list)
async def update(self) -> None:
self.Valkey = await self.Valkey.update()
### Orders ###
@dataclass(kw_only=True)
class Order:
symbol: str
order_id: str
client_order_id: str
side: str
order_type: str
original_qty: float
original_price: float
order_status: str
last_filled_qty: float
last_filled_price: float
commission: float
trade_is_maker: bool
@dataclass(kw_only=True)
class Order_Updates:
# Last_Updated_Ts_Ms: int
# Last_Pulled_Ts_Ms: int
Valkey: Valkey_Stream
Orders: list[Order] = field(default_factory = list)
async def update(self) -> None:
self.Valkey = await self.Valkey.update()
### Funding Rate ###
@dataclass(kw_only=True)
class Funding_Rate:
# Last_Updated_Ts_Ms: int
# Last_Pulled_Ts_Ms: int
Valkey: Valkey_Stream
timestamp_arrival: int
timestamp_msg: int
symbol: str
funding_rate: float
next_funding_time_ts_ms: int
mark_price: float
index_price: float
estimated_settle_price: float
async def update(self) -> None:
self.Valkey = await self.Valkey.update()
### Markets Info ###
@dataclass(kw_only=True)
class Market:
symbol: str
min_order_qty: float
@dataclass(kw_only=True)
class Markets_Details:
Markets: list[Market] = field(default_factory=list)
### Exchanges ###
@dataclass(kw_only=True)
class Perpetual_Exchange:
# Order_Updates: Order_Updates
# Position_Updates: Open_Positions
# Collateral_Updates: Collateral
# Funding_Rate: Funding_Rate
# Markets: Markets_Details
mult: int
lh_asset: str
rh_asset: str
symbol: str = ''
symbol_asset_separator: str = ''
initial_funding_rate: float = 0
fund_rate_at_same_time: bool = False
min_price: float = 0
min_order_size: float = 0
min_lot_size: float = 0
min_notional: float = 0
buy_ratio: float = 0
buy_ratio_std: float = 0
notional_obj: dict = field(default_factory=dict)
notional_position: float = 0
unrealized_pnl: float = 0
just_rejected_count: int = 0
cancel_request_pending: bool = False
# async def update(self):
# await self.Collateral_Updates.update()
# await self.Order_Updates.update()
# await self.Position_Updates.update()
# await self.Funding_Rate.update()
def __post_init__(self) -> None:
self.symbol = f'{self.lh_asset.upper()}{self.symbol_asset_separator}{self.rh_asset.upper()}'
# @dataclass(kw_only=True)
# class Aster(Perpetual_Exchange):
# name: str = 'Aster'
# lh_asset: str = 'ETH'
# rh_asset: str = 'USDT'
# def __post_init__(self):
# super().__post_init__()
# self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = []))
# self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = []))
# self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = []))
# self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None))
# @dataclass(kw_only=True)
# class Extend(Perpetual_Exchange):
# name: str = 'Extended'
# lh_asset: str = 'ETH'
# rh_asset: str = 'USD'
# symbol_asset_separator: str = '-'
# def __post_init__(self):
# super().__post_init__()
# self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = []))
# self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = []))
# self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = []))
# self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None))