import asyncio import json import logging import socket import traceback from datetime import datetime from typing import AsyncContextManager import numpy as np import pandas as pd import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore from sqlalchemy import text import websockets from sqlalchemy.ext.asyncio import create_async_engine import valkey import os from dotenv import load_dotenv ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = False USE_VK: bool = True # VK_FUND_RATE = 'fund_rate_apex' VK_TICKER = 'fut_ticker_apex' CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Apex.log' ### CONSTANTS ### PING_INTERVAL_SEC = 15 ### Globals ### WSS_URL = "wss://quote.omni.apex.exchange/realtime_public?v=2×tamp=" TICKER_SNAPSHOT_DATA_LAST: dict = {} # HIST_TRADES = np.empty((0, 3)) # HIST_TRADES_LOOKBACK_SEC = 6 # ### Database Funcs ### # async def create_rtds_btcusd_table( # CON: AsyncContextManager, # engine: str = 'mysql', # mysql | duckdb # ) -> None: # if CON is None: # logging.info("NO DB CONNECTION, SKIPPING Create Statements") # else: # if engine == 'mysql': # logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') # await CON.execute(text(""" # CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( # timestamp_arrival BIGINT, # timestamp_msg BIGINT, # timestamp_value BIGINT, # value DOUBLE, # qty DOUBLE # ); # """)) # await CON.commit() # else: # raise ValueError('Only MySQL engine is implemented') # async def insert_rtds_btcusd_table( # timestamp_arrival: int, # timestamp_msg: int, # timestamp_value: int, # value: float, # qty: float, # CON: AsyncContextManager, # engine: str = 'mysql', # mysql | duckdb # ) -> None: # params={ # 'timestamp_arrival': timestamp_arrival, # 'timestamp_msg': timestamp_msg, # 'timestamp_value': timestamp_value, # 'value': value, # 'qty': qty, # } # if CON is None: # logging.info("NO DB CONNECTION, SKIPPING Insert Statements") # else: # if engine == 'mysql': # await CON.execute(text(""" # INSERT INTO binance_btcusd_trades # ( # timestamp_arrival, # timestamp_msg, # timestamp_value, # value, # qty # ) # VALUES # ( # :timestamp_arrival, # :timestamp_msg, # :timestamp_value, # :value, # :qty # ) # """), # parameters=params # ) # await CON.commit() # else: # raise ValueError('Only MySQL engine is implemented') ### Websocket ### async def heartbeat(ws): while True: await asyncio.sleep(PING_INTERVAL_SEC) logging.info("SENDING PING...") ping_msg = {"op":"ping","args":[ str(round(datetime.now().timestamp()*1000)) ]} await ws.send(json.dumps(ping_msg)) async def ws_stream(): global TICKER_SNAPSHOT_DATA_LAST async for websocket in websockets.connect(f'{WSS_URL}{round(datetime.now().timestamp())}'): logging.info(f"Connected to {WSS_URL}") asyncio.create_task(heartbeat(ws=websocket)) subscribe_msg = { "op": "subscribe", "args": ["instrumentInfo.H.ETHUSDT"] } await websocket.send(json.dumps(subscribe_msg)) try: async for message in websocket: ts_arrival = round(datetime.now().timestamp()*1000) print(message) # if isinstance(message, str): # try: # data = json.loads(message) # if data.get('op', None) == 'ping': # pong_msg = {"op":"pong","args":[ str(round(datetime.now().timestamp()*1000)) ]} # logging.info(f'RECEIVED PING: {data}; SENDING PONG: {pong_msg}') # await websocket.send(json.dumps(pong_msg)) # continue # elif data.get('success', None): # # logging.info('CONNECTION SUCCESFUL RESP MSG') # continue # msg_type = data.get('type', None) # if msg_type is not None: # match msg_type: # case 'snapshot': # TICKER_SNAPSHOT_DATA_LAST = data['data'] # nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000) # VAL_KEY_OBJ = json.dumps({ # 'timestamp_arrival': ts_arrival, # 'timestamp_msg': data['ts'], # 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'], # 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']), # 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']), # 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']), # 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']), # 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']), # 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']), # 'nextFundingTime_ts_ms': nextFundingTime_ts, # }) # VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) # continue # case 'delta': # TICKER_SNAPSHOT_DATA_LAST.update(data['data']) # nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000) # VAL_KEY_OBJ = json.dumps({ # 'timestamp_arrival': ts_arrival, # 'timestamp_msg': data['ts'], # 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'], # 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']), # 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']), # 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']), # 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']), # 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']), # 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']), # 'nextFundingTime_ts_ms': nextFundingTime_ts, # }) # VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) # continue # case _: # logging.warning(f'UNMATCHED OTHER MSG: {data}') # else: # logging.info(f'Initial or unexpected data struct, skipping: {data}') # continue # except (json.JSONDecodeError, ValueError): # logging.warning(f'Message not in JSON format, skipping: {message}') # continue # else: # raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) continue except Exception as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) else: VAL_KEY = None logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') async with engine.connect() as CON: # await create_rtds_btcusd_table(CON=CON) await ws_stream() else: CON = None logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") await ws_stream() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Stream stopped")