import asyncio import json from dataclasses import dataclass, asdict import logging import math import os import time from datetime import datetime, timezone from typing import AsyncContextManager import traceback import numpy as np import pandas as pd import requests import talib import valkey from dotenv import load_dotenv from py_clob_client.clob_types import ( OrderArgs, OrderType, PartialCreateOrderOptions, PostOrdersArgs, BalanceAllowanceParams, OpenOrderParams, MarketOrderArgs ) from py_clob_client.order_builder.constants import BUY, SELL from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine from functools import wraps import modules.api as api ### Custom Order Args ### # @dataclass # class Custom_OrderArgs(OrderArgs): # max_price: float = 0.00 # post_only: bool = False @dataclass class Custom_PostOrdersArgs(PostOrdersArgs): token_id: str = '' price: float = 0.00 # max_price: float = 0.00 size: float = 0.00 side: str = '' ### Database ### CLIENT = None CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Polymarket_Algo.log' ### ALGO CONFIG / CONSTANTS ### SLOPE_YES_THRESH = 0.00750 # In Percent % Chg (e.g. 0.02 == 0.02%) SLOPE_YES_THRESH_1 = 0.00750 # In Percent % Chg (e.g. 0.02 == 0.02%) ENDTIME_BUFFER_SEC = 30 # Stop trading, cancel all open orders and exit positions this many seconds before mkt settles. TGT_PX_INDEX_DIFF_THRESH = 0.05 # In Percent % Chg (e.g. 0.02 == 0.02%) DEFAULT_ORDER_SIZE = 6 # In USDe MIN_ORDER_SIZE = 5 # In USDe TGT_PROFIT_CENTS = 0.08 # CHASE_TO_BUY_CENTS = 0.05 MAX_ALLOWED_POLY_PX = 0.90 MAX_LEG_LIVE_SEC = 3.25 ### GLOBALS ### LOOP_LAST_ROUTE: str = '' ORDER_LOCK = 0 FIRST_LOOP_NEW_MKT = False SLUG_END_TIME = 0 FREE_CASH: float = 0 POLY_BINANCE = {} POLY_REF = {} POLY_CLOB = {} POLY_CLOB_DOWN = {} USER_TRADES = {} USER_ORDERS = {} SLOPE_HIST = [] LOCAL_ACTIVE_ORDERS = [] LOCAL_MATCHED_ORDERS = [] LOCAL_TOKEN_BALANCES = {} # LOCAL_ACTIVE_POSITIONS = [] ACTIVE_BALANCES_EXIST = False ### REMOVE ### Decorators ### def async_timeit(func): @wraps(func) async def wrapper(*args, **kwargs): start_time = time.perf_counter() try: return await func(*args, **kwargs) finally: end_time = time.perf_counter() total_time = (end_time - start_time)*1000 logging.info(f"Function '{func.__name__}' executed in {total_time:.4f} ms") return wrapper ### Database Funcs ### # @async_timeit async def create_executions_orders_table( CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: if CON is None: logging.info("NO DB CONNECTION, SKIPPING Create Statements") else: if engine == 'mysql': logging.info('Creating Table if Does Not Exist: executions_orders') await CON.execute(text(""" CREATE TABLE IF NOT EXISTS executions_orders ( timestamp_sent BIGINT, token_id VARCHAR(100), limit_price DOUBLE, size DOUBLE, side VARCHAR(8), order_type VARCHAR(8), post_only BOOL, resp_errorMsg VARCHAR(100), resp_orderID VARCHAR(100), resp_takingAmount DOUBLE, resp_makingAmount DOUBLE, resp_status VARCHAR(20), resp_success BOOL ); """)) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') # @async_timeit async def insert_executions_orders_table( timestamp_sent: int, token_id: str, limit_price: float, size: float, side: str, order_type: str, post_only: bool, resp_errorMsg: str, resp_orderID: str, resp_takingAmount: float, resp_makingAmount: float, resp_status: str, resp_success: bool, CON: AsyncContextManager, engine: str = 'mysql', # mysql | duckdb ) -> None: params={ 'timestamp_sent': timestamp_sent, 'token_id': token_id, 'limit_price': limit_price, 'size': size, 'side': side, 'order_type': order_type, 'post_only': post_only, 'resp_errorMsg': resp_errorMsg, 'resp_orderID': resp_orderID, 'resp_takingAmount': resp_takingAmount, 'resp_makingAmount': resp_makingAmount, 'resp_status': resp_status, 'resp_success': resp_success, } if CON is None: logging.info("NO DB CONNECTION, SKIPPING Insert Statements") else: if engine == 'mysql': await CON.execute(text(""" INSERT INTO executions_orders ( timestamp_sent, token_id, limit_price, size, side, order_type, post_only, resp_errorMsg, resp_orderID, resp_takingAmount, resp_makingAmount, resp_status, resp_success ) VALUES ( :timestamp_sent, :token_id, :limit_price, :size, :side, :order_type, :post_only, :resp_errorMsg, :resp_orderID, :resp_takingAmount, :resp_makingAmount, :resp_status, :resp_success ) """), parameters=params ) await CON.commit() else: raise ValueError('Only MySQL engine is implemented') ### Functions ### # @async_timeit def upsert_list_of_dicts_by_id(list_of_dicts, new_dict, id='id'): for index, item in enumerate(list_of_dicts): if item.get(id) == new_dict.get(id): list_of_dicts[index] = new_dict return list_of_dicts list_of_dicts.append(new_dict) return list_of_dicts # @async_timeit async def slope_decision(slope_yes_thresh) -> list[bool, str]: hist_trades = np.array(POLY_BINANCE.get('hist_trades', [])) min_trade_hist_ts = np.min(hist_trades[:, 0]) max_trade_hist_ts = np.max(hist_trades[:, 0] ) if ( max_trade_hist_ts ) - ( min_trade_hist_ts ) < 5: logging.info(f'Max - Min Trade In History is < 5 Seconds Apart: {max_trade_hist_ts} - {min_trade_hist_ts}') return False, '' last_px = POLY_BINANCE['value'] last_px_ts = POLY_BINANCE['timestamp_value'] ts_min_1_sec = last_px_ts - 1000 price_min_1_sec_index = (np.abs(hist_trades[:, 0] - ts_min_1_sec)).argmin() price_min_1_sec = hist_trades[:, 1][price_min_1_sec_index] ts_min_5_sec = last_px_ts - 5000 price_min_5_sec_index = (np.abs(hist_trades[:, 0] - ts_min_5_sec)).argmin() price_min_5_sec = hist_trades[:, 1][price_min_5_sec_index] slope = (last_px - price_min_1_sec) / price_min_1_sec slope_5 = (last_px - price_min_5_sec) / price_min_5_sec SLOPE_HIST.append(slope) # print(f'Avg Binance: {np.mean(hist_trades[:, 1])}') # print(f'Len Hist : {len(hist_trades[:, 1])}') # print(f'First Hist : {pd.to_datetime(np.min(hist_trades[:, 0]), unit='ms')}') # print(f'Latest Hist: {pd.to_datetime(np.max(hist_trades[:, 0]), unit='ms')}') # print(f'Slope Hist Avg: {np.mean(SLOPE_HIST):.4%}') # print(f'Slope Hist Max: {np.max(SLOPE_HIST):.4%}') # print(f'Slope Hist Std: {np.std(SLOPE_HIST):.4%}') slope_1_buy = abs(slope) >= ( slope_yes_thresh / 100) slope_5_buy = abs(slope_5) >= ( slope_yes_thresh / 100) ### DECISION ### if slope_1_buy and slope_5_buy: side = 'UP' if slope > 0.00 else 'DOWN' print(f'🤑 {round(datetime.now().timestamp()*1000)}: Slope_1: {slope_5:.4%}; Slope_5: {slope_5:.4%}; SIDE: {side}') logging.info(f'🤑 {round(datetime.now().timestamp()*1000)}: Slope_1: {slope_5:.4%}; Slope_5: {slope_5:.4%}; SIDE: {side}') return True, side elif abs(slope) >= ( 0.001 / 100): print(f'{round(datetime.now().timestamp()*1000)}: SLOPE_1: {slope:.4%}; SLOPE_5: {slope_5:.4%};') return False, '' else: return False, '' # @async_timeit async def cancel_all_orders(CLIENT): logging.info('Attempting to Cancel All Orders') cxl_resp = CLIENT.cancel_all() if bool(cxl_resp.get('not_canceled', True)): logging.warning(f'*** Cancel Request FAILED, trying again and shutting down: {cxl_resp}') cxl_resp = CLIENT.cancel_all() raise Exception('*** Cancel Request FAILED') logging.info(f'Cancel Successful: {cxl_resp}') # @async_timeit async def cancel_single_order_by_id(CLIENT, order_id): global LOCAL_ACTIVE_ORDERS logging.info(f'Attempting to Cancel Single Order: {order_id}') cxl_resp = CLIENT.cancel(order_id=order_id) for idx, o in enumerate(LOCAL_ACTIVE_ORDERS): if o.get('orderID') == order_id: if bool(cxl_resp.get('not_canceled', True)): if cxl_resp.get('not_canceled', {}).get(order_id, None) == "matched orders can't be canceled": # LOCAL_ACTIVE_ORDERS[idx]['status'] = 'MATCHED' local_local = LOCAL_ACTIVE_ORDERS.copy() local_local = local_local[idx] local_local['status'] = 'MATCHED' LOCAL_ACTIVE_ORDERS = upsert_list_of_dicts_by_id(LOCAL_ACTIVE_ORDERS, local_local) logging.info(f'Cancel request failed b/c already matched: {cxl_resp}') return True elif cxl_resp.get('not_canceled', {}).get(order_id, None) == "order can't be found - already canceled or matched": logging.info(f'Cancel request failed b/c already matched or cancelled: {cxl_resp}') # GET ORDER STATUS order_status = CLIENT.get_orders( OpenOrderParams(id=o['orderID']) )[0]['status'].upper() logging.info(f'Fetched status from CLOB: {order_status} for order: {o['orderID']}') if order_status == 'MATCHED': logging.info('Order is MATCHED') return True elif order_status == 'CANCELED': logging.info('Order is CANCELED') LOCAL_ACTIVE_ORDERS.pop(idx) return False else: raise ValueError(f'ORDER CXL FAILED AND ORDER STILL SHOWS AS LIVE: {cxl_resp}; STATUS: {order_status}; ID: {o.get('orderID')}') else: logging.warning(f'*** Cancel Request FAILED, shutting down: {cxl_resp}') raise Exception('*** Cancel Request FAILED - SHUTDONW') else: LOCAL_ACTIVE_ORDERS.pop(idx) logging.info(f'Cancel Successful: {cxl_resp}') return False # @async_timeit async def flatten_open_positions(CLIENT, token_id_up, token_id_down): up_size = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_up) down_size = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_down) up_size = round(up_size, 2) down_size = round(up_size, 2) logging.info('*********FLATTENING*********') logging.info(f'UP BALANCE = {up_size}') logging.info(f'DOWN BALANCE = {down_size}') ### Submit orders to flatten outstanding balances ### order_list = [] if up_size: logging.info(f'Flattening Up Position: {up_size}') # up_px_worst = round(float(POLY_CLOB['price'])-0.05, 2) order_list.append(MarketOrderArgs( token_id = token_id_up, amount = up_size, # size = up_size, price = 0.01, # max_price = 0.99, side = SELL, order_type = OrderType.FAK )) if down_size: order_list.append(MarketOrderArgs( token_id = token_id_down, amount = down_size, # size = down_size, price = 0.01, # max_price = 0.99, side = SELL, order_type = OrderType.FAK )) logging.info(f'Flattening Down Position: {down_size}') if order_list: await post_order( CLIENT = CLIENT, PostOrdersArgs_list = order_list, is_mkt_order_list = True ) # @async_timeit async def get_balance_by_token_id(CLIENT, token_id): collateral = CLIENT.get_balance_allowance( BalanceAllowanceParams( asset_type='CONDITIONAL', token_id=token_id, ) ) balance = float(collateral['balance']) / 1_000_000 logging.info(f'Balance: {balance}; Collateral: {collateral}') balance = balance if balance >= 0.01 else 0.00 return balance # @async_timeit async def get_usde_balance(CLIENT): collateral = CLIENT.get_balance_allowance( BalanceAllowanceParams( asset_type='COLLATERAL' ) ) return int(collateral['balance']) / 1_000_000 @async_timeit async def check_for_open_positions(CLIENT, token_id_up, token_id_down): global LOCAL_TOKEN_BALANCES if token_id_up is None or token_id_down is None: logging.critical('Token Id is None, Exiting') raise ValueError('Token Id is None, Exiting') # return False up = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_up) down = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_down) LOCAL_TOKEN_BALANCES = { token_id_up: up if up else 0, token_id_down: down if down else 0, } logging.info(f'LOCAL_TOKEN_BALANCES: {LOCAL_TOKEN_BALANCES}') if ( abs(up) > 0 ) or ( abs(down) > 0 ): return True else: return False @async_timeit async def post_order(CLIENT, PostOrdersArgs_list: list[Custom_PostOrdersArgs], is_mkt_order_list: bool = False) -> list[dict]: # Returns order response dict global LOCAL_ACTIVE_ORDERS global LOCAL_MATCHED_ORDERS global LOCAL_TOKEN_BALANCES ### POST if is_mkt_order_list: timestamp_post_sent = round(datetime.now().timestamp()*1000) response = [] for o in PostOrdersArgs_list: response.append(CLIENT.post_order(o)) else: timestamp_post_sent = round(datetime.now().timestamp()*1000) response = CLIENT.post_orders(PostOrdersArgs_list) for idx, d in enumerate(response): if d['errorMsg'] == '': d['timestamp_post_sent'] = timestamp_post_sent d['timestamp_post_resp_rec'] = round(datetime.now().timestamp()*1000) d['token_id'] = PostOrdersArgs_list[idx].token_id if d['token_id'] == POLY_CLOB['token_id_up']: d['outcome'] = "UP" elif d['token_id'] == POLY_CLOB['token_id_down']: d['outcome'] = "DOWN" else: d['outcome'] = "UNKNOWN" raise ValueError(f'UNKNOWN outcome for order: {d}') d['price'] = PostOrdersArgs_list[idx].price # d['max_price'] = PostOrdersArgs_list[idx].max_price if is_mkt_order_list: d['size'] = PostOrdersArgs_list[idx].amount else: d['size'] = PostOrdersArgs_list[idx].size d['side'] = str(PostOrdersArgs_list[idx].side).upper() if d['status'].upper() =='MATCHED': ### Order Immediately Matched, Can Put in Offsetting Order Depending on State ### logging.info('******** ORDER APPEND TO LOCAL - MATCHED ********* ') LOCAL_MATCHED_ORDERS.append(d) elif d['status'].upper() == 'CONFIRMED': current_balance = float(LOCAL_TOKEN_BALANCES.get(d['token_id'], 0.00)) if d['side'] == 'BUY': size = float(d['size']) else: size = float(d['size']) * -1 LOCAL_TOKEN_BALANCES[d['token_id']] = current_balance + size logging.info('******** TRADE FILLED, BAL UPDATED ********* ') else: logging.info('******** ORDER APPEND TO LOCAL - LIVE ********* ') LOCAL_ACTIVE_ORDERS.append(d) elif d['errorMsg'] == "invalid post-only order: order crosses book": await cancel_all_orders(CLIENT=CLIENT) logging.info(f'invalid post-only order: order crosses book. posted: {PostOrdersArgs_list[idx].price}') else: await cancel_all_orders(CLIENT=CLIENT) raise ValueError(f'Order entry failed: {d}') logging.info(f'🚨 Order Posted Resp: {response}') return response ### Routes ### # @async_timeit async def no_orders_route(entry_or_exit: str = 'ENTRY'): global ORDER_LOCK ### Check for Price Bands ### up_last_px = float(POLY_CLOB.get('price', 0)) down_last_px = float(POLY_CLOB_DOWN.get('price', 0)) if entry_or_exit == 'ENTRY': if (up_last_px > MAX_ALLOWED_POLY_PX) or (down_last_px > MAX_ALLOWED_POLY_PX): logging.info(f'Outside max allowed px: {MAX_ALLOWED_POLY_PX}') return False if entry_or_exit == 'ENTRY': ### Check for Index vs. Target Px ### tgt_px = float(POLY_CLOB.get('target_price', 0)) ref_px = float(POLY_REF.get('value')) tgt_px_diff_to_index = ( abs( tgt_px - ref_px ) / tgt_px) if tgt_px_diff_to_index > (TGT_PX_INDEX_DIFF_THRESH / 100): logging.info(f'Tgt Diff to Index Outside Limit ({TGT_PX_INDEX_DIFF_THRESH}%); Diff {tgt_px_diff_to_index:.4%}; Index: {ref_px:.2f}; Tgt: {tgt_px:.2f}') return False token_id_up = POLY_CLOB.get('token_id_up', None) token_id_down = POLY_CLOB.get('token_id_down', None) if entry_or_exit == 'EXIT': # size_up = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_up) size_up = LOCAL_TOKEN_BALANCES[token_id_up] # size_down = await get_balance_by_token_id(CLIENT=CLIENT, token_id=token_id_down) size_down = LOCAL_TOKEN_BALANCES[token_id_down] size_up = round(size_up, 6) size_down = round(size_down, 6) size_less_than_min = (size_up < MIN_ORDER_SIZE) or (size_down < MIN_ORDER_SIZE) # logging.info(f"EXITING: size_less_than_min: {size_less_than_min}; up: {size_up}; down: {size_down};") else: size_less_than_min = False if not size_less_than_min: ### Check Slope ### slope_bool, slope_side = await slope_decision(slope_yes_thresh=SLOPE_YES_THRESH) if not slope_bool: # logging.info('Failed Slope Check') return False else: slope_bool, slope_side = False, 'MKT' ### Order Entry ### if slope_side == 'MKT': side = SELL size = min([size_up, size_down]) up_px = 0.01 down_px = 0.01 up_post_only = False down_post_only = False # T order_type = OrderType.FAK logging.info(f'Flattening Residuals - Mkt Order: ({size} ({side}) ({slope_side}))') elif slope_side == 'UP': if entry_or_exit == 'ENTRY': side = BUY size = DEFAULT_ORDER_SIZE up_px = round(up_last_px + 0.01, 2) down_px = round(down_last_px - TGT_PROFIT_CENTS + 0.01, 2) up_post_only = False down_post_only = False # T order_type = OrderType.GTC else: # entry_or_exit == 'EXIT' side = SELL size = size_up logging.info(f'Flattening Residuals - Limit Order: ({size} ({side}) ({slope_side}))') up_px = round(up_last_px + TGT_PROFIT_CENTS + 0.01, 2) down_px = round(down_last_px - 0.01, 2) order_type = OrderType.GTC up_post_only = False # T down_post_only = False else: # slope_side == 'DOWN' if entry_or_exit == 'ENTRY': side = BUY size = DEFAULT_ORDER_SIZE up_px = round(up_last_px - TGT_PROFIT_CENTS + 0.01, 2) down_px = round(down_last_px + 0.01, 2) up_post_only = False # T down_post_only = False order_type = OrderType.GTC else: # entry_or_exit == 'EXIT' side = SELL size = size_down logging.info(f'Flattening Residuals - Limit Order: ({size} ({side}) ({slope_side}))') up_px = round(up_last_px - 0.01, 2) down_px = round(down_last_px + TGT_PROFIT_CENTS + 0.01, 2) order_type = OrderType.GTC up_post_only = False down_post_only = False # T up_leg = Custom_PostOrdersArgs( order=CLIENT.create_order( order_args=OrderArgs( token_id=token_id_up, price=up_px, size=size, side=side, ), options=PartialCreateOrderOptions( tick_size=str(POLY_CLOB['tick_size']), neg_risk=POLY_CLOB['neg_risk'] ), ), orderType = order_type, postOnly = up_post_only, token_id = token_id_up, price = up_px, # max_price = 0.99, size = size, side = side ) down_leg = Custom_PostOrdersArgs( order=CLIENT.create_order( order_args=OrderArgs( token_id=token_id_down, price=down_px, size=size, side=side, ), options=PartialCreateOrderOptions( tick_size=str(POLY_CLOB['tick_size']), neg_risk=POLY_CLOB['neg_risk'] ), ), orderType = order_type, postOnly = down_post_only, token_id = token_id_down, price = down_px, # max_price = 0.99, size = size, side = side ) ### ADD CHECK FOR MKT MOVED AWAY FROM OPPORTUNITY ### if slope_side == 'MKT': order_list = [up_leg, down_leg] elif slope_side == 'UP': order_list = [up_leg, down_leg] vk_px = float( json.loads( VAL_KEY.get('poly_5min_btcusd') )['price'] ) if up_px < vk_px: logging.info(f'ABANDONED BUY ORDERS: UP px moved from {up_px} -> {vk_px}') return False else: logging.info(f'NOT ABANDONED BUY ORDERS: UP px moved from {up_px} -> {vk_px}') else: order_list = [down_leg, up_leg] vk_px = float( json.loads( VAL_KEY.get('poly_5min_btcusd_down') )['price'] ) if down_px < vk_px: logging.info(f'ABANDONED BUY ORDERS: DOWN px moved from {down_px} -> {vk_px}') return False else: logging.info(f'NOT ABANDONED BUY ORDERS: UP px moved from {up_px} -> {vk_px}') logging.info('PRICES AT TIME OF ORDER:') logging.info(f'Current TS: {round(datetime.now().timestamp()*1000)}') logging.info(f'up_last_px: {up_last_px}; order up_px: {up_px}') logging.info(f'down_last_px: {down_last_px}; order down_px: {down_px}') logging.info(f'TGT_PROFIT_CENTS: {TGT_PROFIT_CENTS}') if ORDER_LOCK: logging.info(f'BUY ORDER BLOCKED BY LOCK: {order_list}') else: logging.info(f'Attempting {entry_or_exit} Orders {order_list}') await post_order( CLIENT = CLIENT, PostOrdersArgs_list = order_list ) # ORDER_LOCK = ORDER_LOCK + 1 # @async_timeit async def active_orders_route(): global LOCAL_ACTIVE_ORDERS global LOCAL_MATCHED_ORDERS if len(LOCAL_ACTIVE_ORDERS) > 2: logging.critical('More than two active orders, shutting down') await kill_algo('More than two active orders, shutting down') if len(LOCAL_MATCHED_ORDERS) > 2: logging.critical('More than two matched orders, shutting down') await kill_algo('More than two matched orders, shutting down') for idx, o in enumerate(LOCAL_ACTIVE_ORDERS): replace_w_order_at_mkt = False if o.get('status').upper() == 'MATCHED': logging.info(f'Active Order MATCHED. Moving to LOCAL_MATCHED_ORDERS. Order Id: {o['orderID']}') LOCAL_MATCHED_ORDERS.append(o) LOCAL_ACTIVE_ORDERS.pop(idx) continue elif o.get('status').upper() == 'LIVE': ts_now = round(datetime.now().timestamp()*1000) ts_order_post = o['timestamp_post_sent'] sec_order_live = (ts_now - ts_order_post) / 1000 logging.info(f'Working on order ({o['side']}) ({o['outcome']}): {o['orderID']}; SEC ALIVE: {sec_order_live:.2f}') ### Check Conditions to Immediately Replace Order at Mkt (Abandon Target Px) ### if (sec_order_live > MAX_LEG_LIVE_SEC): # Abandon if lived longer than x seconds logging.info(f'Order live > max sec ({sec_order_live} > {MAX_LEG_LIVE_SEC}); {o['side']}) ({o['outcome']}): {o['orderID']}') replace_w_order_at_mkt = True if not replace_w_order_at_mkt: slope_bool, slope_side = await slope_decision(slope_yes_thresh=SLOPE_YES_THRESH / 2) # Abandon if slope has reversed if slope_bool and (slope_side == o['outcome']): logging.info(f'SLOPE MOVED AWAY FROM TGT ORDER, replacing at mkt; ({o['side']}) ({o['outcome']}): {o['orderID']}') replace_w_order_at_mkt = True if replace_w_order_at_mkt: order_matched = await cancel_single_order_by_id(CLIENT=CLIENT, order_id=o['orderID']) if order_matched: logging.info(f'Order is MATCHED after being worked: {o}') LOCAL_MATCHED_ORDERS.append(o) LOCAL_ACTIVE_ORDERS.pop(idx) continue else: token_id = o['token_id'] if POLY_CLOB['token_id_up'] == token_id: clob_px = float(POLY_CLOB['price']) else: clob_px = float(POLY_CLOB_DOWN['price']) ### BUY if o['side'] == 'BUY': px = round(clob_px + 0.02, 2) side = BUY # max_price = o['max_price'] size = float(o['size']) post_only = False ### SELL elif o['side'] == 'SELL': px = round(clob_px - 0.02, 2) side = SELL # max_price = o['max_price'] size = float(o['size']) post_only = False if size < MIN_ORDER_SIZE: order_type = OrderType.FAK else: order_type = OrderType.GTC logging.info(f'REPLACING ORDER. Orig Px {o['price']} -> Mkt Px: {clob_px}; {o['side']}) ({o['outcome']}): {o['orderID']}') order = Custom_PostOrdersArgs( order=CLIENT.create_order( order_args=OrderArgs( token_id=token_id, price=px, size=size, side=side, ), options=PartialCreateOrderOptions( tick_size=str(POLY_CLOB['tick_size']), neg_risk=POLY_CLOB['neg_risk'] ), ), orderType = order_type, postOnly = post_only, token_id = token_id, price = px, # max_price = max_price, size = size, side = side ) await post_order( CLIENT = CLIENT, PostOrdersArgs_list = [order] ) elif o.get('status').upper() == 'FAILED': raise ValueError(f'Trade FAILED after matching: {o}') elif o.get('status').upper() == 'RETRYING': raise ValueError(f'Trade RETRYING after matching: {o}') else: raise ValueError(f'Unexpected Order Status: {o}') @async_timeit async def kill_algo(msg: str = 'No kill msg provided'): logging.info('Killing algo...') await cancel_all_orders(CLIENT=CLIENT) await flatten_open_positions( CLIENT=CLIENT, token_id_up = POLY_CLOB.get('token_id_up', None), token_id_down = POLY_CLOB.get('token_id_down', None), ) logging.info(f'...algo killed: {msg}') raise Exception(f'Algo Killed: {msg}') @async_timeit async def clob_client_caching(token_id): # logging.info('CLIENT CACHING') tick_size = CLIENT.get_tick_size(token_id=token_id) # logging.info(f'Tick Size: {tick_size}') neg_risk = CLIENT.get_neg_risk(token_id=token_id) # logging.info(f'Is Negative Risk: {neg_risk}') fee_rate_bps = CLIENT.get_fee_rate_bps(token_id=token_id) # logging.info(f'Fee Rate Bps: {fee_rate_bps}') # logging.info('CLIENT CACHING COMPLETE') # @async_timeit async def loop_check_route_switch(route_name): global LOOP_LAST_ROUTE if LOOP_LAST_ROUTE != route_name: print(f'SWITCHING ROUTES: {LOOP_LAST_ROUTE} -> {route_name}') logging.info(f'SWITCHING ROUTES: {LOOP_LAST_ROUTE} -> {route_name}') if route_name == 'no_orders_route_EXIT': await check_for_open_positions( CLIENT=CLIENT, token_id_up=POLY_CLOB.get('token_id_up', None), token_id_down=POLY_CLOB.get('token_id_down', None), ) LOOP_LAST_ROUTE = route_name async def run_algo(): global POLY_BINANCE global POLY_REF global POLY_CLOB global POLY_CLOB_DOWN global USER_TRADES global USER_ORDERS global SLOPE_HIST global ACTIVE_BALANCES_EXIST global FIRST_LOOP_NEW_MKT global LOCAL_ACTIVE_ORDERS global LOCAL_MATCHED_ORDERS global LOCAL_TOKEN_BALANCES # global LOCAL_ACTIVE_POSITIONS POLY_CLOB = json.loads(VAL_KEY.get('poly_5min_btcusd')) ### Get Token IDs ### token_id_up = POLY_CLOB.get('token_id_up', None) token_id_down = POLY_CLOB.get('token_id_down', None) if (token_id_up is None) or (token_id_down is None): raise ValueError(f'Token ID is None: UP: {token_id_up}; DOWN: {token_id_down}') logging.info(f'token_id_up: {POLY_CLOB.get('token_id_up', None)}') logging.info(f'token_id_down: {POLY_CLOB.get('token_id_down', None)}') ### CLOB Client Caching ### await clob_client_caching(token_id=token_id_up) await clob_client_caching(token_id=token_id_down) ### Get Initial Balances ### ACTIVE_BALANCES_EXIST = await check_for_open_positions( CLIENT=CLIENT, token_id_up=POLY_CLOB.get('token_id_up', None), token_id_down=POLY_CLOB.get('token_id_down', None), ) ### Check for missing target px (Poly 5min Target BTC Px Target) ### if POLY_CLOB.get('target_price', 0) <= 1.00: kill_algo('Missing target_price, check CLOB feedhandler') try: while True: # loop_start = time.time() # print('__________Start___________') POLY_BINANCE = json.loads(VAL_KEY.get('poly_binance_btcusd')) POLY_REF = json.loads(VAL_KEY.get('poly_rtds_cl_btcusd')) POLY_CLOB = json.loads(VAL_KEY.get('poly_5min_btcusd')) POLY_CLOB_DOWN = json.loads(VAL_KEY.get('poly_5min_btcusd_down')) USER_TRADES = VAL_KEY.get('poly_user_trades') USER_TRADES = json.loads(USER_TRADES) if USER_TRADES is not None else [] USER_ORDERS = VAL_KEY.get('poly_user_orders') USER_ORDERS = json.loads(USER_ORDERS) if USER_ORDERS is not None else [] ### Check for Token Id token_id_up = POLY_CLOB.get('token_id_up', None) token_id_down = POLY_CLOB.get('token_id_down', None) if (token_id_up is None) or (token_id_down is None): logging.info(f'Missing Token Ids for Market (token_id_up: {token_id_up}; token_id_down: {token_id_down}), sleeping 1 sec and retrying...') time.sleep(1) ACTIVE_BALANCES_EXIST = {} continue if FIRST_LOOP_NEW_MKT: ### CLOB Client Caching ### await clob_client_caching(token_id=token_id_up) await clob_client_caching(token_id=token_id_down) FIRST_LOOP_NEW_MKT = False for idx, o in enumerate(LOCAL_ACTIVE_ORDERS): if USER_TRADES: for t in USER_TRADES: if t['trader_side']=='MAKER': user_trade = next( ( item for item in json.loads(t['maker_orders']) if ( o['orderID'] == item['order_id'] ) ), None ) if user_trade: user_trade['status'] = t['status'] user_trade['size'] = float(user_trade['matched_amount']) # logging.info(f'********** MAKER TRADE IN USER TRADES: {user_trade} *******') elif t['taker_order_id'] == o["orderID"]: user_trade = t else: user_trade = None if user_trade: trade_status = str(user_trade['status']).upper() if trade_status != o['status'].upper(): logging.info(f'Updated Trade Status: {o['status'].upper()} --> {trade_status}; {o['orderID']}') logging.info(f'Trade Details: {user_trade}') o['status'] = trade_status if trade_status == 'CONFIRMED': LOCAL_ACTIVE_ORDERS.pop(idx) token_id = user_trade['asset_id'] current_balance = float(LOCAL_TOKEN_BALANCES.get(token_id, 0.00)) if user_trade['side'] == 'BUY': size = float(user_trade['size']) else: size = float(user_trade['size']) * -1 LOCAL_TOKEN_BALANCES[token_id] = current_balance + size logging.info('Order FILLED! - IN LOCAL_ACTIVE_ORDERS') elif trade_status == 'MATCHED': logging.info(f'Order MATCHED. Moving to LOCAL_MATCHED_ORDERS. Trade Status: {trade_status}') LOCAL_MATCHED_ORDERS.append(o) LOCAL_ACTIVE_ORDERS.pop(idx) elif trade_status == 'MINED': logging.info(f'Order Mined ...awaiting confirm: {trade_status}') else: logging.info(f'Trade status but not filled: trade= {user_trade}; order={o}') for idx, o in enumerate(LOCAL_MATCHED_ORDERS): if USER_TRADES: for t in USER_TRADES: if t['trader_side']=='MAKER': user_trade = next( ( item for item in json.loads(t['maker_orders']) if ( o['orderID'] == item['order_id'] ) ), None ) if user_trade: user_trade['status'] = t['status'] user_trade['size'] = float(user_trade['matched_amount']) # logging.info(f'********** MAKER TRADE IN USER TRADES: {user_trade} *******') elif t['taker_order_id'] == o["orderID"]: user_trade = t else: user_trade = None if user_trade: trade_status = str(user_trade['status']).upper() if trade_status != o['status'].upper(): logging.info(f'Updated Trade Status: {o['status']} --> {trade_status}; {o['orderID']}') o['status'] = trade_status if trade_status == 'CONFIRMED': LOCAL_MATCHED_ORDERS.pop(idx) token_id = user_trade['asset_id'] current_balance = float(LOCAL_TOKEN_BALANCES.get(token_id, 0.00)) if user_trade['side'] == 'BUY': size = float(user_trade['size']) else: size = float(user_trade['size']) * -1 LOCAL_TOKEN_BALANCES[token_id] = current_balance + size logging.info('Matched order CONFIRMED! - IN LOCAL_MATCHED_ORDERS') elif trade_status == 'MATCHED': # logging.info(f'Order Matched...awaiting confirm: {trade_status}') pass elif trade_status == 'MINED': # logging.info(f'Order Mined...awaiting confirm: {trade_status}') pass else: logging.info(f'Trade status but not filled: trade= {user_trade}; order={o}') ### CHECK BALANCES ### if (LOCAL_TOKEN_BALANCES.get(token_id_up) is None): LOCAL_TOKEN_BALANCES[token_id_up] = 0.00 if (LOCAL_TOKEN_BALANCES.get(token_id_down) is None): LOCAL_TOKEN_BALANCES[token_id_down] = 0.00 ACTIVE_BALANCES_EXIST = (abs(LOCAL_TOKEN_BALANCES.get(token_id_up)) > 0) or abs((LOCAL_TOKEN_BALANCES.get(token_id_down)) > 0) ### Check for Endtime Buffer ### if ENDTIME_BUFFER_SEC > POLY_CLOB.get('sec_remaining', 0): FIRST_LOOP_NEW_MKT = True if LOCAL_ACTIVE_ORDERS: print('buffer zone - orders cancel') await cancel_all_orders(CLIENT=CLIENT) if ACTIVE_BALANCES_EXIST: print('buffer zone - flatten positions') await flatten_open_positions( CLIENT=CLIENT, token_id_up = POLY_CLOB.get('token_id_up', None), token_id_down = POLY_CLOB.get('token_id_down', None), ) print('buffer zone, sleeping until next session') time.sleep(1) continue ### ENTRY Route ### if not(LOCAL_ACTIVE_ORDERS) and not(LOCAL_MATCHED_ORDERS) and not(ACTIVE_BALANCES_EXIST): # No Orders, No Matched, No Positions await loop_check_route_switch('no_orders_route_ENTRY') await no_orders_route(entry_or_exit='ENTRY') ### Open Orders Route ### elif LOCAL_ACTIVE_ORDERS: # Any Active Orders await loop_check_route_switch('active_orders_route') await active_orders_route() ### Matched Orders Route - Waiting ### elif LOCAL_MATCHED_ORDERS: # Any Active Orders await loop_check_route_switch('matched_orders_awaiting_confirms') # await active_orders_route() ### Open Positions Route - EXIT ### elif not(LOCAL_ACTIVE_ORDERS) and not(LOCAL_MATCHED_ORDERS) and ACTIVE_BALANCES_EXIST: # No Orders, No Matches, Positions await loop_check_route_switch('no_orders_route_EXIT') await no_orders_route(entry_or_exit='EXIT') # time.sleep(0.5) else: print('ROUTE: NOT IMPLEMENTED') # print(f'__________________________ (Algo Engine ms: {(time.time() - loop_start)*1000})') # time.sleep(3) except KeyboardInterrupt: print('...algo stopped') await cancel_all_orders(CLIENT=CLIENT) except Exception as e: logging.critical(f'*** ALGO ENGINE CRASHED: {e}') logging.error(traceback.format_exc()) await cancel_all_orders(CLIENT=CLIENT) async def main(): global CLIENT global VAL_KEY global CON CLIENT = api.create_client() VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') async with engine.connect() as CON: await create_executions_orders_table(CON=CON) await run_algo() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") asyncio.run(main())