import asyncio import json import logging import os from datetime import datetime from typing import AsyncContextManager import numpy as np import valkey import websockets from dotenv import load_dotenv from py_clob_client.client import ClobClient from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine ### Database ### USE_DB: bool = True USE_VK: bool = True LOCAL_LIVE_ORDERS = [] LOCAL_RECENT_TRADES = [] LOCAL_RECENT_TRADES_LOOKBACK_SEC = 10 VK_LIVE_ORDERS = 'poly_user_orders' VK_RECENT_TRADES = 'poly_user_trades' CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_User.log' # https://docs.polymarket.com/market-data/websocket/user-channel WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/user" API_CREDS = {} HIST_TRADES = np.empty((0, 2)) TARGET_PX = 0 ### Database Funcs ### async def create_user_trades_table( CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: if CON is None: logging.info("NO DB CONNECTION, SKIPPING Create Statements") else: if engine == 'mysql': logging.info('Creating Table if Does Not Exist: user_stream_trades') await CON.execute(text(""" CREATE TABLE IF NOT EXISTS user_stream_trades ( -- event_type VARCHAR(8), timestamp_arrival BIGINT, type VARCHAR(20), id VARCHAR(100), taker_order_id VARCHAR(100), market VARCHAR(100), asset_id VARCHAR(100), side VARCHAR(8), size DOUBLE, price DOUBLE, fee_rate_bps DOUBLE, status VARCHAR(20), matchtime BIGINT, last_update BIGINT, outcome VARCHAR(20), owner VARCHAR(100), trade_owner VARCHAR(100), maker_address VARCHAR(100), transaction_hash VARCHAR(100), bucket_index INT, maker_orders JSON NULL, trader_side VARCHAR(8), timestamp BIGINT ); """)) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') async def insert_user_trades_table( params: dict, CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: if CON is None: logging.info("NO DB CONNECTION, SKIPPING Insert Statements") else: if engine == 'mysql': await CON.execute(text(""" INSERT INTO user_stream_trades ( timestamp_arrival, type, id, taker_order_id, market, asset_id, side, size, price, fee_rate_bps, status, matchtime, last_update, outcome, owner, trade_owner, maker_address, transaction_hash, bucket_index, maker_orders, trader_side, timestamp ) VALUES ( :timestamp_arrival, :type, :id, :taker_order_id, :market, :asset_id, :side, :size, :price, :fee_rate_bps, :status, :matchtime, :last_update, :outcome, :owner, :trade_owner, :maker_address, :transaction_hash, :bucket_index, :maker_orders, :trader_side, :timestamp ) """), parameters=params ) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') async def create_user_orders_table( CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: if CON is None: logging.info("NO DB CONNECTION, SKIPPING Create Statements") else: if engine == 'mysql': logging.info('Creating Table if Does Not Exist: user_stream_orders') await CON.execute(text(""" CREATE TABLE IF NOT EXISTS user_stream_orders ( -- event_type VARCHAR(8), timestamp_arrival BIGINT, id VARCHAR(100), owner VARCHAR(100), market VARCHAR(100), asset_id VARCHAR(100), side VARCHAR(8), order_owner VARCHAR(100), original_size DOUBLE, size_matched DOUBLE, price DOUBLE, associate_trades JSON NULL, outcome VARCHAR(20), type VARCHAR(20), created_at BIGINT, expiration VARCHAR(20), order_type VARCHAR(8), status VARCHAR(20), maker_address VARCHAR(100), timestamp BIGINT ); """)) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') async def insert_user_orders_table( params: dict, CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: if CON is None: logging.info("NO DB CONNECTION, SKIPPING Insert Statements") else: if engine == 'mysql': await CON.execute(text(""" INSERT INTO user_stream_orders ( timestamp_arrival, id, owner, market, asset_id, side, order_owner, original_size, size_matched, price, associate_trades, outcome, type, created_at, expiration, order_type, status, maker_address, timestamp ) VALUES ( :timestamp_arrival, :id, :owner, :market, :asset_id, :side, :order_owner, :original_size, :size_matched, :price, :associate_trades, :outcome, :type, :created_at, :expiration, :order_type, :status, :maker_address, :timestamp ) """), parameters=params ) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') ### Helpers ### def live_orders_only(orders: list[dict]) -> list[dict]: return [d for d in orders if d.get('status')=='LIVE'] def upsert_list_of_dicts_by_id(list_of_dicts, new_dict): for index, item in enumerate(list_of_dicts): if item.get('id') == new_dict.get('id'): list_of_dicts[index] = new_dict return list_of_dicts list_of_dicts.append(new_dict) return list_of_dicts async def polymarket_stream(): global TARGET_PX global HIST_TRADES global LOCAL_LIVE_ORDERS global LOCAL_RECENT_TRADES POLY_API_KEY = API_CREDS.api_key POLY_API_SECRET = API_CREDS.api_secret POLY_API_PASS = API_CREDS.api_passphrase async for websocket in websockets.connect(WSS_URL): print(f"Connected to {WSS_URL}") subscribe_msg = { "auth": { "apiKey": POLY_API_KEY, "secret": POLY_API_SECRET, "passphrase": POLY_API_PASS, }, "type": "user", "markets": [] } await websocket.send(json.dumps(subscribe_msg)) print("Subscribed to User Data") try: async for message in websocket: ts_arrival = round(datetime.now().timestamp()*1000) if isinstance(message, str): data = json.loads(message) if data == {}: # Handle empty server ping - return pong await websocket.send(json.dumps({})) print('SENT HEARTBEAT PING') continue data['timestamp_arrival'] = ts_arrival event_type = data.get('event_type', None) match event_type: case 'trade': # logging.info(f'TRADE: {data}') # trade_status = data.get('status') # match trade_status: # Raise TELEGRAM ALERT ??? # case 'MATCHED': # pass # case 'MINED': # pass # case 'CONFIRMED': # pass # case 'RETRYING': # pass # case 'FAILED': # pass ### Convert Datatypes ### data['size'] = float(data['size']) data['price'] = float(data['price']) data['fee_rate_bps'] = float(data['fee_rate_bps']) data['matchtime'] = int(data['match_time']) data['last_update'] = int(data['last_update']) data['timestamp'] = int(data['timestamp']) data['maker_orders'] = json.dumps(data['maker_orders']) if data['maker_orders'] else None LOCAL_RECENT_TRADES = upsert_list_of_dicts_by_id(LOCAL_RECENT_TRADES, data) LOOKBACK_MIN_TS_MS = ts_arrival-LOCAL_RECENT_TRADES_LOOKBACK_SEC*1000 LOCAL_RECENT_TRADES = [t for t in LOCAL_RECENT_TRADES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] # print("---------------------") # print(LOCAL_RECENT_TRADES) # print("---------------------") VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_TRADES) # VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ) VAL_KEY.set(VK_RECENT_TRADES, VAL_KEY_OBJ) logging.info(f'User Trade Update: {data}') ### Insert into DB ### await insert_user_trades_table( params=data, CON=CON ) case 'order': logging.info(f'ORDER: {data}') ### Convert Datatypes ### data['original_size'] = float(data['original_size']) data['size_matched'] = float(data['size_matched']) data['price'] = float(data['price']) data['associate_trades'] = json.dumps(data['associate_trades']) if data['associate_trades'] else None data['created_at'] = int(data['created_at']) data['timestamp'] = int(data['timestamp']) ### Match on Status - Pass Live orders to Valkey for Algo Engine ### order_status = data.get('status') match order_status: case 'live': LOCAL_LIVE_ORDERS = upsert_list_of_dicts_by_id(LOCAL_LIVE_ORDERS, data) LOCAL_LIVE_ORDERS = live_orders_only(LOCAL_LIVE_ORDERS) VAL_KEY_OBJ = json.dumps(LOCAL_LIVE_ORDERS) # VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ) VAL_KEY.set(VK_LIVE_ORDERS, VAL_KEY_OBJ) logging.info(f'Order(s) RESTING: {data}') case 'matched': logging.info(f'Order(s) MATCHED: {data}') case 'delayed': raise ValueError(f'Order Status of "delayed" which is not expected for non-sports orders: {data}') case 'unmatched': raise ValueError(f'Order Status of "unmatched" which is not expected for non-sports orders: {data}') ### Insert into DB ### await insert_user_orders_table( params=data, CON=CON, ) else: raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: print(f"Connection closed by server. Exception: {e}") async def main(): global VAL_KEY global CON global API_CREDS private_key = os.getenv("PRIVATE_KEY") host = "https://clob.polymarket.com" chain_id = 137 # Polygon mainnet temp_client = ClobClient(host, key=private_key, chain_id=chain_id) API_CREDS = temp_client.create_or_derive_api_creds() if USE_VK: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) # published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}") else: VAL_KEY = None logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') async with engine.connect() as CON: await create_user_trades_table(CON=CON) await create_user_orders_table(CON=CON) await polymarket_stream() else: CON = None logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") await polymarket_stream() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") try: asyncio.run(main()) except KeyboardInterrupt as e: print(f"Stream stopped: {e}")