import asyncio import json import logging import socket import traceback from datetime import datetime from typing import AsyncContextManager import numpy as np import pandas as pd import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore from sqlalchemy import text import websockets from sqlalchemy.ext.asyncio import create_async_engine import valkey import os from dotenv import load_dotenv import modules.aster_auth as aster_auth import modules.aster_db as aster_db import modules.db as db ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = True USE_VK: bool = True VK_ORDERS_TRADES = 'fr_aster_user_orders' VK_MARGIN_CALLS = 'fr_aster_user_margin_calls' VK_BALANCES = 'fr_aster_user_balances' VK_POSITIONS = 'fr_aster_user_positions' CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Aster_User.log' ### CONSTANTS ### WSS_URL = "wss://fstream.asterdex.com/ws/" LOCAL_RECENT_UPDATES_LOOKBACK_SEC = 30 ### Globals ### LISTEN_KEY: str | None = None LISTEN_KEY_LAST_UPDATE_TS_S: int = 0 LISTEN_KEY_PUT_INTERVAL_SEC = 1800 LOCAL_RECENT_ORDERS: list = [] LOCAL_RECENT_MARGIN_CALLS: list = [] LOCAL_RECENT_BALANCES: list = [] LOCAL_RECENT_POSITIONS: list = [] def upsert_list_of_dicts_by_id(list_of_dicts, new_dict, id='id'): for index, item in enumerate(list_of_dicts): if item.get(id) == new_dict.get(id): list_of_dicts[index] = new_dict return list_of_dicts list_of_dicts.append(new_dict) return list_of_dicts def get_new_listen_key() -> str: global LISTEN_KEY_LAST_UPDATE_TS_S listen_key_request = { "url": "/fapi/v3/listenKey", "method": "POST", "params": {} } r = aster_auth.post_authenticated_url(listen_key_request) listen_key = r.get('listenKey', None) print(f'LISTEN KEY: {listen_key}') if listen_key is not None: LISTEN_KEY_LAST_UPDATE_TS_S = round(datetime.now().timestamp()) return listen_key else: raise ValueError(f'Listen Key is None; Failed to Update. response: {r}') async def listen_key_interval(): global LISTEN_KEY while True: await asyncio.sleep(LISTEN_KEY_PUT_INTERVAL_SEC) LISTEN_KEY = get_new_listen_key() ### Websocket ### async def ws_stream(): global LISTEN_KEY global LOCAL_RECENT_ORDERS global LOCAL_RECENT_MARGIN_CALLS global LOCAL_RECENT_BALANCES global LOCAL_RECENT_POSITIONS LISTEN_KEY = get_new_listen_key() async for websocket in websockets.connect(WSS_URL+LISTEN_KEY): logging.info(f"Connected to {WSS_URL}") asyncio.create_task(listen_key_interval()) try: async for message in websocket: ts_arrival = round(datetime.now().timestamp()*1000) if isinstance(message, str): try: data = json.loads(message) channel = data.get('e', None) if channel is not None: LOOKBACK_MIN_TS_MS = ts_arrival - (LOCAL_RECENT_UPDATES_LOOKBACK_SEC*1000) match channel: case 'ORDER_TRADE_UPDATE': logging.info(f'ORDER_TRADE_UPDATE: {data}') new_order_update = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['E'], 'timestamp_transaction': data['T'], 'symbol': data['o']["s"], # "BTCUSDT", // Symbol 'client_order_id': data['o']["c"], # "TEST", // Client Order Id 'side': data['o']["S"], # "SELL", // Side 'order_type': data['o']["o"], # "TRAILING_STOP_MARKET", // Order Type 'time_in_force': data['o']["f"], # "GTC", // Time in Force 'original_qty': float(data['o']["q"]), # "0.001", // Original Quantity 'original_price': float(data['o']["p"]), # "0", // Original Price 'avg_price': float(data['o']["ap"]), # :"0", // Average Price 'stop_price': float(data['o'].get("sp", 0)), # :"7103.04", // Stop Price. Please ignore with TRAILING_STOP_MARKET order 'execution_type': data['o']["x"], # "NEW", // Execution Type 'order_status': data['o']["X"], # "NEW", // Order Status 'order_id': data['o']["i"], # 8886774, // Order Id 'last_filled_qty': float(data['o']["l"]), # "0", // Order Last Filled Quantity 'filled_accumulated_qty': float(data['o']["z"]), # "0", // Order Filled Accumulated Quantity 'last_filled_price': float(data['o']["L"]), # "0", // Last Filled Price 'commission_asset': data['o'].get("N", None), # "USDT", // Commission Asset, will not push if no commission 'commission': float(data['o'].get("n",0)), # "0", // Commission, will not push if no commission 'order_trade_time_ts': data['o']["T"], # 1568879465651, // Order Trade Time 'trade_id': data['o']["t"], # 0, // Trade Id 'bid_notional': float(data['o']["b"]), # "0", // Bids Notional 'ask_notional': float(data['o']["a"]), # "9.91", // Ask Notional 'trade_is_maker': data['o']["m"], # false, // Is this trade the maker side? 'trade_is_reduce_only': data['o']["R"], # false, // Is this reduce only 'stop_px_working_type': data['o']["wt"], # :"CONTRACT_PRICE", // Stop Price Working Type 'original_order_type': data['o']["ot"], # :"TRAILING_STOP_MARKET", // Original Order Type 'position_side': data['o']["ps"], # :"LONG", // Position Side 'pushed_w_conditional_order': bool(data['o'].get("cp", False)), # :false, // If Close-All, pushed with conditional order 'activation_price': float(data['o'].get("AP", 0)), # :"7476.89", // Activation Price, only puhed with TRAILING_STOP_MARKET order 'callback_rate': float(data['o'].get("cr", 0)), # :"5.0", // Callback Rate, only puhed with TRAILING_STOP_MARKET order 'realized_profit': float(data['o']["rp"]), # :"0" // Realized Profit of the trade } LOCAL_RECENT_ORDERS = upsert_list_of_dicts_by_id(LOCAL_RECENT_ORDERS, new_order_update, id='client_order_id') LOCAL_RECENT_ORDERS = [t for t in LOCAL_RECENT_ORDERS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_ORDERS) VAL_KEY.set(VK_ORDERS_TRADES, VAL_KEY_OBJ) await db.insert_df_to_mysql(table_name='fr_aster_user_order_trade', params=new_order_update, CON=CON) continue case 'MARGIN_CALL': logging.info(f'MARGIN_CALL: {data}') list_for_df = [] for p in list(data['p']): margin_call_update = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['E'], 'cross_wallet_balance': float(data.get('cw', 0)), 'symbol': p["s"], # "ETHUSDT", // Symbol 'position_side': p["ps"], # :"LONG", // Position Side 'position_amount': float(p["pa"]), # :"1.327", // Position Amount 'margin_type': p["mt"], # :"CROSSED", // Margin Type 'isolated_wallet': float(p.get("iw", 0)), # :"0", // Isolated Wallet (if isolated position) 'mark_price': float(p["mp"]), # :"187.17127", // Mark Price 'unrealized_pnl': float(p["up"]), # :"-1.166074", // Unrealized PnL 'maint_margin_required': float(p["mm"]), # :"1.614445" // Maintenance Margin Required } list_for_df.append(margin_call_update) LOCAL_RECENT_MARGIN_CALLS = upsert_list_of_dicts_by_id(LOCAL_RECENT_MARGIN_CALLS, margin_call_update, id='symbol') LOCAL_RECENT_MARGIN_CALLS = [t for t in LOCAL_RECENT_MARGIN_CALLS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_MARGIN_CALLS) VAL_KEY.set(VK_MARGIN_CALLS, VAL_KEY_OBJ) await db.insert_df_to_mysql(table_name='fr_aster_user_margin', params=list_for_df, CON=CON) continue case 'ACCOUNT_UPDATE': logging.info(f'ACCOUNT_UPDATE: {data}') list_for_df_bal = [] list_for_df_pos = [] ### Balance Updates ### if len(list(data['a']['B'])) > 0: for b in list(data['a']['B']): balance_update = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['E'], 'timestamp_transaction': data['T'], 'event_reason_type': data['a']["m"], 'asset': b['a'], 'wallet_balance': float(b['wb']), 'cross_wallet_balance': float(b.get('cw', 0)), 'balance_change_excl_pnl_comms': float(b['bc']), } list_for_df_bal.append(balance_update) LOCAL_RECENT_BALANCES = upsert_list_of_dicts_by_id(LOCAL_RECENT_BALANCES, balance_update, id='asset') LOCAL_RECENT_BALANCES = [t for t in LOCAL_RECENT_BALANCES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY.set(VK_BALANCES, json.dumps(LOCAL_RECENT_BALANCES)) ### Position Updates ### if len(list(data['a']['P'])) > 0: for p in list(data['a']['P']): position_update = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['E'], 'timestamp_transaction': data['T'], 'event_reason_type': data['a']["m"], 'symbol': p['s'], 'position_amount': float(p['pa']), 'entry_price': float(p['ep']), 'accumulated_realized_pre_fees': float(p['cr']), 'unrealized_pnl': float(p['up']), 'margin_type': p['mt'], 'isolated_wallet': float(p.get('iw', 0)), 'position_side': p['ps'], } list_for_df_pos.append(position_update) LOCAL_RECENT_POSITIONS = upsert_list_of_dicts_by_id(LOCAL_RECENT_POSITIONS, position_update, id='symbol') LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY.set(VK_POSITIONS, json.dumps(LOCAL_RECENT_POSITIONS)) if balance_update: await db.insert_df_to_mysql(table_name='fr_aster_user_account_bal', params=list_for_df_bal, CON=CON) if position_update: await db.insert_df_to_mysql(table_name='fr_aster_user_account_pos', params=list_for_df_bal, CON=CON) continue case 'listenKeyExpired': raise('Listen Key Has Expired; Failed to Update Properly. Restarting.') case _: logging.warning(f'UNMATCHED OTHER MSG: {data}') else: logging.info(f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError): logging.warning(f'Message not in JSON format, skipping: {message}') continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) continue except Exception as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) else: VAL_KEY = None logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: await aster_db.create_fr_aster_user_order_trade_table(CON=CON) await aster_db.create_fr_aster_user_margin_table(CON=CON) await aster_db.create_fr_aster_user_account_bal(CON=CON) await aster_db.create_fr_aster_user_account_pos(CON=CON) await ws_stream() else: CON = None logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") await ws_stream() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Stream stopped")