diff --git a/algo.ipynb b/algo.ipynb index e69de29..118f65b 100644 --- a/algo.ipynb +++ b/algo.ipynb @@ -0,0 +1,196 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 28, + "id": "ac27075d", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "from dataclasses import dataclass, asdict\n", + "\n", + "@dataclass(kw_only=True)\n", + "class Algo_Config:\n", + " Allow_Ordering_Aster: bool\n", + " Allow_Ordering_Extend: bool\n", + " Loop_Sleep_Sec: int\n", + " Min_Time_To_Funding_Minutes: int\n", + " Price_Worsener_Aster: float\n", + " Price_Worsener_Extend: float\n", + " Target_Open_Cash_Position: int" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "d1eed397", + "metadata": {}, + "outputs": [], + "source": [ + "with open('algo_config.json', 'r', encoding='utf-8') as file:\n", + " ALGO_CONFIG = json.load(file, object_hook=lambda d: Algo_Config(**d))" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "286bf2d2", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Algo_Config(Allow_Ordering_Aster=True, Allow_Ordering_Extend=True, Loop_Sleep_Sec=1, Min_Time_To_Funding_Minutes=7, Price_Worsener_Aster=0.0, Price_Worsener_Extend=0.0, Target_Open_Cash_Position=10)" + ] + }, + "execution_count": 30, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ALGO_CONFIG" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "id": "0cbed6d4", + "metadata": {}, + "outputs": [], + "source": [ + "d = asdict(ALGO_CONFIG)" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "daa47980", + "metadata": {}, + "outputs": [], + "source": [ + "d_update = {'Allow_Ordering_Aster': False}" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "id": "aba7797b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'Allow_Ordering_Aster': True,\n", + " 'Allow_Ordering_Extend': True,\n", + " 'Loop_Sleep_Sec': 1,\n", + " 'Min_Time_To_Funding_Minutes': 7,\n", + " 'Price_Worsener_Aster': 0.0,\n", + " 'Price_Worsener_Extend': 0.0,\n", + " 'Target_Open_Cash_Position': 10}" + ] + }, + "execution_count": 42, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "d" + ] + }, + { + "cell_type": "code", + "execution_count": 43, + "id": "3250cb84", + "metadata": {}, + "outputs": [ + { + "ename": "SyntaxError", + "evalue": "expected ':' (3324129842.py, line 1)", + "output_type": "error", + "traceback": [ + " \u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[43]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[31m \u001b[39m\u001b[31mif d.get(d_update)\u001b[39m\n ^\n\u001b[31mSyntaxError\u001b[39m\u001b[31m:\u001b[39m expected ':'\n" + ] + } + ], + "source": [ + "if d.get(d_update)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e283b819", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "dict_keys(['Allow_Ordering_Aster'])" + ] + }, + "execution_count": 46, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "list(d_update.keys())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6f067c02", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d2e26271", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a0df43de", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py_313", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/algo_config.json b/algo_config.json new file mode 100644 index 0000000..e0d945f --- /dev/null +++ b/algo_config.json @@ -0,0 +1,11 @@ +{ + "Config_Updated_Timestamp": 1777098091913, + "Allow_Ordering_Aster": true, + "Allow_Ordering_Extend": true, + "Loop_Sleep_Sec": 1, + "Max_Target_Notional": 0.00, + "Min_Time_To_Funding_Minutes": 60, + "Price_Worsener_Aster": 0.0, + "Price_Worsener_Extend": 0.0, + "Target_Open_Cash_Position": 10 +} \ No newline at end of file diff --git a/algo_orchestrator.py b/algo_orchestrator.py index b8c7316..ccd76e4 100644 --- a/algo_orchestrator.py +++ b/algo_orchestrator.py @@ -2,19 +2,20 @@ import asyncio import json import logging import os -import time import traceback from datetime import datetime from typing import AsyncContextManager import valkey from dotenv import load_dotenv -from sqlalchemy import text -from sqlalchemy.ext.asyncio import create_async_engine +# from sqlalchemy.ext.asyncio import create_async_engine +''' +TO DO: +- Insert config changes into database for analysis later / general tracking +''' ### Database ### -EXTEND_CLIENT = None CON: AsyncContextManager | None = None VAL_KEY = None VK_IN = 'fr_orchestrator_input' @@ -24,12 +25,12 @@ VK_OUT = 'fr_orchestrator_output' load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo_Orchestrator.log' -### ALGO GLOBALS ### -ASTER_ALLOW_ORDERING: bool = True -EXTEND_ALLOW_ORDERING: bool = True -LOOP_SLEEP_SEC = 1 +ALGO_CONFIG: None | dict +# ALGO_CONFIG: None | Algo_Config = None async def orchestrator() -> None: + global ALGO_CONFIG + try: VK_PUBSUB = VAL_KEY.pubsub() VK_PUBSUB.subscribe(VK_IN) @@ -37,11 +38,20 @@ async def orchestrator() -> None: print(f"Subscribed to '{VK_IN}'. Waiting for messages...") for message in VK_PUBSUB.listen(): - # Valkey sends a 'subscribe' message first; we usually want to skip it if message['type'] == 'message': - data = message['data'] - channel = message['channel'] - print(f"[{channel}] Received: {data}") + timestamp = round(datetime.now().timestamp()*1000) + data = json.loads(message['data']) + # channel = message['channel'] + + for k, v in data.items(): + if ALGO_CONFIG.get(k, None) is not None: + ALGO_CONFIG[k] = v + + ALGO_CONFIG['Config_Updated_Timestamp'] = timestamp + VAL_KEY.set(VK_OUT, json.dumps(ALGO_CONFIG)) + with open('algo_config.json', 'w', encoding='utf-8') as f: + json.dump(ALGO_CONFIG, f, indent=4) + print(f"Algo Config Updated @ {timestamp}; {data}") except valkey.exceptions.ConnectionError as e: print(f"Could not connect to Valkey. Please check the publish server is up; {e}") @@ -53,15 +63,20 @@ async def orchestrator() -> None: ### MAIN STARTUP ### async def main() -> None: - global EXTEND_CLIENT global VAL_KEY global CON + global ALGO_CONFIG VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) - engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') + # engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') - async with engine.connect() as CON: - await orchestrator() + with open('algo_config.json', 'r', encoding='utf-8') as f: + # ALGO_CONFIG = json.load(f, object_hook=lambda d: Algo_Config(**d)) + ALGO_CONFIG = json.load(f) + ALGO_CONFIG['Config_Updated_Timestamp'] = round(datetime.now().timestamp()*1000) + + # async with engine.connect() as CON: + await orchestrator() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) diff --git a/algo_orchestrator/Dockerfile b/algo_orchestrator/Dockerfile new file mode 100644 index 0000000..bd579d7 --- /dev/null +++ b/algo_orchestrator/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.13-slim + +RUN apt-get update && \ + apt-get install -y build-essential + +RUN gcc --version +RUN rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Finally, run gunicorn. +CMD [ "python", "-u" ,"algo_orchestrator.py"] +# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 3615543..b6c42fb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,7 @@ services: context: ./ dockerfile: ./algo/Dockerfile depends_on: + - algo_orchestrator - ws_aster - ws_aster_user - ws_extended_fund_rate @@ -17,6 +18,16 @@ services: - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data network_mode: "host" + algo_orchestrator: + container_name: algo_orchestrator + restart: "unless-stopped" + build: + context: ./ + dockerfile: ./algo_orchestrator/Dockerfile + volumes: + - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + network_mode: "host" ws_aster: container_name: ws_aster restart: "unless-stopped" @@ -51,10 +62,10 @@ services: container_name: ws_extended_orderbook restart: "unless-stopped" build: - context: ./ + context: ./ dockerfile: ./ws_extended_orderbook/Dockerfile volumes: - - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to dataw - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data network_mode: "host" ws_extended_user: diff --git a/main.py b/main.py index 432d5ea..9b55f9f 100644 --- a/main.py +++ b/main.py @@ -25,6 +25,7 @@ from x10.models.order import OrderSide import modules.utils as utils import modules.aster_auth as aster_auth import modules.extended_auth as extend_auth +import modules.structs as structs ### Database ### EXTEND_CLIENT = None @@ -35,28 +36,24 @@ VAL_KEY = None load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo.log' +### Algo Config ### +ALGO_CONFIG: structs.Algo_Config = None + ### CONSTANTS ### -ASTER_ALLOW_ORDERING: bool = False -EXTEND_ALLOW_ORDERING: bool = False -LOOP_SLEEP_SEC = 1 -PRICE_WORSENER_ASTER = 0.00 -PRICE_WORSENER_EXTEND = 0.0 +ASTER = structs.Perpetual_Exchange( + mult = 150, + lh_asset = 'ETH', + rh_asset = 'USDT', + symbol_asset_separator = '', +) -MIN_TIME_TO_FUNDING: int = 1000 * 60 * 7 # 5 minutes. - -ASTER_LH_ASSET: str = 'ETH' -ASTER_RH_ASSET: str = 'USDT' -ASTER_TICKER: str = ASTER_LH_ASSET + ASTER_RH_ASSET EXTEND_LH_ASSET: str = 'ETH' EXTEND_RH_ASSET: str = 'USD' EXTEND_TICKER: str = EXTEND_LH_ASSET + '-' + EXTEND_RH_ASSET -TARGET_OPEN_CASH_POSITION: float = 10 # Each side (alpha and hedge) - ### GLOBALS ### ASTER_MULT = 150 EXTEND_MULT = 50 -MAX_TARGET_NOTIONAL = min([ASTER_MULT, EXTEND_MULT]) * TARGET_OPEN_CASH_POSITION ASTER_MIN_ORDER_QTY = 0.001 EXTEND_MIN_ORDER_QTY = 0.01 @@ -73,164 +70,10 @@ EXTEND_OPEN_ORDERS = [] # ASTER_OPEN_POSITIONS = [] # EXTEND_OPEN_POSITIONS = [] -@dataclass(kw_only=True) -class Valkey_Stream: - channel: str - data: Any = None - none_fill: Any = None - - async def update(self): - r = VAL_KEY.get(self.channel) - self.data = json.loads(r) if r is not None else self.none_fill - -@dataclass(kw_only=True) -class Position: - market: str - notional: float - qty: float - -@dataclass(kw_only=True) -class Open_Positions: - Valkey: Valkey_Stream - Positions: list[Position] = field(default_factory = list) - - async def update(self) -> None: - self.Valkey = await self.Valkey.update() - -### Collateral ### -@dataclass(kw_only=True) -class Asset: - symbol: str - balance: float - # min_order_qty: float - -@dataclass(kw_only=True) -class Collateral: - Valkey: Valkey_Stream - # Last_Updated_Ts_Ms: int - # Last_Pulled_Ts_Ms: int - Assets: list[Asset] = field(default_factory = list) - - async def update(self) -> None: - self.Valkey = await self.Valkey.update() - -### Orders ### -@dataclass(kw_only=True) -class Order: - symbol: str - order_id: str - client_order_id: str - side: str - order_type: str - original_qty: float - original_price: float - order_status: str - last_filled_qty: float - last_filled_price: float - commission: float - trade_is_maker: bool - -@dataclass(kw_only=True) -class Order_Updates: - # Last_Updated_Ts_Ms: int - # Last_Pulled_Ts_Ms: int - Valkey: Valkey_Stream - Orders: list[Order] = field(default_factory = list) - - async def update(self) -> None: - self.Valkey = await self.Valkey.update() - -### Funding Rate ### -@dataclass(kw_only=True) -class Funding_Rate: - # Last_Updated_Ts_Ms: int - # Last_Pulled_Ts_Ms: int - Valkey: Valkey_Stream - timestamp_arrival: int - timestamp_msg: int - symbol: str - funding_rate: float - next_funding_time_ts_ms: int - mark_price: float - index_price: float - estimated_settle_price: float - - async def update(self) -> None: - self.Valkey = await self.Valkey.update() - -### Markets Info ### -@dataclass(kw_only=True) -class Market: - symbol: str - min_order_qty: float - -@dataclass(kw_only=True) -class Markets_Details: - Markets: list[Market] = field(default_factory=list) - - -### Exchanges ### -@dataclass(kw_only=True) -class Perpetual_Exchange: - Order_Updates: Order_Updates - Position_Updates: Open_Positions - Collateral_Updates: Collateral - Funding_Rate: Funding_Rate - Markets: Markets_Details - mult: int - lh_asset: str - rh_asset: str - symbol_asset_separator: str = '' - symbol: str - - async def update(self): - await self.Collateral_Updates.update() - await self.Order_Updates.update() - await self.Position_Updates.update() - await self.Funding_Rate.update() - - def __post_init__(self) -> None: - self.symbol = f'{self.lh_asset.upper()}{self.symbol_asset_separator}{self.rh_asset.upper()}' - - -@dataclass(kw_only=True) -class Aster(Perpetual_Exchange): - name: str = 'Aster' - lh_asset: str = 'ETH' - rh_asset: str = 'USDT' - - def __post_init__(self): - super().__post_init__() - self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) - self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) - self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) - self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) - - -@dataclass(kw_only=True) -class Extend(Perpetual_Exchange): - name: str = 'Extended' - lh_asset: str = 'ETH' - rh_asset: str = 'USD' - symbol_asset_separator: str = '-' - - def __post_init__(self): - super().__post_init__() - self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) - self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) - self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) - self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) - - # EXCHANGES: list = [ Aster(), Extend() ] ### FLAGS ### -@dataclass(kw_only=True) -class Flags: - LIQUIDATE_POS_AND_KILL_ALGO_FLAG: bool = False - NET_FUNDING_IS_ZERO: bool = False -Flags = Flags() - +Flags = structs.Flags() ### UTILS ### def round_decimal_down(value, decimal_places): @@ -265,7 +108,7 @@ async def get_aster_collateral(): "params": {} } r = await aster_auth.post_authenticated_url(fut_acct_balances) - ASTER_AVAIL_COLLATERAL = float([d for d in r if d.get('asset')==ASTER_RH_ASSET][0].get('availableBalance')) + ASTER_AVAIL_COLLATERAL = float([d for d in r if d.get('asset')==ASTER.rh_asset][0].get('availableBalance')) async def get_aster_notional_position(resp: dict | None = None): global ASTER_NOTIONAL_POSITION @@ -276,15 +119,16 @@ async def get_aster_notional_position(resp: dict | None = None): "url": "/fapi/v3/positionRisk", "method": "GET", "params": { - 'symbol': ASTER_TICKER, + 'symbol': ASTER.symbol, } } resp = await aster_auth.post_authenticated_url(fut_acct_positionRisk) - d = [x for x in resp if x.get('symbol', None) == ASTER_TICKER][0] + d = [x for x in resp if x.get('symbol', None) == ASTER.symbol][0] + if len(d) < 1: logging.info(f'BAD NOTIONAL - ASTER CHANGE: Empty d: {d}; resp: {resp}') - kill_algo() + await kill_algo() aster_unrealized_pnl = float(d['unrealized_pnl']) if d.get('unrealized_pnl') is not None else float(d['unRealizedProfit']) @@ -297,11 +141,11 @@ async def get_aster_notional_position(resp: dict | None = None): ASTER_NOTIONAL_POSITION = notional - aster_unrealized_pnl if not resp: ASTER_MULT = float(d['leverage']) - if abs(ASTER_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: + if abs(ASTER_NOTIONAL_POSITION) > ALGO_CONFIG.Max_Target_Notional*1.01: logging.info(f'BAD NOTIONAL - ASTER CHANGE: {ASTER_NOTIONAL_POSITION}; UR PNL: {aster_unrealized_pnl}; MULT: {ASTER_MULT}; d: {d}; resp: {resp}') - kill_algo() + await kill_algo() if ASTER_NOTIONAL_POSITION != previous_notional_position: - logging.info(f'ASTER NOTIONAL CHANGE: {ASTER_NOTIONAL_POSITION:.2f}; UR PNL: {aster_unrealized_pnl:.2f}; MULT: {ASTER_MULT:.0f}; resp: {bool(resp)}') + logging.info(f'ASTER NOTIONAL CHANGE: {previous_notional_position} -> {ASTER_NOTIONAL_POSITION:.2f}; UR PNL: {aster_unrealized_pnl:.2f}; MULT: {ASTER_MULT:.0f}; resp: {bool(resp)}') async def get_extend_collateral(): global EXTEND_AVAIL_COLLATERAL @@ -317,14 +161,26 @@ async def get_extend_notional(resp: dict | None = None): resp = dict(await EXTEND_CLIENT.account.get_positions()).get('data', {}) pos_dict = [dict(d) for d in resp if dict(d).get('market') == EXTEND_TICKER] - pos_dict = pos_dict[0] - unrealized_pnl = pos_dict.get('unrealised_pnl', 0) - previous_notional_position = EXTEND_NOTIONAL_POSITION - EXTEND_NOTIONAL_POSITION = float(pos_dict.get('value', 0)) - float(unrealized_pnl) - EXTEND_MULT = pos_dict.get('leverage', EXTEND_MULT) - if EXTEND_NOTIONAL_POSITION != previous_notional_position: - logging.info(f'EXTEND NOTIONAL CHANGE: {EXTEND_NOTIONAL_POSITION:.2f}; UR PNL: {unrealized_pnl:.2f}; MULT: {EXTEND_MULT:.0f}; resp: {bool(resp)}') - + + if not pos_dict: + logging.info('get_extend_notional - No Positions') + else: + pos_dict = pos_dict[0] + unrealized_pnl = pos_dict.get('unrealised_pnl', 0) + previous_notional_position = EXTEND_NOTIONAL_POSITION + position_side = pos_dict['side'] # LONG or SHORT + notional_pos_abs = abs(float(pos_dict['value'])) + if position_side == 'LONG': + notional_pos_sided = notional_pos_abs + elif position_side == 'SHORT': + notional_pos_sided = notional_pos_abs * -1 + else: + logging.info(f'EXTEND BAD SIDE ON POSITION UPDATE: {pos_dict}') + + EXTEND_NOTIONAL_POSITION = notional_pos_sided - float(unrealized_pnl) + EXTEND_MULT = pos_dict.get('leverage', EXTEND_MULT) + if EXTEND_NOTIONAL_POSITION != previous_notional_position: + logging.info(f'EXTEND NOTIONAL CHANGE: {previous_notional_position} -> {EXTEND_NOTIONAL_POSITION:.2f}; UR PNL: {unrealized_pnl:.2f}; MULT: {EXTEND_MULT:.0f}; resp: {bool(resp)}') ### EXCHANGE INFO ### async def get_aster_exch_info(): @@ -370,29 +226,19 @@ async def kill_algo(): logging.info('ALGO KILL FLAG ACTIVATED; CANCELLING OPEN ORDERS AND SHUTTING DOWN') raise ValueError('KILL FLAG ACTIVATED') -### ROUTES ### -# async def aster_remainder_route(): -# # Check open orders...cancel replace or new order? -# # Check collateral to confirm you have enough money to trade -# # if CR, what should be the new price? has it changed? maybe no action needed? how long has it been working? -# # if not enough collateral then need to liquidate and kill algo - flip flag - -# # if good to order, then create and post order. ADD to LOCAL OPEN ORDERS LIST - - -# pass - -# async def extend_remainder_route(): -# pass - - ### ALGO LOOP ### async def run_algo(): + global ALGO_CONFIG try: while True: loop_start = time.time() - print('__________Start___________') + # print('__________Start___________') + ### ALGO CONIFG ### + ALGO_CONFIG = json.loads(VAL_KEY.get('fr_orchestrator_output'), object_hook=lambda d: structs.Algo_Config(**d)) + ALGO_CONFIG.Max_Target_Notional = float(min([ASTER_MULT, EXTEND_MULT]) * ALGO_CONFIG.Target_Open_Cash_Position) + + MIN_TIME_TO_FUNDING = ALGO_CONFIG.Min_Time_To_Funding_Minutes * 60 * 1000 ### Load Data from Feedhandlers ### ASTER_FUND_RATE_DICT = json.loads(VAL_KEY.get('fund_rate_aster')) @@ -521,11 +367,10 @@ async def run_algo(): if ALPHA_FUND_RATE < 0: ALPHA_CARRY_SIDE = 'BUY' - ALPHA_TGT_NOTIONAL = MAX_TARGET_NOTIONAL + ALPHA_TGT_NOTIONAL = ALGO_CONFIG.Max_Target_Notional else: ALPHA_CARRY_SIDE = 'SELL' - ALPHA_TGT_NOTIONAL = MAX_TARGET_NOTIONAL*-1 - + ALPHA_TGT_NOTIONAL = ALGO_CONFIG.Max_Target_Notional*-1 def calc_next_net_fund_rate(FUNDINGS_AT_SAME_TIME_NEXT_HR: bool) -> float: if FUNDINGS_AT_SAME_TIME_NEXT_HR: @@ -566,46 +411,52 @@ async def run_algo(): ASTER_TGT_TAIL_BASE_QTY = Decimal(str(float(ASTER_TGT_TAIL) / float(ASTER_TOB_PX))).quantize(Decimal(str(0.001)), rounding=ROUND_DOWN) EXTEND_TGT_TAIL_BASE_QTY = Decimal(str(float(EXTEND_TGT_TAIL) / float(EXTEND_TOB_PX))).quantize(Decimal(str(0.001)), rounding=ROUND_DOWN) - ASTER_TGT_TAIL_ORDERABLE = abs(ASTER_TGT_TAIL_BASE_QTY) >= ASTER_MIN_ORDER_QTY - EXTEND_TGT_TAIL_ORDERABLE = abs(EXTEND_TGT_TAIL_BASE_QTY) >= EXTEND_MIN_ORDER_QTY + MAX_MIN_ORDER_QTY = max([ASTER_MIN_ORDER_QTY, EXTEND_MIN_ORDER_QTY]) - print(f''' - {pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')-datetime.now()):}) | {pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')-datetime.now()):}) - ASTER: {ASTER_FUND_RATE:.6%} [{ASTER_FUND_RATE*10_000:.2f}bps] [{ASTER_FUND_RATE*1_000_000:.0f}pips] | EXTEND: {EXTEND_FUND_RATE:.6%} [{EXTEND_FUND_RATE*10_000:.2f}bps] [{EXTEND_FUND_RATE*1_000_000:.0f}pips] - ASTER: {ASTER_PAYOUT_DIRECTION_STR} | EXTEND: {EXTEND_PAYOUT_DIRECTION_STR} - ASTER: [ Available Collateral: {ASTER_AVAIL_COLLATERAL:.4f} ] | EXTEND: [ Available Collateral: {EXTEND_AVAIL_COLLATERAL:.4f} ] - ASTER: [ Notional Position $ : {ASTER_NOTIONAL_POSITION:.4f} ] | EXTEND: [ Notional Position $ : {EXTEND_NOTIONAL_POSITION:.4f} ] - - SAME TIME? : {FUNDINGS_AT_SAME_TIME_NEXT_HR} [ Minutes Between Fundings: {min_between_fundings} ] - NET FUNDING : {NEXT_NET_FUNDING_RATE:.6%} [{NEXT_NET_FUNDING_RATE*10_000:.2f}bps] [{NEXT_NET_FUNDING_RATE*1_000_000:.0f}pips]; Is Zero?: {Flags.NET_FUNDING_IS_ZERO} - ALPHA SIDE : {ALPHA_EXCH} [{ALPHA_CARRY_SIDE}] + ASTER_TGT_TAIL_ORDERABLE = abs(ASTER_TGT_TAIL_BASE_QTY) >= MAX_MIN_ORDER_QTY + EXTEND_TGT_TAIL_ORDERABLE = abs(EXTEND_TGT_TAIL_BASE_QTY) >= MAX_MIN_ORDER_QTY - TGT NOTIONAL: $ {MAX_TARGET_NOTIONAL if not Flags.NET_FUNDING_IS_ZERO else 0.00} + def print_summary(use_logging: bool = False): + OUT: print | logging.info = logging.info if use_logging else print + + OUT(f''' + {pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')-datetime.now()):}) | {pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')-datetime.now()):}) + ASTER: {ASTER_FUND_RATE:.6%} [{ASTER_FUND_RATE*10_000:.2f}bps] [{ASTER_FUND_RATE*1_000_000:.0f}pips] | EXTEND: {EXTEND_FUND_RATE:.6%} [{EXTEND_FUND_RATE*10_000:.2f}bps] [{EXTEND_FUND_RATE*1_000_000:.0f}pips] + ASTER: {ASTER_PAYOUT_DIRECTION_STR} | EXTEND: {EXTEND_PAYOUT_DIRECTION_STR} + ASTER: [ Available Collateral: {ASTER_AVAIL_COLLATERAL:.4f} ] | EXTEND: [ Available Collateral: {EXTEND_AVAIL_COLLATERAL:.4f} ] + ASTER: [ Notional Position $ : {ASTER_NOTIONAL_POSITION:.4f} ] | EXTEND: [ Notional Position $ : {EXTEND_NOTIONAL_POSITION:.4f} ] + + SAME TIME? : {FUNDINGS_AT_SAME_TIME_NEXT_HR} [ Minutes Between Fundings: {min_between_fundings} ] + NET FUNDING : {NEXT_NET_FUNDING_RATE:.6%} [{NEXT_NET_FUNDING_RATE*10_000:.2f}bps] [{NEXT_NET_FUNDING_RATE*1_000_000:.0f}pips]; Is Zero?: {Flags.NET_FUNDING_IS_ZERO} + ALPHA SIDE : {ALPHA_EXCH} [{ALPHA_CARRY_SIDE}] - ASTER: {ASTER_NOTIONAL_POSITION:.4f} -> {ASTER_TGT_NOTIONAL:.2f} [ Remain: {ASTER_TGT_TAIL:.4f} ] | EXTEND: {EXTEND_NOTIONAL_POSITION:.4f} -> {EXTEND_TGT_NOTIONAL:.2f} [ Remain: {EXTEND_TGT_TAIL:.4f} ] - ASTER: {ASTER_TGT_NOTIONAL:.4f} - {ASTER_NOTIONAL_POSITION:.4f} = Tail: {ASTER_TGT_TAIL:4f} | EXTEND: {EXTEND_TGT_NOTIONAL:.4f} - {EXTEND_NOTIONAL_POSITION:.4f} = Tail: {EXTEND_TGT_TAIL:4f} - ASTER: {ASTER_TGT_TAIL_BASE_QTY:.4f} > {ASTER_MIN_ORDER_QTY:.4f} min [ Order: {ASTER_TGT_TAIL_ORDERABLE} ] | EXTEND: {EXTEND_TGT_TAIL_BASE_QTY:.4f} > {EXTEND_MIN_ORDER_QTY:.4f} min [ Order: {EXTEND_TGT_TAIL_ORDERABLE} ] + TGT NOTIONAL: $ {ALGO_CONFIG.Max_Target_Notional if not Flags.NET_FUNDING_IS_ZERO else 0.00} - --- ASTER OPEN ORDERS --- - {ASTER_OPEN_ORDERS} - - --- EXTEND OPEN ORDERS --- - {EXTEND_OPEN_ORDERS} - ''') - + ASTER: {ASTER_NOTIONAL_POSITION:.4f} -> {ASTER_TGT_NOTIONAL:.2f} [ Remain: {ASTER_TGT_TAIL:.4f} ] | EXTEND: {EXTEND_NOTIONAL_POSITION:.4f} -> {EXTEND_TGT_NOTIONAL:.2f} [ Remain: {EXTEND_TGT_TAIL:.4f} ] + ASTER: {ASTER_TGT_NOTIONAL:.4f} - {ASTER_NOTIONAL_POSITION:.4f} = Tail: {ASTER_TGT_TAIL:4f} | EXTEND: {EXTEND_TGT_NOTIONAL:.4f} - {EXTEND_NOTIONAL_POSITION:.4f} = Tail: {EXTEND_TGT_TAIL:4f} + ASTER: {ASTER_TGT_TAIL_BASE_QTY:.4f} > {MAX_MIN_ORDER_QTY:.4f} min [ Order: {ASTER_TGT_TAIL_ORDERABLE} ] | EXTEND: {EXTEND_TGT_TAIL_BASE_QTY:.4f} > {MAX_MIN_ORDER_QTY:.4f} min [ Order: {EXTEND_TGT_TAIL_ORDERABLE} ] + + --- ASTER OPEN ORDERS --- + {ASTER_OPEN_ORDERS} + + --- EXTEND OPEN ORDERS --- + {EXTEND_OPEN_ORDERS} + ''') + if ALGO_CONFIG.print_summary_each_loop: + print_summary() + # print_summary() ### ROUTES ### # ASTER - if ASTER_TGT_TAIL_ORDERABLE and ASTER_ALLOW_ORDERING: - symbol = ASTER_TICKER + if ASTER_TGT_TAIL_ORDERABLE and ALGO_CONFIG.Allow_Ordering_Aster: + symbol = ASTER.symbol side = 'BUY' if ASTER_TGT_TAIL_BASE_QTY > 0.00 else 'SELL' qty = str(abs(ASTER_TGT_TAIL_BASE_QTY)) - price = ASTER_TOB_PX - PRICE_WORSENER_ASTER if side == 'BUY' else ASTER_TOB_PX + PRICE_WORSENER_ASTER + price = ASTER_TOB_PX - ALGO_CONFIG.Price_Worsener_Aster if side == 'BUY' else ASTER_TOB_PX + ALGO_CONFIG.Price_Worsener_Aster - if abs(abs(float(ASTER_TGT_TAIL_BASE_QTY))*float(price)) + abs(ASTER_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: - pass + if abs( ( float(ASTER_TGT_TAIL_BASE_QTY)*float(price) ) + ASTER_NOTIONAL_POSITION ) > ALGO_CONFIG.Max_Target_Notional*1.01: logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - ASTER: {ASTER_NOTIONAL_POSITION} + {float(ASTER_TGT_TAIL_BASE_QTY)*float(price)} (qty: {float(ASTER_TGT_TAIL_BASE_QTY):.2f}; px: {float(price):.2f})') - # await aster_remainder_route() + await kill_algo() if ASTER_OPEN_ORDERS: open_order_id = ASTER_OPEN_ORDERS[0].get('order_id') if ASTER_OPEN_ORDERS[0].get('order_id') is not None else ASTER_OPEN_ORDERS[0]['orderId'] open_order_px = float(ASTER_OPEN_ORDERS[0].get('price')) if ASTER_OPEN_ORDERS[0].get('price') is not None else float(ASTER_OPEN_ORDERS[0]['original_price']) @@ -617,7 +468,7 @@ async def run_algo(): "url": "/fapi/v3/order", "method": "DELETE", "params": { - 'symbol': ASTER_TICKER, + 'symbol': ASTER.symbol, 'orderId': open_order_id, } } @@ -655,6 +506,7 @@ async def run_algo(): ASTER_OPEN_ORDERS.append(order_resp) utils.send_tg_alert(f'FR_ALGO - ASTER Order. Start_$: {ASTER_NOTIONAL_POSITION:.2f}; Value: {float(ASTER_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') logging.info(f'ASTER ORDER PLACED SUCCESS: {order_resp}') + print_summary(use_logging=True) else: logging.warning('ASTER PLACE ORDER CHECKS FAILED, SKIPPING') @@ -663,16 +515,15 @@ async def run_algo(): await aster_cancel_all_orders() # EXTEND - if EXTEND_TGT_TAIL_ORDERABLE and EXTEND_ALLOW_ORDERING: + if EXTEND_TGT_TAIL_ORDERABLE and ALGO_CONFIG.Allow_Ordering_Extend: symbol = EXTEND_TICKER side = OrderSide.BUY if EXTEND_TGT_TAIL_BASE_QTY > 0.00 else OrderSide.SELL qty = Decimal(str(abs(EXTEND_TGT_TAIL_BASE_QTY))) - price = EXTEND_TOB_PX - PRICE_WORSENER_EXTEND if side == 'BUY' else EXTEND_TOB_PX + PRICE_WORSENER_EXTEND + price = EXTEND_TOB_PX - ALGO_CONFIG.Price_Worsener_Extend if side == 'BUY' else EXTEND_TOB_PX + ALGO_CONFIG.Price_Worsener_Extend - if abs(float(EXTEND_TGT_TAIL_BASE_QTY)*float(price)) + abs(float(EXTEND_NOTIONAL_POSITION)) > MAX_TARGET_NOTIONAL*1.01: + if abs( ( float(EXTEND_TGT_TAIL_BASE_QTY)*float(price) ) + EXTEND_NOTIONAL_POSITION ) > ALGO_CONFIG.Max_Target_Notional*1.01: logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - EXTEND: {EXTEND_NOTIONAL_POSITION:.2f} + {float(EXTEND_TGT_TAIL_BASE_QTY)*float(price):.2f} (qty: {float(EXTEND_TGT_TAIL_BASE_QTY):.2f}; px: {float(price):.2f})') - pass - # await extend_remainder_route() + await kill_algo() if EXTEND_OPEN_ORDERS: open_order_dict = dict(EXTEND_OPEN_ORDERS[0]) open_order_id = open_order_dict['external_id'] @@ -707,6 +558,7 @@ async def run_algo(): EXTEND_OPEN_ORDERS.append(order_dict) utils.send_tg_alert(f'FR_ALGO - EXTEND Order. Start_$: {EXTEND_NOTIONAL_POSITION:.2f}; Value: {float(EXTEND_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') logging.info(f'EXTEND ORDER PLACED SUCCESS: {order_dict}') + print_summary(use_logging=True) else: logging.warning('EXTEND PLACE ORDER CHECKS FAILED, SKIPPING') @@ -716,7 +568,7 @@ async def run_algo(): print(f'__________ End ___________ (Algo Engine ms: {(time.time() - loop_start)*1000})') - time.sleep(LOOP_SLEEP_SEC) + time.sleep(ALGO_CONFIG.Loop_Sleep_Sec) except KeyboardInterrupt: logging.info('CANCELLING OPEN ORDERS') @@ -734,11 +586,16 @@ async def main(): global EXTEND_CLIENT global VAL_KEY global CON + global ALGO_CONFIG _, EXTEND_CLIENT = await extend_auth.create_auth_account_and_trading_client() VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') + with open('algo_config.json', 'r', encoding='utf-8') as file: + ALGO_CONFIG = json.load(file, object_hook=lambda d: structs.Algo_Config(**d)) + ALGO_CONFIG.Max_Target_Notional = float(min([ASTER_MULT, EXTEND_MULT]) * ALGO_CONFIG.Target_Open_Cash_Position) + async with engine.connect() as CON: ### ASTER SETUP ### await get_aster_collateral() diff --git a/main_update.py b/main_v0.py similarity index 100% rename from main_update.py rename to main_v0.py diff --git a/modules/__pycache__/db.cpython-313.pyc b/modules/__pycache__/db.cpython-313.pyc index c7af759..eea2b4e 100644 Binary files a/modules/__pycache__/db.cpython-313.pyc and b/modules/__pycache__/db.cpython-313.pyc differ diff --git a/modules/__pycache__/structs.cpython-313.pyc b/modules/__pycache__/structs.cpython-313.pyc new file mode 100644 index 0000000..5b7fd84 Binary files /dev/null and b/modules/__pycache__/structs.cpython-313.pyc differ diff --git a/modules/db.py b/modules/db.py index 19ca86d..83080d6 100644 --- a/modules/db.py +++ b/modules/db.py @@ -18,7 +18,9 @@ async def insert_df_to_mysql( if isinstance(params, dict): params = [params] df = pd.DataFrame(params) - + else: + df = params + print(f'DB INSERT: table: {table_name}; CON: {CON}; params: {params}') await CON.run_sync( lambda sync_conn: df.to_sql(name=table_name, con=sync_conn, if_exists='append', index=False) ) diff --git a/modules/structs.py b/modules/structs.py new file mode 100644 index 0000000..a286ec0 --- /dev/null +++ b/modules/structs.py @@ -0,0 +1,181 @@ +import json +from dataclasses import dataclass, field +from typing import Any + +import valkey + + +@dataclass(kw_only=True) +class Algo_Config: + Config_Updated_Timestamp: int + Allow_Ordering_Aster: bool + Allow_Ordering_Extend: bool + Loop_Sleep_Sec: int + Max_Target_Notional: float + Min_Time_To_Funding_Minutes: int + Price_Worsener_Aster: float + Price_Worsener_Extend: float + Target_Open_Cash_Position: int + +@dataclass(kw_only=True) +class Flags: + LIQUIDATE_POS_AND_KILL_ALGO_FLAG: bool = False + NET_FUNDING_IS_ZERO: bool = False + + +@dataclass(kw_only=True) +class Valkey_Stream: + client: valkey.Valkey + channel: str + data: Any = None + none_fill: Any = None + + async def update(self): + r = self.client.get(self.channel) + self.data = json.loads(r) if r is not None else self.none_fill + + +@dataclass(kw_only=True) +class Position: + market: str + notional: float + qty: float + + +@dataclass(kw_only=True) +class Open_Positions: + Valkey: Valkey_Stream + Positions: list[Position] = field(default_factory = list) + + async def update(self) -> None: + self.Valkey = await self.Valkey.update() + + +### Collateral ### +@dataclass(kw_only=True) +class Asset: + symbol: str + balance: float + # min_order_qty: float + + +@dataclass(kw_only=True) +class Collateral: + Valkey: Valkey_Stream + # Last_Updated_Ts_Ms: int + # Last_Pulled_Ts_Ms: int + Assets: list[Asset] = field(default_factory = list) + + async def update(self) -> None: + self.Valkey = await self.Valkey.update() + + +### Orders ### +@dataclass(kw_only=True) +class Order: + symbol: str + order_id: str + client_order_id: str + side: str + order_type: str + original_qty: float + original_price: float + order_status: str + last_filled_qty: float + last_filled_price: float + commission: float + trade_is_maker: bool + + +@dataclass(kw_only=True) +class Order_Updates: + # Last_Updated_Ts_Ms: int + # Last_Pulled_Ts_Ms: int + Valkey: Valkey_Stream + Orders: list[Order] = field(default_factory = list) + + async def update(self) -> None: + self.Valkey = await self.Valkey.update() + + +### Funding Rate ### +@dataclass(kw_only=True) +class Funding_Rate: + # Last_Updated_Ts_Ms: int + # Last_Pulled_Ts_Ms: int + Valkey: Valkey_Stream + timestamp_arrival: int + timestamp_msg: int + symbol: str + funding_rate: float + next_funding_time_ts_ms: int + mark_price: float + index_price: float + estimated_settle_price: float + + async def update(self) -> None: + self.Valkey = await self.Valkey.update() + + +### Markets Info ### +@dataclass(kw_only=True) +class Market: + symbol: str + min_order_qty: float + + +@dataclass(kw_only=True) +class Markets_Details: + Markets: list[Market] = field(default_factory=list) + + +### Exchanges ### +@dataclass(kw_only=True) +class Perpetual_Exchange: + # Order_Updates: Order_Updates + # Position_Updates: Open_Positions + # Collateral_Updates: Collateral + # Funding_Rate: Funding_Rate + # Markets: Markets_Details + mult: int + lh_asset: str + rh_asset: str + symbol_asset_separator: str = '' + + async def update(self): + await self.Collateral_Updates.update() + await self.Order_Updates.update() + await self.Position_Updates.update() + await self.Funding_Rate.update() + + def __post_init__(self) -> None: + self.symbol = f'{self.lh_asset.upper()}{self.symbol_asset_separator}{self.rh_asset.upper()}' + + +@dataclass(kw_only=True) +class Aster(Perpetual_Exchange): + name: str = 'Aster' + lh_asset: str = 'ETH' + rh_asset: str = 'USDT' + + def __post_init__(self): + super().__post_init__() + self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) + self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) + self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) + self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) + + +@dataclass(kw_only=True) +class Extend(Perpetual_Exchange): + name: str = 'Extended' + lh_asset: str = 'ETH' + rh_asset: str = 'USD' + symbol_asset_separator: str = '-' + + def __post_init__(self): + super().__post_init__() + self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) + self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) + self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) + self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) diff --git a/ng.py b/ng.py new file mode 100644 index 0000000..3ecfad0 --- /dev/null +++ b/ng.py @@ -0,0 +1,36 @@ +import os +from nicegui import ui, app +from sqlalchemy import create_engine +import json +import valkey + + +VALKEY_R = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) + + + +def root(): + app.add_static_files(max_cache_age=0, url_path='/static', local_directory=os.path.join(os.path.dirname(__file__), 'nicegui_modules/static')) + ui.add_head_html(''' + + + + + ''' + ) + # ui.add_head_html('') + # update_body_scroll(bool_override=ALLOW_BODY_SCROLL) + + ui.sub_pages({ + '/': controls_grid, + }).classes('w-full') + + +async def controls_grid(): + with ui.grid(columns=16).classes('w-full gap-0 auto-fit'): + with ui.card().tight().classes('w-full col-span-full no-shadow border border-black-200').style('overflow: auto;'): + ui.html('
', sanitize=False).classes('w-full') + ui.run_javascript('await create_tv();') + + +ui.run(root, storage_secret="123ABC", reload=True, dark=True, title='Atwater_Trading') \ No newline at end of file diff --git a/ws_extended_user.py b/ws_extended_user.py index 2e39fe9..69154ac 100644 --- a/ws_extended_user.py +++ b/ws_extended_user.py @@ -130,6 +130,7 @@ async def ws_stream(): 'created_time_ts': t['createdTime'], 'is_taker': t['isTaker'], } + list_for_df.append(trade_update) LOCAL_RECENT_TRADES = utils.upsert_list_of_dicts_by_id(LOCAL_RECENT_TRADES, trade_update, id='trade_id', seq_check_field='sequence_id') LOCAL_RECENT_TRADES = [t for t in LOCAL_RECENT_TRADES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] @@ -187,15 +188,17 @@ async def ws_stream(): 'tp_limit_price': float(p.get('tpLimitPrice', 0)), 'sl_trigger_price': float(p.get('slTriggerPrice', 0)), 'sl_limit_price': float(p.get('slLimitPrice', 0)), - 'adl_percentile': p['adl'], # closer to 100 means higher chance of auto-deleveraging + 'adl_percentile': p.get('adl', 0), # closer to 100 means higher chance of auto-deleveraging 'created_at_ts': p['createdAt'], 'updated_at_ts': p['updatedAt'], } + list_for_df.append(position_update) LOCAL_RECENT_POSITIONS = utils.upsert_list_of_dicts_by_id(LOCAL_RECENT_POSITIONS, position_update, id='position_id', seq_check_field='sequence_id') LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_POSITIONS) VAL_KEY.set(VK_POSITIONS, VAL_KEY_OBJ) + await db.insert_df_to_mysql(table_name='fr_extended_user_position', params=list_for_df, CON=CON) continue case _: @@ -231,8 +234,8 @@ async def main(): engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: await extended_db.create_fr_extended_user_balance(CON=CON) - await extended_db.create_fr_extended_user_order(CON=CON) await extended_db.create_fr_extended_user_position(CON=CON) + await extended_db.create_fr_extended_user_order(CON=CON) await extended_db.create_fr_extended_user_trade(CON=CON) await ws_stream() else: