Files
Polymarket/ws_user.py

431 lines
15 KiB
Python
Raw Normal View History

2026-04-01 17:37:19 +00:00
import asyncio
import json
import logging
import os
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import valkey
import websockets
from dotenv import load_dotenv
from py_clob_client.client import ClobClient
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
### Database ###
USE_DB: bool = True
USE_VK: bool = True
LOCAL_LIVE_ORDERS = []
LOCAL_RECENT_TRADES = []
LOCAL_RECENT_TRADES_LOOKBACK_SEC = 10
VK_LIVE_ORDERS = 'poly_user_orders'
VK_RECENT_TRADES = 'poly_user_trades'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_User.log'
# https://docs.polymarket.com/market-data/websocket/user-channel
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/user"
API_CREDS = {}
HIST_TRADES = np.empty((0, 2))
TARGET_PX = 0
### Database Funcs ###
async def create_user_trades_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: user_stream_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS user_stream_trades (
-- event_type VARCHAR(8),
timestamp_arrival BIGINT,
type VARCHAR(20),
id VARCHAR(100),
taker_order_id VARCHAR(100),
market VARCHAR(100),
asset_id VARCHAR(100),
side VARCHAR(8),
size DOUBLE,
price DOUBLE,
fee_rate_bps DOUBLE,
status VARCHAR(20),
matchtime BIGINT,
last_update BIGINT,
outcome VARCHAR(20),
owner VARCHAR(100),
trade_owner VARCHAR(100),
maker_address VARCHAR(100),
transaction_hash VARCHAR(100),
bucket_index INT,
maker_orders JSON NULL,
trader_side VARCHAR(8),
timestamp BIGINT
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_user_trades_table(
params: dict,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO user_stream_trades
(
timestamp_arrival,
type,
id,
taker_order_id,
market,
asset_id,
side,
size,
price,
fee_rate_bps,
status,
matchtime,
last_update,
outcome,
owner,
trade_owner,
maker_address,
transaction_hash,
bucket_index,
maker_orders,
trader_side,
timestamp
)
VALUES
(
:timestamp_arrival,
:type,
:id,
:taker_order_id,
:market,
:asset_id,
:side,
:size,
:price,
:fee_rate_bps,
:status,
:matchtime,
:last_update,
:outcome,
:owner,
:trade_owner,
:maker_address,
:transaction_hash,
:bucket_index,
:maker_orders,
:trader_side,
:timestamp
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def create_user_orders_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: user_stream_orders')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS user_stream_orders (
-- event_type VARCHAR(8),
timestamp_arrival BIGINT,
id VARCHAR(100),
owner VARCHAR(100),
market VARCHAR(100),
asset_id VARCHAR(100),
side VARCHAR(8),
order_owner VARCHAR(100),
original_size DOUBLE,
size_matched DOUBLE,
price DOUBLE,
associate_trades JSON NULL,
outcome VARCHAR(20),
type VARCHAR(20),
created_at BIGINT,
expiration VARCHAR(20),
order_type VARCHAR(8),
status VARCHAR(20),
maker_address VARCHAR(100),
timestamp BIGINT
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_user_orders_table(
params: dict,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO user_stream_orders
(
timestamp_arrival,
id,
owner,
market,
asset_id,
side,
order_owner,
original_size,
size_matched,
price,
associate_trades,
outcome,
type,
created_at,
expiration,
order_type,
status,
maker_address,
timestamp
)
VALUES
(
:timestamp_arrival,
:id,
:owner,
:market,
:asset_id,
:side,
:order_owner,
:original_size,
:size_matched,
:price,
:associate_trades,
:outcome,
:type,
:created_at,
:expiration,
:order_type,
:status,
:maker_address,
:timestamp
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Helpers ###
def live_orders_only(orders: list[dict]) -> list[dict]:
return [d for d in orders if d.get('status')=='LIVE']
def upsert_list_of_dicts_by_id(list_of_dicts, new_dict):
for index, item in enumerate(list_of_dicts):
if item.get('id') == new_dict.get('id'):
list_of_dicts[index] = new_dict
return list_of_dicts
list_of_dicts.append(new_dict)
return list_of_dicts
async def polymarket_stream():
global TARGET_PX
global HIST_TRADES
global LOCAL_LIVE_ORDERS
global LOCAL_RECENT_TRADES
POLY_API_KEY = API_CREDS.api_key
POLY_API_SECRET = API_CREDS.api_secret
POLY_API_PASS = API_CREDS.api_passphrase
async for websocket in websockets.connect(WSS_URL):
print(f"Connected to {WSS_URL}")
subscribe_msg = {
"auth": {
"apiKey": POLY_API_KEY,
"secret": POLY_API_SECRET,
"passphrase": POLY_API_PASS,
},
"type": "user",
"markets": []
}
await websocket.send(json.dumps(subscribe_msg))
print("Subscribed to User Data")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
data = json.loads(message)
if data == {}: # Handle empty server ping - return pong
await websocket.send(json.dumps({}))
print('SENT HEARTBEAT PING')
continue
data['timestamp_arrival'] = ts_arrival
event_type = data.get('event_type', None)
match event_type:
case 'trade':
2026-04-02 02:38:01 +00:00
# logging.info(f'TRADE: {data}')
2026-04-01 17:37:19 +00:00
# trade_status = data.get('status')
# match trade_status: # Raise TELEGRAM ALERT ???
# case 'MATCHED':
# pass
# case 'MINED':
# pass
# case 'CONFIRMED':
# pass
# case 'RETRYING':
# pass
# case 'FAILED':
# pass
### Convert Datatypes ###
data['size'] = float(data['size'])
data['price'] = float(data['price'])
data['fee_rate_bps'] = float(data['fee_rate_bps'])
data['matchtime'] = int(data['match_time'])
data['last_update'] = int(data['last_update'])
data['timestamp'] = int(data['timestamp'])
data['maker_orders'] = json.dumps(data['maker_orders']) if data['maker_orders'] else None
LOCAL_RECENT_TRADES = upsert_list_of_dicts_by_id(LOCAL_RECENT_TRADES, data)
LOOKBACK_MIN_TS_MS = ts_arrival-LOCAL_RECENT_TRADES_LOOKBACK_SEC*1000
LOCAL_RECENT_TRADES = [t for t in LOCAL_RECENT_TRADES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
2026-04-02 02:38:01 +00:00
# print("---------------------")
# print(LOCAL_RECENT_TRADES)
# print("---------------------")
2026-04-01 17:37:19 +00:00
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_TRADES)
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_RECENT_TRADES, VAL_KEY_OBJ)
logging.info(f'User Trade Update: {data}')
### Insert into DB ###
await insert_user_trades_table(
params=data,
CON=CON
)
case 'order':
logging.info(f'ORDER: {data}')
### Convert Datatypes ###
data['original_size'] = float(data['original_size'])
data['size_matched'] = float(data['size_matched'])
data['price'] = float(data['price'])
data['associate_trades'] = json.dumps(data['associate_trades']) if data['associate_trades'] else None
data['created_at'] = int(data['created_at'])
data['timestamp'] = int(data['timestamp'])
### Match on Status - Pass Live orders to Valkey for Algo Engine ###
order_status = data.get('status')
match order_status:
case 'live':
LOCAL_LIVE_ORDERS = upsert_list_of_dicts_by_id(LOCAL_LIVE_ORDERS, data)
LOCAL_LIVE_ORDERS = live_orders_only(LOCAL_LIVE_ORDERS)
VAL_KEY_OBJ = json.dumps(LOCAL_LIVE_ORDERS)
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_LIVE_ORDERS, VAL_KEY_OBJ)
logging.info(f'Order(s) RESTING: {data}')
case 'matched':
logging.info(f'Order(s) MATCHED: {data}')
case 'delayed':
raise ValueError(f'Order Status of "delayed" which is not expected for non-sports orders: {data}')
case 'unmatched':
raise ValueError(f'Order Status of "unmatched" which is not expected for non-sports orders: {data}')
### Insert into DB ###
await insert_user_orders_table(
params=data,
CON=CON,
)
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
print(f"Connection closed by server. Exception: {e}")
async def main():
global VAL_KEY
global CON
global API_CREDS
private_key = os.getenv("PRIVATE_KEY")
host = "https://clob.polymarket.com"
chain_id = 137 # Polygon mainnet
temp_client = ClobClient(host, key=private_key, chain_id=chain_id)
API_CREDS = temp_client.create_or_derive_api_creds()
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_user_trades_table(CON=CON)
await create_user_orders_table(CON=CON)
await polymarket_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await polymarket_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt as e:
print(f"Stream stopped: {e}")