import asyncio import json import logging import socket import traceback from datetime import datetime from typing import AsyncContextManager import numpy as np import pandas as pd import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore from sqlalchemy import text import websockets from sqlalchemy.ext.asyncio import create_async_engine import valkey import os from dotenv import load_dotenv ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = True USE_VK: bool = True VK_CHANNEL = 'poly_binance_btcusd' CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_Binance_Trades.log' ### Globals ### WSS_URL = "wss://stream.binance.com:9443/ws/BTCUSDT@aggTrade" HIST_TRADES = np.empty((0, 3)) HIST_TRADES_LOOKBACK_SEC = 5 ### Database Funcs ### async def create_rtds_btcusd_table( CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: if CON is None: logging.info("NO DB CONNECTION, SKIPPING Create Statements") else: if engine == 'mysql': logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') await CON.execute(text(""" CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( timestamp_arrival BIGINT, timestamp_msg BIGINT, timestamp_value BIGINT, value DOUBLE, qty DOUBLE ); """)) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') async def insert_rtds_btcusd_table( timestamp_arrival: int, timestamp_msg: int, timestamp_value: int, value: float, qty: float, CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: params={ 'timestamp_arrival': timestamp_arrival, 'timestamp_msg': timestamp_msg, 'timestamp_value': timestamp_value, 'value': value, 'qty': qty, } if CON is None: logging.info("NO DB CONNECTION, SKIPPING Insert Statements") else: if engine == 'mysql': await CON.execute(text(""" INSERT INTO binance_btcusd_trades ( timestamp_arrival, timestamp_msg, timestamp_value, value, qty ) VALUES ( :timestamp_arrival, :timestamp_msg, :timestamp_value, :value, :qty ) """), parameters=params ) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') ### Websocket ### async def binance_trades_stream(): global HIST_TRADES async for websocket in websockets.connect(WSS_URL): logging.info(f"Connected to {WSS_URL}") subscribe_msg = { "method": "SUBSCRIBE", "params": ["btcusdt@aggTrade"], "id": 1 } await websocket.send(json.dumps(subscribe_msg)) try: async for message in websocket: ts_arrival = round(datetime.now().timestamp()*1000) if isinstance(message, str): try: data = json.loads(message) if data.get('T', None) is not None: timestamp_msg = data['E'] timestamp_value = data['T'] last_px = float(data['p']) qty = float(data['q']) # print(f'🤑 BTC Binance Last Px: {last_px:_.4f}; TS: {pd.to_datetime(data['T'], unit='ms')}') HIST_TRADES = np.append(HIST_TRADES, np.array([[timestamp_value, last_px, qty]]), axis=0) hist_trades_lookback_ts_ms = round(datetime.now().timestamp() - HIST_TRADES_LOOKBACK_SEC)*1000 HIST_TRADES = HIST_TRADES[HIST_TRADES[:, 0] >= hist_trades_lookback_ts_ms] VAL_KEY_OBJ = json.dumps({ 'timestamp_arrival': ts_arrival, 'timestamp_msg': timestamp_msg, 'timestamp_value': timestamp_value, 'value': last_px, 'qty': qty, 'hist_trades': HIST_TRADES.tolist() }) # VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ) VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ) await insert_rtds_btcusd_table( CON=CON, timestamp_arrival=ts_arrival, timestamp_msg=timestamp_msg, timestamp_value=timestamp_value, value=last_px, qty=qty, ) else: logging.info(f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError): logging.warning(f'Message not in JSON format, skipping: {message}') continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) continue except Exception as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) # published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}") else: VAL_KEY = None logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') async with engine.connect() as CON: await create_rtds_btcusd_table(CON=CON) await binance_trades_stream() else: CON = None logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") await binance_trades_stream() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Stream stopped")