Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 259ea93479 | |||
| 8d7d99d749 | |||
| c051130867 |
BIN
Screenshot 2026-03-29 at 12.35.51 AM.png
Normal file
BIN
Screenshot 2026-03-29 at 12.35.51 AM.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 337 KiB |
317
database.ipynb
Normal file
317
database.ipynb
Normal file
@@ -0,0 +1,317 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"id": "4cae6bf1",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from sqlalchemy import create_engine, text\n",
|
||||
"import pandas as pd\n",
|
||||
"from datetime import datetime"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 11,
|
||||
"id": "f5040527",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Connection successful\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"### MYSQL ###\n",
|
||||
"engine = create_engine('mysql+pymysql://root:pwd@localhost/polymarket')\n",
|
||||
"try:\n",
|
||||
" with engine.connect() as conn:\n",
|
||||
" print(\"Connection successful\")\n",
|
||||
"except Exception as e:\n",
|
||||
" print(f\"Connection failed: {e}\") "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 10,
|
||||
"id": "72059b3f",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Connection successful\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"### MYSQL ###\n",
|
||||
"engine_inter_storage = create_engine('mysql+pymysql://root:pwd@100.84.226.40/polymarket')\n",
|
||||
"try:\n",
|
||||
" with engine.connect() as conn:\n",
|
||||
" print(\"Connection successful\")\n",
|
||||
"except Exception as e:\n",
|
||||
" print(f\"Connection failed: {e}\") "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 48,
|
||||
"id": "b723a51f",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# with engine.connect() as conn:\n",
|
||||
"# print(\"Connection successful\")\n",
|
||||
"# sql = text(\"TRUNCATE TABLE coinbase_btcusd_trades;\")\n",
|
||||
"# conn.execute(sql)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "5c23110d",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"q_binance = '''\n",
|
||||
"SELECT * FROM binance_btcusd_trades;\n",
|
||||
"'''\n",
|
||||
"q_coinbase = '''\n",
|
||||
"SELECT * FROM coinbase_btcusd_trades;\n",
|
||||
"'''\n",
|
||||
"q_rtds = '''\n",
|
||||
"SELECT * FROM poly_rtds_cl_btcusd;\n",
|
||||
"'''\n",
|
||||
"q_clob = '''\n",
|
||||
"SELECT * FROM poly_btcusd_trades;\n",
|
||||
"'''"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 24,
|
||||
"id": "a866e9ca",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# df_binance = pd.read_sql(q_binance, con=engine)\n",
|
||||
"df_coinbase = pd.read_sql(q_coinbase, con=engine)\n",
|
||||
"df_rtds = pd.read_sql(q_rtds, con=engine)\n",
|
||||
"df_clob = pd.read_sql(q_clob, con=engine)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "954a3c3c",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# df_binance['timestamp_arrival'] = pd.to_datetime(df_binance['timestamp_arrival'], unit='ms')\n",
|
||||
"df_coinbase['timestamp_arrival'] = pd.to_datetime(df_coinbase['timestamp_arrival'], unit='ms')\n",
|
||||
"df_rtds['timestamp_arrival'] = pd.to_datetime(df_rtds['timestamp_arrival'], unit='ms')\n",
|
||||
"df_clob['timestamp_arrival'] = pd.to_datetime(df_clob['timestamp_arrival'], unit='ms')"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 57,
|
||||
"id": "50c6339f",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def copy_table_data_btw_servers(df, table_name, engine_destination) -> None:\n",
|
||||
" rows_imported = df.to_sql(name=table_name, con=engine_destination, if_exists='append')\n",
|
||||
" if rows_imported == len(df):\n",
|
||||
" print(f'SUCCESS: COPIED {rows_imported} to table \"{table_name}\" on INTERSERVER_STORAGE')\n",
|
||||
" else:\n",
|
||||
" raise ValueError(f'FAILED: COPIED {rows_imported} rows to table {table_name} on INTERSERVER_STORAGE; EXPECTED {len(df)}')\n",
|
||||
" \n",
|
||||
"def truncate_table(engine, table):\n",
|
||||
" with engine.connect() as conn:\n",
|
||||
" sql = text(f\"TRUNCATE TABLE {table};\")\n",
|
||||
" conn.execute(sql)\n",
|
||||
" conn.commit()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 61,
|
||||
"id": "d0399a96",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def backup_all_tables(engine_origin, engine_destination, tables_to_copy):\n",
|
||||
" for t in tables_to_copy:\n",
|
||||
" q = f'''\n",
|
||||
" SELECT * FROM {t};\n",
|
||||
" '''\n",
|
||||
" df = pd.read_sql(q, con=engine_origin)\n",
|
||||
" print('-------------------------------------------------------------------------')\n",
|
||||
" print(f'Loaded Data for Table: {t}...Attempting to Transfer to Destination Server')\n",
|
||||
" copy_table_data_btw_servers(\n",
|
||||
" df=df,\n",
|
||||
" table_name=t,\n",
|
||||
" engine_destination=engine_destination,\n",
|
||||
" )\n",
|
||||
" print(f'Attempting to Truncate Table: {t}...')\n",
|
||||
" \n",
|
||||
" ### FOR REALTIME - instead of truncate, need to delete rows using a conditon (e.g. delete all rows <= max timestamp arrival in the DF)\n",
|
||||
" \n",
|
||||
" truncate_table(\n",
|
||||
" engine=engine_origin,\n",
|
||||
" table=t,\n",
|
||||
" )\n",
|
||||
" print(f'...Successfully Truncated Table: {t}')\n",
|
||||
" print(f'Done Transferring Data for Table: {t}')\n",
|
||||
" \n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 59,
|
||||
"id": "0de1629a",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"tables_to_copy = [\n",
|
||||
" # 'binance_btcusd_trades',\n",
|
||||
" # 'coinbase_btcusd_trades',\n",
|
||||
" 'poly_btcusd_trades',\n",
|
||||
" 'poly_rtds_cl_btcusd',\n",
|
||||
" # 'user_stream_orders',\n",
|
||||
" # 'user_stream_trades',\n",
|
||||
"]"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 60,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"-------------------------------------------------------------------------\n",
|
||||
"Loaded Data for Table: poly_btcusd_trades...Attempting to Transfer to Destination Server\n",
|
||||
"SUCCESS: COPIED 720568 to table \"poly_btcusd_trades\" on INTERSERVER_STORAGE\n",
|
||||
"Attempting to Truncate Table: poly_btcusd_trades...\n",
|
||||
"...Successfully Truncated Table: poly_btcusd_trades\n",
|
||||
"Done Transferring Data for Table: poly_btcusd_trades\n",
|
||||
"-------------------------------------------------------------------------\n",
|
||||
"Loaded Data for Table: poly_rtds_cl_btcusd...Attempting to Transfer to Destination Server\n",
|
||||
"SUCCESS: COPIED 73771 to table \"poly_rtds_cl_btcusd\" on INTERSERVER_STORAGE\n",
|
||||
"Attempting to Truncate Table: poly_rtds_cl_btcusd...\n",
|
||||
"...Successfully Truncated Table: poly_rtds_cl_btcusd\n",
|
||||
"Done Transferring Data for Table: poly_rtds_cl_btcusd\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"backup_all_tables(\n",
|
||||
" engine_origin=engine,\n",
|
||||
" engine_destination=engine_inter_storage,\n",
|
||||
" tables_to_copy=tables_to_copy\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "cd0b40d2",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"SUCCESS COPIED 326007 to binance_btcusd_trades to INTERSERVER_STORAGE\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "48b47799",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "ad030f88",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "cafc5060",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "5ba7be5f",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "py_313",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.13.12"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
||||
51
docker-compose.yml
Normal file
51
docker-compose.yml
Normal file
@@ -0,0 +1,51 @@
|
||||
services:
|
||||
ws_binance:
|
||||
container_name: ws_binance
|
||||
restart: "unless-stopped"
|
||||
build:
|
||||
context: ./
|
||||
dockerfile: ./ws_binance/Dockerfile
|
||||
volumes:
|
||||
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
|
||||
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
|
||||
network_mode: "host"
|
||||
ws_clob:
|
||||
container_name: ws_clob
|
||||
restart: "unless-stopped"
|
||||
build:
|
||||
context: ./
|
||||
dockerfile: ./ws_clob/Dockerfile
|
||||
volumes:
|
||||
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
|
||||
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
|
||||
network_mode: "host"
|
||||
ws_rtds:
|
||||
container_name: ws_rtds
|
||||
restart: "unless-stopped"
|
||||
build:
|
||||
context: ./
|
||||
dockerfile: ./ws_rtds/Dockerfile
|
||||
volumes:
|
||||
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
|
||||
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
|
||||
network_mode: "host"
|
||||
ws_user:
|
||||
container_name: ws_user
|
||||
restart: "unless-stopped"
|
||||
build:
|
||||
context: ./
|
||||
dockerfile: ./ws_user/Dockerfile
|
||||
volumes:
|
||||
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
|
||||
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
|
||||
network_mode: "host"
|
||||
ng:
|
||||
container_name: ng
|
||||
restart: "unless-stopped"
|
||||
build:
|
||||
context: ./
|
||||
dockerfile: ./ng/Dockerfile
|
||||
volumes:
|
||||
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
|
||||
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
|
||||
network_mode: "host"
|
||||
808
main.py
Normal file
808
main.py
Normal file
@@ -0,0 +1,808 @@
|
||||
import asyncio
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from typing import AsyncContextManager
|
||||
import traceback
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import requests
|
||||
import talib
|
||||
import valkey
|
||||
from dotenv import load_dotenv
|
||||
from py_clob_client.clob_types import (
|
||||
OrderArgs,
|
||||
OrderType,
|
||||
PartialCreateOrderOptions,
|
||||
PostOrdersArgs,
|
||||
BalanceAllowanceParams
|
||||
)
|
||||
from py_clob_client.order_builder.constants import BUY, SELL
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
from functools import wraps
|
||||
import modules.api as api
|
||||
|
||||
### Custom Order Args ###
|
||||
@dataclass
|
||||
class Custom_OrderArgs(OrderArgs):
|
||||
max_price: float = 0.00
|
||||
|
||||
|
||||
### Database ###
|
||||
CLIENT = None
|
||||
CON: AsyncContextManager | None = None
|
||||
VAL_KEY = None
|
||||
|
||||
### Logging ###
|
||||
load_dotenv()
|
||||
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_5min_Algo.log'
|
||||
|
||||
### ALGO CONFIG / CONSTANTS ###
|
||||
SLOPE_YES_THRESH = 0.01 # In Percent % Chg (e.g. 0.02 == 0.02%)
|
||||
ENDTIME_BUFFER_SEC = 30 # Stop trading, cancel all open orders and exit positions this many seconds before mkt settles.
|
||||
TGT_PX_INDEX_DIFF_THRESH = 0.1 # In Percent % Chg (e.g. 0.02 == 0.02%)
|
||||
DEFAULT_ORDER_SIZE = 10 # In USDe
|
||||
MIN_ORDER_SIZE = 5 # In USDe
|
||||
TGT_PROFIT_CENTS = 0.02
|
||||
CHASE_TO_BUY_CENTS = 0.05
|
||||
MAX_ALLOWED_POLY_PX = 0.90
|
||||
|
||||
### GLOBALS ###
|
||||
ORDER_LOCK = 0
|
||||
|
||||
SLUG_END_TIME = 0
|
||||
|
||||
FREE_CASH: float = 0
|
||||
|
||||
POLY_BINANCE = {}
|
||||
POLY_REF = {}
|
||||
POLY_CLOB = {}
|
||||
POLY_CLOB_DOWN = {}
|
||||
USER_TRADES = {}
|
||||
USER_ORDERS = {}
|
||||
SLOPE_HIST = []
|
||||
|
||||
LOCAL_ACTIVE_ORDERS = []
|
||||
LOCAL_TOKEN_BALANCES = {}
|
||||
# LOCAL_ACTIVE_POSITIONS = []
|
||||
|
||||
ACTIVE_BALANCES_EXIST = False ### REMOVE
|
||||
|
||||
|
||||
|
||||
### Decorators ###
|
||||
def async_timeit(func):
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
return await func(*args, **kwargs)
|
||||
finally:
|
||||
end_time = time.perf_counter()
|
||||
total_time = (end_time - start_time)*1000
|
||||
print(f"Function '{func.__name__}' executed in {total_time:.4f} ms")
|
||||
|
||||
return wrapper
|
||||
|
||||
### Database Funcs ###
|
||||
# @async_timeit
|
||||
async def create_executions_orders_table(
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
logging.info('Creating Table if Does Not Exist: executions_orders')
|
||||
await CON.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS executions_orders (
|
||||
timestamp_sent BIGINT,
|
||||
token_id VARCHAR(100),
|
||||
limit_price DOUBLE,
|
||||
size DOUBLE,
|
||||
side VARCHAR(8),
|
||||
order_type VARCHAR(8),
|
||||
post_only BOOL,
|
||||
resp_errorMsg VARCHAR(100),
|
||||
resp_orderID VARCHAR(100),
|
||||
resp_takingAmount DOUBLE,
|
||||
resp_makingAmount DOUBLE,
|
||||
resp_status VARCHAR(20),
|
||||
resp_success BOOL
|
||||
);
|
||||
"""))
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
# @async_timeit
|
||||
async def insert_executions_orders_table(
|
||||
timestamp_sent: int,
|
||||
token_id: str,
|
||||
limit_price: float,
|
||||
size: float,
|
||||
side: str,
|
||||
order_type: str,
|
||||
post_only: bool,
|
||||
resp_errorMsg: str,
|
||||
resp_orderID: str,
|
||||
resp_takingAmount: float,
|
||||
resp_makingAmount: float,
|
||||
resp_status: str,
|
||||
resp_success: bool,
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
params={
|
||||
'timestamp_sent': timestamp_sent,
|
||||
'token_id': token_id,
|
||||
'limit_price': limit_price,
|
||||
'size': size,
|
||||
'side': side,
|
||||
'order_type': order_type,
|
||||
'post_only': post_only,
|
||||
'resp_errorMsg': resp_errorMsg,
|
||||
'resp_orderID': resp_orderID,
|
||||
'resp_takingAmount': resp_takingAmount,
|
||||
'resp_makingAmount': resp_makingAmount,
|
||||
'resp_status': resp_status,
|
||||
'resp_success': resp_success,
|
||||
}
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
await CON.execute(text("""
|
||||
INSERT INTO executions_orders
|
||||
(
|
||||
timestamp_sent,
|
||||
token_id,
|
||||
limit_price,
|
||||
size,
|
||||
side,
|
||||
order_type,
|
||||
post_only,
|
||||
resp_errorMsg,
|
||||
resp_orderID,
|
||||
resp_takingAmount,
|
||||
resp_makingAmount,
|
||||
resp_status,
|
||||
resp_success
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
:timestamp_sent,
|
||||
:token_id,
|
||||
:limit_price,
|
||||
:size,
|
||||
:side,
|
||||
:order_type,
|
||||
:post_only,
|
||||
:resp_errorMsg,
|
||||
:resp_orderID,
|
||||
:resp_takingAmount,
|
||||
:resp_makingAmount,
|
||||
:resp_status,
|
||||
:resp_success
|
||||
)
|
||||
"""),
|
||||
parameters=params
|
||||
)
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
### Functions ###
|
||||
# @async_timeit
|
||||
async def slope_decision() -> list[bool, str]:
|
||||
hist_trades = np.array(POLY_BINANCE.get('hist_trades', []))
|
||||
|
||||
if ( np.max(hist_trades[:, 0] )*1000 ) - ( np.min(hist_trades[:, 0])*1000 ) < 5:
|
||||
logging.info('Max - Min Trade In History is < 5 Seconds Apart')
|
||||
return False, ''
|
||||
|
||||
last_px = POLY_BINANCE['value']
|
||||
last_px_ts = POLY_BINANCE['timestamp_value']
|
||||
|
||||
ts_min_1_sec = last_px_ts - 1000
|
||||
price_min_1_sec_index = (np.abs(hist_trades[:, 0] - ts_min_1_sec)).argmin()
|
||||
price_min_1_sec = hist_trades[:, 1][price_min_1_sec_index]
|
||||
|
||||
ts_min_5_sec = last_px_ts - 5000
|
||||
price_min_5_sec_index = (np.abs(hist_trades[:, 0] - ts_min_5_sec)).argmin()
|
||||
price_min_5_sec = hist_trades[:, 1][price_min_5_sec_index]
|
||||
|
||||
slope = (last_px - price_min_1_sec) / price_min_1_sec
|
||||
slope_5 = (last_px - price_min_5_sec) / price_min_5_sec
|
||||
SLOPE_HIST.append(slope)
|
||||
|
||||
# print(f'Avg Binance: {np.mean(hist_trades[:, 1])}')
|
||||
# print(f'Len Hist : {len(hist_trades[:, 1])}')
|
||||
# print(f'First Hist : {pd.to_datetime(np.min(hist_trades[:, 0]), unit='ms')}')
|
||||
# print(f'Latest Hist: {pd.to_datetime(np.max(hist_trades[:, 0]), unit='ms')}')
|
||||
print(f'Slope Hist Avg: {np.mean(SLOPE_HIST):.4%}')
|
||||
print(f'Slope Hist Max: {np.max(SLOPE_HIST):.4%}')
|
||||
print(f'Slope Hist Std: {np.std(SLOPE_HIST):.4%}')
|
||||
slope_1_buy = abs(slope) >= ( SLOPE_YES_THRESH / 100)
|
||||
slope_5_buy = abs(slope_5) >= ( SLOPE_YES_THRESH / 100)
|
||||
|
||||
print(f'SLOPE_1: {slope:.4%} == {slope_1_buy}; SLOPE_5: {slope_5:.4%} == {slope_5_buy};')
|
||||
|
||||
### DECISION ###
|
||||
if slope_1_buy and slope_5_buy:
|
||||
print(f'🤑🤑🤑🤑🤑🤑🤑🤑🤑🤑 Slope: {slope_5:.4%};')
|
||||
side = 'UP' if slope > 0.00 else 'DOWN'
|
||||
return True, side
|
||||
else:
|
||||
return False, ''
|
||||
|
||||
# @async_timeit
|
||||
async def cancel_all_orders(CLIENT):
|
||||
logging.info('Attempting to Cancel All Orders')
|
||||
cxl_resp = CLIENT.cancel_all()
|
||||
if bool(cxl_resp.get('not_canceled', True)):
|
||||
logging.warning(f'*** Cancel Request FAILED, trying again and shutting down: {cxl_resp}')
|
||||
cxl_resp = CLIENT.cancel_all()
|
||||
raise Exception('*** Cancel Request FAILED')
|
||||
logging.info(f'Cancel Successful: {cxl_resp}')
|
||||
|
||||
# @async_timeit
|
||||
async def cancel_single_order_by_id(CLIENT, order_id):
|
||||
global LOCAL_ACTIVE_ORDERS
|
||||
|
||||
logging.info(f'Attempting to Cancel Single Order: {order_id}')
|
||||
cxl_resp = CLIENT.cancel(order_id=order_id)
|
||||
|
||||
for idx, o in enumerate(LOCAL_ACTIVE_ORDERS):
|
||||
if o.get('orderID') == order_id:
|
||||
if bool(cxl_resp.get('not_canceled', True)):
|
||||
if cxl_resp.get('not_canceled', {}).get(order_id, None) == "matched orders can't be canceled":
|
||||
logging.info(f'Cancel request failed b/c already matched: {cxl_resp}')
|
||||
return False
|
||||
elif cxl_resp.get('not_canceled', {}).get(order_id, None) == "order can't be found - already canceled or matched":
|
||||
logging.info(f'Cancel request failed b/c already matched or cancelled: {cxl_resp}')
|
||||
LOCAL_ACTIVE_ORDERS.pop(idx)
|
||||
return False
|
||||
else:
|
||||
logging.warning(f'*** Cancel Request FAILED, shutting down: {cxl_resp}')
|
||||
raise Exception('*** Cancel Request FAILED - SHUTDONW')
|
||||
else:
|
||||
LOCAL_ACTIVE_ORDERS.pop(idx)
|
||||
logging.info(f'Cancel Successful: {cxl_resp}')
|
||||
return False
|
||||
|
||||
# @async_timeit
|
||||
async def flatten_open_positions(CLIENT, token_id_up, token_id_down):
|
||||
up = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_up)
|
||||
down = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_down)
|
||||
|
||||
### Submit orders to flatten outstanding balances ###
|
||||
if up > MIN_ORDER_SIZE:
|
||||
logging.info(f'Flattening Up Position: {up}')
|
||||
await post_order(
|
||||
CLIENT = CLIENT,
|
||||
tick_size = POLY_CLOB['tick_size'],
|
||||
neg_risk = POLY_CLOB['neg_risk'],
|
||||
OrderArgs_list = [Custom_OrderArgs(
|
||||
token_id=token_id_up,
|
||||
price=float(POLY_CLOB['price'])-0.01,
|
||||
size=up,
|
||||
side=SELL,
|
||||
)]
|
||||
)
|
||||
if down > MIN_ORDER_SIZE:
|
||||
logging.info(f'Flattening Down Position: {down}')
|
||||
await post_order(
|
||||
CLIENT = CLIENT,
|
||||
tick_size = POLY_CLOB['tick_size'],
|
||||
neg_risk = POLY_CLOB['neg_risk'],
|
||||
OrderArgs_list = [Custom_OrderArgs(
|
||||
token_id=token_id_down,
|
||||
price=float(POLY_CLOB_DOWN['price'])-0.01,
|
||||
size=down,
|
||||
side=SELL,
|
||||
|
||||
)]
|
||||
)
|
||||
|
||||
# @async_timeit
|
||||
async def get_balance_by_token_id(CLIENT, token_id):
|
||||
collateral = CLIENT.get_balance_allowance(
|
||||
BalanceAllowanceParams(
|
||||
asset_type='CONDITIONAL',
|
||||
token_id=token_id,
|
||||
)
|
||||
)
|
||||
return int(collateral['balance']) / 1_000_000
|
||||
|
||||
# @async_timeit
|
||||
async def get_usde_balance(CLIENT):
|
||||
collateral = CLIENT.get_balance_allowance(
|
||||
BalanceAllowanceParams(
|
||||
asset_type='COLLATERAL'
|
||||
)
|
||||
)
|
||||
return int(collateral['balance']) / 1_000_000
|
||||
|
||||
@async_timeit
|
||||
async def check_for_open_positions(CLIENT, token_id_up, token_id_down):
|
||||
global LOCAL_TOKEN_BALANCES
|
||||
|
||||
if token_id_up is None or token_id_down is None:
|
||||
logging.critical('Token Id is None, Exiting')
|
||||
raise ValueError('Token Id is None, Exiting')
|
||||
# return False
|
||||
up = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_up)
|
||||
down = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_down)
|
||||
|
||||
LOCAL_TOKEN_BALANCES = {
|
||||
token_id_up: up if up else 0,
|
||||
token_id_down: down if down else 0,
|
||||
}
|
||||
|
||||
logging.info(f'LOCAL_TOKEN_BALANCES: {LOCAL_TOKEN_BALANCES}')
|
||||
|
||||
if ( abs(up) > 0 ) or ( abs(down) > 0 ):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@async_timeit
|
||||
async def post_order(CLIENT, OrderArgs_list: list[Custom_OrderArgs], tick_size: float | str, neg_risk: bool):
|
||||
global LOCAL_ACTIVE_ORDERS
|
||||
global LOCAL_TOKEN_BALANCES
|
||||
|
||||
orders = []
|
||||
for oa in OrderArgs_list:
|
||||
orders.append(
|
||||
PostOrdersArgs(
|
||||
order=CLIENT.create_order(
|
||||
order_args=oa,
|
||||
options=PartialCreateOrderOptions(
|
||||
tick_size=str(tick_size),
|
||||
neg_risk=neg_risk
|
||||
),
|
||||
),
|
||||
orderType=OrderType.GTC,
|
||||
postOnly=False,
|
||||
),
|
||||
)
|
||||
|
||||
### POST
|
||||
response = CLIENT.post_orders(orders)
|
||||
for idx, d in enumerate(response):
|
||||
if d['errorMsg'] == '':
|
||||
d['token_id'] = OrderArgs_list[idx].token_id
|
||||
d['price'] = OrderArgs_list[idx].price
|
||||
d['size'] = OrderArgs_list[idx].size
|
||||
d['side'] = str(OrderArgs_list[idx].side).upper()
|
||||
|
||||
if d['status'].upper() =='MATCHED':
|
||||
### Order Immediately Matched, Can Put in Offsetting Order Depending on State ###
|
||||
print('******** ORDER APPEND TO LOCAL - MATCHED ********* ')
|
||||
LOCAL_ACTIVE_ORDERS.append(d)
|
||||
|
||||
if d['status'].upper() == 'CONFIRMED':
|
||||
current_balance = float(LOCAL_TOKEN_BALANCES.get(d['token_id'], 0.00))
|
||||
if d['side'] == 'BUY':
|
||||
size = float(d['size'])
|
||||
else:
|
||||
size = float(d['size']) * -1
|
||||
|
||||
LOCAL_TOKEN_BALANCES[d['token_id']] = current_balance + size
|
||||
print('******** TRADE FILLED, BAL UPDATED ********* ')
|
||||
else:
|
||||
print('******** ORDER APPEND TO LOCAL - LIVE ********* ')
|
||||
LOCAL_ACTIVE_ORDERS.append(d)
|
||||
else:
|
||||
raise ValueError(f'Order entry failed: {d}')
|
||||
|
||||
logging.info(f'Order Posted Resp: {response}')
|
||||
print(f'Order Posted Resp: {response}')
|
||||
|
||||
|
||||
### Routes ###
|
||||
async def no_orders_no_positions_route():
|
||||
global ORDER_LOCK
|
||||
|
||||
### Check for Price Bands ###
|
||||
up_px = float(POLY_CLOB.get('price', 0))
|
||||
down_px = float(POLY_CLOB_DOWN.get('price', 0))
|
||||
|
||||
if (up_px > MAX_ALLOWED_POLY_PX) or (down_px > MAX_ALLOWED_POLY_PX):
|
||||
logging.info(f'Outside max allowed px: {MAX_ALLOWED_POLY_PX}')
|
||||
return False
|
||||
|
||||
### Check for Index vs. Target Px ###
|
||||
tgt_px = float(POLY_CLOB.get('target_price', 0))
|
||||
ref_px = float(POLY_REF.get('value'))
|
||||
tgt_px_diff_to_index = ( abs( tgt_px - ref_px ) / tgt_px)
|
||||
if tgt_px_diff_to_index > (TGT_PX_INDEX_DIFF_THRESH / 100):
|
||||
logging.info(f'Tgt Diff to Index Outside Limit ({TGT_PX_INDEX_DIFF_THRESH}%); Diff {tgt_px_diff_to_index:.4%}; Index: {ref_px:.2f}; Tgt: {tgt_px:.2f}')
|
||||
return False
|
||||
|
||||
|
||||
### Check Slope ###
|
||||
slope_bool, slope_side = await slope_decision()
|
||||
if not slope_bool:
|
||||
logging.info('Failed Slope Check')
|
||||
return False
|
||||
token_id = POLY_CLOB.get('token_id_up', None) if slope_side=='UP' else POLY_CLOB.get('token_id_down', None)
|
||||
|
||||
|
||||
### Order Entry ###
|
||||
px = float(POLY_CLOB['price'])+0.01
|
||||
order = Custom_OrderArgs(
|
||||
token_id=token_id,
|
||||
price=px,
|
||||
size=DEFAULT_ORDER_SIZE,
|
||||
side=BUY,
|
||||
max_price = px + CHASE_TO_BUY_CENTS
|
||||
)
|
||||
|
||||
### ADD CHECK FOR MKT MOVED AWAY FROM OPPORTNITY ###
|
||||
|
||||
if ORDER_LOCK:
|
||||
logging.info(f'BUY ORDER BLOCKED BY LOCK: {order}')
|
||||
|
||||
else:
|
||||
logging.info(f'Attempting BUY Order {order}')
|
||||
await post_order(
|
||||
CLIENT = CLIENT,
|
||||
tick_size = POLY_CLOB['tick_size'],
|
||||
neg_risk = POLY_CLOB['neg_risk'],
|
||||
OrderArgs_list = [order]
|
||||
)
|
||||
# ORDER_LOCK = ORDER_LOCK + 1
|
||||
|
||||
async def active_orders_no_positions_route():
|
||||
if len(LOCAL_ACTIVE_ORDERS) > 2:
|
||||
logging.critical('More than two active orders, shutting down')
|
||||
await kill_algo()
|
||||
b_c = 0
|
||||
s_c = 0
|
||||
for o in LOCAL_ACTIVE_ORDERS:
|
||||
if o['side'] == 'BUY':
|
||||
b_c = b_c + 1
|
||||
elif o['side'] == 'SELL':
|
||||
s_c = s_c + 1
|
||||
if (b_c > 1) or (s_c > 1):
|
||||
logging.critical(f'More than one active buy or more than one active sell: b_c {b_c}; s_c{s_c}')
|
||||
await kill_algo()
|
||||
|
||||
for o in LOCAL_ACTIVE_ORDERS:
|
||||
logging.info(f'Working on order ({o['side']}): {o['orderID']}')
|
||||
|
||||
if o.get('status').upper() == 'MATCHED':
|
||||
logging.info('Order is matched, awaiting confirm or kickback')
|
||||
|
||||
elif o.get('status').upper() == 'FAILED':
|
||||
raise ValueError(f'Trade FAILED after matching: {o}')
|
||||
else:
|
||||
orig_px = float(o['price'])
|
||||
orig_size = float(o['size'])
|
||||
if o['side'] == 'BUY':
|
||||
if POLY_CLOB['token_id_up'] == o['token_id']:
|
||||
clob_px = float(POLY_CLOB['price'])
|
||||
else:
|
||||
clob_px = float(POLY_CLOB_DOWN['price'])
|
||||
|
||||
if clob_px >= orig_px:
|
||||
logging.info(f"Market px: ({clob_px} is above buy order px: {orig_px:.2f})")
|
||||
if o.get('max_price', 0) > clob_px:
|
||||
logging.info(f"Market px: ({clob_px} has moved too far away from original target, cancelling and resetting algo: {o.get('max_price', 0) :.2f})")
|
||||
|
||||
order_matched = await cancel_single_order_by_id(CLIENT=CLIENT, order_id=o['orderID'])
|
||||
|
||||
if order_matched:
|
||||
o['status'] = 'MATCHED'
|
||||
else:
|
||||
px = orig_px+0.01
|
||||
|
||||
await post_order(
|
||||
CLIENT = CLIENT,
|
||||
tick_size = POLY_CLOB['tick_size'],
|
||||
neg_risk = POLY_CLOB['neg_risk'],
|
||||
OrderArgs_list = [Custom_OrderArgs(
|
||||
token_id=o['token_id'],
|
||||
price=px,
|
||||
size=orig_size,
|
||||
side=BUY,
|
||||
max_price=o['max_price']
|
||||
|
||||
)]
|
||||
)
|
||||
else:
|
||||
await cancel_single_order_by_id(CLIENT=CLIENT, order_id=o['orderID'])
|
||||
elif o['side'] == 'SELL':
|
||||
if POLY_CLOB['token_id_up'] == o['token_id']:
|
||||
clob_px = float(POLY_CLOB['price'])
|
||||
else:
|
||||
clob_px = float(POLY_CLOB_DOWN['price'])
|
||||
|
||||
if clob_px <= orig_px:
|
||||
logging.info(f"Market px: ({clob_px} is below sell order px: {orig_px:.2f})")
|
||||
|
||||
order_filled = await cancel_single_order_by_id(CLIENT=CLIENT, order_id=o['orderID'])
|
||||
if not order_filled:
|
||||
await post_order(
|
||||
CLIENT = CLIENT,
|
||||
tick_size = POLY_CLOB['tick_size'],
|
||||
neg_risk = POLY_CLOB['neg_risk'],
|
||||
OrderArgs_list = [Custom_OrderArgs(
|
||||
token_id=o['token_id'],
|
||||
price=orig_px-0.01,
|
||||
size=orig_size,
|
||||
side=SELL,
|
||||
max_price = 0.00
|
||||
)]
|
||||
)
|
||||
|
||||
|
||||
async def no_orders_active_positions_route():
|
||||
'''
|
||||
Succesful Buy, now neeed to take profit and exit
|
||||
'''
|
||||
global LOCAL_TOKEN_BALANCES
|
||||
|
||||
OrderArgs_list = []
|
||||
|
||||
logging.warning(f'LOCAL_TOKEN_BALANCES: {LOCAL_TOKEN_BALANCES}')
|
||||
|
||||
for k, v in LOCAL_TOKEN_BALANCES.items():
|
||||
size = await get_balance_by_token_id(CLIENT=CLIENT, token_id=k)
|
||||
if size >= MIN_ORDER_SIZE:
|
||||
if POLY_CLOB['token_id_up'] == k:
|
||||
clob_px = float(POLY_CLOB['price'])
|
||||
else:
|
||||
clob_px = float(POLY_CLOB_DOWN['price'])
|
||||
|
||||
OrderArgs_list.append(
|
||||
Custom_OrderArgs(
|
||||
token_id=k,
|
||||
price=clob_px + TGT_PROFIT_CENTS,
|
||||
size=size,
|
||||
side='SELL',
|
||||
)
|
||||
)
|
||||
else:
|
||||
LOCAL_TOKEN_BALANCES[k] = 0.00
|
||||
logging.info(f'Wants to flatten small amount, skipping: {v}')
|
||||
|
||||
if OrderArgs_list:
|
||||
logging.info(f'Posting orders to close: {OrderArgs_list}')
|
||||
await post_order(
|
||||
CLIENT = CLIENT,
|
||||
tick_size = POLY_CLOB['tick_size'],
|
||||
neg_risk = POLY_CLOB['neg_risk'],
|
||||
OrderArgs_list = OrderArgs_list
|
||||
)
|
||||
|
||||
async def active_orders_active_positions_route():
|
||||
pass
|
||||
|
||||
async def kill_algo():
|
||||
logging.info('Killing algo...')
|
||||
await cancel_all_orders(CLIENT=CLIENT)
|
||||
await flatten_open_positions(
|
||||
CLIENT=CLIENT,
|
||||
token_id_up = POLY_CLOB.get('token_id_up', None),
|
||||
token_id_down = POLY_CLOB.get('token_id_down', None),
|
||||
)
|
||||
logging.info('...algo killed')
|
||||
raise Exception('Algo Killed')
|
||||
|
||||
async def run_algo():
|
||||
global POLY_BINANCE
|
||||
global POLY_REF
|
||||
global POLY_CLOB
|
||||
global POLY_CLOB_DOWN
|
||||
global USER_TRADES
|
||||
global USER_ORDERS
|
||||
|
||||
global SLOPE_HIST
|
||||
global ACTIVE_BALANCES_EXIST
|
||||
|
||||
global LOCAL_ACTIVE_ORDERS
|
||||
global LOCAL_TOKEN_BALANCES
|
||||
# global LOCAL_ACTIVE_POSITIONS
|
||||
|
||||
|
||||
print(f'token_id_up: {POLY_CLOB.get('token_id_up', None)}')
|
||||
print(f'token_id_down: {POLY_CLOB.get('token_id_down', None)}')
|
||||
|
||||
POLY_CLOB = json.loads(VAL_KEY.get('poly_5min_btcusd'))
|
||||
|
||||
ACTIVE_BALANCES_EXIST = await check_for_open_positions(
|
||||
CLIENT=CLIENT,
|
||||
token_id_up=POLY_CLOB.get('token_id_up', None),
|
||||
token_id_down=POLY_CLOB.get('token_id_down', None),
|
||||
)
|
||||
|
||||
try:
|
||||
while True:
|
||||
loop_start = time.time()
|
||||
print('__________Start___________')
|
||||
POLY_BINANCE = json.loads(VAL_KEY.get('poly_binance_btcusd'))
|
||||
POLY_REF = json.loads(VAL_KEY.get('poly_rtds_cl_btcusd'))
|
||||
POLY_CLOB = json.loads(VAL_KEY.get('poly_5min_btcusd'))
|
||||
POLY_CLOB_DOWN = json.loads(VAL_KEY.get('poly_5min_btcusd_down'))
|
||||
USER_TRADES = json.loads(VAL_KEY.get('poly_user_trades'))
|
||||
USER_ORDERS = VAL_KEY.get('poly_user_orders')
|
||||
USER_ORDERS = json.loads(USER_ORDERS) if USER_ORDERS is not None else []
|
||||
|
||||
### CHANGE METHOD FROM BUY-SELL TO BUY UP - BUY DOWN
|
||||
### DO THIS TO AVOID DELAY WITH FILL CONFIRMS
|
||||
|
||||
### Manage Local vs User Stream Orders ###
|
||||
print(f'LOCAL_ACTIVE_ORDERS: {LOCAL_ACTIVE_ORDERS}')
|
||||
for idx, o in enumerate(LOCAL_ACTIVE_ORDERS):
|
||||
user_order = next((item for item in USER_ORDERS if item["id"] == o['orderID']), None)
|
||||
user_trade = next( ( item for item in USER_TRADES if ( o['orderID'] == item['taker_order_id'] ) or ( o["orderID"] == json.loads(item['maker_orders'])[0]['order_id'] ) ), None )
|
||||
|
||||
print(f'USER TRADE: {user_trade}')
|
||||
|
||||
if user_trade is not None:
|
||||
trade_status = str(user_trade['status']).upper()
|
||||
logging.info(f'Updated Trade Status: {o['status']} --> {trade_status}; {o['orderID']}')
|
||||
if trade_status == 'CONFIRMED':
|
||||
LOCAL_ACTIVE_ORDERS.pop(idx)
|
||||
|
||||
token_id = user_trade['asset_id']
|
||||
current_balance = float(LOCAL_TOKEN_BALANCES.get(token_id, 0.00))
|
||||
|
||||
if user_trade['side'] == 'BUY':
|
||||
size = float(user_trade['size'])
|
||||
else:
|
||||
size = float(user_trade['size']) * -1
|
||||
|
||||
LOCAL_TOKEN_BALANCES[token_id] = current_balance + size
|
||||
|
||||
# px = user_trade['price']
|
||||
# LOCAL_ACTIVE_POSITIONS.append({
|
||||
# 'token_id': token_id,
|
||||
# 'order_id': o['orderID'],
|
||||
# 'associate_trades': user_order['associate_trades'],
|
||||
# 'size_matched': user_order['size_matched'],
|
||||
# 'price': px,
|
||||
# 'timestamp_value': user_order['timestamp'],
|
||||
# })
|
||||
logging.info('Order FILLED!')
|
||||
elif trade_status == 'MATCHED':
|
||||
logging.info(f'Order Matched...awaiting confirm: {trade_status}')
|
||||
else:
|
||||
logging.info(f'Trade status but not filled: trade= {user_trade}; order={o}')
|
||||
|
||||
elif user_order is not None:
|
||||
order_status = str(user_order['status']).upper()
|
||||
logging.info(f'Updated Order Status: {o['status']} --> {order_status}; {o['orderID']}')
|
||||
# LOCAL_ACTIVE_ORDERS[idx]['status'] = order_status
|
||||
# if order_status == 'MATCHED':
|
||||
# LOCAL_ACTIVE_ORDERS.pop(idx)
|
||||
|
||||
# token_id = user_order['asset_id']
|
||||
# current_balance = float(LOCAL_TOKEN_BALANCES.get(token_id, 0.00))
|
||||
|
||||
# if user_order['side'] == 'BUY':
|
||||
# size = float(user_order['size_matched'])
|
||||
# else:
|
||||
# size = float(user_order['size_matched']) * -1
|
||||
|
||||
# LOCAL_TOKEN_BALANCES[token_id] = current_balance + size
|
||||
|
||||
# # px = user_order['price']
|
||||
# # LOCAL_ACTIVE_POSITIONS.append({
|
||||
# # 'token_id': token_id,
|
||||
# # 'order_id': o['orderID'],
|
||||
# # 'associate_trades': user_order['associate_trades'],
|
||||
# # 'size_matched': user_order['size_matched'],
|
||||
# # 'price': px,
|
||||
# # 'timestamp_value': user_order['timestamp'],
|
||||
# # })
|
||||
# logging.info('Order FILLED!')
|
||||
if order_status == 'CANCELED':
|
||||
LOCAL_ACTIVE_ORDERS.pop(idx)
|
||||
logging.info('Order Canceled')
|
||||
else:
|
||||
logging.info('Order Live or Trade Awaiting Confirm')
|
||||
|
||||
### UPDATES CAN COME THRU EITHER ORDER OR TRADE CHANNELS - NEED TO UPDATE TO HANDLE TRADE CHANNLE
|
||||
|
||||
token_id_up = POLY_CLOB.get('token_id_up', 0)
|
||||
token_id_down = POLY_CLOB.get('token_id_down', 0)
|
||||
|
||||
if (token_id_up is None) or (token_id_down is None):
|
||||
print('Missing Token Ids for Market, sleeping 1 sec and retrying...')
|
||||
time.sleep(1)
|
||||
ACTIVE_BALANCES_EXIST = {}
|
||||
continue
|
||||
else:
|
||||
if (LOCAL_TOKEN_BALANCES.get(token_id_up) is None):
|
||||
LOCAL_TOKEN_BALANCES[token_id_up] = 0.00
|
||||
if (LOCAL_TOKEN_BALANCES.get(token_id_down) is None):
|
||||
LOCAL_TOKEN_BALANCES[token_id_down] = 0.00
|
||||
ACTIVE_BALANCES_EXIST = (LOCAL_TOKEN_BALANCES.get(token_id_up) > 0) or (LOCAL_TOKEN_BALANCES.get(token_id_down) > 0)
|
||||
|
||||
### Check for Endtime Buffer ###
|
||||
if ENDTIME_BUFFER_SEC > POLY_CLOB.get('sec_remaining', 0):
|
||||
if LOCAL_ACTIVE_ORDERS:
|
||||
print('buffer zone - orders cancel')
|
||||
await cancel_all_orders(CLIENT=CLIENT)
|
||||
if ACTIVE_BALANCES_EXIST:
|
||||
print('buffer zone - flatten positions')
|
||||
await flatten_open_positions(
|
||||
CLIENT=CLIENT,
|
||||
token_id_up = POLY_CLOB.get('token_id_up', None),
|
||||
token_id_down = POLY_CLOB.get('token_id_down', None),
|
||||
)
|
||||
|
||||
print('buffer zone, sleeping until next session')
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
### Execution Route ###
|
||||
if not(LOCAL_ACTIVE_ORDERS) and not(ACTIVE_BALANCES_EXIST): # No Orders, No Positions
|
||||
print('ROUTE: no_orders_no_positions_route')
|
||||
await no_orders_no_positions_route()
|
||||
|
||||
### Open Orders Route ###
|
||||
elif LOCAL_ACTIVE_ORDERS and not(ACTIVE_BALANCES_EXIST): # Orders, No Positions
|
||||
print('ROUTE: active_orders_no_positions_route')
|
||||
await active_orders_no_positions_route()
|
||||
|
||||
### Open Positions Route ###
|
||||
elif not(LOCAL_ACTIVE_ORDERS) and ACTIVE_BALANCES_EXIST: # No Orders, Positions
|
||||
print('ROUTE: no_orders_no_positions_route')
|
||||
await no_orders_active_positions_route()
|
||||
|
||||
### Open Orders and Open Positions Route ###
|
||||
else:
|
||||
print('ROUTE: active_orders_active_positions_route')
|
||||
await active_orders_no_positions_route() # Orders and Positions
|
||||
|
||||
print(f'__________________________ (Algo Engine ms: {(time.time() - loop_start)*1000})')
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print('...algo stopped')
|
||||
await cancel_all_orders(CLIENT=CLIENT)
|
||||
except Exception as e:
|
||||
logging.critical(f'*** ALGO ENGINE CRASHED: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
await cancel_all_orders(CLIENT=CLIENT)
|
||||
|
||||
|
||||
async def main():
|
||||
global CLIENT
|
||||
global VAL_KEY
|
||||
global CON
|
||||
|
||||
CLIENT = api.create_client()
|
||||
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
|
||||
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||
|
||||
async with engine.connect() as CON:
|
||||
await create_executions_orders_table(CON=CON)
|
||||
await run_algo()
|
||||
|
||||
if __name__ == '__main__':
|
||||
START_TIME = round(datetime.now().timestamp()*1000)
|
||||
|
||||
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||
|
||||
logging.basicConfig(
|
||||
force=True,
|
||||
filename=LOG_FILEPATH,
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
filemode='w'
|
||||
)
|
||||
logging.info(f"STARTED: {START_TIME}")
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
Binary file not shown.
117
ng.py
Normal file
117
ng.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import os
|
||||
from nicegui import ui, app
|
||||
from sqlalchemy import create_engine
|
||||
# import requests
|
||||
import json
|
||||
# import time
|
||||
# import re
|
||||
import valkey
|
||||
# import asyncio
|
||||
# import datetime as dt
|
||||
# from random import random
|
||||
# from nicegui_modules import data
|
||||
# from nicegui_modules import ui_components
|
||||
# from glide import GlideClient, NodeAddress, GlideClientConfiguration
|
||||
|
||||
|
||||
LISTENING_CLIENT = None
|
||||
LH_PAIR = 'BTC'
|
||||
RH_PAIR = 'USD'
|
||||
|
||||
DEFAULT_TO_DARKMODE: bool = True
|
||||
ALLOW_BODY_SCROLL: bool = True
|
||||
LOOKBACK: int = 60
|
||||
LOOKBACK_RT_TV_MAX_POINTS: int = 300
|
||||
REFRESH_INTERVAL_SEC: int = 10
|
||||
REFRESH_INTERVAL_RT_SEC: int = 1/30
|
||||
|
||||
ENGINE = create_engine('mysql+pymysql://root:pwd@localhost/polymarket')
|
||||
VALKEY_R = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
|
||||
# VALKEY_P = VALKEY_R.pubsub()
|
||||
# VALKEY_P.subscribe('mexc_mkt_bookTicker')
|
||||
|
||||
|
||||
def root():
|
||||
app.add_static_files(max_cache_age=0, url_path='/static', local_directory=os.path.join(os.path.dirname(__file__), 'nicegui_modules/static'))
|
||||
ui.add_head_html('''
|
||||
<meta name="darkreader-lock">
|
||||
<link rel="stylesheet" type="text/css" href="/static/styles.css">
|
||||
<script type="text/javascript" src="https://unpkg.com/lightweight-charts/dist/lightweight-charts.standalone.production.js"></script>
|
||||
<script src="/static/script.js"></script>
|
||||
'''
|
||||
)
|
||||
|
||||
# ui.add_head_html('<meta name="darkreader-lock">')
|
||||
update_body_scroll(bool_override=ALLOW_BODY_SCROLL)
|
||||
|
||||
ui.sub_pages({
|
||||
'/': rt_chart_page,
|
||||
}).classes('w-full')
|
||||
|
||||
|
||||
async def update_tv():
|
||||
series_update = json.loads(VALKEY_R.get('poly_rtds_cl_btcusd'))
|
||||
series_update_b = json.loads(VALKEY_R.get('poly_binance_btcusd'))
|
||||
series_update_c = json.loads(VALKEY_R.get('poly_5min_btcusd'))
|
||||
timestamp = round( ( series_update['timestamp_arrival'] / 1000 ) , 2)
|
||||
timestamp_b = round( ( series_update_b['timestamp_arrival'] / 1000 ) , 2)
|
||||
timestamp_c = round( ( series_update_c['timestamp_arrival'] / 1000 ) , 2)
|
||||
value = float(series_update['value'])
|
||||
value_b = float(series_update_b['value'])
|
||||
value_c = float(series_update_c['price'])
|
||||
|
||||
data_dict = {
|
||||
'timestamp': timestamp,
|
||||
'timestamp_b': timestamp_b,
|
||||
'timestamp_c': timestamp_c,
|
||||
'value': value,
|
||||
'value_b': value_b,
|
||||
'value_c': value_c,
|
||||
'target': series_update_c['target_price'],
|
||||
'LOOKBACK_RT_TV_MAX_POINTS': LOOKBACK_RT_TV_MAX_POINTS,
|
||||
}
|
||||
|
||||
ui.run_javascript(f'await update_tv(data_dict={data_dict});')
|
||||
|
||||
|
||||
def update_body_scroll(e=None, bool_override=False):
|
||||
if e is None:
|
||||
if bool_override:
|
||||
ui.query('body').style('height: 100%; overflow-y: auto;')
|
||||
else:
|
||||
ui.query('body').style('height: 100%; overflow-y: hidden;')
|
||||
else:
|
||||
if e.value:
|
||||
ui.query('body').style('height: 100%; overflow-y: auto;')
|
||||
else:
|
||||
ui.query('body').style('height: 100%; overflow-y: hidden;')
|
||||
|
||||
# async def refresh_lookback_funcs(lookback: int = LOOKBACK):
|
||||
# lookback = app.storage.user.get('lookback', lookback)
|
||||
|
||||
# await data.trades_pnl_graph.refresh(ENGINE=ENGINE, LH_PAIR=LH_PAIR, RH_PAIR=RH_PAIR, lookback=lookback)
|
||||
# await ui_components.er_table.refresh(ENGINE=ENGINE, lookback=lookback)
|
||||
# await ui_components.trades_table.refresh(ENGINE=ENGINE, lookback=lookback)
|
||||
# await ui_components.er_stats.refresh(ENGINE=ENGINE, lookback=lookback)
|
||||
|
||||
async def rt_chart_page():
|
||||
global LOOKBACK
|
||||
|
||||
LOOKBACK = app.storage.user.get('lookback', LOOKBACK)
|
||||
timer = ui.timer(REFRESH_INTERVAL_RT_SEC, update_tv)
|
||||
|
||||
with ui.row():
|
||||
with ui.column():
|
||||
ui.switch('☸︎', value=ALLOW_BODY_SCROLL, on_change=lambda e: update_body_scroll(e))
|
||||
with ui.column():
|
||||
ui.switch('▶️', value=True).bind_value_to(timer, 'active')
|
||||
with ui.column().style('position: absolute; right: 20px; font-family: monospace; align-self: center;'):
|
||||
ui.label('Atwater Trading: Orderbook')
|
||||
|
||||
with ui.grid(columns=16).classes('w-full gap-0 auto-fit'):
|
||||
with ui.card().tight().classes('w-full col-span-full no-shadow border border-black-200').style('overflow: auto;'):
|
||||
ui.html('<div id="tv" style="width:100%; height:800px;"></div>', sanitize=False).classes('w-full')
|
||||
ui.run_javascript('await create_tv();')
|
||||
|
||||
|
||||
ui.run(root, storage_secret="123ABC", reload=True, dark=True, title='Atwater Trading')
|
||||
19
ng/Dockerfile
Normal file
19
ng/Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
||||
FROM python:3.13-slim
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
|
||||
RUN gcc --version
|
||||
RUN rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
# Finally, run gunicorn.
|
||||
CMD [ "python", "ng.py"]
|
||||
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]
|
||||
223
nicegui_modules/static/script.js
Normal file
223
nicegui_modules/static/script.js
Normal file
@@ -0,0 +1,223 @@
|
||||
async function waitForVariable(variableName, timeout = 5000) {
|
||||
const startTime = Date.now();
|
||||
while (typeof window[variableName] === 'undefined') {
|
||||
if (Date.now() - startTime > timeout) {
|
||||
throw new Error(`Variable '${variableName}' not defined within ${timeout}ms`);
|
||||
}
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
console.log(`Variable '${variableName}' is now defined.`);
|
||||
}
|
||||
|
||||
async function update_tv(data_dict) {
|
||||
|
||||
window.data.push({ time: data_dict.timestamp, value: data_dict.value });
|
||||
window.data_b.push({ time: data_dict.timestamp_b, value: data_dict.value_b });
|
||||
window.data_c.push({ time: data_dict.timestamp_c, value: data_dict.value_c });
|
||||
window.data_tgt.push({ time: data_dict.timestamp_c, value: data_dict.target });
|
||||
window.lineSeries.update({ time: data_dict.timestamp, value: data_dict.value });
|
||||
window.lineSeries_b.update({ time: data_dict.timestamp_b, value: data_dict.value_b });
|
||||
window.lineSeries_c.update({ time: data_dict.timestamp_c, value: data_dict.value_c });
|
||||
window.lineSeries_tgt.update({ time: data_dict.timestamp_c, value: data_dict.target });
|
||||
|
||||
// midPriceLine.applyOptions({
|
||||
// price: data_dict.mid_px,
|
||||
// color: '#c78228',
|
||||
// lineWidth: 3,
|
||||
// lineStyle: LightweightCharts.LineStyle.Dashed,
|
||||
// axisLabelVisible: true,
|
||||
// });
|
||||
|
||||
window.chart.timeScale().scrollToRealTime();
|
||||
// const currentRange = window.chart.timeScale().getVisibleLogicalRange();
|
||||
// window.chart.timeScale().fitContent();
|
||||
// window.chart.timeScale().setVisibleLogicalRange(currentRange);
|
||||
|
||||
const MAX_DATA_POINTS = data_dict.LOOKBACK_RT_TV_MAX_POINTS;
|
||||
if (window.lineSeries.data().length > MAX_DATA_POINTS) {
|
||||
window.lineSeries.setData(lineSeries.data().slice(-MAX_DATA_POINTS));
|
||||
}
|
||||
if (window.lineSeries_b.data().length > MAX_DATA_POINTS) {
|
||||
window.lineSeries_b.setData(lineSeries_b.data().slice(-MAX_DATA_POINTS));
|
||||
}
|
||||
if (window.lineSeries_c.data().length > MAX_DATA_POINTS) {
|
||||
window.lineSeries_c.setData(lineSeries_c.data().slice(-MAX_DATA_POINTS));
|
||||
}
|
||||
if (window.lineSeries_tgt.data().length > MAX_DATA_POINTS) {
|
||||
window.lineSeries_tgt.setData(lineSeries_tgt.data().slice(-MAX_DATA_POINTS));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
async function create_tv() {
|
||||
window.chart = LightweightCharts.createChart(document.getElementById('tv'),
|
||||
{
|
||||
autoSize: true,
|
||||
toolbox: true,
|
||||
timeScale: {
|
||||
timeVisible: true, // Shows HH:mm on x-axis
|
||||
secondsVisible: true // Optional: show seconds
|
||||
},
|
||||
rightPriceScale: {
|
||||
visible: true,
|
||||
autoScale: true
|
||||
},
|
||||
leftPriceScale: {
|
||||
visible: true
|
||||
},
|
||||
|
||||
layout: {
|
||||
background: { type: 'solid', color: '#222' },
|
||||
textColor: '#DDD',
|
||||
},
|
||||
grid: {
|
||||
vertLines: {
|
||||
color: '#e1e1e1', // Set vertical line color
|
||||
visible: true,
|
||||
style: 2, // 0: Solid, 1: Dashed, 2: Dotted, 3: LargeDashed, 4: SparseDotted
|
||||
},
|
||||
horzLines: {
|
||||
color: '#e1e1e1', // Set horizontal line color
|
||||
visible: true,
|
||||
style: 2,
|
||||
},
|
||||
},
|
||||
|
||||
crosshair: { mode: LightweightCharts.CrosshairMode.Normal },
|
||||
}
|
||||
);
|
||||
window.lineSeries = chart.addSeries(LightweightCharts.LineSeries, {
|
||||
color: '#94fcdf',
|
||||
priceScaleId: 'right'
|
||||
// topColor: '#94fcdf',
|
||||
// bottomColor: 'rgba(112, 171, 249, 0.28)',
|
||||
// invertFilledArea: false
|
||||
});
|
||||
window.lineSeries_b = chart.addSeries(LightweightCharts.LineSeries, {
|
||||
color: '#dd7525',
|
||||
priceScaleId: 'right'
|
||||
// topColor: '#94fcdf',
|
||||
// bottomColor: 'rgba(112, 171, 249, 0.28)',
|
||||
// invertFilledArea: false
|
||||
});
|
||||
window.lineSeries_c = chart.addSeries(LightweightCharts.LineSeries, {
|
||||
color: '#ea0707',
|
||||
priceScaleId: 'left',
|
||||
autoscaleInfoProvider: () => ({
|
||||
priceRange: {
|
||||
minValue: 0.0,
|
||||
maxValue: 1.0
|
||||
}
|
||||
})
|
||||
// topColor: '#94fcdf',
|
||||
// bottomColor: 'rgba(112, 171, 249, 0.28)',
|
||||
// invertFilledArea: false
|
||||
});
|
||||
window.lineSeries_tgt = chart.addSeries(LightweightCharts.LineSeries, {
|
||||
color: '#ffffff',
|
||||
priceScaleId: 'right',
|
||||
lineStyle: LightweightCharts.LineStyle.Dashed
|
||||
// topColor: '#94fcdf',
|
||||
// bottomColor: 'rgba(112, 171, 249, 0.28)',
|
||||
// invertFilledArea: false
|
||||
});
|
||||
// window.midPriceLine_Config = {
|
||||
// price: 0,
|
||||
// color: '#c78228',
|
||||
// lineWidth: 3,
|
||||
// lineStyle: LightweightCharts.LineStyle.Dashed,
|
||||
// axisLabelVisible: false,
|
||||
// };
|
||||
// window.midPriceLine = window.lineSeries.createPriceLine(midPriceLine_Config);
|
||||
window.data = [];
|
||||
window.data_b = [];
|
||||
window.data_c = [];
|
||||
window.data_tgt = [];
|
||||
window.lineSeries.setData(window.data);
|
||||
window.lineSeries_b.setData(window.data_b);
|
||||
window.lineSeries_c.setData(window.data_c);
|
||||
window.lineSeries_tgt.setData(window.data_tgt);
|
||||
|
||||
// Create and style the tooltip html element
|
||||
const container = document.getElementById('tv');
|
||||
|
||||
window.toolTipWidth = 200;
|
||||
|
||||
const toolTip = document.createElement('div');
|
||||
toolTip.style = `width: ${window.toolTipWidth}px; height: 100%; position: absolute; display: none; padding: 8px; box-sizing: border-box; font-size: 12px; text-align: left; z-index: 1000; top: 12px; left: 12px; pointer-events: none; border-radius: 4px 4px 0px 0px; border-bottom: none; box-shadow: 0 2px 5px 0 rgba(117, 134, 150, 0.45);font-family: -apple-system, BlinkMacSystemFont, 'Trebuchet MS', Roboto, Ubuntu, sans-serif; -webkit-font-smoothing: antialiased; -moz-osx-font-smoothing: grayscale;`;
|
||||
toolTip.style.background = `rgba(${'0, 0, 0'}, 0.25)`;
|
||||
toolTip.style.color = 'white';
|
||||
toolTip.style.borderColor = 'rgba( 239, 83, 80, 1)';
|
||||
container.appendChild(toolTip);
|
||||
|
||||
// update tooltip
|
||||
window.chart.subscribeCrosshairMove(async param => {
|
||||
|
||||
if (
|
||||
param.point === undefined ||
|
||||
!param.time ||
|
||||
param.point.x < 0 ||
|
||||
param.point.x > container.clientWidth ||
|
||||
param.point.y < 0 ||
|
||||
param.point.y > container.clientHeight
|
||||
) {
|
||||
toolTip.style.display = 'none';
|
||||
} else {
|
||||
|
||||
// toolTip.style.height = '100%';
|
||||
toolTip.style.alignContent = 'center';
|
||||
|
||||
const dateStr = new Date(param.time*1000).toISOString();
|
||||
|
||||
let data = await param.seriesData.get(window.lineSeries);
|
||||
if (data === undefined) {
|
||||
data = {}
|
||||
data.value = 0
|
||||
console.log('data is UNDEFINED, SETTING TO 0')
|
||||
};
|
||||
|
||||
let data_b = await param.seriesData.get(window.lineSeries_b);
|
||||
if (data_b === undefined) {
|
||||
data_b = {}
|
||||
data_b.value = 0
|
||||
console.log('data is UNDEFINED, SETTING TO 0')
|
||||
};
|
||||
|
||||
const value_px = data.value
|
||||
const value_px_b = window.data_b.value
|
||||
const value_px_c = window.data_c.value
|
||||
const value_px_tgt = window.data_tgt.value
|
||||
|
||||
toolTip.style.display = 'block';
|
||||
// <div style="color: ${'rgba( 239, 83, 80, 1)'}">
|
||||
// Atwater Trading
|
||||
// </div>
|
||||
toolTip.innerHTML = `
|
||||
<div style="font-size: 24px; margin: 4px 0px; color: ${'white'}">
|
||||
Chainlink: ${Math.round(100 * value_px) / 100}
|
||||
Binance: ${Math.round(100 * value_px_b) / 100}
|
||||
</div>
|
||||
<div style="color: ${'white'}">
|
||||
${dateStr}
|
||||
</div>
|
||||
`;
|
||||
|
||||
let left = param.point.x; // relative to timeScale
|
||||
const timeScaleWidth = chart.timeScale().width();
|
||||
const priceScaleWidth = chart.priceScale('left').width();
|
||||
const halfTooltipWidth = toolTipWidth / 2;
|
||||
left += priceScaleWidth - halfTooltipWidth;
|
||||
left = Math.min(left, priceScaleWidth + timeScaleWidth - toolTipWidth);
|
||||
left = Math.max(left, priceScaleWidth);
|
||||
|
||||
toolTip.style.left = left + 'px';
|
||||
toolTip.style.top = 0 + 'px';
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
|
||||
window.chart.timeScale().fitContent();
|
||||
|
||||
console.log("TV Created!")
|
||||
};
|
||||
33
nicegui_modules/static/styles.css
Normal file
33
nicegui_modules/static/styles.css
Normal file
@@ -0,0 +1,33 @@
|
||||
/* Sticky Quasar Table for Dark Mode */
|
||||
.table-sticky-dark .q-table__top,
|
||||
.table-sticky-dark .q-table__bottom,
|
||||
.table-sticky-dark thead tr:first-child th {
|
||||
background-color: black;
|
||||
}
|
||||
.table-sticky-dark thead tr th {
|
||||
position: sticky;
|
||||
z-index: 1;
|
||||
}
|
||||
.table-sticky-dark thead tr:first-child th {
|
||||
top: 0;
|
||||
}
|
||||
.table-sticky-dark tbody {
|
||||
scroll-margin-top: 48px;
|
||||
}
|
||||
|
||||
/* Sticky Quasar Table for Light Mode */
|
||||
/* .table-sticky-light .q-table__top,
|
||||
.table-sticky-light .q-table__bottom,
|
||||
.table-sticky-light thead tr:first-child th {
|
||||
background-color: rgb(229, 223, 223);
|
||||
}
|
||||
.table-sticky-light thead tr th {
|
||||
position: sticky;
|
||||
z-index: 1;
|
||||
}
|
||||
.table-sticky-light thead tr:first-child th {
|
||||
top: 0;
|
||||
}
|
||||
.table-sticky-light tbody {
|
||||
scroll-margin-top: 48px;
|
||||
} */
|
||||
1188
order_entry.ipynb
1188
order_entry.ipynb
File diff suppressed because it is too large
Load Diff
22
requirements.txt
Normal file
22
requirements.txt
Normal file
@@ -0,0 +1,22 @@
|
||||
pandas
|
||||
rel
|
||||
websockets
|
||||
pyarrow
|
||||
plotly
|
||||
mysql-connector-python
|
||||
sqlalchemy
|
||||
requests
|
||||
pymysql
|
||||
scipy
|
||||
asyncmy
|
||||
cryptography
|
||||
TA-Lib
|
||||
valkey
|
||||
nicegui
|
||||
py_clob_client
|
||||
# google
|
||||
# google-api-core==2.30.0
|
||||
# google-api-python-client==2.190.0
|
||||
# googleapis-common-protos==1.72.0
|
||||
# grpcio==1.76.0
|
||||
# grpcio-tools==1.76.0
|
||||
63
test.py
Normal file
63
test.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import asyncio
|
||||
import json
|
||||
import websockets
|
||||
import time
|
||||
|
||||
# Credentials
|
||||
API_KEY = "019d2ad3-3755-744b-ace8-ad0f08c958dd"
|
||||
API_SECRET = "vXT1UeliaP89z9vcxDtdv47422mftijJkrJYE7CFqvA="
|
||||
API_PASSPHRASE = "57e703b801f22333d1a66a48c3a71773d3d3a42825ddcf330c3325856bc99756"
|
||||
WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/user"
|
||||
|
||||
async def heartbeat(websocket):
|
||||
"""Sends a heartbeat every 10 seconds to keep the connection alive."""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(10)
|
||||
await websocket.send(json.dumps({}))
|
||||
# print("Heartbeat sent")
|
||||
except Exception:
|
||||
break
|
||||
|
||||
async def connect_polymarket_user_ws():
|
||||
while True: # Outer loop for reconnection
|
||||
try:
|
||||
async with websockets.connect(WS_URL) as websocket:
|
||||
subscribe_message = {
|
||||
"type": "user",
|
||||
"auth": {
|
||||
"apiKey": API_KEY,
|
||||
"secret": API_SECRET,
|
||||
"passphrase": API_PASSPHRASE
|
||||
},
|
||||
"markets": []
|
||||
}
|
||||
|
||||
await websocket.send(json.dumps(subscribe_message))
|
||||
print(f"[{time.strftime('%H:%M:%S')}] Subscription sent...")
|
||||
|
||||
# Start the heartbeat task in the background
|
||||
heartbeat_task = asyncio.create_task(heartbeat(websocket))
|
||||
|
||||
async for message in websocket:
|
||||
data = json.loads(message)
|
||||
|
||||
# Log the specific reason if it's an error message
|
||||
if data.get("type") == "error":
|
||||
print(f"Server Error: {data.get('message')}")
|
||||
break
|
||||
|
||||
if data: # Ignore empty heartbeat responses from server
|
||||
print(f"Update: {data}")
|
||||
|
||||
heartbeat_task.cancel()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Connection lost: {e}. Retrying in 5s...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(connect_polymarket_user_ws())
|
||||
except KeyboardInterrupt:
|
||||
print("Stopped by user.")
|
||||
150
ws.py
150
ws.py
@@ -1,150 +0,0 @@
|
||||
import asyncio
|
||||
import json
|
||||
import math
|
||||
import pandas as pd
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
import websockets
|
||||
import numpy as np
|
||||
import talib
|
||||
import requests
|
||||
|
||||
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
|
||||
SLUG_END_TIME = 0
|
||||
|
||||
HIST_TRADES = np.empty((0, 2))
|
||||
|
||||
def format_timestamp(total_seconds) -> str:
|
||||
minutes, seconds = divmod(total_seconds, 60)
|
||||
return f"{minutes} minutes and {seconds} seconds"
|
||||
|
||||
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
|
||||
interval_secs = interval_mins * 60
|
||||
seconds = dt.timestamp()
|
||||
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
|
||||
|
||||
return rounded_seconds
|
||||
|
||||
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
|
||||
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
|
||||
market = r.json()['markets'][0]
|
||||
token_ids = json.loads(market.get("clobTokenIds", "[]"))
|
||||
outcomes = json.loads(market.get("outcomes", "[]"))
|
||||
d = dict(zip(outcomes, token_ids))
|
||||
d['isActive'] = market['negRisk']
|
||||
d['MinTickSize'] = market['orderPriceMinTickSize']
|
||||
d['isNegRisk'] = market['negRisk']
|
||||
d['ConditionId'] = market['conditionId']
|
||||
d['EndDateTime'] = market['endDate']
|
||||
|
||||
return d, market
|
||||
|
||||
def gen_slug():
|
||||
slug_prefix = 'btc-updown-5m-'
|
||||
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
|
||||
return slug_prefix + str(slug_time_id)
|
||||
|
||||
|
||||
async def polymarket_stream():
|
||||
global SLUG_END_TIME
|
||||
global HIST_TRADES
|
||||
|
||||
slug_full = gen_slug()
|
||||
market_details, market = get_mkt_details_by_slug(slug_full)
|
||||
TARGET_ASSET_ID = market_details['Up']
|
||||
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
|
||||
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
|
||||
|
||||
async with websockets.connect(WSS_URL) as websocket:
|
||||
print(f"Connected to {WSS_URL}")
|
||||
|
||||
subscribe_msg = {
|
||||
"assets_ids": [TARGET_ASSET_ID],
|
||||
"type": "market",
|
||||
"custom_feature_enabled": True
|
||||
}
|
||||
|
||||
await websocket.send(json.dumps(subscribe_msg))
|
||||
print(f"Subscribed to Asset: {TARGET_ASSET_ID}")
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
current_ts = round(datetime.now().timestamp())
|
||||
sec_remaining = SLUG_END_TIME - current_ts
|
||||
|
||||
if sec_remaining <= 0:
|
||||
HIST_TRADES = np.empty((0, 2))
|
||||
|
||||
print('*** Attempting to unsub from past 5min')
|
||||
update_unsub_msg = {
|
||||
"operation": 'unsubscribe',
|
||||
"assets_ids": [TARGET_ASSET_ID],
|
||||
"custom_feature_enabled": True
|
||||
}
|
||||
await websocket.send(json.dumps(update_unsub_msg))
|
||||
|
||||
print('*** Attempting to SUB to new 5min')
|
||||
slug_full = gen_slug()
|
||||
market_details, market = get_mkt_details_by_slug(slug_full)
|
||||
TARGET_ASSET_ID = market_details['Up']
|
||||
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
|
||||
|
||||
update_sub_msg = {
|
||||
"operation": 'subscribe',
|
||||
"assets_ids": [TARGET_ASSET_ID],
|
||||
"custom_feature_enabled": True
|
||||
}
|
||||
await websocket.send(json.dumps(update_sub_msg))
|
||||
|
||||
|
||||
if isinstance(message, str):
|
||||
data = json.loads(message)
|
||||
|
||||
if isinstance(data, dict):
|
||||
# print(data.get("event_type", None))
|
||||
pass
|
||||
elif isinstance(data, list):
|
||||
print('initial book: ')
|
||||
print(data)
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
||||
|
||||
event_type = data.get("event_type", None)
|
||||
|
||||
if event_type == "price_change":
|
||||
# print("📈 Price Change")
|
||||
# print(pd.DataFrame(data['price_changes']))
|
||||
pass
|
||||
elif event_type == "best_bid_ask":
|
||||
# print(pd.DataFrame([data]))
|
||||
pass
|
||||
elif event_type == "last_trade_price":
|
||||
px = float(data['price'])
|
||||
qty = float(data['size'])
|
||||
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
|
||||
SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
|
||||
print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
|
||||
elif event_type == "book":
|
||||
pass
|
||||
elif event_type == "new_market":
|
||||
print('Received new_market')
|
||||
elif event_type == "market_resolved":
|
||||
print(f"Received: {event_type}")
|
||||
print(data)
|
||||
elif event_type == "tick_size_change": # may want for CLOB order routing
|
||||
print(f"Received: {event_type}")
|
||||
print(data)
|
||||
else:
|
||||
print(f"Received: {event_type}")
|
||||
print(data)
|
||||
|
||||
except websockets.ConnectionClosed:
|
||||
print("Connection closed by server.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
asyncio.run(polymarket_stream())
|
||||
except KeyboardInterrupt:
|
||||
print("Stream stopped.")
|
||||
213
ws_binance.py
Normal file
213
ws_binance.py
Normal file
@@ -0,0 +1,213 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import AsyncContextManager
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
|
||||
from sqlalchemy import text
|
||||
import websockets
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
import valkey
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
### Allow only ipv4 ###
|
||||
def allowed_gai_family():
|
||||
return socket.AF_INET
|
||||
urllib3_cn.allowed_gai_family = allowed_gai_family
|
||||
|
||||
### Database ###
|
||||
USE_DB: bool = True
|
||||
USE_VK: bool = True
|
||||
VK_CHANNEL = 'poly_binance_btcusd'
|
||||
CON: AsyncContextManager | None = None
|
||||
VAL_KEY = None
|
||||
|
||||
### Logging ###
|
||||
load_dotenv()
|
||||
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_Binance_Trades.log'
|
||||
|
||||
### Globals ###
|
||||
WSS_URL = "wss://stream.binance.com:9443/ws/BTCUSDT@aggTrade"
|
||||
HIST_TRADES = np.empty((0, 3))
|
||||
HIST_TRADES_LOOKBACK_SEC = 5
|
||||
|
||||
### Database Funcs ###
|
||||
async def create_rtds_btcusd_table(
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
|
||||
await CON.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
|
||||
timestamp_arrival BIGINT,
|
||||
timestamp_msg BIGINT,
|
||||
timestamp_value BIGINT,
|
||||
value DOUBLE,
|
||||
qty DOUBLE
|
||||
);
|
||||
"""))
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
async def insert_rtds_btcusd_table(
|
||||
timestamp_arrival: int,
|
||||
timestamp_msg: int,
|
||||
timestamp_value: int,
|
||||
value: float,
|
||||
qty: float,
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
params={
|
||||
'timestamp_arrival': timestamp_arrival,
|
||||
'timestamp_msg': timestamp_msg,
|
||||
'timestamp_value': timestamp_value,
|
||||
'value': value,
|
||||
'qty': qty,
|
||||
}
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
await CON.execute(text("""
|
||||
INSERT INTO binance_btcusd_trades
|
||||
(
|
||||
timestamp_arrival,
|
||||
timestamp_msg,
|
||||
timestamp_value,
|
||||
value,
|
||||
qty
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
:timestamp_arrival,
|
||||
:timestamp_msg,
|
||||
:timestamp_value,
|
||||
:value,
|
||||
:qty
|
||||
)
|
||||
"""),
|
||||
parameters=params
|
||||
)
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
### Websocket ###
|
||||
async def binance_trades_stream():
|
||||
global HIST_TRADES
|
||||
|
||||
async for websocket in websockets.connect(WSS_URL):
|
||||
logging.info(f"Connected to {WSS_URL}")
|
||||
|
||||
subscribe_msg = {
|
||||
"method": "SUBSCRIBE",
|
||||
"params": ["btcusdt@aggTrade"],
|
||||
"id": 1
|
||||
}
|
||||
|
||||
await websocket.send(json.dumps(subscribe_msg))
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
ts_arrival = round(datetime.now().timestamp()*1000)
|
||||
if isinstance(message, str):
|
||||
try:
|
||||
data = json.loads(message)
|
||||
if data.get('T', None) is not None:
|
||||
timestamp_msg = data['E']
|
||||
timestamp_value = data['T']
|
||||
last_px = float(data['p'])
|
||||
qty = float(data['q'])
|
||||
# print(f'🤑 BTC Binance Last Px: {last_px:_.4f}; TS: {pd.to_datetime(data['T'], unit='ms')}')
|
||||
HIST_TRADES = np.append(HIST_TRADES, np.array([[timestamp_value, last_px, qty]]), axis=0)
|
||||
hist_trades_lookback_ts_ms = round(datetime.now().timestamp() - HIST_TRADES_LOOKBACK_SEC)*1000
|
||||
HIST_TRADES = HIST_TRADES[HIST_TRADES[:, 0] >= hist_trades_lookback_ts_ms]
|
||||
VAL_KEY_OBJ = json.dumps({
|
||||
'timestamp_arrival': ts_arrival,
|
||||
'timestamp_msg': timestamp_msg,
|
||||
'timestamp_value': timestamp_value,
|
||||
'value': last_px,
|
||||
'qty': qty,
|
||||
'hist_trades': HIST_TRADES.tolist()
|
||||
})
|
||||
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
await insert_rtds_btcusd_table(
|
||||
CON=CON,
|
||||
timestamp_arrival=ts_arrival,
|
||||
timestamp_msg=timestamp_msg,
|
||||
timestamp_value=timestamp_value,
|
||||
value=last_px,
|
||||
qty=qty,
|
||||
)
|
||||
else:
|
||||
logging.info(f'Initial or unexpected data struct, skipping: {data}')
|
||||
continue
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
logging.warning(f'Message not in JSON format, skipping: {message}')
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
||||
except websockets.ConnectionClosed as e:
|
||||
logging.error(f'Connection closed: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
continue
|
||||
except Exception as e:
|
||||
logging.error(f'Connection closed: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
|
||||
|
||||
async def main():
|
||||
global VAL_KEY
|
||||
global CON
|
||||
|
||||
if USE_VK:
|
||||
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||
else:
|
||||
VAL_KEY = None
|
||||
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||
|
||||
if USE_DB:
|
||||
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||
async with engine.connect() as CON:
|
||||
await create_rtds_btcusd_table(CON=CON)
|
||||
await binance_trades_stream()
|
||||
else:
|
||||
CON = None
|
||||
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||
await binance_trades_stream()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
START_TIME = round(datetime.now().timestamp()*1000)
|
||||
|
||||
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||
|
||||
logging.basicConfig(
|
||||
force=True,
|
||||
filename=LOG_FILEPATH,
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
filemode='w'
|
||||
)
|
||||
logging.info(f"STARTED: {START_TIME}")
|
||||
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Stream stopped")
|
||||
19
ws_binance/Dockerfile
Normal file
19
ws_binance/Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
||||
FROM python:3.13-slim
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
|
||||
RUN gcc --version
|
||||
RUN rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
# Finally, run gunicorn.
|
||||
CMD [ "python", "ws_binance.py"]
|
||||
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]
|
||||
378
ws_clob.py
Normal file
378
ws_clob.py
Normal file
@@ -0,0 +1,378 @@
|
||||
import asyncio
|
||||
import json
|
||||
import math
|
||||
import logging
|
||||
import pandas as pd
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
import websockets
|
||||
import numpy as np
|
||||
import talib
|
||||
import requests
|
||||
from typing import AsyncContextManager
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
from sqlalchemy import text
|
||||
import valkey
|
||||
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
### Database ###
|
||||
USE_DB: bool = True
|
||||
USE_VK: bool = True
|
||||
VK_CHANNEL = 'poly_5min_btcusd'
|
||||
VK_CHANNEL_DOWN = 'poly_5min_btcusd_down'
|
||||
CON: AsyncContextManager | None = None
|
||||
VAL_KEY = None
|
||||
|
||||
### Logging ###
|
||||
load_dotenv()
|
||||
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_5min.log'
|
||||
|
||||
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
|
||||
SLUG_END_TIME = 0
|
||||
|
||||
HIST_TRADES = np.empty((0, 2))
|
||||
HIST_TRADES_DOWN = np.empty((0, 2))
|
||||
MIN_TICK_SIZE = 0.01
|
||||
NEG_RISK = False
|
||||
|
||||
TARGET_PX = 0
|
||||
|
||||
TARGET_ASSET_ID = None
|
||||
TARGET_ASSET_ID_DOWN = None
|
||||
|
||||
def format_timestamp(total_seconds) -> str:
|
||||
minutes, seconds = divmod(total_seconds, 60)
|
||||
|
||||
return f"{minutes} minutes and {seconds} seconds"
|
||||
|
||||
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
|
||||
interval_secs = interval_mins * 60
|
||||
seconds = dt.timestamp()
|
||||
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
|
||||
|
||||
return rounded_seconds
|
||||
|
||||
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
|
||||
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
|
||||
market = r.json()['markets'][0]
|
||||
token_ids = json.loads(market.get("clobTokenIds", "[]"))
|
||||
outcomes = json.loads(market.get("outcomes", "[]"))
|
||||
d = dict(zip(outcomes, token_ids))
|
||||
d['isActive'] = market['negRisk']
|
||||
d['MinTickSize'] = market['orderPriceMinTickSize']
|
||||
d['OrderMinSize'] = market['orderMinSize']
|
||||
d['isNegRisk'] = market['negRisk']
|
||||
d['ConditionId'] = market['conditionId']
|
||||
d['EndDateTime'] = market['endDate']
|
||||
# d['Liquidity'] = market['liquidity']
|
||||
d['LiquidityClob'] = market['liquidityClob']
|
||||
d['VolumeNum'] = market['volumeNum']
|
||||
d['Volume24hr'] = market['volume24hr']
|
||||
print(market)
|
||||
|
||||
return d, market
|
||||
|
||||
def gen_slug():
|
||||
slug_prefix = 'btc-updown-5m-'
|
||||
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
|
||||
|
||||
return slug_prefix + str(slug_time_id)
|
||||
|
||||
|
||||
### Database Funcs ###
|
||||
async def create_poly_btcusd_trades_table(
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
logging.info('Creating Table if Does Not Exist: poly_btcusd_trades')
|
||||
await CON.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS poly_btcusd_trades (
|
||||
timestamp_arrival BIGINT,
|
||||
timestamp_msg BIGINT,
|
||||
timestamp_value BIGINT,
|
||||
price DOUBLE,
|
||||
qty DOUBLE,
|
||||
side_taker VARCHAR(8),
|
||||
up_or_down VARCHAR(8)
|
||||
);
|
||||
"""))
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
async def insert_poly_btcusd_trades_table(
|
||||
timestamp_arrival: int,
|
||||
timestamp_msg: int,
|
||||
timestamp_value: int,
|
||||
price: float,
|
||||
qty: float,
|
||||
side_taker: str,
|
||||
up_or_down: str,
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
params={
|
||||
'timestamp_arrival': timestamp_arrival,
|
||||
'timestamp_msg': timestamp_msg,
|
||||
'timestamp_value': timestamp_value,
|
||||
'price': price,
|
||||
'qty': qty,
|
||||
'side_taker': side_taker,
|
||||
'up_or_down': up_or_down,
|
||||
}
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
await CON.execute(text("""
|
||||
INSERT INTO poly_btcusd_trades
|
||||
(
|
||||
timestamp_arrival,
|
||||
timestamp_msg,
|
||||
timestamp_value,
|
||||
price,
|
||||
qty,
|
||||
side_taker,
|
||||
up_or_down
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
:timestamp_arrival,
|
||||
:timestamp_msg,
|
||||
:timestamp_value,
|
||||
:price,
|
||||
:qty,
|
||||
:side_taker,
|
||||
:up_or_down
|
||||
)
|
||||
"""),
|
||||
parameters=params
|
||||
)
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
|
||||
async def polymarket_stream():
|
||||
global SLUG_END_TIME
|
||||
global TARGET_PX
|
||||
global HIST_TRADES
|
||||
global HIST_TRADES_DOWN
|
||||
global MIN_TICK_SIZE
|
||||
global NEG_RISK
|
||||
global TARGET_ASSET_ID
|
||||
global TARGET_ASSET_ID_DOWN
|
||||
|
||||
slug_full = gen_slug()
|
||||
market_details, _ = get_mkt_details_by_slug(slug_full)
|
||||
CONDITION_ID = market_details['ConditionId']
|
||||
TARGET_ASSET_ID = market_details['Up']
|
||||
TARGET_ASSET_ID_DOWN = market_details['Down']
|
||||
MIN_TICK_SIZE = market_details['MinTickSize']
|
||||
NEG_RISK = market_details['isNegRisk']
|
||||
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
|
||||
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
|
||||
|
||||
async for websocket in websockets.connect(WSS_URL):
|
||||
print(f"Connected to {WSS_URL}")
|
||||
|
||||
subscribe_msg = {
|
||||
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
|
||||
"type": "market",
|
||||
"custom_feature_enabled": False
|
||||
}
|
||||
|
||||
await websocket.send(json.dumps(subscribe_msg))
|
||||
print(f"Subscribed to Assets: Up {TARGET_ASSET_ID}; Down: {TARGET_ASSET_ID_DOWN}")
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
ts_arrival = round(datetime.now().timestamp()*1000)
|
||||
sec_remaining = SLUG_END_TIME - round(datetime.now().timestamp())
|
||||
|
||||
if sec_remaining <= 0:
|
||||
ref_data = json.loads(VAL_KEY.get('poly_rtds_cl_btcusd'))
|
||||
TARGET_PX = float(ref_data['value'])
|
||||
HIST_TRADES = np.empty((0, 2))
|
||||
|
||||
print('*** Attempting to unsub from past 5min')
|
||||
update_unsub_msg = {
|
||||
"operation": 'unsubscribe',
|
||||
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
|
||||
"custom_feature_enabled": False
|
||||
}
|
||||
await websocket.send(json.dumps(update_unsub_msg))
|
||||
|
||||
print('*** Attempting to SUB to new 5min')
|
||||
slug_full = gen_slug()
|
||||
market_details, market = get_mkt_details_by_slug(slug_full)
|
||||
CONDITION_ID = market_details['ConditionId']
|
||||
TARGET_ASSET_ID = market_details['Up']
|
||||
TARGET_ASSET_ID_DOWN = market_details['Down']
|
||||
MIN_TICK_SIZE = market_details['MinTickSize']
|
||||
NEG_RISK = market_details['isNegRisk']
|
||||
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
|
||||
|
||||
update_sub_msg = {
|
||||
"operation": 'subscribe',
|
||||
"assets_ids": [TARGET_ASSET_ID, TARGET_ASSET_ID_DOWN],
|
||||
"custom_feature_enabled": False
|
||||
}
|
||||
await websocket.send(json.dumps(update_sub_msg))
|
||||
|
||||
if isinstance(message, str):
|
||||
data = json.loads(message)
|
||||
if isinstance(data, list):
|
||||
print('initial book:')
|
||||
print(data)
|
||||
continue
|
||||
|
||||
event_type = data.get("event_type", None)
|
||||
|
||||
if event_type == "price_change":
|
||||
# print("📈 Price Change")
|
||||
# print(pd.DataFrame(data['price_changes']))
|
||||
continue
|
||||
elif event_type == "best_bid_ask":
|
||||
# print(pd.DataFrame([data]))
|
||||
continue
|
||||
elif event_type == "last_trade_price":
|
||||
token_id = data['asset_id']
|
||||
ts_msg = int(data['timestamp'])
|
||||
ts_value = int(ts_msg)
|
||||
px = float(data['price'])
|
||||
qty = float(data['size'])
|
||||
side_taker = data['side']
|
||||
if token_id == TARGET_ASSET_ID:
|
||||
up_or_down = 'UP'
|
||||
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
|
||||
# print(f"✨ Last Px: {px:.2f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
|
||||
# print(f'Up: {TARGET_ASSET_ID}')
|
||||
# print(f'Down: {TARGET_ASSET_ID_DOWN}')
|
||||
# SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
|
||||
# print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
|
||||
if USE_VK:
|
||||
VAL_KEY_OBJ = json.dumps({
|
||||
'timestamp_arrival': ts_arrival,
|
||||
'timestamp_msg': ts_msg,
|
||||
'timestamp_value': ts_value,
|
||||
'price': px,
|
||||
'qty': qty,
|
||||
'side_taker': side_taker,
|
||||
'sec_remaining': sec_remaining,
|
||||
'target_price': TARGET_PX,
|
||||
'condition_id': CONDITION_ID,
|
||||
'token_id_up': TARGET_ASSET_ID,
|
||||
'token_id_down': TARGET_ASSET_ID_DOWN,
|
||||
'tick_size': MIN_TICK_SIZE,
|
||||
'neg_risk': NEG_RISK,
|
||||
})
|
||||
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
elif token_id == TARGET_ASSET_ID_DOWN:
|
||||
up_or_down = 'DOWN'
|
||||
HIST_TRADES_DOWN = np.append(HIST_TRADES_DOWN, np.array([[px, qty]]), axis=0)
|
||||
if USE_VK:
|
||||
VAL_KEY_OBJ = json.dumps({
|
||||
'timestamp_arrival': ts_arrival,
|
||||
'timestamp_msg': ts_msg,
|
||||
'timestamp_value': ts_value,
|
||||
'price': px,
|
||||
'qty': qty,
|
||||
'side_taker': side_taker,
|
||||
'sec_remaining': sec_remaining,
|
||||
'target_price': TARGET_PX,
|
||||
'condition_id': CONDITION_ID,
|
||||
'token_id_up': TARGET_ASSET_ID,
|
||||
'token_id_down': TARGET_ASSET_ID_DOWN,
|
||||
'tick_size': MIN_TICK_SIZE,
|
||||
'neg_risk': NEG_RISK,
|
||||
})
|
||||
VAL_KEY.set(VK_CHANNEL_DOWN, VAL_KEY_OBJ)
|
||||
else:
|
||||
logging.warning('Token Id from Market Does Not Match Pricing Data Id')
|
||||
|
||||
if USE_DB:
|
||||
await insert_poly_btcusd_trades_table(
|
||||
CON=CON,
|
||||
timestamp_arrival=ts_arrival,
|
||||
timestamp_msg=ts_msg,
|
||||
timestamp_value=ts_value,
|
||||
price=px,
|
||||
qty=qty,
|
||||
side_taker=side_taker,
|
||||
up_or_down=up_or_down
|
||||
)
|
||||
|
||||
elif event_type == "book":
|
||||
continue
|
||||
elif event_type == "new_market":
|
||||
print('Received new_market')
|
||||
continue
|
||||
elif event_type == "market_resolved":
|
||||
print(f"Received: {event_type}")
|
||||
print(data)
|
||||
continue
|
||||
elif event_type == "tick_size_change": # may want for CLOB order routing
|
||||
print(f"Received: {event_type}")
|
||||
print(data)
|
||||
continue
|
||||
else:
|
||||
print(f"*********** REC UNMAPPED EVENT: {event_type}")
|
||||
print(data)
|
||||
continue
|
||||
elif isinstance(data, dict):
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
||||
|
||||
except websockets.ConnectionClosed as e:
|
||||
print(f"Connection closed by server. Exception: {e}")
|
||||
continue
|
||||
|
||||
async def main():
|
||||
global VAL_KEY
|
||||
global CON
|
||||
|
||||
if USE_VK:
|
||||
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||
else:
|
||||
VAL_KEY = None
|
||||
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||
|
||||
if USE_DB:
|
||||
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||
async with engine.connect() as CON:
|
||||
await create_poly_btcusd_trades_table(CON=CON)
|
||||
await polymarket_stream()
|
||||
else:
|
||||
CON = None
|
||||
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||
await polymarket_stream()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
START_TIME = round(datetime.now().timestamp()*1000)
|
||||
|
||||
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||
|
||||
logging.basicConfig(
|
||||
force=True,
|
||||
filename=LOG_FILEPATH,
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
filemode='w'
|
||||
)
|
||||
logging.info(f"STARTED: {START_TIME}")
|
||||
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt as e:
|
||||
print(f"Stream stopped: {e}")
|
||||
19
ws_clob/Dockerfile
Normal file
19
ws_clob/Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
||||
FROM python:3.13-slim
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
|
||||
RUN gcc --version
|
||||
RUN rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
# Finally, run gunicorn.
|
||||
CMD [ "python", "ws_clob.py"]
|
||||
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]
|
||||
227
ws_coinbase.py
Normal file
227
ws_coinbase.py
Normal file
@@ -0,0 +1,227 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import AsyncContextManager
|
||||
import os
|
||||
# import numpy as np
|
||||
import pandas as pd
|
||||
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
|
||||
from sqlalchemy import text
|
||||
import websockets
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
import valkey
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
### Allow only ipv4 ###
|
||||
def allowed_gai_family():
|
||||
return socket.AF_INET
|
||||
urllib3_cn.allowed_gai_family = allowed_gai_family
|
||||
|
||||
### Database ###
|
||||
USE_DB: bool = True
|
||||
USE_VK: bool = True
|
||||
VK_CHANNEL = 'poly_coinbase_btcusd'
|
||||
CON: AsyncContextManager | None = None
|
||||
VAL_KEY = None
|
||||
|
||||
### Logging ###
|
||||
load_dotenv()
|
||||
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_coinbase_Trades.log'
|
||||
|
||||
### Globals ###
|
||||
WSS_URL = "wss://ws-feed.exchange.coinbase.com"
|
||||
# HIST_TRADES = np.empty((0, 2))
|
||||
|
||||
### Database Funcs ###
|
||||
async def create_rtds_btcusd_table(
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
logging.info('Creating Table if Does Not Exist: coinbase_btcusd_trades')
|
||||
await CON.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS coinbase_btcusd_trades (
|
||||
timestamp_arrival BIGINT,
|
||||
timestamp_msg BIGINT,
|
||||
timestamp_value BIGINT,
|
||||
value DOUBLE,
|
||||
qty DOUBLE,
|
||||
side VARCHAR(8)
|
||||
);
|
||||
"""))
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
async def insert_rtds_btcusd_table(
|
||||
timestamp_arrival: int,
|
||||
timestamp_msg: int,
|
||||
timestamp_value: int,
|
||||
value: float,
|
||||
qty: float,
|
||||
side: str,
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
params={
|
||||
'timestamp_arrival': timestamp_arrival,
|
||||
'timestamp_msg': timestamp_msg,
|
||||
'timestamp_value': timestamp_value,
|
||||
'value': value,
|
||||
'qty': qty,
|
||||
'side': side,
|
||||
}
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
await CON.execute(text("""
|
||||
INSERT INTO coinbase_btcusd_trades
|
||||
(
|
||||
timestamp_arrival,
|
||||
timestamp_msg,
|
||||
timestamp_value,
|
||||
value,
|
||||
qty,
|
||||
side
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
:timestamp_arrival,
|
||||
:timestamp_msg,
|
||||
:timestamp_value,
|
||||
:value,
|
||||
:qty,
|
||||
:side
|
||||
)
|
||||
"""),
|
||||
parameters=params
|
||||
)
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
|
||||
### Websocket ###
|
||||
async def coinbase_trades_stream():
|
||||
global HIST_TRADES
|
||||
|
||||
async with websockets.connect(WSS_URL) as websocket:
|
||||
logging.info(f"Connected to {WSS_URL}")
|
||||
|
||||
subscribe_msg = {
|
||||
"type": "subscribe",
|
||||
"product_ids": ["BTC-USD"],
|
||||
"channels": [
|
||||
{
|
||||
"name": "ticker",
|
||||
"product_ids": ["BTC-USD"]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
await websocket.send(json.dumps(subscribe_msg))
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
if isinstance(message, str) or isinstance(message, bytes):
|
||||
try:
|
||||
data = json.loads(message)
|
||||
if data.get('price', None) is not None:
|
||||
ts_arrival = round(datetime.now().timestamp()*1000)
|
||||
ts_msg = round(datetime.strptime(data['time'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()*1000)
|
||||
ts_value = ts_msg
|
||||
last_px = float(data['price'])
|
||||
qty = float(data['last_size'])
|
||||
side = data['side']
|
||||
print(f'🤑 BTC Coinbase Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};')
|
||||
if USE_VK:
|
||||
VAL_KEY_OBJ = json.dumps({
|
||||
'timestamp_arrival': ts_arrival,
|
||||
'timestamp_msg': ts_msg,
|
||||
'timestamp_value': ts_value,
|
||||
'value': last_px,
|
||||
'qty': qty,
|
||||
'side': side,
|
||||
})
|
||||
VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
if USE_DB:
|
||||
await insert_rtds_btcusd_table(
|
||||
CON=CON,
|
||||
timestamp_arrival=ts_arrival,
|
||||
timestamp_msg=ts_msg,
|
||||
timestamp_value=ts_value,
|
||||
value=last_px,
|
||||
qty=qty,
|
||||
side=side,
|
||||
)
|
||||
# elif data.get('op'):
|
||||
# if data['op'] == 'PING':
|
||||
# pong = {"op": "PONG", "timestamp": ts_arrival}
|
||||
# await websocket.send(json.dumps(pong))
|
||||
# logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}')
|
||||
else:
|
||||
logging.info(f'Initial or unexpected data struct, skipping: {data}')
|
||||
continue
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}')
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f'Type: {type(message)} not expected: {message}')
|
||||
except websockets.ConnectionClosed as e:
|
||||
logging.error(f'Connection closed: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
except Exception as e:
|
||||
logging.error(f'Connection closed: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
|
||||
|
||||
async def main():
|
||||
global VAL_KEY
|
||||
global CON
|
||||
|
||||
if USE_VK:
|
||||
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||
else:
|
||||
VAL_KEY = None
|
||||
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||
|
||||
if USE_DB:
|
||||
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||
async with engine.connect() as CON:
|
||||
await create_rtds_btcusd_table(CON=CON)
|
||||
await coinbase_trades_stream()
|
||||
else:
|
||||
CON = None
|
||||
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||
await coinbase_trades_stream()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
START_TIME = round(datetime.now().timestamp()*1000)
|
||||
|
||||
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||
|
||||
logging.basicConfig(
|
||||
force=True,
|
||||
filename=LOG_FILEPATH,
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
filemode='w'
|
||||
)
|
||||
logging.info(f"STARTED: {START_TIME}")
|
||||
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Stream stopped")
|
||||
19
ws_coinbase/Dockerfile
Normal file
19
ws_coinbase/Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
||||
FROM python:3.13-slim
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
|
||||
RUN gcc --version
|
||||
RUN rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
# Finally, run gunicorn.
|
||||
CMD [ "python", "ws_coinbase.py"]
|
||||
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]
|
||||
222
ws_pionex.py
Normal file
222
ws_pionex.py
Normal file
@@ -0,0 +1,222 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import AsyncContextManager
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
|
||||
from sqlalchemy import text
|
||||
import websockets
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
import valkey
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
### Allow only ipv4 ###
|
||||
def allowed_gai_family():
|
||||
return socket.AF_INET
|
||||
urllib3_cn.allowed_gai_family = allowed_gai_family
|
||||
|
||||
### Database ###
|
||||
USE_DB: bool = True
|
||||
USE_VK: bool = True
|
||||
VK_CHANNEL = 'poly_pionex_btcusd'
|
||||
CON: AsyncContextManager | None = None
|
||||
VAL_KEY = None
|
||||
|
||||
|
||||
### Logging ###
|
||||
load_dotenv()
|
||||
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_Pionex_Trades.log'
|
||||
|
||||
### Globals ###
|
||||
WSS_URL = "wss://ws.pionex.com/wsPub"
|
||||
# HIST_TRADES = np.empty((0, 2))
|
||||
|
||||
### Database Funcs ###
|
||||
async def create_rtds_btcusd_table(
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
logging.info('Creating Table if Does Not Exist: pionex_btcusd_trades')
|
||||
await CON.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS pionex_btcusd_trades (
|
||||
timestamp_msg BIGINT,
|
||||
timestamp_value BIGINT,
|
||||
value DOUBLE,
|
||||
qty DOUBLE,
|
||||
side VARCHAR(8)
|
||||
);
|
||||
"""))
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
async def insert_rtds_btcusd_table(
|
||||
timestamp_msg: int,
|
||||
timestamp_value: int,
|
||||
value: float,
|
||||
qty: float,
|
||||
side: str,
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
params={
|
||||
'timestamp_msg': timestamp_msg,
|
||||
'timestamp_value': timestamp_value,
|
||||
'value': value,
|
||||
'qty': qty,
|
||||
'side': side,
|
||||
}
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
await CON.execute(text("""
|
||||
INSERT INTO pionex_btcusd_trades
|
||||
(
|
||||
timestamp_msg,
|
||||
timestamp_value,
|
||||
value,
|
||||
qty,
|
||||
side
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
:timestamp_msg,
|
||||
:timestamp_value,
|
||||
:value,
|
||||
:qty,
|
||||
:side
|
||||
)
|
||||
"""),
|
||||
parameters=params
|
||||
)
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
|
||||
### Websocket ###
|
||||
async def pionex_trades_stream():
|
||||
global HIST_TRADES
|
||||
|
||||
async with websockets.connect(WSS_URL) as websocket:
|
||||
logging.info(f"Connected to {WSS_URL}")
|
||||
|
||||
subscribe_msg = {
|
||||
"op": "SUBSCRIBE",
|
||||
"topic": "TRADE",
|
||||
"symbol": "BTC_USDT"
|
||||
}
|
||||
|
||||
await websocket.send(json.dumps(subscribe_msg))
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
if isinstance(message, str) or isinstance(message, bytes):
|
||||
try:
|
||||
data = json.loads(message)
|
||||
if data.get('data', None) is not None:
|
||||
ts_msg = data['timestamp']
|
||||
data = data['data']
|
||||
ts_value = data[0]['timestamp']
|
||||
last_px = float(data[0]['price'])
|
||||
qty = float(data[0]['size'])
|
||||
side = data[0]['side']
|
||||
print(f'🤑 BTC Pionex Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};')
|
||||
print(ts_value)
|
||||
if USE_VK:
|
||||
VAL_KEY.publish(VK_CHANNEL, json.dumps({
|
||||
'timestamp_msg': ts_msg,
|
||||
'timestamp_value': ts_value,
|
||||
'value': last_px,
|
||||
'qty': qty,
|
||||
'side': side,
|
||||
}))
|
||||
VAL_KEY.set(VK_CHANNEL, json.dumps({
|
||||
'timestamp_msg': ts_msg,
|
||||
'timestamp_value': ts_value,
|
||||
'value': last_px,
|
||||
'qty': qty,
|
||||
'side': side,
|
||||
}))
|
||||
if USE_DB:
|
||||
await insert_rtds_btcusd_table(
|
||||
CON=CON,
|
||||
timestamp_msg=ts_msg,
|
||||
timestamp_value=ts_value,
|
||||
value=last_px,
|
||||
qty=qty,
|
||||
side=side,
|
||||
)
|
||||
elif data.get('op'):
|
||||
if data['op'] == 'PING':
|
||||
pong = {"op": "PONG", "timestamp": round(datetime.now().timestamp()*1000)}
|
||||
await websocket.send(json.dumps(pong))
|
||||
logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}')
|
||||
else:
|
||||
logging.info(f'Initial or unexpected data struct, skipping: {data}')
|
||||
continue
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}')
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f'Type: {type(message)} not expected: {message}')
|
||||
except websockets.ConnectionClosed as e:
|
||||
logging.error(f'Connection closed: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
except Exception as e:
|
||||
logging.error(f'Connection closed: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
|
||||
|
||||
async def main():
|
||||
global VAL_KEY
|
||||
global CON
|
||||
|
||||
if USE_VK:
|
||||
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||
else:
|
||||
VAL_KEY = None
|
||||
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||
|
||||
if USE_DB:
|
||||
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||
async with engine.connect() as CON:
|
||||
await create_rtds_btcusd_table(CON=CON)
|
||||
await pionex_trades_stream()
|
||||
else:
|
||||
CON = None
|
||||
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||
await pionex_trades_stream()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
START_TIME = round(datetime.now().timestamp()*1000)
|
||||
|
||||
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||
|
||||
logging.basicConfig(
|
||||
force=True,
|
||||
filename=LOG_FILEPATH,
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
filemode='w'
|
||||
)
|
||||
logging.info(f"STARTED: {START_TIME}")
|
||||
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Stream stopped")
|
||||
179
ws_rtds.py
179
ws_rtds.py
@@ -1,24 +1,111 @@
|
||||
import asyncio
|
||||
import json
|
||||
import math
|
||||
import pandas as pd
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
import websockets
|
||||
import logging
|
||||
import socket
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import AsyncContextManager
|
||||
|
||||
import numpy as np
|
||||
import talib
|
||||
import requests
|
||||
import pandas as pd
|
||||
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
|
||||
from sqlalchemy import text
|
||||
import websockets
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
import valkey
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
### Allow only ipv4 ###
|
||||
def allowed_gai_family():
|
||||
return socket.AF_INET
|
||||
urllib3_cn.allowed_gai_family = allowed_gai_family
|
||||
|
||||
### Database ###
|
||||
USE_DB: bool = True
|
||||
USE_VK: bool = True
|
||||
VK_CHANNEL = 'poly_rtds_cl_btcusd'
|
||||
CON: AsyncContextManager | None = None
|
||||
VAL_KEY = None
|
||||
|
||||
|
||||
### Logging ###
|
||||
load_dotenv()
|
||||
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_RTDS.log'
|
||||
|
||||
### Globals ###
|
||||
WSS_URL = "wss://ws-live-data.polymarket.com"
|
||||
# HIST_TRADES = np.empty((0, 2))
|
||||
|
||||
HIST_TRADES = np.empty((0, 2))
|
||||
### Database Funcs ###
|
||||
async def create_rtds_btcusd_table(
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
logging.info('Creating Table if Does Not Exist: poly_rtds_cl_btcusd')
|
||||
await CON.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS poly_rtds_cl_btcusd (
|
||||
timestamp_arrival BIGINT,
|
||||
timestamp_msg BIGINT,
|
||||
timestamp_value BIGINT,
|
||||
value DOUBLE
|
||||
);
|
||||
"""))
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
async def insert_rtds_btcusd_table(
|
||||
timestamp_arrival: int,
|
||||
timestamp_msg: int,
|
||||
timestamp_value: int,
|
||||
value: int,
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
params={
|
||||
'timestamp_arrival': timestamp_arrival,
|
||||
'timestamp_msg': timestamp_msg,
|
||||
'timestamp_value': timestamp_value,
|
||||
'value': value,
|
||||
}
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
await CON.execute(text("""
|
||||
INSERT INTO poly_rtds_cl_btcusd
|
||||
(
|
||||
timestamp_arrival,
|
||||
timestamp_msg,
|
||||
timestamp_value,
|
||||
value
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
:timestamp_arrival,
|
||||
:timestamp_msg,
|
||||
:timestamp_value,
|
||||
:value
|
||||
)
|
||||
"""),
|
||||
parameters=params
|
||||
)
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
|
||||
### Websocket ###
|
||||
async def rtds_stream():
|
||||
global HIST_TRADES
|
||||
|
||||
async with websockets.connect(WSS_URL) as websocket:
|
||||
print(f"Connected to {WSS_URL}")
|
||||
async for websocket in websockets.connect(WSS_URL):
|
||||
logging.info(f"Connected to {WSS_URL}")
|
||||
|
||||
subscribe_msg = {
|
||||
"action": "subscribe",
|
||||
@@ -39,23 +126,79 @@ async def rtds_stream():
|
||||
try:
|
||||
data = json.loads(message)
|
||||
if data['payload'].get('value', None) is not None:
|
||||
print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['timestamp'], unit='ms')}')
|
||||
ts_arrival = round(datetime.now().timestamp()*1000)
|
||||
# print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['payload']['timestamp'], unit='ms')}')
|
||||
VAL_KEY_OBJ = json.dumps({
|
||||
'timestamp_arrival': ts_arrival,
|
||||
'timestamp_msg': data['timestamp'],
|
||||
'timestamp_value': data['payload']['timestamp'],
|
||||
'value': data['payload']['value'],
|
||||
})
|
||||
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
await insert_rtds_btcusd_table(
|
||||
CON=CON,
|
||||
timestamp_arrival=ts_arrival,
|
||||
timestamp_msg=data['timestamp'],
|
||||
timestamp_value=data['payload']['timestamp'],
|
||||
value=data['payload']['value'],
|
||||
)
|
||||
else:
|
||||
print(f'Initial or unexpected data struct, skipping: {data}')
|
||||
# logging.info(f'Initial or unexpected data struct, skipping: {data}')
|
||||
logging.info('Initial or unexpected data struct, skipping')
|
||||
continue
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
print(f'Message not in JSON format, skipping: {message}')
|
||||
logging.warning(f'Message not in JSON format, skipping: {message}')
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
||||
|
||||
except websockets.ConnectionClosed as e:
|
||||
logging.error(f'Connection closed: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
continue
|
||||
except Exception as e:
|
||||
logging.error(f'Connection closed: {e}')
|
||||
logging.error(traceback.format_exc())
|
||||
|
||||
except websockets.ConnectionClosed:
|
||||
print("Connection closed by server.")
|
||||
|
||||
async def main():
|
||||
global VAL_KEY
|
||||
global CON
|
||||
|
||||
if USE_VK:
|
||||
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||
else:
|
||||
VAL_KEY = None
|
||||
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||
|
||||
if USE_DB:
|
||||
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||
async with engine.connect() as CON:
|
||||
await create_rtds_btcusd_table(CON=CON)
|
||||
await rtds_stream()
|
||||
else:
|
||||
CON = None
|
||||
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||
await rtds_stream()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
START_TIME = round(datetime.now().timestamp()*1000)
|
||||
|
||||
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||
|
||||
logging.basicConfig(
|
||||
force=True,
|
||||
filename=LOG_FILEPATH,
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
filemode='w'
|
||||
)
|
||||
logging.info(f"STARTED: {START_TIME}")
|
||||
|
||||
try:
|
||||
asyncio.run(rtds_stream())
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
print("Stream stopped.")
|
||||
logging.info("Stream stopped")
|
||||
19
ws_rtds/Dockerfile
Normal file
19
ws_rtds/Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
||||
FROM python:3.13-slim
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
|
||||
RUN gcc --version
|
||||
RUN rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
# Finally, run gunicorn.
|
||||
CMD [ "python", "ws_rtds.py"]
|
||||
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]
|
||||
431
ws_user.py
Normal file
431
ws_user.py
Normal file
@@ -0,0 +1,431 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import AsyncContextManager
|
||||
|
||||
import numpy as np
|
||||
import valkey
|
||||
import websockets
|
||||
from dotenv import load_dotenv
|
||||
from py_clob_client.client import ClobClient
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
|
||||
### Database ###
|
||||
USE_DB: bool = True
|
||||
USE_VK: bool = True
|
||||
|
||||
LOCAL_LIVE_ORDERS = []
|
||||
LOCAL_RECENT_TRADES = []
|
||||
LOCAL_RECENT_TRADES_LOOKBACK_SEC = 10
|
||||
|
||||
VK_LIVE_ORDERS = 'poly_user_orders'
|
||||
VK_RECENT_TRADES = 'poly_user_trades'
|
||||
CON: AsyncContextManager | None = None
|
||||
VAL_KEY = None
|
||||
|
||||
### Logging ###
|
||||
load_dotenv()
|
||||
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_User.log'
|
||||
|
||||
# https://docs.polymarket.com/market-data/websocket/user-channel
|
||||
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/user"
|
||||
API_CREDS = {}
|
||||
|
||||
HIST_TRADES = np.empty((0, 2))
|
||||
TARGET_PX = 0
|
||||
|
||||
### Database Funcs ###
|
||||
async def create_user_trades_table(
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
logging.info('Creating Table if Does Not Exist: user_stream_trades')
|
||||
await CON.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS user_stream_trades (
|
||||
-- event_type VARCHAR(8),
|
||||
timestamp_arrival BIGINT,
|
||||
type VARCHAR(20),
|
||||
id VARCHAR(100),
|
||||
taker_order_id VARCHAR(100),
|
||||
market VARCHAR(100),
|
||||
asset_id VARCHAR(100),
|
||||
side VARCHAR(8),
|
||||
size DOUBLE,
|
||||
price DOUBLE,
|
||||
fee_rate_bps DOUBLE,
|
||||
status VARCHAR(20),
|
||||
matchtime BIGINT,
|
||||
last_update BIGINT,
|
||||
outcome VARCHAR(20),
|
||||
owner VARCHAR(100),
|
||||
trade_owner VARCHAR(100),
|
||||
maker_address VARCHAR(100),
|
||||
transaction_hash VARCHAR(100),
|
||||
bucket_index INT,
|
||||
maker_orders JSON NULL,
|
||||
trader_side VARCHAR(8),
|
||||
timestamp BIGINT
|
||||
);
|
||||
"""))
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
async def insert_user_trades_table(
|
||||
params: dict,
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
await CON.execute(text("""
|
||||
INSERT INTO user_stream_trades
|
||||
(
|
||||
timestamp_arrival,
|
||||
type,
|
||||
id,
|
||||
taker_order_id,
|
||||
market,
|
||||
asset_id,
|
||||
side,
|
||||
size,
|
||||
price,
|
||||
fee_rate_bps,
|
||||
status,
|
||||
matchtime,
|
||||
last_update,
|
||||
outcome,
|
||||
owner,
|
||||
trade_owner,
|
||||
maker_address,
|
||||
transaction_hash,
|
||||
bucket_index,
|
||||
maker_orders,
|
||||
trader_side,
|
||||
timestamp
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
:timestamp_arrival,
|
||||
:type,
|
||||
:id,
|
||||
:taker_order_id,
|
||||
:market,
|
||||
:asset_id,
|
||||
:side,
|
||||
:size,
|
||||
:price,
|
||||
:fee_rate_bps,
|
||||
:status,
|
||||
:matchtime,
|
||||
:last_update,
|
||||
:outcome,
|
||||
:owner,
|
||||
:trade_owner,
|
||||
:maker_address,
|
||||
:transaction_hash,
|
||||
:bucket_index,
|
||||
:maker_orders,
|
||||
:trader_side,
|
||||
:timestamp
|
||||
)
|
||||
"""),
|
||||
parameters=params
|
||||
)
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
|
||||
async def create_user_orders_table(
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
logging.info('Creating Table if Does Not Exist: user_stream_orders')
|
||||
await CON.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS user_stream_orders (
|
||||
-- event_type VARCHAR(8),
|
||||
timestamp_arrival BIGINT,
|
||||
id VARCHAR(100),
|
||||
owner VARCHAR(100),
|
||||
market VARCHAR(100),
|
||||
asset_id VARCHAR(100),
|
||||
side VARCHAR(8),
|
||||
order_owner VARCHAR(100),
|
||||
original_size DOUBLE,
|
||||
size_matched DOUBLE,
|
||||
price DOUBLE,
|
||||
associate_trades JSON NULL,
|
||||
outcome VARCHAR(20),
|
||||
type VARCHAR(20),
|
||||
created_at BIGINT,
|
||||
expiration VARCHAR(20),
|
||||
order_type VARCHAR(8),
|
||||
status VARCHAR(20),
|
||||
maker_address VARCHAR(100),
|
||||
timestamp BIGINT
|
||||
);
|
||||
"""))
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
async def insert_user_orders_table(
|
||||
params: dict,
|
||||
CON: AsyncContextManager,
|
||||
engine: str = 'mysql', # mysql | duckdb
|
||||
) -> None:
|
||||
if CON is None:
|
||||
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||
else:
|
||||
if engine == 'mysql':
|
||||
await CON.execute(text("""
|
||||
INSERT INTO user_stream_orders
|
||||
(
|
||||
timestamp_arrival,
|
||||
id,
|
||||
owner,
|
||||
market,
|
||||
asset_id,
|
||||
side,
|
||||
order_owner,
|
||||
original_size,
|
||||
size_matched,
|
||||
price,
|
||||
associate_trades,
|
||||
outcome,
|
||||
type,
|
||||
created_at,
|
||||
expiration,
|
||||
order_type,
|
||||
status,
|
||||
maker_address,
|
||||
timestamp
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
:timestamp_arrival,
|
||||
:id,
|
||||
:owner,
|
||||
:market,
|
||||
:asset_id,
|
||||
:side,
|
||||
:order_owner,
|
||||
:original_size,
|
||||
:size_matched,
|
||||
:price,
|
||||
:associate_trades,
|
||||
:outcome,
|
||||
:type,
|
||||
:created_at,
|
||||
:expiration,
|
||||
:order_type,
|
||||
:status,
|
||||
:maker_address,
|
||||
:timestamp
|
||||
)
|
||||
"""),
|
||||
parameters=params
|
||||
)
|
||||
await CON.commit()
|
||||
else:
|
||||
raise ValueError('Only MySQL engine is implemented')
|
||||
|
||||
### Helpers ###
|
||||
def live_orders_only(orders: list[dict]) -> list[dict]:
|
||||
return [d for d in orders if d.get('status')=='LIVE']
|
||||
|
||||
def upsert_list_of_dicts_by_id(list_of_dicts, new_dict):
|
||||
for index, item in enumerate(list_of_dicts):
|
||||
if item.get('id') == new_dict.get('id'):
|
||||
list_of_dicts[index] = new_dict
|
||||
return list_of_dicts
|
||||
|
||||
list_of_dicts.append(new_dict)
|
||||
return list_of_dicts
|
||||
|
||||
|
||||
async def polymarket_stream():
|
||||
global TARGET_PX
|
||||
global HIST_TRADES
|
||||
global LOCAL_LIVE_ORDERS
|
||||
global LOCAL_RECENT_TRADES
|
||||
|
||||
POLY_API_KEY = API_CREDS.api_key
|
||||
POLY_API_SECRET = API_CREDS.api_secret
|
||||
POLY_API_PASS = API_CREDS.api_passphrase
|
||||
|
||||
async for websocket in websockets.connect(WSS_URL):
|
||||
print(f"Connected to {WSS_URL}")
|
||||
|
||||
subscribe_msg = {
|
||||
"auth": {
|
||||
"apiKey": POLY_API_KEY,
|
||||
"secret": POLY_API_SECRET,
|
||||
"passphrase": POLY_API_PASS,
|
||||
},
|
||||
"type": "user",
|
||||
"markets": []
|
||||
}
|
||||
|
||||
await websocket.send(json.dumps(subscribe_msg))
|
||||
print("Subscribed to User Data")
|
||||
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
ts_arrival = round(datetime.now().timestamp()*1000)
|
||||
if isinstance(message, str):
|
||||
data = json.loads(message)
|
||||
if data == {}: # Handle empty server ping - return pong
|
||||
await websocket.send(json.dumps({}))
|
||||
print('SENT HEARTBEAT PING')
|
||||
continue
|
||||
data['timestamp_arrival'] = ts_arrival
|
||||
|
||||
event_type = data.get('event_type', None)
|
||||
match event_type:
|
||||
case 'trade':
|
||||
logging.info(f'TRADE: {data}')
|
||||
# trade_status = data.get('status')
|
||||
# match trade_status: # Raise TELEGRAM ALERT ???
|
||||
# case 'MATCHED':
|
||||
# pass
|
||||
# case 'MINED':
|
||||
# pass
|
||||
# case 'CONFIRMED':
|
||||
# pass
|
||||
# case 'RETRYING':
|
||||
# pass
|
||||
# case 'FAILED':
|
||||
# pass
|
||||
|
||||
### Convert Datatypes ###
|
||||
data['size'] = float(data['size'])
|
||||
data['price'] = float(data['price'])
|
||||
data['fee_rate_bps'] = float(data['fee_rate_bps'])
|
||||
data['matchtime'] = int(data['match_time'])
|
||||
data['last_update'] = int(data['last_update'])
|
||||
data['timestamp'] = int(data['timestamp'])
|
||||
data['maker_orders'] = json.dumps(data['maker_orders']) if data['maker_orders'] else None
|
||||
|
||||
LOCAL_RECENT_TRADES = upsert_list_of_dicts_by_id(LOCAL_RECENT_TRADES, data)
|
||||
LOOKBACK_MIN_TS_MS = ts_arrival-LOCAL_RECENT_TRADES_LOOKBACK_SEC*1000
|
||||
LOCAL_RECENT_TRADES = [t for t in LOCAL_RECENT_TRADES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
|
||||
|
||||
print("---------------------")
|
||||
print(LOCAL_RECENT_TRADES)
|
||||
print("---------------------")
|
||||
|
||||
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_TRADES)
|
||||
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
VAL_KEY.set(VK_RECENT_TRADES, VAL_KEY_OBJ)
|
||||
|
||||
logging.info(f'User Trade Update: {data}')
|
||||
|
||||
### Insert into DB ###
|
||||
await insert_user_trades_table(
|
||||
params=data,
|
||||
CON=CON
|
||||
)
|
||||
case 'order':
|
||||
logging.info(f'ORDER: {data}')
|
||||
### Convert Datatypes ###
|
||||
data['original_size'] = float(data['original_size'])
|
||||
data['size_matched'] = float(data['size_matched'])
|
||||
data['price'] = float(data['price'])
|
||||
data['associate_trades'] = json.dumps(data['associate_trades']) if data['associate_trades'] else None
|
||||
data['created_at'] = int(data['created_at'])
|
||||
data['timestamp'] = int(data['timestamp'])
|
||||
|
||||
### Match on Status - Pass Live orders to Valkey for Algo Engine ###
|
||||
order_status = data.get('status')
|
||||
match order_status:
|
||||
case 'live':
|
||||
LOCAL_LIVE_ORDERS = upsert_list_of_dicts_by_id(LOCAL_LIVE_ORDERS, data)
|
||||
LOCAL_LIVE_ORDERS = live_orders_only(LOCAL_LIVE_ORDERS)
|
||||
VAL_KEY_OBJ = json.dumps(LOCAL_LIVE_ORDERS)
|
||||
# VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
|
||||
VAL_KEY.set(VK_LIVE_ORDERS, VAL_KEY_OBJ)
|
||||
logging.info(f'Order(s) RESTING: {data}')
|
||||
case 'matched':
|
||||
logging.info(f'Order(s) MATCHED: {data}')
|
||||
case 'delayed':
|
||||
raise ValueError(f'Order Status of "delayed" which is not expected for non-sports orders: {data}')
|
||||
case 'unmatched':
|
||||
raise ValueError(f'Order Status of "unmatched" which is not expected for non-sports orders: {data}')
|
||||
|
||||
### Insert into DB ###
|
||||
await insert_user_orders_table(
|
||||
params=data,
|
||||
CON=CON,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
||||
|
||||
except websockets.ConnectionClosed as e:
|
||||
print(f"Connection closed by server. Exception: {e}")
|
||||
|
||||
async def main():
|
||||
global VAL_KEY
|
||||
global CON
|
||||
global API_CREDS
|
||||
|
||||
private_key = os.getenv("PRIVATE_KEY")
|
||||
host = "https://clob.polymarket.com"
|
||||
chain_id = 137 # Polygon mainnet
|
||||
|
||||
temp_client = ClobClient(host, key=private_key, chain_id=chain_id)
|
||||
API_CREDS = temp_client.create_or_derive_api_creds()
|
||||
|
||||
if USE_VK:
|
||||
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||
# published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
# logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||
else:
|
||||
VAL_KEY = None
|
||||
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||
|
||||
if USE_DB:
|
||||
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||
async with engine.connect() as CON:
|
||||
await create_user_trades_table(CON=CON)
|
||||
await create_user_orders_table(CON=CON)
|
||||
await polymarket_stream()
|
||||
else:
|
||||
CON = None
|
||||
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||
await polymarket_stream()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
START_TIME = round(datetime.now().timestamp()*1000)
|
||||
|
||||
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||
|
||||
logging.basicConfig(
|
||||
force=True,
|
||||
filename=LOG_FILEPATH,
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
filemode='w'
|
||||
)
|
||||
logging.info(f"STARTED: {START_TIME}")
|
||||
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt as e:
|
||||
print(f"Stream stopped: {e}")
|
||||
19
ws_user/Dockerfile
Normal file
19
ws_user/Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
||||
FROM python:3.13-slim
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y build-essential
|
||||
|
||||
RUN gcc --version
|
||||
RUN rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
# Finally, run gunicorn.
|
||||
CMD [ "python", "ws_user.py"]
|
||||
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]
|
||||
Reference in New Issue
Block a user