import asyncio import json import math import pandas as pd import os from datetime import datetime, timezone import websockets import numpy as np import talib import requests WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market" SLUG_END_TIME = 0 HIST_TRADES = np.empty((0, 2)) def format_timestamp(total_seconds) -> str: minutes, seconds = divmod(total_seconds, 60) return f"{minutes} minutes and {seconds} seconds" def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds interval_secs = interval_mins * 60 seconds = dt.timestamp() rounded_seconds = math.floor(seconds / interval_secs) * interval_secs return rounded_seconds def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False} r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}") market = r.json()['markets'][0] token_ids = json.loads(market.get("clobTokenIds", "[]")) outcomes = json.loads(market.get("outcomes", "[]")) d = dict(zip(outcomes, token_ids)) d['isActive'] = market['negRisk'] d['MinTickSize'] = market['orderPriceMinTickSize'] d['isNegRisk'] = market['negRisk'] d['ConditionId'] = market['conditionId'] d['EndDateTime'] = market['endDate'] return d, market def gen_slug(): slug_prefix = 'btc-updown-5m-' slug_time_id = time_round_down(dt=datetime.now(timezone.utc)) return slug_prefix + str(slug_time_id) async def polymarket_stream(): global SLUG_END_TIME global HIST_TRADES slug_full = gen_slug() market_details, market = get_mkt_details_by_slug(slug_full) TARGET_ASSET_ID = market_details['Up'] SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp()) print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********') async with websockets.connect(WSS_URL) as websocket: print(f"Connected to {WSS_URL}") subscribe_msg = { "assets_ids": [TARGET_ASSET_ID], "type": "market", "custom_feature_enabled": True } await websocket.send(json.dumps(subscribe_msg)) print(f"Subscribed to Asset: {TARGET_ASSET_ID}") try: async for message in websocket: current_ts = round(datetime.now().timestamp()) sec_remaining = SLUG_END_TIME - current_ts if sec_remaining <= 0: HIST_TRADES = np.empty((0, 2)) print('*** Attempting to unsub from past 5min') update_unsub_msg = { "operation": 'unsubscribe', "assets_ids": [TARGET_ASSET_ID], "custom_feature_enabled": True } await websocket.send(json.dumps(update_unsub_msg)) print('*** Attempting to SUB to new 5min') slug_full = gen_slug() market_details, market = get_mkt_details_by_slug(slug_full) TARGET_ASSET_ID = market_details['Up'] SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp()) update_sub_msg = { "operation": 'subscribe', "assets_ids": [TARGET_ASSET_ID], "custom_feature_enabled": True } await websocket.send(json.dumps(update_sub_msg)) if isinstance(message, str): data = json.loads(message) if isinstance(data, dict): # print(data.get("event_type", None)) pass elif isinstance(data, list): print('initial book: ') print(data) continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') event_type = data.get("event_type", None) if event_type == "price_change": # print("📈 Price Change") # print(pd.DataFrame(data['price_changes'])) pass elif event_type == "best_bid_ask": # print(pd.DataFrame([data])) pass elif event_type == "last_trade_price": px = float(data['price']) qty = float(data['size']) HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0) SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1] print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}") elif event_type == "book": pass elif event_type == "new_market": print('Received new_market') elif event_type == "market_resolved": print(f"Received: {event_type}") print(data) elif event_type == "tick_size_change": # may want for CLOB order routing print(f"Received: {event_type}") print(data) else: print(f"Received: {event_type}") print(data) except websockets.ConnectionClosed: print("Connection closed by server.") if __name__ == '__main__': try: asyncio.run(polymarket_stream()) except KeyboardInterrupt: print("Stream stopped.")