3 Commits

Author SHA1 Message Date
259ea93479 saving 2026-04-01 17:37:19 +00:00
8d7d99d749 transfer to aws 2026-03-29 16:27:58 +00:00
c051130867 ionos_last_commit 2026-03-27 17:57:12 +00:00
24 changed files with 4399 additions and 337 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 337 KiB

317
database.ipynb Normal file
View File

@@ -0,0 +1,317 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"id": "4cae6bf1",
"metadata": {},
"outputs": [],
"source": [
"from sqlalchemy import create_engine, text\n",
"import pandas as pd\n",
"from datetime import datetime"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "f5040527",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connection successful\n"
]
}
],
"source": [
"### MYSQL ###\n",
"engine = create_engine('mysql+pymysql://root:pwd@localhost/polymarket')\n",
"try:\n",
" with engine.connect() as conn:\n",
" print(\"Connection successful\")\n",
"except Exception as e:\n",
" print(f\"Connection failed: {e}\") "
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "72059b3f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connection successful\n"
]
}
],
"source": [
"### MYSQL ###\n",
"engine_inter_storage = create_engine('mysql+pymysql://root:pwd@100.84.226.40/polymarket')\n",
"try:\n",
" with engine.connect() as conn:\n",
" print(\"Connection successful\")\n",
"except Exception as e:\n",
" print(f\"Connection failed: {e}\") "
]
},
{
"cell_type": "code",
"execution_count": 48,
"id": "b723a51f",
"metadata": {},
"outputs": [],
"source": [
"# with engine.connect() as conn:\n",
"# print(\"Connection successful\")\n",
"# sql = text(\"TRUNCATE TABLE coinbase_btcusd_trades;\")\n",
"# conn.execute(sql)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5c23110d",
"metadata": {},
"outputs": [],
"source": [
"q_binance = '''\n",
"SELECT * FROM binance_btcusd_trades;\n",
"'''\n",
"q_coinbase = '''\n",
"SELECT * FROM coinbase_btcusd_trades;\n",
"'''\n",
"q_rtds = '''\n",
"SELECT * FROM poly_rtds_cl_btcusd;\n",
"'''\n",
"q_clob = '''\n",
"SELECT * FROM poly_btcusd_trades;\n",
"'''"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "a866e9ca",
"metadata": {},
"outputs": [],
"source": [
"# df_binance = pd.read_sql(q_binance, con=engine)\n",
"df_coinbase = pd.read_sql(q_coinbase, con=engine)\n",
"df_rtds = pd.read_sql(q_rtds, con=engine)\n",
"df_clob = pd.read_sql(q_clob, con=engine)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "954a3c3c",
"metadata": {},
"outputs": [],
"source": [
"# df_binance['timestamp_arrival'] = pd.to_datetime(df_binance['timestamp_arrival'], unit='ms')\n",
"df_coinbase['timestamp_arrival'] = pd.to_datetime(df_coinbase['timestamp_arrival'], unit='ms')\n",
"df_rtds['timestamp_arrival'] = pd.to_datetime(df_rtds['timestamp_arrival'], unit='ms')\n",
"df_clob['timestamp_arrival'] = pd.to_datetime(df_clob['timestamp_arrival'], unit='ms')"
]
},
{
"cell_type": "code",
"execution_count": 57,
"id": "50c6339f",
"metadata": {},
"outputs": [],
"source": [
"def copy_table_data_btw_servers(df, table_name, engine_destination) -> None:\n",
" rows_imported = df.to_sql(name=table_name, con=engine_destination, if_exists='append')\n",
" if rows_imported == len(df):\n",
" print(f'SUCCESS: COPIED {rows_imported} to table \"{table_name}\" on INTERSERVER_STORAGE')\n",
" else:\n",
" raise ValueError(f'FAILED: COPIED {rows_imported} rows to table {table_name} on INTERSERVER_STORAGE; EXPECTED {len(df)}')\n",
" \n",
"def truncate_table(engine, table):\n",
" with engine.connect() as conn:\n",
" sql = text(f\"TRUNCATE TABLE {table};\")\n",
" conn.execute(sql)\n",
" conn.commit()"
]
},
{
"cell_type": "code",
"execution_count": 61,
"id": "d0399a96",
"metadata": {},
"outputs": [],
"source": [
"def backup_all_tables(engine_origin, engine_destination, tables_to_copy):\n",
" for t in tables_to_copy:\n",
" q = f'''\n",
" SELECT * FROM {t};\n",
" '''\n",
" df = pd.read_sql(q, con=engine_origin)\n",
" print('-------------------------------------------------------------------------')\n",
" print(f'Loaded Data for Table: {t}...Attempting to Transfer to Destination Server')\n",
" copy_table_data_btw_servers(\n",
" df=df,\n",
" table_name=t,\n",
" engine_destination=engine_destination,\n",
" )\n",
" print(f'Attempting to Truncate Table: {t}...')\n",
" \n",
" ### FOR REALTIME - instead of truncate, need to delete rows using a conditon (e.g. delete all rows <= max timestamp arrival in the DF)\n",
" \n",
" truncate_table(\n",
" engine=engine_origin,\n",
" table=t,\n",
" )\n",
" print(f'...Successfully Truncated Table: {t}')\n",
" print(f'Done Transferring Data for Table: {t}')\n",
" \n"
]
},
{
"cell_type": "code",
"execution_count": 59,
"id": "0de1629a",
"metadata": {},
"outputs": [],
"source": [
"tables_to_copy = [\n",
" # 'binance_btcusd_trades',\n",
" # 'coinbase_btcusd_trades',\n",
" 'poly_btcusd_trades',\n",
" 'poly_rtds_cl_btcusd',\n",
" # 'user_stream_orders',\n",
" # 'user_stream_trades',\n",
"]"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-------------------------------------------------------------------------\n",
"Loaded Data for Table: poly_btcusd_trades...Attempting to Transfer to Destination Server\n",
"SUCCESS: COPIED 720568 to table \"poly_btcusd_trades\" on INTERSERVER_STORAGE\n",
"Attempting to Truncate Table: poly_btcusd_trades...\n",
"...Successfully Truncated Table: poly_btcusd_trades\n",
"Done Transferring Data for Table: poly_btcusd_trades\n",
"-------------------------------------------------------------------------\n",
"Loaded Data for Table: poly_rtds_cl_btcusd...Attempting to Transfer to Destination Server\n",
"SUCCESS: COPIED 73771 to table \"poly_rtds_cl_btcusd\" on INTERSERVER_STORAGE\n",
"Attempting to Truncate Table: poly_rtds_cl_btcusd...\n",
"...Successfully Truncated Table: poly_rtds_cl_btcusd\n",
"Done Transferring Data for Table: poly_rtds_cl_btcusd\n"
]
}
],
"source": [
"backup_all_tables(\n",
" engine_origin=engine,\n",
" engine_destination=engine_inter_storage,\n",
" tables_to_copy=tables_to_copy\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cd0b40d2",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SUCCESS COPIED 326007 to binance_btcusd_trades to INTERSERVER_STORAGE\n"
]
}
],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "48b47799",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "ad030f88",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "cafc5060",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "5ba7be5f",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "py_313",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

51
docker-compose.yml Normal file
View File

@@ -0,0 +1,51 @@
services:
ws_binance:
container_name: ws_binance
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_binance/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
ws_clob:
container_name: ws_clob
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_clob/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
ws_rtds:
container_name: ws_rtds
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_rtds/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
ws_user:
container_name: ws_user
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_user/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
ng:
container_name: ng
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ng/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"

808
main.py Normal file
View File

