import asyncio import json import logging import os import socket import traceback from datetime import datetime from typing import AsyncContextManager import numpy as np import pandas as pd import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore import valkey import websockets from dotenv import load_dotenv from sqlalchemy.ext.asyncio import create_async_engine import modules.db as db import modules.extended_db as extended_db ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = True USE_VK: bool = True VK_LAST_TRADE = 'fut_last_trade_extended' CON: AsyncContextManager VAL_KEY: valkey.Valkey ### Logging ### load_dotenv() LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Extended_Trades.log' ### CONSTANTS ### SYMBOL: str = 'ETH-USD' ### Globals ### ALLOW_SYMBOL_CHG: bool = True ### Websocket ### async def ws_stream(): global SYMBOL while True: CHANGE_SYMBOL: bool = False WSS_URL = f"wss://api.starknet.extended.exchange/stream.extended.exchange/v1/publicTrades/{SYMBOL}" async for websocket in websockets.connect(WSS_URL): if CHANGE_SYMBOL: break logging.info(f"Connected to {WSS_URL}") try: async for message in websocket: ### Update Symbol if Algo Outputs Change ### if ALLOW_SYMBOL_CHG: vk_get: str = VAL_KEY.get(name='fr_algo_working_symbol') # ty:ignore[invalid-assignment] if vk_get: best_symbol_by_exchange: dict = json.loads(s=vk_get) best_symbol: str = best_symbol_by_exchange['EXTEND']['symbol'] if best_symbol != SYMBOL: logging.info(f'Symbol Change: {SYMBOL} -> {best_symbol}') SYMBOL = best_symbol CHANGE_SYMBOL = True await websocket.close() break else: logging.warning('Extend Trades WS: "fr_algo_working_symbol" is None; not switching to new symbol...') ts_arrival = round(datetime.now().timestamp()*1000) if isinstance(message, str): try: data = json.loads(message) if data.get('data', None) is not None: # print(data) if data['seq'] == 1: # Skip first msg that has historical trades continue list_for_df = [] for t in data['data']: trade_obj = { 'sequence_id': data['seq'], 'timestamp_arrival': ts_arrival, 'timestamp_msg': data['ts'], 'timestamp_trade': t['T'], 'symbol': t['m'], 'side_taker': t['S'], 'trade_type': t['tT'], 'price': float(t['p']), 'qty': float(t['q']), 'trade_id': t['i'], 'is_buyer_mkt_maker': True if t['S']=='SELL' else False, } list_for_df.append(trade_obj) # VAL_KEY.set(VK_LAST_TRADE, json.dumps(trade_obj)) if USE_DB: await db.insert_df_to_mysql(table_name='fr_extended_mkt_trades', params=list_for_df, CON=CON) pass continue else: logging.info(f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError): logging.warning(f'Message not in JSON format, skipping: {message}') continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) continue except Exception as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) else: logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") raise NotImplementedError('Cannot run without VK') if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: await extended_db.create_fr_extended_mkt_trades(CON=CON) await ws_stream() else: logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") raise NotImplementedError('Cannot run without DB') if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Stream stopped")