Files
Polymarket/ws.py
2026-03-27 12:40:49 -04:00

150 lines
5.7 KiB
Python

import asyncio
import json
import math
import pandas as pd
import os
from datetime import datetime, timezone
import websockets
import numpy as np
import talib
import requests
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
SLUG_END_TIME = 0
HIST_TRADES = np.empty((0, 2))
def format_timestamp(total_seconds) -> str:
minutes, seconds = divmod(total_seconds, 60)
return f"{minutes} minutes and {seconds} seconds"
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
interval_secs = interval_mins * 60
seconds = dt.timestamp()
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
return rounded_seconds
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
market = r.json()['markets'][0]
token_ids = json.loads(market.get("clobTokenIds", "[]"))
outcomes = json.loads(market.get("outcomes", "[]"))
d = dict(zip(outcomes, token_ids))
d['isActive'] = market['negRisk']
d['MinTickSize'] = market['orderPriceMinTickSize']
d['isNegRisk'] = market['negRisk']
d['ConditionId'] = market['conditionId']
d['EndDateTime'] = market['endDate']
return d, market
def gen_slug():
slug_prefix = 'btc-updown-5m-'
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
return slug_prefix + str(slug_time_id)
async def polymarket_stream():
global SLUG_END_TIME
global HIST_TRADES
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
TARGET_ASSET_ID = market_details['Up']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
async with websockets.connect(WSS_URL) as websocket:
print(f"Connected to {WSS_URL}")
subscribe_msg = {
"assets_ids": [TARGET_ASSET_ID],
"type": "market",
"custom_feature_enabled": True
}
await websocket.send(json.dumps(subscribe_msg))
print(f"Subscribed to Asset: {TARGET_ASSET_ID}")
try:
async for message in websocket:
current_ts = round(datetime.now().timestamp())
sec_remaining = SLUG_END_TIME - current_ts
if sec_remaining <= 0:
HIST_TRADES = np.empty((0, 2))
print('*** Attempting to unsub from past 5min')
update_unsub_msg = {
"operation": 'unsubscribe',
"assets_ids": [TARGET_ASSET_ID],
"custom_feature_enabled": True
}
await websocket.send(json.dumps(update_unsub_msg))
print('*** Attempting to SUB to new 5min')
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
TARGET_ASSET_ID = market_details['Up']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
update_sub_msg = {
"operation": 'subscribe',
"assets_ids": [TARGET_ASSET_ID],
"custom_feature_enabled": True
}
await websocket.send(json.dumps(update_sub_msg))
if isinstance(message, str):
data = json.loads(message)
if isinstance(data, dict):
# print(data.get("event_type", None))
pass
elif isinstance(data, list):
print('initial book: ')
print(data)
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
event_type = data.get("event_type", None)
if event_type == "price_change":
# print("📈 Price Change")
# print(pd.DataFrame(data['price_changes']))
pass
elif event_type == "best_bid_ask":
# print(pd.DataFrame([data]))
pass
elif event_type == "last_trade_price":
px = float(data['price'])
qty = float(data['size'])
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
elif event_type == "book":
pass
elif event_type == "new_market":
print('Received new_market')
elif event_type == "market_resolved":
print(f"Received: {event_type}")
print(data)
elif event_type == "tick_size_change": # may want for CLOB order routing
print(f"Received: {event_type}")
print(data)
else:
print(f"Received: {event_type}")
print(data)
except websockets.ConnectionClosed:
print("Connection closed by server.")
if __name__ == '__main__':
try:
asyncio.run(polymarket_stream())
except KeyboardInterrupt:
print("Stream stopped.")