import asyncio import json import logging import socket import traceback from datetime import datetime, timezone from typing import AsyncContextManager import math import numpy as np import pandas as pd import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore from sqlalchemy import text import websockets from sqlalchemy.ext.asyncio import create_async_engine import valkey import os from dotenv import load_dotenv import modules.extended_db as extended_db import modules.db as db ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = True USE_VK: bool = True VK_ORDERS = 'fr_extended_user_orders' VK_TRADES = 'fr_extended_user_trades' VK_BALANCES = 'fr_extended_user_balances' VK_POSITIONS = 'fr_extended_user_positions' CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Extended_User.log' ### CONSTANTS ### WSS_URL = "wss://api.starknet.extended.exchange/stream.extended.exchange/v1/account" API_KEY = os.getenv('EXTENDED_API_KEY') LOCAL_RECENT_UPDATES_LOOKBACK_SEC = 30 ### Globals ### LOCAL_RECENT_ORDERS: list = [] LOCAL_RECENT_TRADES: list = [] LOCAL_RECENT_BALANCES: list = [] LOCAL_RECENT_POSITIONS: list = [] def upsert_list_of_dicts_by_id(list_of_dicts, new_dict, id='id', seq_check_field: str | None = None): for index, item in enumerate(list_of_dicts): if item.get(id) == new_dict.get(id): if seq_check_field is not None: if item.get(seq_check_field) < new_dict.get(seq_check_field): logging.info('Skipping out of sequence msg') return list_of_dicts list_of_dicts[index] = new_dict return list_of_dicts list_of_dicts.append(new_dict) return list_of_dicts ### Websocket ### async def ws_stream(): global LOCAL_RECENT_ORDERS global LOCAL_RECENT_TRADES global LOCAL_RECENT_BALANCES global LOCAL_RECENT_POSITIONS async for websocket in websockets.connect(WSS_URL, extra_headers={'X-Api-Key': API_KEY}): logging.info(f"Connected to {WSS_URL}") try: async for message in websocket: ts_arrival = round(datetime.now().timestamp()*1000) if isinstance(message, str): try: data = json.loads(message) channel = data.get('type', None) if channel is not None: LOOKBACK_MIN_TS_MS = ts_arrival - (LOCAL_RECENT_UPDATES_LOOKBACK_SEC*1000) match channel: case 'ORDER': list_for_df = [] for o in data['data']['orders']: order_update = { 'sequence_id': data['seq'], 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['ts'], 'order_id': o['id'], 'account_id': o['accountId'], 'external_id': o.get('externalId', None), 'market': o['market'], 'type': o['type'], 'side': o['side'], 'status': o['status'], 'status_reason': o.get('statusReason', None), 'price': float(o.get('price', 0)), 'averagePrice': float(o.get('averagePrice', 0)), 'qty': float(o['qty']), 'filled_qty': float(o.get('filledQty', 0)), 'payed_fee': float(o.get('payedFee', 0)), # 'trigger_dict': o.get('trigger', None), 'tp_sl_type': o.get('tpSlType', None), # 'take_profit_dict': o.get('takeProfit', None), # 'stop_loss_dict': o.get('stopLoss', None), 'reduce_only': o.get('reduceOnly', False), 'post_only': o.get('postOnly', False), 'created_time_ts': o['createdTime'], 'updated_time_ts': o['updatedTime'], 'expire_time_ts': o['expireTime'], } list_for_df.append(order_update) LOCAL_RECENT_ORDERS = upsert_list_of_dicts_by_id(LOCAL_RECENT_ORDERS, order_update, id='order_id', seq_check_field='sequence_id') LOCAL_RECENT_ORDERS = [t for t in LOCAL_RECENT_ORDERS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_ORDERS) VAL_KEY.set(VK_ORDERS, VAL_KEY_OBJ) await db.insert_df_to_mysql(table_name='fr_extended_user_order', params=list_for_df, CON=CON) continue case 'TRADE': list_for_df = [] for t in data['data']['trades']: trade_update = { 'sequence_id': data['seq'], 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['ts'], 'trade_id': t['id'], 'account_id': t['accountId'], 'market': t['market'], 'order_id': t['orderId'], 'external_order_id': t.get('externalOrderId', None), 'side': t['side'], 'price': float(t['price']), 'qty': float(t['qty']), 'value': float(t['value']), 'fee': float(t['fee']), 'trade_type': t['tradeType'], 'created_time_ts': t['createdTime'], 'is_taker': t['isTaker'], } LOCAL_RECENT_TRADES = upsert_list_of_dicts_by_id(LOCAL_RECENT_TRADES, trade_update, id='trade_id', seq_check_field='sequence_id') LOCAL_RECENT_TRADES = [t for t in LOCAL_RECENT_TRADES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_TRADES) VAL_KEY.set(VK_TRADES, VAL_KEY_OBJ) await db.insert_df_to_mysql(table_name='fr_extended_user_trade', params=list_for_df, CON=CON) continue case 'BALANCE': balance_update = { 'sequence_id': data['seq'], 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['ts'], 'collateral_name': data['data']['balance']['collateralName'], 'balance': float(data['data']['balance']['balance']), 'equity': float(data['data']['balance']['equity']), 'available_for_trade': float(data['data']['balance']['availableForTrade']), 'available_for_withdrawal': float(data['data']['balance']['availableForWithdrawal']), 'unrealised_pnl': float(data['data']['balance']['unrealisedPnl']), 'initial_margin': float(data['data']['balance']['initialMargin']), 'margin_ratio': float(data['data']['balance']['marginRatio']), 'updated_time_ts': data['data']['balance']['updatedTime'], 'exposure': float(data['data']['balance']['exposure']), 'leverage': float(data['data']['balance']['leverage']), } LOCAL_RECENT_BALANCES = upsert_list_of_dicts_by_id(LOCAL_RECENT_BALANCES, balance_update, id='collateral_name', seq_check_field='sequence_id') LOCAL_RECENT_BALANCES = [t for t in LOCAL_RECENT_BALANCES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_BALANCES) VAL_KEY.set(VK_BALANCES, VAL_KEY_OBJ) await db.insert_df_to_mysql(table_name='fr_extended_user_balance', params=balance_update, CON=CON) continue case 'POSITION': list_for_df = [] for p in data['data']['positions']: position_update = { 'sequence_id': data['seq'], 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['ts'], 'position_id': p['id'], 'account_id': p['accountId'], 'market': p['market'], 'side': p['side'], 'leverage': float(p['leverage']), 'size': float(p['size']), 'value': float(p['value']), 'open_price': float(p['openPrice']), 'mark_price': float(p['markPrice']), 'liquidation_price': float(p['liquidationPrice']), 'margin': float(p['margin']), 'unrealised_pnl': float(p['unrealisedPnl']), 'realised_pnl': float(p['realisedPnl']), 'tp_trigger_price': float(p.get('tpTriggerPrice', 0)), 'tp_limit_price': float(p.get('tpLimitPrice', 0)), 'sl_trigger_price': float(p.get('slTriggerPrice', 0)), 'sl_limit_price': float(p.get('slLimitPrice', 0)), 'adl_percentile': p['adl'], # closer to 100 means higher chance of auto-deleveraging 'created_at_ts': p['createdAt'], 'updated_at_ts': p['updatedAt'], } LOCAL_RECENT_POSITIONS = upsert_list_of_dicts_by_id(LOCAL_RECENT_POSITIONS, position_update, id='position_id', seq_check_field='sequence_id') LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_POSITIONS) VAL_KEY.set(VK_POSITIONS, VAL_KEY_OBJ) await db.insert_df_to_mysql(table_name='fr_extended_user_position', params=list_for_df, CON=CON) continue case _: logging.warning(f'UNMATCHED OTHER MSG: {data}') else: logging.info(f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError): logging.warning(f'Message not in JSON format, skipping: {message}') continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) continue except Exception as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) else: VAL_KEY = None logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: await extended_db.create_fr_extended_user_balance(CON=CON) await extended_db.create_fr_extended_user_order(CON=CON) await extended_db.create_fr_extended_user_position(CON=CON) await extended_db.create_fr_extended_user_trade(CON=CON) await ws_stream() else: CON = None logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") await ws_stream() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Stream stopped")