Files
Polymarket/ws_clob.py

378 lines
14 KiB
Python
Raw Permalink Normal View History

2026-03-29 16:27:58 +00:00
import asyncio
import json
import math
import logging
import pandas as pd
import os
from datetime import datetime, timezone
import websockets
import numpy as np
import talib
import requests
from typing import AsyncContextManager
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
import valkey
2026-04-01 17:37:19 +00:00
import os
from dotenv import load_dotenv
2026-03-29 16:27:58 +00:00
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_5min_btcusd'
2026-04-01 17:37:19 +00:00
VK_CHANNEL_DOWN = 'poly_5min_btcusd_down'
2026-03-29 16:27:58 +00:00
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
2026-04-01 17:37:19 +00:00
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_5min.log'
2026-03-29 16:27:58 +00:00
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
SLUG_END_TIME = 0
HIST_TRADES = np.empty((0, 2))
2026-04-01 17:37:19 +00:00
HIST_TRADES_DOWN = np.empty((0, 2))
MIN_TICK_SIZE = 0.01
NEG_RISK = False
2026-03-29 16:27:58 +00:00
TARGET_PX = 0
2026-04-01 17:37:19 +00:00
TARGET_ASSET_ID = None
TARGET_ASSET_ID_DOWN = None
2026-03-29 16:27:58 +00:00
def format_timestamp(total_seconds) -> str:
minutes, seconds = divmod(total_seconds, 60)
return f"{minutes} minutes and {seconds} seconds"
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
interval_secs = interval_mins * 60
seconds = dt.timestamp()
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
return rounded_seconds
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
market = r.json()['markets'][0]
token_ids = json.loads(market.get("clobTokenIds", "[]"))
outcomes = json.loads(market.get("outcomes", "[]"))
d = dict(zip(outcomes, token_ids))
d['isActive'] = market['negRisk']
d['MinTickSize'] = market['orderPriceMinTickSize']
d['OrderMinSize'] = market['orderMinSize']
d['isNegRisk'] = market['negRisk']
d['ConditionId'] = market['conditionId']
d['EndDateTime'] = market['endDate']
# d['Liquidity'] = market['liquidity']
d['LiquidityClob'] = market['liquidityClob']
d['VolumeNum'] = market['volumeNum']
d['Volume24hr'] = market['volume24hr']
print(market)
return d, market
def gen_slug():
slug_prefix = 'btc-updown-5m-'
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
return slug_prefix + str(slug_time_id)
### Database Funcs ###
async def create_poly_btcusd_trades_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: poly_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS poly_btcusd_trades (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
price DOUBLE,
qty DOUBLE,
2026-04-01 17:37:19 +00:00
side_taker VARCHAR(8),
up_or_down VARCHAR(8)
2026-03-29 16:27:58 +00:00
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_poly_btcusd_trades_table(
timestamp_arrival: int,
timestamp_msg: int,
timestamp_value: int,
price: float,
qty: float,
side_taker: str,
2026-04-01 17:37:19 +00:00
up_or_down: str,
2026-03-29 16:27:58 +00:00
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_arrival': timestamp_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'price': price,
'qty': qty,
'side_taker': side_taker,
2026-04-01 17:37:19 +00:00
'up_or_down': up_or_down,
2026-03-29 16:27:58 +00:00
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO poly_btcusd_trades
(
timestamp_arrival,
timestamp_msg,
timestamp_value,
price,
qty,
2026-04-01 17:37:19 +00:00
side_taker,
up_or_down
2026-03-29 16:27:58 +00:00
)
VALUES
(
:timestamp_arrival,
:timestamp_msg,
:timestamp_value,
:price,
:qty,
2026-04-01 17:37:19 +00:00
:side_taker,
:up_or_down
2026-03-29 16:27:58 +00:00
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def polymarket_stream():
global SLUG_END_TIME
global TARGET_PX
global HIST_TRADES
2026-04-01 17:37:19 +00:00
global HIST_TRADES_DOWN
global MIN_TICK_SIZE
global NEG_RISK
global TARGET_ASSET_ID
global TARGET_ASSET_ID_DOWN
2026-03-29 16:27:58 +00:00
slug_full = gen_slug()
2026-04-01 17:37:19 +00:00
market_details, _ = get_mkt_details_by_slug(slug_full)
CONDITION_ID = market_details['ConditionId']
2026-03-29 16:27:58 +00:00
TARGET_ASSET_ID = market_details['Up']
2026-04-01 17:37:19 +00:00
TARGET_ASSET_ID_DOWN = market_details['Down']
MIN_TICK_SIZE = market_details['MinTickSize']
NEG_RISK = market_details['isNegRisk']
2026-03-29 16:27:58 +00:00
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
2026-04-01 17:37:19 +00:00
async for websocket in websockets.connect(WSS_URL):
2026-03-29 16:27:58 +00:00
print(f"Connected to {WSS_URL}")
subscribe_msg = {
2026-04-01 17:37:19 +00:00
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
2026-03-29 16:27:58 +00:00
"type": "market",
2026-04-01 17:37:19 +00:00
"custom_feature_enabled": False
2026-03-29 16:27:58 +00:00
}
await websocket.send(json.dumps(subscribe_msg))
2026-04-01 17:37:19 +00:00
print(f"Subscribed to Assets: Up {TARGET_ASSET_ID}; Down: {TARGET_ASSET_ID_DOWN}")
2026-03-29 16:27:58 +00:00
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
sec_remaining = SLUG_END_TIME - round(datetime.now().timestamp())
if sec_remaining <= 0:
ref_data = json.loads(VAL_KEY.get('poly_rtds_cl_btcusd'))
TARGET_PX = float(ref_data['value'])
HIST_TRADES = np.empty((0, 2))
print('*** Attempting to unsub from past 5min')
update_unsub_msg = {
"operation": 'unsubscribe',
2026-04-01 17:37:19 +00:00
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
"custom_feature_enabled": False
2026-03-29 16:27:58 +00:00
}
await websocket.send(json.dumps(update_unsub_msg))
print('*** Attempting to SUB to new 5min')
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
2026-04-01 17:37:19 +00:00
CONDITION_ID = market_details['ConditionId']
2026-03-29 16:27:58 +00:00
TARGET_ASSET_ID = market_details['Up']
2026-04-01 17:37:19 +00:00
TARGET_ASSET_ID_DOWN = market_details['Down']
MIN_TICK_SIZE = market_details['MinTickSize']
NEG_RISK = market_details['isNegRisk']
2026-03-29 16:27:58 +00:00
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
update_sub_msg = {
"operation": 'subscribe',
2026-04-01 17:37:19 +00:00
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
"custom_feature_enabled": False
2026-03-29 16:27:58 +00:00
}
await websocket.send(json.dumps(update_sub_msg))
if isinstance(message, str):
data = json.loads(message)
if isinstance(data, list):
print('initial book:')
print(data)
continue
event_type = data.get("event_type", None)
if event_type == "price_change":
# print("📈 Price Change")
# print(pd.DataFrame(data['price_changes']))
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
elif event_type == "best_bid_ask":
# print(pd.DataFrame([data]))
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
elif event_type == "last_trade_price":
2026-04-01 17:37:19 +00:00
token_id = data['asset_id']
2026-03-29 16:27:58 +00:00
ts_msg = int(data['timestamp'])
ts_value = int(ts_msg)
px = float(data['price'])
qty = float(data['size'])
side_taker = data['side']
2026-04-01 17:37:19 +00:00
if token_id == TARGET_ASSET_ID:
up_or_down = 'UP'
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
# print(f"✨ Last Px: {px:.2f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
# print(f'Up: {TARGET_ASSET_ID}')
# print(f'Down: {TARGET_ASSET_ID_DOWN}')
# SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
# print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
if USE_VK:
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'price': px,
'qty': qty,
'side_taker': side_taker,
'sec_remaining': sec_remaining,
'target_price': TARGET_PX,
'condition_id': CONDITION_ID,
'token_id_up': TARGET_ASSET_ID,
'token_id_down': TARGET_ASSET_ID_DOWN,
'tick_size': MIN_TICK_SIZE,
'neg_risk': NEG_RISK,
})
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
elif token_id == TARGET_ASSET_ID_DOWN:
up_or_down = 'DOWN'
HIST_TRADES_DOWN = np.append(HIST_TRADES_DOWN, np.array([[px, qty]]), axis=0)
if USE_VK:
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'price': px,
'qty': qty,
'side_taker': side_taker,
'sec_remaining': sec_remaining,
'target_price': TARGET_PX,
'condition_id': CONDITION_ID,
'token_id_up': TARGET_ASSET_ID,
'token_id_down': TARGET_ASSET_ID_DOWN,
'tick_size': MIN_TICK_SIZE,
'neg_risk': NEG_RISK,
})
VAL_KEY.set(VK_CHANNEL_DOWN, VAL_KEY_OBJ)
else:
logging.warning('Token Id from Market Does Not Match Pricing Data Id')
2026-03-29 16:27:58 +00:00
if USE_DB:
await insert_poly_btcusd_trades_table(
CON=CON,
timestamp_arrival=ts_arrival,
timestamp_msg=ts_msg,
timestamp_value=ts_value,
price=px,
qty=qty,
side_taker=side_taker,
2026-04-01 17:37:19 +00:00
up_or_down=up_or_down
2026-03-29 16:27:58 +00:00
)
elif event_type == "book":
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
elif event_type == "new_market":
print('Received new_market')
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
elif event_type == "market_resolved":
print(f"Received: {event_type}")
print(data)
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
elif event_type == "tick_size_change": # may want for CLOB order routing
print(f"Received: {event_type}")
print(data)
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
else:
print(f"*********** REC UNMAPPED EVENT: {event_type}")
print(data)
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
elif isinstance(data, dict):
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
print(f"Connection closed by server. Exception: {e}")
2026-04-01 17:37:19 +00:00
continue
2026-03-29 16:27:58 +00:00
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
2026-04-01 17:37:19 +00:00
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
2026-03-29 16:27:58 +00:00
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_poly_btcusd_trades_table(CON=CON)
await polymarket_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await polymarket_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt as e:
print(f"Stream stopped: {e}")