@@ -0,0 +1,808 @@
import asyncio
import json
from dataclasses import dataclass
import logging
import math
import os
import time
from datetime import datetime, timezone
from typing import AsyncContextManager
import traceback
import numpy as np
import pandas as pd
import requests
import talib
import valkey
from dotenv import load_dotenv
from py_clob_client.clob_types import (
OrderArgs,
OrderType,
PartialCreateOrderOptions,
PostOrdersArgs,
BalanceAllowanceParams
)
from py_clob_client.order_builder.constants import BUY, SELL
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from functools import wraps
import modules.api as api
### Custom Order Args ###
@dataclass
class Custom_OrderArgs(OrderArgs):
max_price: float = 0.00
### Database ###
CLIENT = None
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_5min_Algo.log'
### ALGO CONFIG / CONSTANTS ###
SLOPE_YES_THRESH = 0.01 # In Percent % Chg (e.g. 0.02 == 0.02%)
ENDTIME_BUFFER_SEC = 30 # Stop trading, cancel all open orders and exit positions this many seconds before mkt settles.
TGT_PX_INDEX_DIFF_THRESH = 0.1 # In Percent % Chg (e.g. 0.02 == 0.02%)
DEFAULT_ORDER_SIZE = 10 # In USDe
MIN_ORDER_SIZE = 5 # In USDe
TGT_PROFIT_CENTS = 0.02
CHASE_TO_BUY_CENTS = 0.05
MAX_ALLOWED_POLY_PX = 0.90
### GLOBALS ###
ORDER_LOCK = 0
SLUG_END_TIME = 0
FREE_CASH: float = 0
POLY_BINANCE = {}
POLY_REF = {}
POLY_CLOB = {}
POLY_CLOB_DOWN = {}
USER_TRADES = {}
USER_ORDERS = {}
SLOPE_HIST = []
LOCAL_ACTIVE_ORDERS = []
LOCAL_TOKEN_BALANCES = {}
# LOCAL_ACTIVE_POSITIONS = []
ACTIVE_BALANCES_EXIST = False ### REMOVE
### Decorators ###
def async_timeit(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
end_time = time.perf_counter()
total_time = (end_time - start_time)*1000
print(f"Function '{func.__name__}' executed in {total_time:.4f} ms")
return wrapper
### Database Funcs ###
# @async_timeit
async def create_executions_orders_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: executions_orders')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS executions_orders (
timestamp_sent BIGINT,
token_id VARCHAR(100),
limit_price DOUBLE,
size DOUBLE,
side VARCHAR(8),
order_type VARCHAR(8),
post_only BOOL,
resp_errorMsg VARCHAR(100),
resp_orderID VARCHAR(100),
resp_takingAmount DOUBLE,
resp_makingAmount DOUBLE,
resp_status VARCHAR(20),
resp_success BOOL
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
# @async_timeit
async def insert_executions_orders_table(
timestamp_sent: int,
token_id: str,
limit_price: float,
size: float,
side: str,
order_type: str,
post_only: bool,
resp_errorMsg: str,
resp_orderID: str,
resp_takingAmount: float,
resp_makingAmount: float,
resp_status: str,
resp_success: bool,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_sent': timestamp_sent,
'token_id': token_id,
'limit_price': limit_price,
'size': size,
'side': side,
'order_type': order_type,
'post_only': post_only,
'resp_errorMsg': resp_errorMsg,
'resp_orderID': resp_orderID,
'resp_takingAmount': resp_takingAmount,
'resp_makingAmount': resp_makingAmount,
'resp_status': resp_status,
'resp_success': resp_success,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO executions_orders
(
timestamp_sent,
token_id,
limit_price,
size,
side,
order_type,
post_only,
resp_errorMsg,
resp_orderID,
resp_takingAmount,
resp_makingAmount,
resp_status,
resp_success
)
VALUES
(
:timestamp_sent,
:token_id,
:limit_price,
:size,
:side,
:order_type,
:post_only,
:resp_errorMsg,
:resp_orderID,
:resp_takingAmount,
:resp_makingAmount,
:resp_status,
:resp_success
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Functions ###
# @async_timeit
async def slope_decision() -> list[bool, str]:
hist_trades = np.array(POLY_BINANCE.get('hist_trades', []))
if ( np.max(hist_trades[:, 0] )*1000 ) - ( np.min(hist_trades[:, 0])*1000 ) < 5:
logging.info('Max - Min Trade In History is < 5 Seconds Apart')
return False, ''
last_px = POLY_BINANCE['value']
last_px_ts = POLY_BINANCE['timestamp_value']
ts_min_1_sec = last_px_ts - 1000
price_min_1_sec_index = (np.abs(hist_trades[:, 0] - ts_min_1_sec)).argmin()
price_min_1_sec = hist_trades[:, 1][price_min_1_sec_index]
ts_min_5_sec = last_px_ts - 5000
price_min_5_sec_index = (np.abs(hist_trades[:, 0] - ts_min_5_sec)).argmin()
price_min_5_sec = hist_trades[:, 1][price_min_5_sec_index]
slope = (last_px - price_min_1_sec) / price_min_1_sec
slope_5 = (last_px - price_min_5_sec) / price_min_5_sec
SLOPE_HIST.append(slope)
# print(f'Avg Binance: {np.mean(hist_trades[:, 1])}')
# print(f'Len Hist : {len(hist_trades[:, 1])}')
# print(f'First Hist : {pd.to_datetime(np.min(hist_trades[:, 0]), unit='ms')}')
# print(f'Latest Hist: {pd.to_datetime(np.max(hist_trades[:, 0]), unit='ms')}')
print(f'Slope Hist Avg: {np.mean(SLOPE_HIST):.4%}')
print(f'Slope Hist Max: {np.max(SLOPE_HIST):.4%}')
print(f'Slope Hist Std: {np.std(SLOPE_HIST):.4%}')
slope_1_buy = abs(slope) >= ( SLOPE_YES_THRESH / 100)
slope_5_buy = abs(slope_5) >= ( SLOPE_YES_THRESH / 100)
print(f'SLOPE_1: {slope:.4%} == {slope_1_buy}; SLOPE_5: {slope_5:.4%} == {slope_5_buy};')
### DECISION ###
if slope_1_buy and slope_5_buy:
print(f'🤑🤑🤑🤑🤑🤑🤑🤑🤑🤑 Slope: {slope_5:.4%};')
side = 'UP' if slope > 0.00 else 'DOWN'
return True, side
else:
return False, ''
# @async_timeit
async def cancel_all_orders(CLIENT):
logging.info('Attempting to Cancel All Orders')
cxl_resp = CLIENT.cancel_all()
if bool(cxl_resp.get('not_canceled', True)):
logging.warning(f'*** Cancel Request FAILED, trying again and shutting down: {cxl_resp}')
cxl_resp = CLIENT.cancel_all()
raise Exception('*** Cancel Request FAILED')
logging.info(f'Cancel Successful: {cxl_resp}')
# @async_timeit
async def cancel_single_order_by_id(CLIENT, order_id):
global LOCAL_ACTIVE_ORDERS
logging.info(f'Attempting to Cancel Single Order: {order_id}')
cxl_resp = CLIENT.cancel(order_id=order_id)
for idx, o in enumerate(LOCAL_ACTIVE_ORDERS):
if o.get('orderID') == order_id:
if bool(cxl_resp.get('not_canceled', True)):
if cxl_resp.get('not_canceled', {}).get(order_id, None) == "matched orders can't be canceled":
logging.info(f'Cancel request failed b/c already matched: {cxl_resp}')
return False
elif cxl_resp.get('not_canceled', {}).get(order_id, None) == "order can't be found - already canceled or matched":
logging.info(f'Cancel request failed b/c already matched or cancelled: {cxl_resp}')
LOCAL_ACTIVE_ORDERS.pop(idx)
return False
else:
logging.warning(f'*** Cancel Request FAILED, shutting down: {cxl_resp}')
raise Exception('*** Cancel Request FAILED - SHUTDONW')
else:
LOCAL_ACTIVE_ORDERS.pop(idx)
logging.info(f'Cancel Successful: {cxl_resp}')
return False
# @async_timeit
async def flatten_open_positions(CLIENT, token_id_up, token_id_down):
up = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_up)
down = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_down)
### Submit orders to flatten outstanding balances ###
if up > MIN_ORDER_SIZE:
logging.info(f'Flattening Up Position: {up}')
await post_order(
CLIENT = CLIENT,
tick_size = POLY_CLOB['tick_size'],
neg_risk = POLY_CLOB['neg_risk'],
OrderArgs_list = [Custom_OrderArgs(
token_id=token_id_up,
price=float(POLY_CLOB['price'])-0.01,
size=up,
side=SELL,
)]
)
if down > MIN_ORDER_SIZE:
logging.info(f'Flattening Down Position: {down}')
await post_order(
CLIENT = CLIENT,
tick_size = POLY_CLOB['tick_size'],
neg_risk = POLY_CLOB['neg_risk'],
OrderArgs_list = [Custom_OrderArgs(
token_id=token_id_down,
price=float(POLY_CLOB_DOWN['price'])-0.01,
size=down,
side=SELL,
)]
)
# @async_timeit
async def get_balance_by_token_id(CLIENT, token_id):
collateral = CLIENT.get_balance_allowance(
BalanceAllowanceParams(
asset_type='CONDITIONAL',
token_id=token_id,
)
)
return int(collateral['balance']) / 1_000_000
# @async_timeit
async def get_usde_balance(CLIENT):
collateral = CLIENT.get_balance_allowance(
BalanceAllowanceParams(
asset_type='COLLATERAL'
)
)
return int(collateral['balance']) / 1_000_000
@async_timeit
async def check_for_open_positions(CLIENT, token_id_up, token_id_down):
global LOCAL_TOKEN_BALANCES
if token_id_up is None or token_id_down is None:
logging.critical('Token Id is None, Exiting')
raise ValueError('Token Id is None, Exiting')
# return False
up = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_up)
down = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_down)
LOCAL_TOKEN_BALANCES = {
token_id_up: up if up else 0,
token_id_down: down if down else 0,
}
logging.info(f'LOCAL_TOKEN_BALANCES: {LOCAL_TOKEN_BALANCES}')
if ( abs(up) > 0 ) or ( abs(down) > 0 ):
return True
else:
return False
@async_timeit
async def post_order(CLIENT, OrderArgs_list: list[Custom_OrderArgs], tick_size: float | str, neg_risk: bool):
global LOCAL_ACTIVE_ORDERS
global LOCAL_TOKEN_BALANCES
orders = []
for oa in OrderArgs_list:
orders.append(
PostOrdersArgs(
order=CLIENT.create_order(
order_args=oa,
options=PartialCreateOrderOptions(
tick_size=str(tick_size),
neg_risk=neg_risk
),
),
orderType=OrderType.GTC,
postOnly=False,
),
)
### POST
response = CLIENT.post_orders(orders)
for idx, d in enumerate(response):
if d['errorMsg'] == '':
d['token_id'] = OrderArgs_list[idx].token_id
d['price'] = OrderArgs_list[idx].price
d['size'] = OrderArgs_list[idx].size
d['side'] = str(OrderArgs_list[idx].side).upper()
if d['status'].upper() =='MATCHED':
### Order Immediately Matched, Can Put in Offsetting Order Depending on State ###
print('******** ORDER APPEND TO LOCAL - MATCHED ********* ')
LOCAL_ACTIVE_ORDERS.append(d)
if d['status'].upper() == 'CONFIRMED':
current_balance = float(LOCAL_TOKEN_BALANCES.get(d['token_id'], 0.00))
if d['side'] == 'BUY':
size = float(d['size'])
else:
size = float(d['size']) * -1
LOCAL_TOKEN_BALANCES[d['token_id']] = current_balance + size
print('******** TRADE FILLED, BAL UPDATED ********* ')
else:
print('******** ORDER APPEND TO LOCAL - LIVE ********* ')
LOCAL_ACTIVE_ORDERS.append(d)
else:
raise ValueError(f'Order entry failed: {d}')
logging.info(f'Order Posted Resp: {response}')
print(f'Order Posted Resp: {response}')
### Routes ###
async def no_orders_no_positions_route():
global ORDER_LOCK
### Check for Price Bands ###
up_px = float(POLY_CLOB.get('price', 0))
down_px = float(POLY_CLOB_DOWN.get('price', 0))
if (up_px > MAX_ALLOWED_POLY_PX) or (down_px > MAX_ALLOWED_POLY_PX):
logging.info(f'Outside max allowed px: {MAX_ALLOWED_POLY_PX}')
return False
### Check for Index vs. Target Px ###
tgt_px = float(POLY_CLOB.get('target_price', 0))
ref_px = float(POLY_REF.get('value'))
tgt_px_diff_to_index = ( abs( tgt_px - ref_px ) / tgt_px)
if tgt_px_diff_to_index > (TGT_PX_INDEX_DIFF_THRESH / 100):
logging.info(f'Tgt Diff to Index Outside Limit ({TGT_PX_INDEX_DIFF_THRESH}%); Diff {tgt_px_diff_to_index:.4%}; Index: {ref_px:.2f}; Tgt: {tgt_px:.2f}')
return False
### Check Slope ###
slope_bool, slope_side = await slope_decision()
if not slope_bool:
logging.info('Failed Slope Check')
return False
token_id = POLY_CLOB.get('token_id_up', None) if slope_side=='UP' else POLY_CLOB.get('token_id_down', None)
### Order Entry ###
px = float(POLY_CLOB['price'])+0.01
order = Custom_OrderArgs(
token_id=token_id,
price=px,
size=DEFAULT_ORDER_SIZE,
side=BUY,
max_price = px + CHASE_TO_BUY_CENTS
)
### ADD CHECK FOR MKT MOVED AWAY FROM OPPORTNITY ###
if ORDER_LOCK:
logging.info(f'BUY ORDER BLOCKED BY LOCK: {order}')
else:
logging.info(f'Attempting BUY Order {order}')
await post_order(
CLIENT = CLIENT,
tick_size = POLY_CLOB['tick_size'],
neg_risk = POLY_CLOB['neg_risk'],
OrderArgs_list = [order]
)
# ORDER_LOCK = ORDER_LOCK + 1
async def active_orders_no_positions_route():
if len(LOCAL_ACTIVE_ORDERS) > 2:
logging.critical('More than two active orders, shutting down')
await kill_algo()
b_c = 0
s_c = 0
for o in LOCAL_ACTIVE_ORDERS:
if o['side'] == 'BUY':
b_c = b_c + 1
elif o['side'] == 'SELL':
s_c = s_c + 1
if (b_c > 1) or (s_c > 1):
logging.critical(f'More than one active buy or more than one active sell: b_c {b_c}; s_c{s_c}')
await kill_algo()
for o in LOCAL_ACTIVE_ORDERS:
logging.info(f'Working on order ({o['side']}): {o['orderID']}')
if o.get('status').upper() == 'MATCHED':
logging.info('Order is matched, awaiting confirm or kickback')
elif o.get('status').upper() == 'FAILED':
raise ValueError(f'Trade FAILED after matching: {o}')
else:
orig_px = float(o['price'])
orig_size = float(o['size'])
if o['side'] == 'BUY':
if POLY_CLOB['token_id_up'] == o['token_id']:
clob_px = float(POLY_CLOB['price'])
else:
clob_px = float(POLY_CLOB_DOWN['price'])
if clob_px >= orig_px:
logging.info(f"Market px: ({clob_px} is above buy order px: {orig_px:.2f})")
if o.get('max_price', 0) > clob_px:
logging.info(f"Market px: ({clob_px} has moved too far away from original target, cancelling and resetting algo: {o.get('max_price', 0) :.2f})")
order_matched = await cancel_single_order_by_id(CLIENT=CLIENT, order_id=o['orderID'])
if order_matched:
o['status'] = 'MATCHED'
else:
px = orig_px+0.01
await post_order(
CLIENT = CLIENT,
tick_size = POLY_CLOB['tick_size'],
neg_risk = POLY_CLOB['neg_risk'],
OrderArgs_list = [Custom_OrderArgs(
token_id=o['token_id'],
price=px,
size=orig_size,
side=BUY,
max_price=o['max_price']
)]
)
else:
await cancel_single_order_by_id(CLIENT=CLIENT, order_id=o['orderID'])
elif o['side'] == 'SELL':
if POLY_CLOB['token_id_up'] == o['token_id']:
clob_px = float(POLY_CLOB['price'])
else:
clob_px = float(POLY_CLOB_DOWN['price'])
if clob_px <= orig_px:
logging.info(f"Market px: ({clob_px} is below sell order px: {orig_px:.2f})")
order_filled = await cancel_single_order_by_id(CLIENT=CLIENT, order_id=o['orderID'])
if not order_filled:
await post_order(
CLIENT = CLIENT,
tick_size = POLY_CLOB['tick_size'],
neg_risk = POLY_CLOB['neg_risk'],
OrderArgs_list = [Custom_OrderArgs(
token_id=o['token_id'],
price=orig_px-0.01,
size=orig_size,
side=SELL,
max_price = 0.00
)]
)
async def no_orders_active_positions_route():
'''
Succesful Buy, now neeed to take profit and exit
'''
global LOCAL_TOKEN_BALANCES
OrderArgs_list = []
logging.warning(f'LOCAL_TOKEN_BALANCES: {LOCAL_TOKEN_BALANCES}')
for k, v in LOCAL_TOKEN_BALANCES.items():
size = await get_balance_by_token_id(CLIENT=CLIENT, token_id=k)
if size >= MIN_ORDER_SIZE:
if POLY_CLOB['token_id_up'] == k:
clob_px = float(POLY_CLOB['price'])
else:
clob_px = float(POLY_CLOB_DOWN['price'])
OrderArgs_list.append(
Custom_OrderArgs(
token_id=k,
price=clob_px + TGT_PROFIT_CENTS,
size=size,
side='SELL',
)
)
else:
LOCAL_TOKEN_BALANCES[k] = 0.00
logging.info(f'Wants to flatten small amount, skipping: {v}')
if OrderArgs_list:
logging.info(f'Posting orders to close: {OrderArgs_list}')
await post_order(
CLIENT = CLIENT,
tick_size = POLY_CLOB['tick_size'],
neg_risk = POLY_CLOB['neg_risk'],
OrderArgs_list = OrderArgs_list
)
async def active_orders_active_positions_route():
pass
async def kill_algo():
logging.info('Killing algo...')
await cancel_all_orders(CLIENT=CLIENT)
await flatten_open_positions(
CLIENT=CLIENT,
token_id_up = POLY_CLOB.get('token_id_up', None),
token_id_down = POLY_CLOB.get('token_id_down', None),
)
logging.info('...algo killed')
raise Exception('Algo Killed')
async def run_algo():
global POLY_BINANCE
global POLY_REF
global POLY_CLOB
global POLY_CLOB_DOWN
global USER_TRADES
global USER_ORDERS
global SLOPE_HIST
global ACTIVE_BALANCES_EXIST
global LOCAL_ACTIVE_ORDERS
global LOCAL_TOKEN_BALANCES
# global LOCAL_ACTIVE_POSITIONS
print(f'token_id_up: {POLY_CLOB.get('token_id_up', None)}')
print(f'token_id_down: {POLY_CLOB.get('token_id_down', None)}')
POLY_CLOB = json.loads(VAL_KEY.get('poly_5min_btcusd'))
ACTIVE_BALANCES_EXIST = await check_for_open_positions(
CLIENT=CLIENT,
token_id_up=POLY_CLOB.get('token_id_up', None),
token_id_down=POLY_CLOB.get('token_id_down', None),
)
try:
while True:
loop_start = time.time()
print('__________Start___________')
POLY_BINANCE = json.loads(VAL_KEY.get('poly_binance_btcusd'))
POLY_REF = json.loads(VAL_KEY.get('poly_rtds_cl_btcusd'))
POLY_CLOB = json.loads(VAL_KEY.get('poly_5min_btcusd'))
POLY_CLOB_DOWN = json.loads(VAL_KEY.get('poly_5min_btcusd_down'))
USER_TRADES = json.loads(VAL_KEY.get('poly_user_trades'))
USER_ORDERS = VAL_KEY.get('poly_user_orders')
USER_ORDERS = json.loads(USER_ORDERS) if USER_ORDERS is not None else []
### CHANGE METHOD FROM BUY-SELL TO BUY UP - BUY DOWN
### DO THIS TO AVOID DELAY WITH FILL CONFIRMS
### Manage Local vs User Stream Orders ###
print(f'LOCAL_ACTIVE_ORDERS: {LOCAL_ACTIVE_ORDERS}')
for idx, o in enumerate(LOCAL_ACTIVE_ORDERS):
user_order = next((item for item in USER_ORDERS if item["id"] == o['orderID']), None)
user_trade = next( ( item for item in USER_TRADES if ( o['orderID'] == item['taker_order_id'] ) or ( o["orderID"] == json.loads(item['maker_orders'])[0]['order_id'] ) ), None )
print(f'USER TRADE: {user_trade}')
if user_trade is not None:
trade_status = str(user_trade['status']).upper()
logging.info(f'Updated Trade Status: {o['status']} --> {trade_status}; {o['orderID']}')
if trade_status == 'CONFIRMED':
LOCAL_ACTIVE_ORDERS.pop(idx)
token_id = user_trade['asset_id']
current_balance = float(LOCAL_TOKEN_BALANCES.get(token_id, 0.00))
if user_trade['side'] == 'BUY':
size = float(user_trade['size'])
else:
size = float(user_trade['size']) * -1
LOCAL_TOKEN_BALANCES[token_id] = current_balance + size
# px = user_trade['price']
# LOCAL_ACTIVE_POSITIONS.append({
# 'token_id': token_id,
# 'order_id': o['orderID'],
# 'associate_trades': user_order['associate_trades'],
# 'size_matched': user_order['size_matched'],
# 'price': px,
# 'timestamp_value': user_order['timestamp'],
# })
logging.info('Order FILLED!')
elif trade_status == 'MATCHED':
logging.info(f'Order Matched...awaiting confirm: {trade_status}')
else:
logging.info(f'Trade status but not filled: trade= {user_trade}; order={o}')
elif user_order is not None:
order_status = str(user_order['status']).upper()
logging.info(f'Updated Order Status: {o['status']} --> {order_status}; {o['orderID']}')
# LOCAL_ACTIVE_ORDERS[idx]['status'] = order_status
# if order_status == 'MATCHED':
# LOCAL_ACTIVE_ORDERS.pop(idx)
# token_id = user_order['asset_id']
# current_balance = float(LOCAL_TOKEN_BALANCES.get(token_id, 0.00))
# if user_order['side'] == 'BUY':
# size = float(user_order['size_matched'])
# else:
# size = float(user_order['size_matched']) * -1
# LOCAL_TOKEN_BALANCES[token_id] = current_balance + size
# # px = user_order['price']
# # LOCAL_ACTIVE_POSITIONS.append({
# # 'token_id': token_id,
# # 'order_id': o['orderID'],
# # 'associate_trades': user_order['associate_trades'],
# # 'size_matched': user_order['size_matched'],
# # 'price': px,
# # 'timestamp_value': user_order['timestamp'],
# # })
# logging.info('Order FILLED!')
if order_status == 'CANCELED':
LOCAL_ACTIVE_ORDERS.pop(idx)
logging.info('Order Canceled')
else:
logging.info('Order Live or Trade Awaiting Confirm')
### UPDATES CAN COME THRU EITHER ORDER OR TRADE CHANNELS - NEED TO UPDATE TO HANDLE TRADE CHANNLE
token_id_up = POLY_CLOB.get('token_id_up', 0)
token_id_down = POLY_CLOB.get('token_id_down', 0)
if (token_id_up is None) or (token_id_down is None):
print('Missing Token Ids for Market, sleeping 1 sec and retrying...')
time.sleep(1)
ACTIVE_BALANCES_EXIST = {}
continue
else:
if (LOCAL_TOKEN_BALANCES.get(token_id_up) is None):
LOCAL_TOKEN_BALANCES[token_id_up] = 0.00
if (LOCAL_TOKEN_BALANCES.get(token_id_down) is None):
LOCAL_TOKEN_BALANCES[token_id_down] = 0.00
ACTIVE_BALANCES_EXIST = (LOCAL_TOKEN_BALANCES.get(token_id_up) > 0) or (LOCAL_TOKEN_BALANCES.get(token_id_down) > 0)
### Check for Endtime Buffer ###
if ENDTIME_BUFFER_SEC > POLY_CLOB.get('sec_remaining', 0):
if LOCAL_ACTIVE_ORDERS:
print('buffer zone - orders cancel')
await cancel_all_orders(CLIENT=CLIENT)
if ACTIVE_BALANCES_EXIST:
print('buffer zone - flatten positions')
await flatten_open_positions(
CLIENT=CLIENT,
token_id_up = POLY_CLOB.get('token_id_up', None),
token_id_down = POLY_CLOB.get('token_id_down', None),
)
print('buffer zone, sleeping until next session')
time.sleep(1)
continue
### Execution Route ###
if not(LOCAL_ACTIVE_ORDERS) and not(ACTIVE_BALANCES_EXIST): # No Orders, No Positions
print('ROUTE: no_orders_no_positions_route')
await no_orders_no_positions_route()
### Open Orders Route ###
elif LOCAL_ACTIVE_ORDERS and not(ACTIVE_BALANCES_EXIST): # Orders, No Positions
print('ROUTE: active_orders_no_positions_route')
await active_orders_no_positions_route()
### Open Positions Route ###
elif not(LOCAL_ACTIVE_ORDERS) and ACTIVE_BALANCES_EXIST: # No Orders, Positions
print('ROUTE: no_orders_no_positions_route')
await no_orders_active_positions_route()
### Open Orders and Open Positions Route ###
else:
print('ROUTE: active_orders_active_positions_route')
await active_orders_no_positions_route() # Orders and Positions
print(f'__________________________ (Algo Engine ms: {(time.time() - loop_start)*1000})')
time.sleep(1)
except KeyboardInterrupt:
print('...algo stopped')
await cancel_all_orders(CLIENT=CLIENT)
except Exception as e:
logging.critical(f'*** ALGO ENGINE CRASHED: {e}')
logging.error(traceback.format_exc())
await cancel_all_orders(CLIENT=CLIENT)
async def main():
global CLIENT
global VAL_KEY
global CON
CLIENT = api.create_client()
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_executions_orders_table(CON=CON)
await run_algo()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
asyncio.run(main())

