import asyncio import json import logging import os import socket import traceback from datetime import datetime from typing import AsyncContextManager import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore import valkey import websockets from dotenv import load_dotenv from sqlalchemy.ext.asyncio import create_async_engine import modules.aster_auth as aster_auth import modules.aster_db as aster_db import modules.db as db import modules.utils as utils ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = True USE_VK: bool = True CON: AsyncContextManager VAL_KEY: valkey.Valkey VK_ORDERS_TRADES: str = 'fr_aster_user_orders' VK_MARGIN_CALLS: str = 'fr_aster_user_margin_calls' VK_BALANCES: str = 'fr_aster_user_balances' VK_POSITIONS: str = 'fr_aster_user_positions' ### Logging ### load_dotenv() LOG_FILEPATH: str = f'{os.getenv(key="LOGS_PATH")}/Fund_Rate_Aster_User.log' ### CONSTANTS ### WSS_URL: str = "wss://fstream.asterdex.com/ws/" LOCAL_RECENT_UPDATES_LOOKBACK_SEC: int = 30 ### Globals ### Listen_Key: str Listen_Key_Last_Update_TS_S: int = 0 Listen_Key_Put_Interval_Sec: int = 1800 Local_Recent_Orders: list[dict] = [] Local_Recent_Margin_Calls: list[dict] = [] Local_Recent_Balances: list[dict] = [] Local_Recent_Positions: list[dict] = [] async def get_new_listen_key() -> str: global Listen_Key_Last_Update_TS_S listen_key_request: dict = { "url": "/fapi/v3/listenKey", "method": "POST", "params": {} } r: dict = await aster_auth.post_authenticated_url(listen_key_request) # ty:ignore[invalid-assignment] listen_key: str = r.get('listenKey', '') print(f'LISTEN KEY: {listen_key}') if listen_key: Listen_Key_Last_Update_TS_S = round(number=datetime.now().timestamp()) return listen_key else: raise ValueError(f'Listen Key is empty; Failed to Update. response: {r}') async def listen_key_interval(): global Listen_Key while True: await asyncio.sleep(delay=Listen_Key_Put_Interval_Sec) Listen_Key = await get_new_listen_key() ### Websocket ### async def ws_stream(): global Listen_Key global Local_Recent_Orders global Local_Recent_Margin_Calls global Local_Recent_Balances global Local_Recent_Positions Listen_Key = await get_new_listen_key() async for websocket in websockets.connect(uri=WSS_URL+Listen_Key, ping_interval=5): logging.info(msg=f"Connected to {WSS_URL}") asyncio.create_task(coro=listen_key_interval()) try: async for message in websocket: ts_arrival: int = round(number=datetime.now().timestamp()*1000) if isinstance(message, str): try: data: dict = json.loads(s=message) channel: str = data.get('e', '') if channel: lookback_min_ts_ms: int = ts_arrival - (LOCAL_RECENT_UPDATES_LOOKBACK_SEC*1000) match channel: case 'ORDER_TRADE_UPDATE': # logging.info(f'ORDER_TRADE_UPDATE: {data}') new_order_update: dict = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['E'], 'timestamp_transaction': data['T'], 'symbol': data['o']["s"], # "BTCUSDT", // Symbol 'client_order_id': data['o']["c"], # "TEST", // Client Order Id 'side': data['o']["S"], # "SELL", // Side 'order_type': data['o']["o"], # "TRAILING_STOP_MARKET", // Order Type 'time_in_force': data['o']["f"], # "GTC", // Time in Force 'original_qty': float(data['o']["q"]), # "0.001", // Original Quantity 'original_price': float(data['o']["p"]), # "0", // Original Price 'avg_price': float(data['o']["ap"]), # :"0", // Average Price 'stop_price': float(data['o'].get("sp", 0)), # :"7103.04", // Stop Price. Please ignore with TRAILING_STOP_MARKET order 'execution_type': data['o']["x"], # "NEW", // Execution Type 'order_status': data['o']["X"], # "NEW", // Order Status 'order_id': data['o']["i"], # 8886774, // Order Id 'last_filled_qty': float(data['o']["l"]), # "0", // Order Last Filled Quantity 'filled_accumulated_qty': float(data['o']["z"]), # "0", // Order Filled Accumulated Quantity 'last_filled_price': float(data['o']["L"]), # "0", // Last Filled Price 'commission_asset': data['o'].get("N", None), # "USDT", // Commission Asset, will not push if no commission 'commission': float(data['o'].get("n",0)), # "0", // Commission, will not push if no commission 'order_trade_time_ts': data['o']["T"], # 1568879465651, // Order Trade Time 'trade_id': data['o']["t"], # 0, // Trade Id 'bid_notional': float(data['o']["b"]), # "0", // Bids Notional 'ask_notional': float(data['o']["a"]), # "9.91", // Ask Notional 'trade_is_maker': data['o']["m"], # false, // Is this trade the maker side? 'trade_is_reduce_only': data['o']["R"], # false, // Is this reduce only 'stop_px_working_type': data['o']["wt"], # :"CONTRACT_PRICE", // Stop Price Working Type 'original_order_type': data['o']["ot"], # :"TRAILING_STOP_MARKET", // Original Order Type 'position_side': data['o']["ps"], # :"LONG", // Position Side 'pushed_w_conditional_order': bool(data['o'].get("cp", False)), # :false, // If Close-All, pushed with conditional order 'activation_price': float(data['o'].get("AP", 0)), # :"7476.89", // Activation Price, only puhed with TRAILING_STOP_MARKET order 'callback_rate': float(data['o'].get("cr", 0)), # :"5.0", // Callback Rate, only puhed with TRAILING_STOP_MARKET order 'realized_profit': float(data['o']["rp"]), # :"0" // Realized Profit of the trade } Local_Recent_Orders = utils.upsert_list_of_dicts_by_id(Local_Recent_Orders, new_order_update, id='order_id', seq_check_field='timestamp_msg') Local_Recent_Orders = [t for t in Local_Recent_Orders if t.get('timestamp_arrival', 0) >= lookback_min_ts_ms] VAL_KEY_OBJ: str = json.dumps(obj=Local_Recent_Orders) VAL_KEY.set(name=VK_ORDERS_TRADES, value=VAL_KEY_OBJ) await db.insert_df_to_mysql(table_name='fr_aster_user_order_trade', params=new_order_update, CON=CON) continue case 'MARGIN_CALL': # logging.info(f'MARGIN_CALL: {data}') list_for_df = [] for p in list(data['p']): margin_call_update: dict = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['E'], 'cross_wallet_balance': float(data.get('cw', 0)), 'symbol': p["s"], # "ETHUSDT", // Symbol 'position_side': p["ps"], # :"LONG", // Position Side 'position_amount': float(p["pa"]), # :"1.327", // Position Amount 'margin_type': p["mt"], # :"CROSSED", // Margin Type 'isolated_wallet': float(p.get("iw", 0)), # :"0", // Isolated Wallet (if isolated position) 'mark_price': float(p["mp"]), # :"187.17127", // Mark Price 'unrealized_pnl': float(p["up"]), # :"-1.166074", // Unrealized PnL 'maint_margin_required': float(p["mm"]), # :"1.614445" // Maintenance Margin Required } list_for_df.append(margin_call_update) Local_Recent_Margin_Calls = utils.upsert_list_of_dicts_by_id(Local_Recent_Margin_Calls, margin_call_update, id='symbol', seq_check_field='timestamp_msg') Local_Recent_Margin_Calls = [t for t in Local_Recent_Margin_Calls if t.get('timestamp_arrival', 0) >= lookback_min_ts_ms] VAL_KEY_OBJ: str = json.dumps(obj=Local_Recent_Margin_Calls) VAL_KEY.set(name=VK_MARGIN_CALLS, value=VAL_KEY_OBJ) await db.insert_df_to_mysql(table_name='fr_aster_user_margin', params=list_for_df, CON=CON) continue case 'ACCOUNT_UPDATE': # logging.info(f'ACCOUNT_UPDATE: {data}') list_for_df_bal = [] list_for_df_pos = [] ### Balance Updates ### if len(list(data['a']['B'])) > 0: for b in list(data['a']['B']): balance_update: dict = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['E'], 'timestamp_transaction': data['T'], 'event_reason_type': data['a']["m"], 'asset': b['a'], 'wallet_balance': float(b['wb']), 'cross_wallet_balance': float(b.get('cw', 0)), 'balance_change_excl_pnl_comms': float(b['bc']), } list_for_df_bal.append(balance_update) Local_Recent_Balances = utils.upsert_list_of_dicts_by_id(Local_Recent_Balances, balance_update, id='asset', seq_check_field='timestamp_msg') Local_Recent_Balances = [t for t in Local_Recent_Balances if t.get('timestamp_arrival', 0) >= lookback_min_ts_ms] VAL_KEY.set(name=VK_BALANCES, value=json.dumps(obj=Local_Recent_Balances)) ### Position Updates ### if len(list(data['a']['P'])) > 0: for p in list(data['a']['P']): position_update: dict = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['E'], 'timestamp_transaction': data['T'], 'event_reason_type': data['a']["m"], 'symbol': p['s'], 'position_amount': float(p['pa']), 'entry_price': float(p['ep']), 'accumulated_realized_pre_fees': float(p['cr']), 'unrealized_pnl': float(p['up']), 'margin_type': p['mt'], 'isolated_wallet': float(p.get('iw', 0)), 'position_side': p['ps'], } list_for_df_pos.append(position_update) Local_Recent_Positions = utils.upsert_list_of_dicts_by_id(Local_Recent_Positions, position_update, id='symbol', seq_check_field='timestamp_msg') Local_Recent_Positions = [t for t in Local_Recent_Positions if t.get('timestamp_arrival', 0) >= lookback_min_ts_ms] VAL_KEY.set(name=VK_POSITIONS, value=json.dumps(obj=Local_Recent_Positions)) if list_for_df_bal: await db.insert_df_to_mysql(table_name='fr_aster_user_account_bal', params=list_for_df_bal, CON=CON) if list_for_df_pos: await db.insert_df_to_mysql(table_name='fr_aster_user_account_pos', params=list_for_df_pos, CON=CON) continue case 'listenKeyExpired': raise ValueError('Listen Key Has Expired; Failed to Update Properly. Restarting.') case _: logging.warning(msg=f'UNMATCHED OTHER MSG: {data}') else: logging.info(msg=f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError): logging.warning(msg=f'Message not in JSON format, skipping: {message}') continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(msg=f'Connection closed: {e}') logging.error(msg=traceback.format_exc()) utils.send_tg_alert(msg=f'WS_Aster_User - Failure: {e}') except Exception as e: logging.error(msg=f'Connection closed: {e}') logging.error(msg=traceback.format_exc()) utils.send_tg_alert(msg=f'WS_Aster_User - Failure: {e}') async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) else: logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") raise NotImplementedError('Cannot run without Valkey') if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: await aster_db.create_fr_aster_user_order_trade_table(CON=CON) await aster_db.create_fr_aster_user_margin_table(CON=CON) await aster_db.create_fr_aster_user_account_bal(CON=CON) await aster_db.create_fr_aster_user_account_pos(CON=CON) await ws_stream() else: logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") raise NotImplementedError('Cannot run without DB') if __name__ == '__main__': START_TIME: int = round(number=datetime.now().timestamp()*1000) logging.info(msg=f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(msg=f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info(msg="Stream stopped")