import asyncio import json import logging import socket import traceback from datetime import datetime from typing import AsyncContextManager import numpy as np import pandas as pd import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore from sqlalchemy import text import websockets from sqlalchemy.ext.asyncio import create_async_engine import valkey ### Allow only ipv4 ### def allowed_gai_family(): return socket.AF_INET urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = True USE_VK: bool = True VK_CHANNEL = 'poly_coinbase_btcusd' CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### LOG_FILEPATH: str = '/root/logs/Polymarket_coinbase_Trades.log' ### Globals ### WSS_URL = "wss://ws-feed.exchange.coinbase.com" # HIST_TRADES = np.empty((0, 2)) ### Database Funcs ### async def create_rtds_btcusd_table( CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: if CON is None: logging.info("NO DB CONNECTION, SKIPPING Create Statements") else: if engine == 'mysql': logging.info('Creating Table if Does Not Exist: coinbase_btcusd_trades') await CON.execute(text(""" CREATE TABLE IF NOT EXISTS coinbase_btcusd_trades ( timestamp_arrival BIGINT, timestamp_msg BIGINT, timestamp_value BIGINT, value DOUBLE, qty DOUBLE, side VARCHAR(8) ); """)) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') async def insert_rtds_btcusd_table( timestamp_arrival: int, timestamp_msg: int, timestamp_value: int, value: float, qty: float, side: str, CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: params={ 'timestamp_arrival': timestamp_arrival, 'timestamp_msg': timestamp_msg, 'timestamp_value': timestamp_value, 'value': value, 'qty': qty, 'side': side, } if CON is None: logging.info("NO DB CONNECTION, SKIPPING Insert Statements") else: if engine == 'mysql': await CON.execute(text(""" INSERT INTO coinbase_btcusd_trades ( timestamp_arrival, timestamp_msg, timestamp_value, value, qty, side ) VALUES ( :timestamp_arrival, :timestamp_msg, :timestamp_value, :value, :qty, :side ) """), parameters=params ) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') ### Websocket ### async def coinbase_trades_stream(): global HIST_TRADES async with websockets.connect(WSS_URL) as websocket: logging.info(f"Connected to {WSS_URL}") subscribe_msg = { "type": "subscribe", "product_ids": ["BTC-USD"], "channels": [ { "name": "ticker", "product_ids": ["BTC-USD"] } ] } await websocket.send(json.dumps(subscribe_msg)) try: async for message in websocket: if isinstance(message, str) or isinstance(message, bytes): try: data = json.loads(message) if data.get('price', None) is not None: ts_arrival = round(datetime.now().timestamp()*1000) ts_msg = round(datetime.strptime(data['time'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()*1000) ts_value = ts_msg last_px = float(data['price']) qty = float(data['last_size']) side = data['side'] print(f'🤑 BTC Coinbase Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};') if USE_VK: VAL_KEY_OBJ = json.dumps({ 'timestamp_arrival': ts_arrival, 'timestamp_msg': ts_msg, 'timestamp_value': ts_value, 'value': last_px, 'qty': qty, 'side': side, }) VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ) VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ) if USE_DB: await insert_rtds_btcusd_table( CON=CON, timestamp_arrival=ts_arrival, timestamp_msg=ts_msg, timestamp_value=ts_value, value=last_px, qty=qty, side=side, ) # elif data.get('op'): # if data['op'] == 'PING': # pong = {"op": "PONG", "timestamp": ts_arrival} # await websocket.send(json.dumps(pong)) # logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}') else: logging.info(f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError) as e: logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}') continue else: raise ValueError(f'Type: {type(message)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) except Exception as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) async def main(): global VAL_KEY global CON if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}") else: VAL_KEY = None logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') async with engine.connect() as CON: await create_rtds_btcusd_table(CON=CON) await coinbase_trades_stream() else: CON = None logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") await coinbase_trades_stream() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Stream stopped")