117
ng.py Normal file
View File

@@ -0,0 +1,117 @@
import os
from nicegui import ui, app
from sqlalchemy import create_engine
# import requests
import json
# import time
# import re
import valkey
# import asyncio
# import datetime as dt
# from random import random
# from nicegui_modules import data
# from nicegui_modules import ui_components
# from glide import GlideClient, NodeAddress, GlideClientConfiguration
LISTENING_CLIENT = None
LH_PAIR = 'BTC'
RH_PAIR = 'USD'
DEFAULT_TO_DARKMODE: bool = True
ALLOW_BODY_SCROLL: bool = True
LOOKBACK: int = 60
LOOKBACK_RT_TV_MAX_POINTS: int = 300
REFRESH_INTERVAL_SEC: int = 10
REFRESH_INTERVAL_RT_SEC: int = 1/30
ENGINE = create_engine('mysql+pymysql://root:pwd@localhost/polymarket')
VALKEY_R = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
# VALKEY_P = VALKEY_R.pubsub()
# VALKEY_P.subscribe('mexc_mkt_bookTicker')
def root():
app.add_static_files(max_cache_age=0, url_path='/static', local_directory=os.path.join(os.path.dirname(__file__), 'nicegui_modules/static'))
ui.add_head_html('''
<meta name="darkreader-lock">
<link rel="stylesheet" type="text/css" href="/static/styles.css">
<script type="text/javascript" src="https://unpkg.com/lightweight-charts/dist/lightweight-charts.standalone.production.js"></script>
<script src="/static/script.js"></script>
'''
)
# ui.add_head_html('<meta name="darkreader-lock">')
update_body_scroll(bool_override=ALLOW_BODY_SCROLL)
ui.sub_pages({
'/': rt_chart_page,
}).classes('w-full')
async def update_tv():
series_update = json.loads(VALKEY_R.get('poly_rtds_cl_btcusd'))
series_update_b = json.loads(VALKEY_R.get('poly_binance_btcusd'))
series_update_c = json.loads(VALKEY_R.get('poly_5min_btcusd'))
timestamp = round( ( series_update['timestamp_arrival'] / 1000 ) , 2)
timestamp_b = round( ( series_update_b['timestamp_arrival'] / 1000 ) , 2)
timestamp_c = round( ( series_update_c['timestamp_arrival'] / 1000 ) , 2)
value = float(series_update['value'])
value_b = float(series_update_b['value'])
value_c = float(series_update_c['price'])
data_dict = {
'timestamp': timestamp,
'timestamp_b': timestamp_b,
'timestamp_c': timestamp_c,
'value': value,
'value_b': value_b,
'value_c': value_c,
'target': series_update_c['target_price'],
'LOOKBACK_RT_TV_MAX_POINTS': LOOKBACK_RT_TV_MAX_POINTS,
}
ui.run_javascript(f'await update_tv(data_dict={data_dict});')
def update_body_scroll(e=None, bool_override=False):
if e is None:
if bool_override:
ui.query('body').style('height: 100%; overflow-y: auto;')
else:
ui.query('body').style('height: 100%; overflow-y: hidden;')
else:
if e.value:
ui.query('body').style('height: 100%; overflow-y: auto;')
else:
ui.query('body').style('height: 100%; overflow-y: hidden;')
# async def refresh_lookback_funcs(lookback: int = LOOKBACK):
# lookback = app.storage.user.get('lookback', lookback)
# await data.trades_pnl_graph.refresh(ENGINE=ENGINE, LH_PAIR=LH_PAIR, RH_PAIR=RH_PAIR, lookback=lookback)
# await ui_components.er_table.refresh(ENGINE=ENGINE, lookback=lookback)
# await ui_components.trades_table.refresh(ENGINE=ENGINE, lookback=lookback)
# await ui_components.er_stats.refresh(ENGINE=ENGINE, lookback=lookback)
async def rt_chart_page():
global LOOKBACK
LOOKBACK = app.storage.user.get('lookback', LOOKBACK)
timer = ui.timer(REFRESH_INTERVAL_RT_SEC, update_tv)
with ui.row():
with ui.column():
ui.switch('☸︎', value=ALLOW_BODY_SCROLL, on_change=lambda e: update_body_scroll(e))
with ui.column():
ui.switch('▶️', value=True).bind_value_to(timer, 'active')
with ui.column().style('position: absolute; right: 20px; font-family: monospace; align-self: center;'):
ui.label('Atwater Trading: Orderbook')
with ui.grid(columns=16).classes('w-full gap-0 auto-fit'):
with ui.card().tight().classes('w-full col-span-full no-shadow border border-black-200').style('overflow: auto;'):
ui.html('<div id="tv" style="width:100%; height:800px;"></div>', sanitize=False).classes('w-full')
ui.run_javascript('await create_tv();')
ui.run(root, storage_secret="123ABC", reload=True, dark=True, title='Atwater Trading')

