Files
Polymarket/ws_user.py
2026-04-01 17:37:19 +00:00

431 lines
15 KiB
Python

import asyncio
import json
import logging
import os
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import valkey
import websockets
from dotenv import load_dotenv
from py_clob_client.client import ClobClient
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
### Database ###
USE_DB: bool = True
USE_VK: bool = True
LOCAL_LIVE_ORDERS = []
LOCAL_RECENT_TRADES = []
LOCAL_RECENT_TRADES_LOOKBACK_SEC = 10
VK_LIVE_ORDERS = 'poly_user_orders'
VK_RECENT_TRADES = 'poly_user_trades'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_User.log'
# https://docs.polymarket.com/market-data/websocket/user-channel
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/user"
API_CREDS = {}
HIST_TRADES = np.empty((0, 2))
TARGET_PX = 0
### Database Funcs ###
async def create_user_trades_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: user_stream_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS user_stream_trades (
-- event_type VARCHAR(8),
timestamp_arrival BIGINT,
type VARCHAR(20),
id VARCHAR(100),
taker_order_id VARCHAR(100),
market VARCHAR(100),
asset_id VARCHAR(100),
side VARCHAR(8),
size DOUBLE,
price DOUBLE,
fee_rate_bps DOUBLE,
status VARCHAR(20),
matchtime BIGINT,
last_update BIGINT,
outcome VARCHAR(20),
owner VARCHAR(100),
trade_owner VARCHAR(100),
maker_address VARCHAR(100),
transaction_hash VARCHAR(100),
bucket_index INT,
maker_orders JSON NULL,
trader_side VARCHAR(8),
timestamp BIGINT
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_user_trades_table(
params: dict,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO user_stream_trades
(
timestamp_arrival,
type,
id,
taker_order_id,
market,
asset_id,
side,
size,
price,
fee_rate_bps,
status,
matchtime,
last_update,
outcome,
owner,
trade_owner,
maker_address,
transaction_hash,
bucket_index,
maker_orders,
trader_side,
timestamp
)
VALUES
(
:timestamp_arrival,
:type,
:id,
:taker_order_id,
:market,
:asset_id,
:side,
:size,
:price,
:fee_rate_bps,
:status,
:matchtime,
:last_update,
:outcome,
:owner,
:trade_owner,
:maker_address,
:transaction_hash,
:bucket_index,
:maker_orders,
:trader_side,
:timestamp
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def create_user_orders_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: user_stream_orders')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS user_stream_orders (
-- event_type VARCHAR(8),
timestamp_arrival BIGINT,
id VARCHAR(100),
owner VARCHAR(100),
market VARCHAR(100),
asset_id VARCHAR(100),
side VARCHAR(8),
order_owner VARCHAR(100),
original_size DOUBLE,
size_matched DOUBLE,
price DOUBLE,
associate_trades JSON NULL,
outcome VARCHAR(20),
type VARCHAR(20),
created_at BIGINT,
expiration VARCHAR(20),
order_type VARCHAR(8),
status VARCHAR(20),
maker_address VARCHAR(100),
timestamp BIGINT
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_user_orders_table(
params: dict,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO user_stream_orders
(
timestamp_arrival,
id,
owner,
market,
asset_id,
side,
order_owner,
original_size,
size_matched,
price,
associate_trades,
outcome,
type,
created_at,
expiration,
order_type,
status,
maker_address,
timestamp
)
VALUES
(
:timestamp_arrival,
:id,
:owner,
:market,
:asset_id,
:side,
:order_owner,
:original_size,
:size_matched,
:price,
:associate_trades,
:outcome,
:type,
:created_at,
:expiration,
:order_type,
:status,
:maker_address,
:timestamp
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Helpers ###
def live_orders_only(orders: list[dict]) -> list[dict]:
return [d for d in orders if d.get('status')=='LIVE']
def upsert_list_of_dicts_by_id(list_of_dicts, new_dict):
for index, item in enumerate(list_of_dicts):
if item.get('id') == new_dict.get('id'):
list_of_dicts[index] = new_dict
return list_of_dicts
list_of_dicts.append(new_dict)
return list_of_dicts
async def polymarket_stream():
global TARGET_PX
global HIST_TRADES
global LOCAL_LIVE_ORDERS
global LOCAL_RECENT_TRADES
POLY_API_KEY = API_CREDS.api_key
POLY_API_SECRET = API_CREDS.api_secret
POLY_API_PASS = API_CREDS.api_passphrase
async for websocket in websockets.connect(WSS_URL):
print(f"Connected to {WSS_URL}")
subscribe_msg = {
"auth": {
"apiKey": POLY_API_KEY,
"secret": POLY_API_SECRET,
"passphrase": POLY_API_PASS,
},
"type": "user",
"markets": []
}
await websocket.send(json.dumps(subscribe_msg))
print("Subscribed to User Data")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
data = json.loads(message)
if data == {}: # Handle empty server ping - return pong
await websocket.send(json.dumps({}))
print('SENT HEARTBEAT PING')
continue
data['timestamp_arrival'] = ts_arrival
event_type = data.get('event_type', None)
match event_type:
case 'trade':
logging.info(f'TRADE: {data}')
# trade_status = data.get('status')
# match trade_status: # Raise TELEGRAM ALERT ???
# case 'MATCHED':
# pass
# case 'MINED':
# pass
# case 'CONFIRMED':
# pass
# case 'RETRYING':
# pass
# case 'FAILED':
# pass
### Convert Datatypes ###
data['size'] = float(data['size'])
data['price'] = float(data['price'])
data['fee_rate_bps'] = float(data['fee_rate_bps'])
data['matchtime'] = int(data['match_time'])
data['last_update'] = int(data['last_update'])
data['timestamp'] = int(data['timestamp'])
data['maker_orders'] = json.dumps(data['maker_orders']) if data['maker_orders'] else None
LOCAL_RECENT_TRADES = upsert_list_of_dicts_by_id(LOCAL_RECENT_TRADES, data)
LOOKBACK_MIN_TS_MS = ts_arrival-LOCAL_RECENT_TRADES_LOOKBACK_SEC*1000
LOCAL_RECENT_TRADES = [t for t in LOCAL_RECENT_TRADES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
print("---------------------")
print(LOCAL_RECENT_TRADES)
print("---------------------")
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_TRADES)
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_RECENT_TRADES, VAL_KEY_OBJ)
logging.info(f'User Trade Update: {data}')
### Insert into DB ###
await insert_user_trades_table(
params=data,
CON=CON
)
case 'order':
logging.info(f'ORDER: {data}')
### Convert Datatypes ###
data['original_size'] = float(data['original_size'])
data['size_matched'] = float(data['size_matched'])
data['price'] = float(data['price'])
data['associate_trades'] = json.dumps(data['associate_trades']) if data['associate_trades'] else None
data['created_at'] = int(data['created_at'])
data['timestamp'] = int(data['timestamp'])
### Match on Status - Pass Live orders to Valkey for Algo Engine ###
order_status = data.get('status')
match order_status:
case 'live':
LOCAL_LIVE_ORDERS = upsert_list_of_dicts_by_id(LOCAL_LIVE_ORDERS, data)
LOCAL_LIVE_ORDERS = live_orders_only(LOCAL_LIVE_ORDERS)
VAL_KEY_OBJ = json.dumps(LOCAL_LIVE_ORDERS)
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_LIVE_ORDERS, VAL_KEY_OBJ)
logging.info(f'Order(s) RESTING: {data}')
case 'matched':
logging.info(f'Order(s) MATCHED: {data}')
case 'delayed':
raise ValueError(f'Order Status of "delayed" which is not expected for non-sports orders: {data}')
case 'unmatched':
raise ValueError(f'Order Status of "unmatched" which is not expected for non-sports orders: {data}')
### Insert into DB ###
await insert_user_orders_table(
params=data,
CON=CON,
)
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
print(f"Connection closed by server. Exception: {e}")
async def main():
global VAL_KEY
global CON
global API_CREDS
private_key = os.getenv("PRIVATE_KEY")
host = "https://clob.polymarket.com"
chain_id = 137 # Polygon mainnet
temp_client = ClobClient(host, key=private_key, chain_id=chain_id)
API_CREDS = temp_client.create_or_derive_api_creds()
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_user_trades_table(CON=CON)
await create_user_orders_table(CON=CON)
await polymarket_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await polymarket_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt as e:
print(f"Stream stopped: {e}")