import asyncio import json import logging import socket import traceback from datetime import datetime from typing import AsyncContextManager import numpy as np import pandas as pd import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore from sqlalchemy import text import websockets from sqlalchemy.ext.asyncio import create_async_engine import valkey import os from dotenv import load_dotenv import modules.db as db import modules.aster_db as aster_db ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = True USE_VK: bool = True VK_FUND_RATE = 'fund_rate_aster' VK_TICKER = 'fut_ticker_aster' VK_LAST_TRADE = 'fut_last_trade_aster' CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Aster.log' ### CONSTANTS ### SYMBOL: str = 'ETHUSDT' STREAM_MARKPRICE: str = f'{SYMBOL.lower()}@markPrice@1s' STREAM_BOOKTICKER: str = f'{SYMBOL.lower()}@bookTicker' STREAM_TRADES: str = f'{SYMBOL.lower()}@aggTrade' ### Globals ### WSS_URL = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}/{STREAM_BOOKTICKER}/{STREAM_TRADES}" ### Websocket ### async def ws_stream(): async for websocket in websockets.connect(WSS_URL): logging.info(f"Connected to {WSS_URL}") try: async for message in websocket: ts_arrival = round(datetime.now().timestamp()*1000) if isinstance(message, str): try: data = json.loads(message) channel = data.get('stream', None) if channel is not None: match channel: case c if c == STREAM_MARKPRICE: # print(f'MP: {data}') VAL_KEY_OBJ = json.dumps({ 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['data']['E'], 'symbol': data['data']['s'], 'mark_price': data['data']['p'], 'index_price': data['data']['i'], 'estimated_settle_price': data['data']['P'], 'funding_rate': data['data']['r'], 'next_funding_time_ts_ms': data['data']['T'], }) VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ) continue case c if c == STREAM_BOOKTICKER: # print(f'BT: {data}') VAL_KEY_OBJ = json.dumps({ 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['data']['E'], 'timestamp_transaction': data['data']['T'], 'orderbook_update_id': data['data']['u'], 'symbol': data['data']['s'], 'best_bid_px': data['data']['b'], 'best_bid_qty': data['data']['B'], 'best_ask_px': data['data']['a'], 'best_ask_qty': data['data']['A'], }) VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) continue case c if c == STREAM_TRADES: # print(f'MKT_TRADE: {data}') trade_obj = { 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['data']['E'], 'timestamp_trade': data['data']['T'], 'symbol': data['data']['s'], 'aggregate_trade_id': data['data']['a'], 'price': float(data['data']['p']), 'qty': float(data['data']['q']), 'first_trade_id': data['data']['f'], 'last_trade_id': data['data']['l'], 'is_buyer_mkt_maker': bool(data['data']['m']), } # VAL_KEY.set(VK_LAST_TRADE, json.dumps(trade_obj)) if USE_DB: await db.insert_df_to_mysql(table_name='fr_aster_mkt_trades', params=trade_obj, CON=CON) continue case _: logging.warning(f'UNMATCHED OTHER MSG: {data}') else: logging.info(f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError): logging.warning(f'Message not in JSON format, skipping: {message}') continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) continue except Exception as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) else: VAL_KEY = None logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: await aster_db.create_fr_aster_mkt_trades(CON=CON) await ws_stream() else: CON = None logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") await ws_stream() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Stream stopped")