19
ng/Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ng.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

View File

@@ -0,0 +1,223 @@
async function waitForVariable(variableName, timeout = 5000) {
const startTime = Date.now();
while (typeof window[variableName] === 'undefined') {
if (Date.now() - startTime > timeout) {
throw new Error(`Variable '${variableName}' not defined within ${timeout}ms`);
}
await new Promise(resolve => setTimeout(resolve, 100));
}
console.log(`Variable '${variableName}' is now defined.`);
}
async function update_tv(data_dict) {
window.data.push({ time: data_dict.timestamp, value: data_dict.value });
window.data_b.push({ time: data_dict.timestamp_b, value: data_dict.value_b });
window.data_c.push({ time: data_dict.timestamp_c, value: data_dict.value_c });
window.data_tgt.push({ time: data_dict.timestamp_c, value: data_dict.target });
window.lineSeries.update({ time: data_dict.timestamp, value: data_dict.value });
window.lineSeries_b.update({ time: data_dict.timestamp_b, value: data_dict.value_b });
window.lineSeries_c.update({ time: data_dict.timestamp_c, value: data_dict.value_c });
window.lineSeries_tgt.update({ time: data_dict.timestamp_c, value: data_dict.target });
// midPriceLine.applyOptions({
// price: data_dict.mid_px,
// color: '#c78228',
// lineWidth: 3,
// lineStyle: LightweightCharts.LineStyle.Dashed,
// axisLabelVisible: true,
// });
window.chart.timeScale().scrollToRealTime();
// const currentRange = window.chart.timeScale().getVisibleLogicalRange();
// window.chart.timeScale().fitContent();
// window.chart.timeScale().setVisibleLogicalRange(currentRange);
const MAX_DATA_POINTS = data_dict.LOOKBACK_RT_TV_MAX_POINTS;
if (window.lineSeries.data().length > MAX_DATA_POINTS) {
window.lineSeries.setData(lineSeries.data().slice(-MAX_DATA_POINTS));
}
if (window.lineSeries_b.data().length > MAX_DATA_POINTS) {
window.lineSeries_b.setData(lineSeries_b.data().slice(-MAX_DATA_POINTS));
}
if (window.lineSeries_c.data().length > MAX_DATA_POINTS) {
window.lineSeries_c.setData(lineSeries_c.data().slice(-MAX_DATA_POINTS));
}
if (window.lineSeries_tgt.data().length > MAX_DATA_POINTS) {
window.lineSeries_tgt.setData(lineSeries_tgt.data().slice(-MAX_DATA_POINTS));
}
};
async function create_tv() {
window.chart = LightweightCharts.createChart(document.getElementById('tv'),
{
autoSize: true,
toolbox: true,
timeScale: {
timeVisible: true, // Shows HH:mm on x-axis
secondsVisible: true // Optional: show seconds
},
rightPriceScale: {
visible: true,
autoScale: true
},
leftPriceScale: {
visible: true
},
layout: {
background: { type: 'solid', color: '#222' },
textColor: '#DDD',
},
grid: {
vertLines: {
color: '#e1e1e1', // Set vertical line color
visible: true,
style: 2, // 0: Solid, 1: Dashed, 2: Dotted, 3: LargeDashed, 4: SparseDotted
},
horzLines: {
color: '#e1e1e1', // Set horizontal line color
visible: true,
style: 2,
},
},
crosshair: { mode: LightweightCharts.CrosshairMode.Normal },
}
);
window.lineSeries = chart.addSeries(LightweightCharts.LineSeries, {
color: '#94fcdf',
priceScaleId: 'right'
// topColor: '#94fcdf',
// bottomColor: 'rgba(112, 171, 249, 0.28)',
// invertFilledArea: false
});
window.lineSeries_b = chart.addSeries(LightweightCharts.LineSeries, {
color: '#dd7525',
priceScaleId: 'right'
// topColor: '#94fcdf',
// bottomColor: 'rgba(112, 171, 249, 0.28)',
// invertFilledArea: false
});
window.lineSeries_c = chart.addSeries(LightweightCharts.LineSeries, {
color: '#ea0707',
priceScaleId: 'left',
autoscaleInfoProvider: () => ({
priceRange: {
minValue: 0.0,
maxValue: 1.0
}
})
// topColor: '#94fcdf',
// bottomColor: 'rgba(112, 171, 249, 0.28)',
// invertFilledArea: false
});
window.lineSeries_tgt = chart.addSeries(LightweightCharts.LineSeries, {
color: '#ffffff',
priceScaleId: 'right',
lineStyle: LightweightCharts.LineStyle.Dashed
// topColor: '#94fcdf',
// bottomColor: 'rgba(112, 171, 249, 0.28)',
// invertFilledArea: false
});
// window.midPriceLine_Config = {
// price: 0,
// color: '#c78228',
// lineWidth: 3,
// lineStyle: LightweightCharts.LineStyle.Dashed,
// axisLabelVisible: false,
// };
// window.midPriceLine = window.lineSeries.createPriceLine(midPriceLine_Config);
window.data = [];
window.data_b = [];
window.data_c = [];
window.data_tgt = [];
window.lineSeries.setData(window.data);
window.lineSeries_b.setData(window.data_b);
window.lineSeries_c.setData(window.data_c);
window.lineSeries_tgt.setData(window.data_tgt);
// Create and style the tooltip html element
const container = document.getElementById('tv');
window.toolTipWidth = 200;
const toolTip = document.createElement('div');
toolTip.style = `width: ${window.toolTipWidth}px; height: 100%; position: absolute; display: none; padding: 8px; box-sizing: border-box; font-size: 12px; text-align: left; z-index: 1000; top: 12px; left: 12px; pointer-events: none; border-radius: 4px 4px 0px 0px; border-bottom: none; box-shadow: 0 2px 5px 0 rgba(117, 134, 150, 0.45);font-family: -apple-system, BlinkMacSystemFont, 'Trebuchet MS', Roboto, Ubuntu, sans-serif; -webkit-font-smoothing: antialiased; -moz-osx-font-smoothing: grayscale;`;
toolTip.style.background = `rgba(${'0, 0, 0'}, 0.25)`;
toolTip.style.color = 'white';
toolTip.style.borderColor = 'rgba( 239, 83, 80, 1)';
container.appendChild(toolTip);
// update tooltip
window.chart.subscribeCrosshairMove(async param => {
if (
param.point === undefined ||
!param.time ||
param.point.x < 0 ||
param.point.x > container.clientWidth ||
param.point.y < 0 ||
param.point.y > container.clientHeight
) {
toolTip.style.display = 'none';
} else {
// toolTip.style.height = '100%';
toolTip.style.alignContent = 'center';
const dateStr = new Date(param.time*1000).toISOString();
let data = await param.seriesData.get(window.lineSeries);
if (data === undefined) {
data = {}
data.value = 0
console.log('data is UNDEFINED, SETTING TO 0')
};
let data_b = await param.seriesData.get(window.lineSeries_b);
if (data_b === undefined) {
data_b = {}
data_b.value = 0
console.log('data is UNDEFINED, SETTING TO 0')
};
const value_px = data.value
const value_px_b = window.data_b.value
const value_px_c = window.data_c.value
const value_px_tgt = window.data_tgt.value
toolTip.style.display = 'block';
// <div style="color: ${'rgba( 239, 83, 80, 1)'}">
// Atwater Trading
// </div>
toolTip.innerHTML = `
<div style="font-size: 24px; margin: 4px 0px; color: ${'white'}">
Chainlink: ${Math.round(100 * value_px) / 100}
Binance: ${Math.round(100 * value_px_b) / 100}
</div>
<div style="color: ${'white'}">
${dateStr}
</div>
`;
let left = param.point.x; // relative to timeScale
const timeScaleWidth = chart.timeScale().width();
const priceScaleWidth = chart.priceScale('left').width();
const halfTooltipWidth = toolTipWidth / 2;
left += priceScaleWidth - halfTooltipWidth;
left = Math.min(left, priceScaleWidth + timeScaleWidth - toolTipWidth);
left = Math.max(left, priceScaleWidth);
toolTip.style.left = left + 'px';
toolTip.style.top = 0 + 'px';
}
});
window.chart.timeScale().fitContent();
console.log("TV Created!")
};

