import asyncio import json import logging import socket import traceback from datetime import datetime from typing import AsyncContextManager import time import numpy as np import pandas as pd import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore from sqlalchemy import text import websockets from sqlalchemy.ext.asyncio import create_async_engine import valkey import os from dotenv import load_dotenv import modules.db as db import modules.aster_db as aster_db import sys ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = True USE_VK: bool = True CON: AsyncContextManager VAL_KEY: valkey.Valkey VK_FUND_RATE = 'fund_rate_aster' VK_TICKER = 'fut_ticker_aster' VK_LAST_TRADE = 'fut_last_trade_aster' ### Logging ### load_dotenv() LOG_FILEPATH: str = f'{os.getenv(key="LOGS_PATH")}/Fund_Rate_Aster.log' ### CONSTANTS ### SYMBOL: str = 'ENAUSDT' STREAM_MARKPRICE: str = '!markPrice@arr@1s' # STREAM_MARKPRICE: str = f'{SYMBOL.lower()}@markPrice@1s' STREAM_BOOKTICKER: str = f'{SYMBOL.lower()}@bookTicker' STREAM_TRADES: str = f'{SYMBOL.lower()}@aggTrade' ### Globals ### WSS_URL: str = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}/{STREAM_BOOKTICKER}/{STREAM_TRADES}" ALLOW_SYMBOL_CHG: bool = True ### Funcs ### async def subscribe_streams(websocket, streams: list[str]) -> None: logging.info(f'Trying to sub: {streams}') msg = { "method": "SUBSCRIBE", "params": streams, "id": int(round(number=datetime.now().timestamp()*1000)) } await websocket.send(json.dumps(obj=msg)) logging.info(f'Success sub: {streams}') async def unsubscribe_streams(websocket, streams: list[str]) -> None: logging.info(f'Trying to unsub: {streams}') msg = { "method": "UNSUBSCRIBE", "params": streams, "id": int(round(number=datetime.now().timestamp()*1000)) } await websocket.send(json.dumps(obj=msg)) logging.info(f'Success unsub: {streams}') ### Websocket ### async def ws_stream(): global SYMBOL global STREAM_MARKPRICE global STREAM_BOOKTICKER global STREAM_TRADES async for websocket in websockets.connect(WSS_URL, ping_interval=5): logging.info(msg=f"Connected to {WSS_URL}") try: async for message in websocket: ### Update Symbol if Algo Outputs Change ### if ALLOW_SYMBOL_CHG: fr_algo_working_symbol = VAL_KEY.get(name='fr_algo_working_symbol') if not fr_algo_working_symbol: logging.critical(f'fr_algo_working_symbol is empty - killing: {fr_algo_working_symbol}') sys.exit(1) best_symbol_by_exchange: dict = json.loads(fr_algo_working_symbol) # ty:ignore[invalid-argument-type] best_symbol: str = best_symbol_by_exchange['ASTER']['symbol'] if best_symbol != SYMBOL: logging.info(f'Symbol Change: {SYMBOL} -> {best_symbol}') SYMBOL = best_symbol await unsubscribe_streams(websocket = websocket, streams=[STREAM_BOOKTICKER,STREAM_TRADES]) # STREAM_MARKPRICE = f'{SYMBOL.lower()}@markPrice@1s' STREAM_BOOKTICKER = f'{SYMBOL.lower()}@bookTicker' STREAM_TRADES = f'{SYMBOL.lower()}@aggTrade' await subscribe_streams(websocket = websocket, streams=[STREAM_BOOKTICKER,STREAM_TRADES]) continue ts_arrival = round(datetime.now().timestamp()*1000) if isinstance(message, str): try: data = json.loads(message) channel = data.get('stream', None) if channel is not None: match channel: case c if c == STREAM_MARKPRICE: if data.get('data'): VAL_KEY.set('fund_rate_aster_all', json.dumps(data['data'])) else: logging.warning(f'Data["data"] is None: {data}') single_ticker_fr = [d for d in data['data'] if d.get('s')==SYMBOL] if single_ticker_fr: d = single_ticker_fr[0] VAL_KEY_OBJ = json.dumps({ 'timestamp_arrival': ts_arrival, 'timestamp_msg': d['E'], 'symbol': d['s'], 'mark_price': d['p'], 'index_price': d['i'], 'estimated_settle_price': d['P'], 'funding_rate': d['r'], 'next_funding_time_ts_ms': d['T'], }) VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ) # print(f'MP: {d}') continue case c if c == STREAM_BOOKTICKER: # print(f'BT: {data}') VAL_KEY_OBJ = json.dumps({ 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['data']['E'], 'timestamp_transaction': data['data']['T'], 'orderbook_update_id': data['data']['u'], 'symbol': data['data']['s'], 'best_bid_px': data['data']['b'], 'best_bid_qty': data['data']['B'], 'best_ask_px': data['data']['a'], 'best_ask_qty': data['data']['A'], }) VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) continue case c if c == STREAM_TRADES: # # print(f'MKT_TRADE: {data}') # trade_obj = { # 'timestamp_arrival': ts_arrival, # 'timestamp_msg': data['data']['E'], # 'timestamp_trade': data['data']['T'], # 'symbol': data['data']['s'], # 'aggregate_trade_id': data['data']['a'], # 'price': float(data['data']['p']), # 'qty': float(data['data']['q']), # 'first_trade_id': data['data']['f'], # 'last_trade_id': data['data']['l'], # 'is_buyer_mkt_maker': bool(data['data']['m']), # } # # VAL_KEY.set(VK_LAST_TRADE, json.dumps(trade_obj)) # if USE_DB: # await db.insert_df_to_mysql(table_name='fr_aster_mkt_trades', params=trade_obj, CON=CON) continue case _: logging.warning(f'UNMATCHED OTHER MSG: {data}') else: logging.info(f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError): logging.warning(f'Message not in JSON format, skipping: {message}') continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) continue except Exception as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) else: logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") raise NotImplementedError('Cannot run without VK') if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: await aster_db.create_fr_aster_mkt_trades(CON=CON) await ws_stream() else: logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") raise NotImplementedError('Cannot run without DB') if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Stream stopped")