View File

@@ -0,0 +1,33 @@
/* Sticky Quasar Table for Dark Mode */
.table-sticky-dark .q-table__top,
.table-sticky-dark .q-table__bottom,
.table-sticky-dark thead tr:first-child th {
background-color: black;
}
.table-sticky-dark thead tr th {
position: sticky;
z-index: 1;
}
.table-sticky-dark thead tr:first-child th {
top: 0;
}
.table-sticky-dark tbody {
scroll-margin-top: 48px;
}
/* Sticky Quasar Table for Light Mode */
/* .table-sticky-light .q-table__top,
.table-sticky-light .q-table__bottom,
.table-sticky-light thead tr:first-child th {
background-color: rgb(229, 223, 223);
}
.table-sticky-light thead tr th {
position: sticky;
z-index: 1;
}
.table-sticky-light thead tr:first-child th {
top: 0;
}
.table-sticky-light tbody {
scroll-margin-top: 48px;
} */

File diff suppressed because it is too large Load Diff

22
requirements.txt Normal file
View File

@@ -0,0 +1,22 @@
pandas
rel
websockets
pyarrow
plotly
mysql-connector-python
sqlalchemy
requests
pymysql
scipy
asyncmy
cryptography
TA-Lib
valkey
nicegui
py_clob_client
# google
# google-api-core==2.30.0
# google-api-python-client==2.190.0
# googleapis-common-protos==1.72.0
# grpcio==1.76.0
# grpcio-tools==1.76.0

63
test.py Normal file
View File

@@ -0,0 +1,63 @@
import asyncio
import json
import websockets
import time
# Credentials
API_KEY = "019d2ad3-3755-744b-ace8-ad0f08c958dd"
API_SECRET = "vXT1UeliaP89z9vcxDtdv47422mftijJkrJYE7CFqvA="
API_PASSPHRASE = "57e703b801f22333d1a66a48c3a71773d3d3a42825ddcf330c3325856bc99756"
WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/user"
async def heartbeat(websocket):
"""Sends a heartbeat every 10 seconds to keep the connection alive."""
while True:
try:
await asyncio.sleep(10)
await websocket.send(json.dumps({}))
# print("Heartbeat sent")
except Exception:
break
async def connect_polymarket_user_ws():
while True: # Outer loop for reconnection
try:
async with websockets.connect(WS_URL) as websocket:
subscribe_message = {
"type": "user",
"auth": {
"apiKey": API_KEY,
"secret": API_SECRET,
"passphrase": API_PASSPHRASE
},
"markets": []
}
await websocket.send(json.dumps(subscribe_message))
print(f"[{time.strftime('%H:%M:%S')}] Subscription sent...")
# Start the heartbeat task in the background
heartbeat_task = asyncio.create_task(heartbeat(websocket))
async for message in websocket:
data = json.loads(message)
# Log the specific reason if it's an error message
if data.get("type") == "error":
print(f"Server Error: {data.get('message')}")
break
if data: # Ignore empty heartbeat responses from server
print(f"Update: {data}")
heartbeat_task.cancel()
except Exception as e:
print(f"Connection lost: {e}. Retrying in 5s...")
await asyncio.sleep(5)
if __name__ == "__main__":
try:
asyncio.run(connect_polymarket_user_ws())
except KeyboardInterrupt:
print("Stopped by user.")

150
ws.py
View File

@@ -1,150 +0,0 @@
import asyncio
import json
import math
import pandas as pd
import os
from datetime import datetime, timezone
import websockets
import numpy as np
import talib
import requests
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
SLUG_END_TIME = 0
HIST_TRADES = np.empty((0, 2))
def format_timestamp(total_seconds) -> str:
minutes, seconds = divmod(total_seconds, 60)
return f"{minutes} minutes and {seconds} seconds"
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
interval_secs = interval_mins * 60
seconds = dt.timestamp()
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
return rounded_seconds
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
market = r.json()['markets'][0]
token_ids = json.loads(market.get("clobTokenIds", "[]"))
outcomes = json.loads(market.get("outcomes", "[]"))
d = dict(zip(outcomes, token_ids))
d['isActive'] = market['negRisk']
d['MinTickSize'] = market['orderPriceMinTickSize']
d['isNegRisk'] = market['negRisk']
d['ConditionId'] = market['conditionId']
d['EndDateTime'] = market['endDate']
return d, market
def gen_slug():
slug_prefix = 'btc-updown-5m-'
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
return slug_prefix + str(slug_time_id)
async def polymarket_stream():
global SLUG_END_TIME
global HIST_TRADES
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
TARGET_ASSET_ID = market_details['Up']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
async with websockets.connect(WSS_URL) as websocket:
print(f"Connected to {WSS_URL}")
subscribe_msg = {
"assets_ids": [TARGET_ASSET_ID],
"type": "market",
"custom_feature_enabled": True
}
await websocket.send(json.dumps(subscribe_msg))
print(f"Subscribed to Asset: {TARGET_ASSET_ID}")
try:
async for message in websocket:
current_ts = round(datetime.now().timestamp())
sec_remaining = SLUG_END_TIME - current_ts
if sec_remaining <= 0:
HIST_TRADES = np.empty((0, 2))
print('*** Attempting to unsub from past 5min')
update_unsub_msg = {
"operation": 'unsubscribe',
"assets_ids": [TARGET_ASSET_ID],
"custom_feature_enabled": True
}
await websocket.send(json.dumps(update_unsub_msg))
print('*** Attempting to SUB to new 5min')
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
TARGET_ASSET_ID = market_details['Up']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
update_sub_msg = {
"operation": 'subscribe',
"assets_ids": [TARGET_ASSET_ID],
"custom_feature_enabled": True
}
await websocket.send(json.dumps(update_sub_msg))
if isinstance(message, str):
data = json.loads(message)
if isinstance(data, dict):
# print(data.get("event_type", None))
pass
elif isinstance(data, list):
print('initial book: ')
print(data)
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
event_type = data.get("event_type", None)
if event_type == "price_change":
# print("📈 Price Change")
# print(pd.DataFrame(data['price_changes']))
pass
elif event_type == "best_bid_ask":
# print(pd.DataFrame([data]))
pass
elif event_type == "last_trade_price":
px = float(data['price'])
qty = float(data['size'])
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
elif event_type == "book":
pass
elif event_type == "new_market":
print('Received new_market')
elif event_type == "market_resolved":
print(f"Received: {event_type}")
print(data)
elif event_type == "tick_size_change": # may want for CLOB order routing
print(f"Received: {event_type}")
print(data)
else:
print(f"Received: {event_type}")
print(data)
except websockets.ConnectionClosed:
print("Connection closed by server.")
if __name__ == '__main__':
try:
asyncio.run(polymarket_stream())
except KeyboardInterrupt:
print("Stream stopped.")

213
ws_binance.py Normal file
View File

@@ -0,0 +1,213 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_binance_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_Binance_Trades.log'
### Globals ###
WSS_URL = "wss://stream.binance.com:9443/ws/BTCUSDT@aggTrade"
HIST_TRADES = np.empty((0, 3))
HIST_TRADES_LOOKBACK_SEC = 5
### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
qty DOUBLE
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_arrival: int,
timestamp_msg: int,
timestamp_value: int,
value: float,
qty: float,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_arrival': timestamp_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
'qty': qty,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO binance_btcusd_trades
(
timestamp_arrival,
timestamp_msg,
timestamp_value,
value,
qty
)
VALUES
(
:timestamp_arrival,
:timestamp_msg,
:timestamp_value,
:value,
:qty
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def binance_trades_stream():
global HIST_TRADES
async for websocket in websockets.connect(WSS_URL):
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
"method": "SUBSCRIBE",
"params": ["btcusdt@aggTrade"],
"id": 1
}
await websocket.send(json.dumps(subscribe_msg))
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
if data.get('T', None) is not None:
timestamp_msg = data['E']
timestamp_value = data['T']
last_px = float(data['p'])
qty = float(data['q'])
# print(f'🤑 BTC Binance Last Px: {last_px:_.4f}; TS: {pd.to_datetime(data['T'], unit='ms')}')
HIST_TRADES = np.append(HIST_TRADES, np.array([[timestamp_value, last_px, qty]]), axis=0)
hist_trades_lookback_ts_ms = round(datetime.now().timestamp() - HIST_TRADES_LOOKBACK_SEC)*1000
HIST_TRADES = HIST_TRADES[HIST_TRADES[:, 0] >= hist_trades_lookback_ts_ms]
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': last_px,
'qty': qty,
'hist_trades': HIST_TRADES.tolist()
})
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
await insert_rtds_btcusd_table(
CON=CON,
timestamp_arrival=ts_arrival,
timestamp_msg=timestamp_msg,
timestamp_value=timestamp_value,
value=last_px,
qty=qty,
)
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_rtds_btcusd_table(CON=CON)
await binance_trades_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await binance_trades_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

19
ws_binance/Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_binance.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

378
ws_clob.py Normal file
View File

@@ -0,0 +1,378 @@
import asyncio
import json
import math
import logging
import pandas as pd
import os
from datetime import datetime, timezone
import websockets
import numpy as np
import talib
import requests
from typing import AsyncContextManager
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
import valkey
import os
from dotenv import load_dotenv
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_5min_btcusd'
VK_CHANNEL_DOWN = 'poly_5min_btcusd_down'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_5min.log'
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
SLUG_END_TIME = 0
HIST_TRADES = np.empty((0, 2))
HIST_TRADES_DOWN = np.empty((0, 2))
MIN_TICK_SIZE = 0.01
NEG_RISK = False
TARGET_PX = 0
TARGET_ASSET_ID = None
TARGET_ASSET_ID_DOWN = None
def format_timestamp(total_seconds) -> str:
minutes, seconds = divmod(total_seconds, 60)
return f"{minutes} minutes and {seconds} seconds"
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
interval_secs = interval_mins * 60
seconds = dt.timestamp()
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
return rounded_seconds
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
market = r.json()['markets'][0]
token_ids = json.loads(market.get("clobTokenIds", "[]"))
outcomes = json.loads(market.get("outcomes", "[]"))
d = dict(zip(outcomes, token_ids))
d['isActive'] = market['negRisk']
d['MinTickSize'] = market['orderPriceMinTickSize']
d['OrderMinSize'] = market['orderMinSize']
d['isNegRisk'] = market['negRisk']
d['ConditionId'] = market['conditionId']
d['EndDateTime'] = market['endDate']
# d['Liquidity'] = market['liquidity']
d['LiquidityClob'] = market['liquidityClob']
d['VolumeNum'] = market['volumeNum']
d['Volume24hr'] = market['volume24hr']
print(market)
return d, market
def gen_slug():
slug_prefix = 'btc-updown-5m-'
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
return slug_prefix + str(slug_time_id)
### Database Funcs ###
async def create_poly_btcusd_trades_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: poly_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS poly_btcusd_trades (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
price DOUBLE,
qty DOUBLE,
side_taker VARCHAR(8),
up_or_down VARCHAR(8)
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_poly_btcusd_trades_table(
timestamp_arrival: int,
timestamp_msg: int,
timestamp_value: int,
price: float,
qty: float,
side_taker: str,
up_or_down: str,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_arrival': timestamp_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'price': price,
'qty': qty,
'side_taker': side_taker,
'up_or_down': up_or_down,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO poly_btcusd_trades
(
timestamp_arrival,
timestamp_msg,
timestamp_value,
price,
qty,
side_taker,
up_or_down
)
VALUES
(
:timestamp_arrival,
:timestamp_msg,
:timestamp_value,
:price,
:qty,
:side_taker,
:up_or_down
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def polymarket_stream():
global SLUG_END_TIME
global TARGET_PX
global HIST_TRADES
global HIST_TRADES_DOWN
global MIN_TICK_SIZE
global NEG_RISK
global TARGET_ASSET_ID
global TARGET_ASSET_ID_DOWN
slug_full = gen_slug()
market_details, _ = get_mkt_details_by_slug(slug_full)
CONDITION_ID = market_details['ConditionId']
TARGET_ASSET_ID = market_details['Up']
TARGET_ASSET_ID_DOWN = market_details['Down']
MIN_TICK_SIZE = market_details['MinTickSize']
NEG_RISK = market_details['isNegRisk']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
async for websocket in websockets.connect(WSS_URL):
print(f"Connected to {WSS_URL}")
subscribe_msg = {
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
"type": "market",
"custom_feature_enabled": False
}
await websocket.send(json.dumps(subscribe_msg))
print(f"Subscribed to Assets: Up {TARGET_ASSET_ID}; Down: {TARGET_ASSET_ID_DOWN}")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
sec_remaining = SLUG_END_TIME - round(datetime.now().timestamp())
if sec_remaining <= 0:
ref_data = json.loads(VAL_KEY.get('poly_rtds_cl_btcusd'))
TARGET_PX = float(ref_data['value'])
HIST_TRADES = np.empty((0, 2))
print('*** Attempting to unsub from past 5min')
update_unsub_msg = {
"operation": 'unsubscribe',
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
"custom_feature_enabled": False
}
await websocket.send(json.dumps(update_unsub_msg))
print('*** Attempting to SUB to new 5min')
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
CONDITION_ID = market_details['ConditionId']
TARGET_ASSET_ID = market_details['Up']
TARGET_ASSET_ID_DOWN = market_details['Down']
MIN_TICK_SIZE = market_details['MinTickSize']
NEG_RISK = market_details['isNegRisk']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
update_sub_msg = {
"operation": 'subscribe',
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
"custom_feature_enabled": False
}
await websocket.send(json.dumps(update_sub_msg))
if isinstance(message, str):
data = json.loads(message)
if isinstance(data, list):
print('initial book:')
print(data)
continue
event_type = data.get("event_type", None)
if event_type == "price_change":
# print("📈 Price Change")
# print(pd.DataFrame(data['price_changes']))
continue
elif event_type == "best_bid_ask":
# print(pd.DataFrame([data]))
continue
elif event_type == "last_trade_price":
token_id = data['asset_id']
ts_msg = int(data['timestamp'])
ts_value = int(ts_msg)
px = float(data['price'])
qty = float(data['size'])
side_taker = data['side']
if token_id == TARGET_ASSET_ID:
up_or_down = 'UP'
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
# print(f"✨ Last Px: {px:.2f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
# print(f'Up: {TARGET_ASSET_ID}')
# print(f'Down: {TARGET_ASSET_ID_DOWN}')
# SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
# print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
if USE_VK:
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'price': px,
'qty': qty,
'side_taker': side_taker,
'sec_remaining': sec_remaining,
'target_price': TARGET_PX,
'condition_id': CONDITION_ID,
'token_id_up': TARGET_ASSET_ID,
'token_id_down': TARGET_ASSET_ID_DOWN,
'tick_size': MIN_TICK_SIZE,
'neg_risk': NEG_RISK,
})
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
elif token_id == TARGET_ASSET_ID_DOWN:
up_or_down = 'DOWN'
HIST_TRADES_DOWN = np.append(HIST_TRADES_DOWN, np.array([[px, qty]]), axis=0)
if USE_VK:
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'price': px,
'qty': qty,
'side_taker': side_taker,
'sec_remaining': sec_remaining,
'target_price': TARGET_PX,
'condition_id': CONDITION_ID,
'token_id_up': TARGET_ASSET_ID,
'token_id_down': TARGET_ASSET_ID_DOWN,
'tick_size': MIN_TICK_SIZE,
'neg_risk': NEG_RISK,
})
VAL_KEY.set(VK_CHANNEL_DOWN, VAL_KEY_OBJ)
else:
logging.warning('Token Id from Market Does Not Match Pricing Data Id')
if USE_DB:
await insert_poly_btcusd_trades_table(
CON=CON,
timestamp_arrival=ts_arrival,
timestamp_msg=ts_msg,
timestamp_value=ts_value,
price=px,
qty=qty,
side_taker=side_taker,
up_or_down=up_or_down
)
elif event_type == "book":
continue
elif event_type == "new_market":
print('Received new_market')
continue
elif event_type == "market_resolved":
print(f"Received: {event_type}")
print(data)
continue
elif event_type == "tick_size_change": # may want for CLOB order routing
print(f"Received: {event_type}")
print(data)
continue
else:
print(f"*********** REC UNMAPPED EVENT: {event_type}")
print(data)
continue
elif isinstance(data, dict):
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
print(f"Connection closed by server. Exception: {e}")
continue
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_poly_btcusd_trades_table(CON=CON)
await polymarket_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await polymarket_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt as e:
print(f"Stream stopped: {e}")

19
ws_clob/Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_clob.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

227
ws_coinbase.py Normal file
View File

@@ -0,0 +1,227 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import os
# import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_coinbase_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_coinbase_Trades.log'
### Globals ###
WSS_URL = "wss://ws-feed.exchange.coinbase.com"
# HIST_TRADES = np.empty((0, 2))
### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: coinbase_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS coinbase_btcusd_trades (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
qty DOUBLE,
side VARCHAR(8)
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_arrival: int,
timestamp_msg: int,
timestamp_value: int,
value: float,
qty: float,
side: str,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_arrival': timestamp_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
'qty': qty,
'side': side,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO coinbase_btcusd_trades
(
timestamp_arrival,
timestamp_msg,
timestamp_value,
value,
qty,
side
)
VALUES
(
:timestamp_arrival,
:timestamp_msg,
:timestamp_value,
:value,
:qty,
:side
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def coinbase_trades_stream():
global HIST_TRADES
async with websockets.connect(WSS_URL) as websocket:
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
"type": "subscribe",
"product_ids": ["BTC-USD"],
"channels": [
{
"name": "ticker",
"product_ids": ["BTC-USD"]
}
]
}
await websocket.send(json.dumps(subscribe_msg))
try:
async for message in websocket:
if isinstance(message, str) or isinstance(message, bytes):
try:
data = json.loads(message)
if data.get('price', None) is not None:
ts_arrival = round(datetime.now().timestamp()*1000)
ts_msg = round(datetime.strptime(data['time'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()*1000)
ts_value = ts_msg
last_px = float(data['price'])
qty = float(data['last_size'])
side = data['side']
print(f'🤑 BTC Coinbase Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};')
if USE_VK:
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'value': last_px,
'qty': qty,
'side': side,
})
VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
if USE_DB:
await insert_rtds_btcusd_table(
CON=CON,
timestamp_arrival=ts_arrival,
timestamp_msg=ts_msg,
timestamp_value=ts_value,
value=last_px,
qty=qty,
side=side,
)
# elif data.get('op'):
# if data['op'] == 'PING':
# pong = {"op": "PONG", "timestamp": ts_arrival}
# await websocket.send(json.dumps(pong))
# logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError) as e:
logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}')
continue
else:
raise ValueError(f'Type: {type(message)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_rtds_btcusd_table(CON=CON)
await coinbase_trades_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await coinbase_trades_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

19
ws_coinbase/Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_coinbase.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

222
ws_pionex.py Normal file
View File

@@ -0,0 +1,222 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_pionex_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_Pionex_Trades.log'
### Globals ###
WSS_URL = "wss://ws.pionex.com/wsPub"
# HIST_TRADES = np.empty((0, 2))
### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: pionex_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS pionex_btcusd_trades (
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
qty DOUBLE,
side VARCHAR(8)
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_msg: int,
timestamp_value: int,
value: float,
qty: float,
side: str,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
'qty': qty,
'side': side,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO pionex_btcusd_trades
(
timestamp_msg,
timestamp_value,
value,
qty,
side
)
VALUES
(
:timestamp_msg,
:timestamp_value,
:value,
:qty,
:side
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def pionex_trades_stream():
global HIST_TRADES
async with websockets.connect(WSS_URL) as websocket:
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
"op": "SUBSCRIBE",
"topic": "TRADE",
"symbol": "BTC_USDT"
}
await websocket.send(json.dumps(subscribe_msg))
try:
async for message in websocket:
if isinstance(message, str) or isinstance(message, bytes):
try:
data = json.loads(message)
if data.get('data', None) is not None:
ts_msg = data['timestamp']
data = data['data']
ts_value = data[0]['timestamp']
last_px = float(data[0]['price'])
qty = float(data[0]['size'])
side = data[0]['side']
print(f'🤑 BTC Pionex Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};')
print(ts_value)
if USE_VK:
VAL_KEY.publish(VK_CHANNEL, json.dumps({
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'value': last_px,
'qty': qty,
'side': side,
}))
VAL_KEY.set(VK_CHANNEL, json.dumps({
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'value': last_px,
'qty': qty,
'side': side,
}))
if USE_DB:
await insert_rtds_btcusd_table(
CON=CON,
timestamp_msg=ts_msg,
timestamp_value=ts_value,
value=last_px,
qty=qty,
side=side,
)
elif data.get('op'):
if data['op'] == 'PING':
pong = {"op": "PONG", "timestamp": round(datetime.now().timestamp()*1000)}
await websocket.send(json.dumps(pong))
logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError) as e:
logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}')
continue
else:
raise ValueError(f'Type: {type(message)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_rtds_btcusd_table(CON=CON)
await pionex_trades_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await pionex_trades_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

View File

@@ -1,24 +1,111 @@
import asyncio
import json
import math
import pandas as pd
import os
from datetime import datetime, timezone
import websockets
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import talib
import requests
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_rtds_cl_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_RTDS.log'
### Globals ###
WSS_URL = "wss://ws-live-data.polymarket.com"
# HIST_TRADES = np.empty((0, 2))
HIST_TRADES = np.empty((0, 2))
### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: poly_rtds_cl_btcusd')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS poly_rtds_cl_btcusd (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_arrival: int,
timestamp_msg: int,
timestamp_value: int,
value: int,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_arrival': timestamp_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO poly_rtds_cl_btcusd
(
timestamp_arrival,
timestamp_msg,
timestamp_value,
value
)
VALUES
(
:timestamp_arrival,
:timestamp_msg,
:timestamp_value,
:value
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def rtds_stream():
global HIST_TRADES
async with websockets.connect(WSS_URL) as websocket:
print(f"Connected to {WSS_URL}")
async for websocket in websockets.connect(WSS_URL):
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
"action": "subscribe",
@@ -39,23 +126,79 @@ async def rtds_stream():
try:
data = json.loads(message)
if data['payload'].get('value', None) is not None:
print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['timestamp'], unit='ms')}')
ts_arrival = round(datetime.now().timestamp()*1000)
# print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['payload']['timestamp'], unit='ms')}')
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['timestamp'],
'timestamp_value': data['payload']['timestamp'],
'value': data['payload']['value'],
})
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
await insert_rtds_btcusd_table(
CON=CON,
timestamp_arrival=ts_arrival,
timestamp_msg=data['timestamp'],
timestamp_value=data['payload']['timestamp'],
value=data['payload']['value'],
)
else:
print(f'Initial or unexpected data struct, skipping: {data}')
# logging.info(f'Initial or unexpected data struct, skipping: {data}')
logging.info('Initial or unexpected data struct, skipping')
continue
except (json.JSONDecodeError, ValueError):
print(f'Message not in JSON format, skipping: {message}')
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except websockets.ConnectionClosed:
print("Connection closed by server.")
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_rtds_btcusd_table(CON=CON)
await rtds_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await rtds_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(rtds_stream())
asyncio.run(main())
except KeyboardInterrupt:
print("Stream stopped.")
logging.info("Stream stopped")

19
ws_rtds/Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_rtds.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

431
ws_user.py Normal file
View File

@@ -0,0 +1,431 @@
import asyncio
import json
import logging
import os
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import valkey
import websockets
from dotenv import load_dotenv
from py_clob_client.client import ClobClient
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
### Database ###
USE_DB: bool = True
USE_VK: bool = True
LOCAL_LIVE_ORDERS = []
LOCAL_RECENT_TRADES = []
LOCAL_RECENT_TRADES_LOOKBACK_SEC = 10
VK_LIVE_ORDERS = 'poly_user_orders'
VK_RECENT_TRADES = 'poly_user_trades'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_User.log'
# https://docs.polymarket.com/market-data/websocket/user-channel
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/user"
API_CREDS = {}
HIST_TRADES = np.empty((0, 2))
TARGET_PX = 0
### Database Funcs ###
async def create_user_trades_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: user_stream_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS user_stream_trades (
-- event_type VARCHAR(8),
timestamp_arrival BIGINT,
type VARCHAR(20),
id VARCHAR(100),
taker_order_id VARCHAR(100),
market VARCHAR(100),
asset_id VARCHAR(100),
side VARCHAR(8),
size DOUBLE,
price DOUBLE,
fee_rate_bps DOUBLE,
status VARCHAR(20),
matchtime BIGINT,
last_update BIGINT,
outcome VARCHAR(20),
owner VARCHAR(100),
trade_owner VARCHAR(100),
maker_address VARCHAR(100),
transaction_hash VARCHAR(100),
bucket_index INT,
maker_orders JSON NULL,
trader_side VARCHAR(8),
timestamp BIGINT
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_user_trades_table(
params: dict,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO user_stream_trades
(
timestamp_arrival,
type,
id,
taker_order_id,
market,
asset_id,
side,
size,
price,
fee_rate_bps,
status,
matchtime,
last_update,
outcome,
owner,
trade_owner,
maker_address,
transaction_hash,
bucket_index,
maker_orders,
trader_side,
timestamp
)
VALUES
(
:timestamp_arrival,
:type,
:id,
:taker_order_id,
:market,
:asset_id,
:side,
:size,
:price,
:fee_rate_bps,
:status,
:matchtime,
:last_update,
:outcome,
:owner,
:trade_owner,
:maker_address,
:transaction_hash,
:bucket_index,
:maker_orders,
:trader_side,
:timestamp
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def create_user_orders_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: user_stream_orders')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS user_stream_orders (
-- event_type VARCHAR(8),
timestamp_arrival BIGINT,
id VARCHAR(100),
owner VARCHAR(100),
market VARCHAR(100),
asset_id VARCHAR(100),
side VARCHAR(8),
order_owner VARCHAR(100),
original_size DOUBLE,
size_matched DOUBLE,
price DOUBLE,
associate_trades JSON NULL,
outcome VARCHAR(20),
type VARCHAR(20),
created_at BIGINT,
expiration VARCHAR(20),
order_type VARCHAR(8),
status VARCHAR(20),
maker_address VARCHAR(100),
timestamp BIGINT
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_user_orders_table(
params: dict,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO user_stream_orders
(
timestamp_arrival,
id,
owner,
market,
asset_id,
side,
order_owner,
original_size,
size_matched,
price,
associate_trades,
outcome,
type,
created_at,
expiration,
order_type,
status,
maker_address,
timestamp
)
VALUES
(
:timestamp_arrival,
:id,
:owner,
:market,
:asset_id,
:side,
:order_owner,
:original_size,
:size_matched,
:price,
:associate_trades,
:outcome,
:type,
:created_at,
:expiration,
:order_type,
:status,
:maker_address,
:timestamp
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Helpers ###
def live_orders_only(orders: list[dict]) -> list[dict]:
return [d for d in orders if d.get('status')=='LIVE']
def upsert_list_of_dicts_by_id(list_of_dicts, new_dict):
for index, item in enumerate(list_of_dicts):
if item.get('id') == new_dict.get('id'):
list_of_dicts[index] = new_dict
return list_of_dicts
list_of_dicts.append(new_dict)
return list_of_dicts
async def polymarket_stream():
global TARGET_PX
global HIST_TRADES
global LOCAL_LIVE_ORDERS
global LOCAL_RECENT_TRADES
POLY_API_KEY = API_CREDS.api_key
POLY_API_SECRET = API_CREDS.api_secret
POLY_API_PASS = API_CREDS.api_passphrase
async for websocket in websockets.connect(WSS_URL):
print(f"Connected to {WSS_URL}")
subscribe_msg = {
"auth": {
"apiKey": POLY_API_KEY,
"secret": POLY_API_SECRET,
"passphrase": POLY_API_PASS,
},
"type": "user",
"markets": []
}
await websocket.send(json.dumps(subscribe_msg))
print("Subscribed to User Data")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
data = json.loads(message)
if data == {}: # Handle empty server ping - return pong
await websocket.send(json.dumps({}))
print('SENT HEARTBEAT PING')
continue
data['timestamp_arrival'] = ts_arrival
event_type = data.get('event_type', None)
match event_type:
case 'trade':
logging.info(f'TRADE: {data}')
# trade_status = data.get('status')
# match trade_status: # Raise TELEGRAM ALERT ???
# case 'MATCHED':
# pass
# case 'MINED':
# pass
# case 'CONFIRMED':
# pass
# case 'RETRYING':
# pass
# case 'FAILED':
# pass
### Convert Datatypes ###
data['size'] = float(data['size'])
data['price'] = float(data['price'])
data['fee_rate_bps'] = float(data['fee_rate_bps'])
data['matchtime'] = int(data['match_time'])
data['last_update'] = int(data['last_update'])
data['timestamp'] = int(data['timestamp'])
data['maker_orders'] = json.dumps(data['maker_orders']) if data['maker_orders'] else None
LOCAL_RECENT_TRADES = upsert_list_of_dicts_by_id(LOCAL_RECENT_TRADES, data)
LOOKBACK_MIN_TS_MS = ts_arrival-LOCAL_RECENT_TRADES_LOOKBACK_SEC*1000
LOCAL_RECENT_TRADES = [t for t in LOCAL_RECENT_TRADES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
print("---------------------")
print(LOCAL_RECENT_TRADES)
print("---------------------")
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_TRADES)
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_RECENT_TRADES, VAL_KEY_OBJ)
logging.info(f'User Trade Update: {data}')
### Insert into DB ###
await insert_user_trades_table(
params=data,
CON=CON
)
case 'order':
logging.info(f'ORDER: {data}')
### Convert Datatypes ###
data['original_size'] = float(data['original_size'])
data['size_matched'] = float(data['size_matched'])
data['price'] = float(data['price'])
data['associate_trades'] = json.dumps(data['associate_trades']) if data['associate_trades'] else None
data['created_at'] = int(data['created_at'])
data['timestamp'] = int(data['timestamp'])
### Match on Status - Pass Live orders to Valkey for Algo Engine ###
order_status = data.get('status')
match order_status:
case 'live':
LOCAL_LIVE_ORDERS = upsert_list_of_dicts_by_id(LOCAL_LIVE_ORDERS, data)
LOCAL_LIVE_ORDERS = live_orders_only(LOCAL_LIVE_ORDERS)
VAL_KEY_OBJ = json.dumps(LOCAL_LIVE_ORDERS)
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_LIVE_ORDERS, VAL_KEY_OBJ)
logging.info(f'Order(s) RESTING: {data}')
case 'matched':
logging.info(f'Order(s) MATCHED: {data}')
case 'delayed':
raise ValueError(f'Order Status of "delayed" which is not expected for non-sports orders: {data}')
case 'unmatched':
raise ValueError(f'Order Status of "unmatched" which is not expected for non-sports orders: {data}')
### Insert into DB ###
await insert_user_orders_table(
params=data,
CON=CON,
)
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
print(f"Connection closed by server. Exception: {e}")
async def main():
global VAL_KEY
global CON
global API_CREDS
private_key = os.getenv("PRIVATE_KEY")
host = "https://clob.polymarket.com"
chain_id = 137 # Polygon mainnet
temp_client = ClobClient(host, key=private_key, chain_id=chain_id)
API_CREDS = temp_client.create_or_derive_api_creds()
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_user_trades_table(CON=CON)
await create_user_orders_table(CON=CON)
await polymarket_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await polymarket_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt as e:
print(f"Stream stopped: {e}")

19
ws_user/Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_user.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]