from x10.utils.http import WrappedApiResponse from x10.perpetual.trading_client.trading_client import PerpetualTradingClient import asyncio import json import logging import math import os import time import traceback from datetime import datetime, timezone from decimal import ROUND_DOWN, ROUND_UP, ROUND_HALF_UP, Decimal from typing import AsyncContextManager from dataclasses import dataclass, asdict, field from typing import Any import numpy as np import pandas as pd import requests # import talib import valkey from dotenv import load_dotenv from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine from x10.models.order import OrderSide, PlacedOrderModel import modules.utils as utils import modules.aster_auth as aster_auth import modules.extended_auth as extend_auth import modules.structs as structs ### Clients ### EXTEND_CLIENT: PerpetualTradingClient CON: AsyncContextManager VAL_KEY: valkey.Valkey ### Logging ### load_dotenv() LOG_FILEPATH: str = f'{os.getenv(key="LOGS_PATH")}/Fund_Rate_Algo.log' ### Algo Config ### Config: structs.Algo_Config Algo_Status: structs.Algo_Status ### Exchanges ### Aster: structs.Perpetual_Exchange Extend: structs.Perpetual_Exchange ### Globals ### Open_Symbols: list[str] = [] Last_Aster_Fill_Time_Ts: float = 0.00 Funding_Rates_Min_Remaining_Factor_Pcts: list[float] = [] Aster_Open_Orders: list[dict] = [] Extend_Open_Orders: list[dict] = [] ### Flags ### Flags = structs.Flags() ### Algo ### async def output_algo_status(status: str) -> None: global Algo_Status Algo_Status.last_update_ts_ms = int(round(datetime.now().timestamp()*1000, 2)) Algo_Status.status = status VAL_KEY.set('algo_status', json.dumps(Algo_Status.model_dump())) def create_exchange_objs_from_dict(exchanges_dict: dict) -> tuple[structs.Perpetual_Exchange, structs.Perpetual_Exchange]: Aster = structs.Perpetual_Exchange( mult = int(exchanges_dict['max_leverage_ast']), lh_asset = exchanges_dict['lh_asset_ast'], rh_asset = exchanges_dict['rh_asset_ast'], symbol_asset_separator = '', initial_funding_rate=float(exchanges_dict['funding_rate_ast']), fund_rate_at_same_time=bool(exchanges_dict['next_funding_at_same_time']), min_price=float(exchanges_dict['min_price_ast']), min_order_size=float(exchanges_dict['min_order_size_ast']), min_lot_size=float(exchanges_dict['min_lot_size_ast']), min_notional=float(exchanges_dict['min_notional_ast']), buy_ratio=float(exchanges_dict['buy_ratio_ast']), buy_ratio_std=float(exchanges_dict['buy_ratio_std']), ) Extend = structs.Perpetual_Exchange( mult = int(exchanges_dict['max_leverage_ext']), lh_asset = exchanges_dict['lh_asset_ext'], rh_asset = exchanges_dict['rh_asset_ext'], symbol_asset_separator = '-', initial_funding_rate=float(exchanges_dict['funding_rate_ext']), fund_rate_at_same_time=bool(exchanges_dict['next_funding_at_same_time']), min_price=float(exchanges_dict['min_price_ext']), min_order_size=float(exchanges_dict['min_order_size_ext']), min_lot_size=float(exchanges_dict['min_lot_size_ext']), min_notional=float(exchanges_dict['min_notional_ext']), buy_ratio=float(exchanges_dict['buy_ratio_ext']), buy_ratio_std=float(exchanges_dict['buy_ratio_std']), ) return Aster, Extend async def symbol_switch(best_symbol_by_exchange_aster: structs.Perpetual_Exchange, best_symbol_by_exchange_extend: structs.Perpetual_Exchange): global Config global Aster global Extend if (best_symbol_by_exchange_aster.symbol != Aster.symbol) or (best_symbol_by_exchange_extend.symbol != Extend.symbol): if abs( Aster.notional_position ) > 0.00 or abs( Extend.notional_position ) > 0.00: if Config.Logging.Print_Summary_Each_Loop: print(f'Symbol switch [{Aster.symbol} > {best_symbol_by_exchange_aster.symbol}] - Flattening Positions') ### Check if its worth switching - is this truly the best move fees included etc? Config.Overrides.Flatten_Open_Positions_Opportunistic = True else: logging.info('Balances Flattened - Updating to Trade New Symbols:') logging.info(f' ASTER.symbol -> {best_symbol_by_exchange_aster.symbol}') logging.info(f' EXTEND.symbol -> {best_symbol_by_exchange_extend.symbol}') await aster_cancel_all_orders() await extend_cancel_all_orders() Config.Overrides.Flatten_Open_Positions_Opportunistic = False if Open_Symbols: logging.info(f'OPEN SYMBOLS: {Open_Symbols}') master_data = json.loads(s=VAL_KEY.get(name='fr_engine_best_fund_rate_master')) # ty:ignore[invalid-argument-type] open_symbol_to_work = Open_Symbols[0] current_pos_master_ast = [d for d in master_data if d.get('symbol_ext') == open_symbol_to_work][0] Aster, Extend = create_exchange_objs_from_dict(exchanges_dict=current_pos_master_ast) Open_Symbols.pop(0) await get_aster_notional_position() await get_extend_notional() else: Aster = best_symbol_by_exchange_aster Extend = best_symbol_by_exchange_extend VAL_KEY.set(name='fr_algo_working_symbol', value=json.dumps(obj={'ASTER': asdict(obj=Aster), 'EXTEND': asdict(obj=Extend)})) def calc_fr_minutes_remaining_factor( min_start_procedure: int = 15, min_to_end_procedure: int = 15, factor_exp_pct: float = 0.25 ): factors = [np.float64(0.00)] for x in range(min_start_procedure+1,61-min_to_end_procedure): y = (x)**(np.log(x)*factor_exp_pct) factors.append(y) pcts = list(factors / np.max(factors)) for x in range(61-min_to_end_procedure, 61): pcts.append(1) pcts.reverse() return pcts def get_fr_factor_by_minute(min_left: int, factor_pcts: list[float]) -> Decimal: return Decimal(str(factor_pcts[min(len(factor_pcts) - 1, min_left)])).quantize(Decimal('0.0001')) @dataclass(kw_only=True) class Target: exchange: str # ASTER | EXTEND symbol: str # e.g. BTC-USD side: str = '' notional_tgt: Decimal notional_tail: Decimal = Decimal('0.00') min_order_notional: Decimal = Decimal('0.00') min_order_base: Decimal = Decimal('0.00') min_order_price: Decimal = Decimal('0.00') min_lot_size: Decimal = Decimal('0.00') def __post_init__(self): if self.exchange == 'ASTER': self.notional_tail = self.notional_tgt - Decimal( str(Aster.notional_position) ) #+ Decimal( str(Aster.unrealized_pnl) ) self.min_order_notional = Decimal(str(Aster.min_notional)) min_order_base = Decimal(str(Aster.min_order_size)) min_order_price = Decimal(str(Aster.min_price)) self.min_lot_size = Decimal(str(Aster.min_lot_size)) else: self.notional_tail = self.notional_tgt - Decimal( str(Extend.notional_position) ) #+ Decimal( str(Extend.unrealized_pnl) ) self.min_order_notional = Decimal(str(Extend.min_notional)) min_order_base = Decimal(str(Extend.min_order_size)) min_order_price = Decimal(str(Extend.min_price)) self.min_lot_size = Decimal(str(Extend.min_lot_size)) self.min_order_base = Decimal(str(int(min_order_base))) if min_order_base == int(min_order_base) else Decimal(str(min_order_base)) self.min_order_price = Decimal(str(int(min_order_price))) if min_order_price == int(min_order_price) else Decimal(str(min_order_price)) self.side = 'BUY' if self.notional_tail > Decimal('0.00') else 'SELL' def base_tgt(self, price: Decimal) -> Decimal: tgt = Decimal(str(float(self.notional_tgt) / float(price))).quantize(Decimal(str(self.min_order_base))) if self.min_lot_size > 0: tgt = tgt - (tgt % self.min_lot_size) return tgt.quantize(Decimal(str(self.min_order_base))) def base_tail(self, price: Decimal) -> Decimal: tail = Decimal(str(float(self.notional_tail) / float(price))).quantize(Decimal(str(self.min_order_base))) if Decimal(str(self.min_lot_size)) > 0: tail = tail - (tail % self.min_lot_size) return tail.quantize(Decimal(str(self.min_order_base))) def is_flattening(self) -> bool: return self.notional_tgt == Decimal('0.00') def is_reduce_only(self, price: Decimal) -> bool: if abs(self.notional_tail) < abs(self.min_order_notional): return True if abs(self.base_tail(price=price)) < abs(self.min_order_base): return True return False def is_orderable(self, price: Decimal) -> bool: if self.notional_tail == Decimal('0.00'): return False if self.base_tail(price=price) == Decimal('0.00'): return False if self.exchange == 'ASTER' and not(Config.Overrides.Allow_Ordering_Aster): return False if self.exchange == 'EXTEND' and not(Config.Overrides.Allow_Ordering_Extend): return False if self.is_reduce_only(price=price) and not(self.is_flattening()): return False return True @dataclass(kw_only=True) class Signal: signal: bool exchange: str # ASTER | EXTEND side: str # BUY | SELL symbol: str # e.g. BTC-USD expected_alpha: Decimal # e.g. BTC-USD model_ratio: Decimal current_ratio: Decimal def signal_alpha_over_taker( Aster: structs.Perpetual_Exchange, Extend: structs.Perpetual_Exchange, aster_ticker_dict: dict, extend_ticker_dict: dict, funding_rate_exch: str, funding_rate_side: str, funding_rate: Decimal = Decimal('0.00'), funding_rate_switch: Decimal = Decimal('0.00'), taker_fee: Decimal = Decimal(str(0.00025)), alpha_hurdle_adj: Decimal = Decimal('0.00'), ) -> Signal: if Config.Overrides.Flatten_Open_Positions_Opportunistic: if Decimal(str(Aster.notional_position)) > 0: aster_buy_fund_rate_return = abs(funding_rate_switch) * -1 extend_buy_fund_rate_return = abs(funding_rate_switch) # funding_rate_exch = 'EXTEND' # funding_rate_side = 'BUY' else: # Decimal(str(Aster.notional_position)) < 0: aster_buy_fund_rate_return = abs(funding_rate_switch) extend_buy_fund_rate_return = abs(funding_rate_switch) * -1 # funding_rate_exch = 'ASTER' # funding_rate_side = 'BUY' else: if funding_rate_exch == 'ASTER': if funding_rate_side == 'BUY': aster_buy_fund_rate_return = abs(funding_rate) extend_buy_fund_rate_return = abs(funding_rate) * -1 else: aster_buy_fund_rate_return = abs(funding_rate) * -1 extend_buy_fund_rate_return = abs(funding_rate) else: # funding_rate_exch == 'EXTEND': if funding_rate_side == 'BUY': aster_buy_fund_rate_return = abs(funding_rate) * -1 extend_buy_fund_rate_return = abs(funding_rate) else: aster_buy_fund_rate_return = abs(funding_rate) extend_buy_fund_rate_return = abs(funding_rate) * -1 aster_mid_px: Decimal = ( Decimal(str(aster_ticker_dict['best_ask_px'])) + Decimal(str(aster_ticker_dict['best_bid_px'])) ) / 2 extend_mid_px: Decimal = ( Decimal(str(extend_ticker_dict['best_ask_px'])) + Decimal(str(extend_ticker_dict['best_bid_px'])) ) / 2 aster_buy_ratio: Decimal = Decimal(str((extend_mid_px / aster_mid_px) - 1)) extend_buy_ratio: Decimal = Decimal(str(aster_buy_ratio*-1)) aster_buy_ratio_min_taker_hurdle = ( aster_buy_ratio + aster_buy_fund_rate_return ) - taker_fee - alpha_hurdle_adj extend_buy_ratio_min_taker_hurdle = ( extend_buy_ratio + extend_buy_fund_rate_return ) - taker_fee - alpha_hurdle_adj aster_buy_expected_alpha: Decimal = ( aster_buy_ratio_min_taker_hurdle - Decimal(str(Aster.buy_ratio)) ).quantize(Decimal('0.000001'), rounding='ROUND_DOWN') # Decimal Price % Diff (x Qty = Alpha $) extend_buy_expected_alpha: Decimal = ( extend_buy_ratio_min_taker_hurdle - Decimal(str(Extend.buy_ratio)) ).quantize(Decimal('0.000001'), rounding='ROUND_DOWN') # Decimal Price % Diff (x Qty = Alpha $) # logging.info(f'aster_buy_ratio_min_taker_hurdle: ( {aster_buy_ratio} - {aster_buy_fund_rate_return} ) - {taker_fee} - {alpha_hurdle_adj} = {aster_buy_ratio_min_taker_hurdle}') # logging.info(f'extend_buy_ratio_min_taker_hurdle: ( {extend_buy_ratio} - {extend_buy_fund_rate_return} ) - {taker_fee} - {alpha_hurdle_adj} = {extend_buy_ratio_min_taker_hurdle}') # logging.info(f'aster_buy_expected_alpha: {aster_buy_ratio_min_taker_hurdle} - {Decimal(str(Aster.buy_ratio))} = {aster_buy_expected_alpha}') # logging.info(f'extend_buy_expected_alpha: {extend_buy_ratio_min_taker_hurdle} - {Decimal(str(Extend.buy_ratio))} = {extend_buy_expected_alpha}') # aster_buy_ratio_min_taker_hurdle : ( 0.000886878630659394261895260 + 0.000037000000000000005 ) - 0.00025 - 0.0 = 0.000673878630659394266895260 # extend_buy_ratio_min_taker_hurdle : ( -0.000886878630659394261895260 + -0.000037000000000000005 ) - 0.00025 - 0.0 = -0.001173878630659394266895260 # aster_buy_expected_alpha : 0.000673878630659394266895260 - 0.0014628375 = -0.0007 # extend_buy_expected_alpha : -0.001173878630659394266895260 - -0.0014628375 = 0.0002 if aster_buy_expected_alpha > 0: signal: bool = True exchange: str = 'ASTER' side: str = 'BUY' symbol: str = Extend.symbol # USING EXT SYMBOL AS DEFAULT expected_alpha: Decimal = aster_buy_expected_alpha model_ratio: Decimal = Decimal(str(Aster.buy_ratio)) current_ratio: Decimal = aster_buy_ratio_min_taker_hurdle elif extend_buy_expected_alpha > 0: signal: bool = True exchange: str = 'EXTEND' side: str = 'BUY' symbol: str = Extend.symbol expected_alpha: Decimal = extend_buy_expected_alpha model_ratio: Decimal = Decimal(str(Extend.buy_ratio)) current_ratio: Decimal = extend_buy_ratio_min_taker_hurdle else: if max([aster_buy_expected_alpha,extend_buy_expected_alpha]) == aster_buy_expected_alpha: signal: bool = False exchange: str = 'ASTER' side: str = 'BUY' symbol: str = Extend.symbol # USING EXT SYMBOL AS DEFAULT expected_alpha: Decimal = aster_buy_expected_alpha model_ratio: Decimal = Decimal(str(Aster.buy_ratio)) current_ratio: Decimal = aster_buy_ratio_min_taker_hurdle else: signal: bool = False exchange: str = 'EXTEND' side: str = 'BUY' symbol: str = Extend.symbol expected_alpha: Decimal = extend_buy_expected_alpha model_ratio: Decimal = Decimal(str(Extend.buy_ratio)) current_ratio: Decimal = extend_buy_ratio_min_taker_hurdle return Signal( signal = signal, exchange = exchange, side = side, symbol = symbol, expected_alpha = expected_alpha, model_ratio = model_ratio, current_ratio = current_ratio, ) ### Define Ordering Logic ### ''' Notes - handle increasing vs flattening - if increasing, set not reduce only - if flattening, set as reduce only and make sure allowed to trade below min notional, and qty calc should be exact - handle opportunistic vs immediate - handle cancel-replace manually for aster and sometimes manually for extend (e.g. cant change certain things on an existing order) - gracefully handle err responses (well known err codes e.g.) and response errors (e.g. json fails to parse) ''' async def cancel_aster_order(open_order_id: str): global Aster_Open_Orders start = time.time() cancel_order: dict = { "url": "/fapi/v3/order", "method": "DELETE", "params": { 'symbol': Aster.symbol, 'orderId': open_order_id, } } cr: dict = await aster_auth.post_authenticated_url(cancel_order) # ty:ignore[invalid-assignment] if cr.get('status', None) == 'CANCELED': Aster_Open_Orders.pop(0) else: logging.warning(f'ASTER ORDER FAILED TO CANCEL DURING CR ({open_order_id}): RESP {cr}') logging.info(f'TIMING - cancel_aster_order: {(time.time() - start)*1000:.2f}') async def post_aster_order( symbol: str, side: str, qty: Decimal, price: Decimal, reduceOnly: bool, postOnly: bool ): global Aster_Open_Orders global Aster if postOnly: timeInForce = 'GTX' else: timeInForce = 'GTC' post_order = { "url": "/fapi/v3/order", "method": "POST", "params": { 'symbol': symbol, 'side': side, 'type': 'LIMIT', 'timeInForce': timeInForce, 'quantity': abs(qty), 'price': price, 'reduceOnly': reduceOnly } } order_resp: dict = await aster_auth.post_authenticated_url(post_order) # ty:ignore[invalid-assignment] if order_resp.get('orderId', None) is not None: order_resp['order_id'] = order_resp['orderId'] order_resp['original_price'] = price order_resp['price'] = price order_resp['order_status'] = order_resp['status'] Aster_Open_Orders.append(order_resp) Aster.just_rejected_count = 0 utils.send_tg_alert(f'FR_ALGO - ASTER Order ({order_resp['orderId']}). Start_$: {Aster.notional_position:.4f}; {side}: {float(qty)*float(price):.4f}; Price: {float(price):.4f}') logging.info(f'ASTER ORDER PLACED SUCCESS: {order_resp}') # print_summary(use_logging=True) else: logging.critical(f'*** Aster Order Response Abnormal: {order_resp}; post_order: {post_order}') await kill_algo() async def cancel_extend_order(order_id: str): r = EXTEND_CLIENT.orders.cancel_order(order_id=order_id) r = dict(r) if r.get('status', None) == 'OK': logging.info(f'EXTEND ORDER CANCELLED: {order_id}') else: logging.warning(f'EXTEND ORDER FAILED TO CANCEL DURING CR ({order_id}): RESP {r}') async def post_extend_order( symbol: str, side: str, qty: Decimal, price: Decimal, reduceOnly: bool, postOnly: bool, cxl_prev_order_id: str | None = None, ): global Extend_Open_Orders global Extend side = OrderSide.BUY if side == 'BUY' else OrderSide.SELL taker_fee = Decimal("0.00025") try: order_resp: WrappedApiResponse[PlacedOrderModel] = await EXTEND_CLIENT.place_order( market_name=symbol, amount_of_synthetic=abs(qty), price=price, side=side, taker_fee=taker_fee, previous_order_id=cxl_prev_order_id, post_only=postOnly, reduce_only=reduceOnly ) order_resp_dict = dict(order_resp) except Exception as e: logging.critical(f'e: {e}; market_name: {symbol}; amount_of_synthetic: {qty}; price: {price}; side: {side}; taker_fee: {taker_fee}; previous_order_id: {cxl_prev_order_id}; post_only: {postOnly}; reduce_only: {reduceOnly}') if '1140' in str(e): # Error response from https://api.starknet.extended.exchange/api/v1/user/order: {"status":"ERROR","error":{"code":1140,"message":"New order cost exceeds available balance","debugInfo":"Order cost 19.458300 exceeds available for trade 1.467180\nOrder price = 43.825, mark price = 43.832222240625 estimated market price = 44.045"}} logging.info('EXTEND New order - cost exceeds available balance') await kill_algo() if '1121' in str(e): # Error response from https://api.starknet.extended.exchange/api/v1/user/order: code 400 - {"status":"ERROR","error":{"code":1121,"message":"Invalid quantity, wrong size increment"}} logging.info('EXTEND New order - Invalid quantity') await kill_algo() if '1123' in str(e): # Error response from https://api.starknet.extended.exchange/api/v1/user/order: code 400 - {"status":"ERROR","error":{"code":1123,"message":"Invalid quantity precision"}} logging.info('EXTEND New order - Invalid quantity precision') await kill_algo() elif '1142' in str(e): # 'Error response from https://api.starknet.extended.exchange/api/v1/user/order: code 400 - {"status":"ERROR","error":{"code":1142,"message":"Edit order not found"}};' logging.info('EXTEND EDIT ORDER, NOT FOUND, CANCELLING and continuing') await extend_cancel_all_orders() # if Extend_Open_Orders: # Extend_Open_Orders.pop(0) # time.sleep(0.1) return if order_resp_dict.get('status', None) == 'ERROR': if order_resp_dict['error']['code']==1142: logging.info('Cant find edit order for Extend, skipping cancel.') else: logging.critical(f'*** Extend Order Response Abnormal: {order_resp};') await kill_algo() if order_resp_dict.get('status', None) == 'OK': if Extend_Open_Orders: Extend_Open_Orders.pop(0) order_dict = dict(order_resp_dict['data']) order_dict['status'] = 'NEW' order_dict['price'] = str(price) order_dict['qty'] = str(qty) order_dict['filled_qty'] = str(0) order_dict['side'] = str(side) Extend_Open_Orders.append(order_dict) Extend.just_rejected_count = 0 utils.send_tg_alert(f'FR_ALGO - EXTEND Order ({order_dict.get('id', None)}). Start_$: {Extend.notional_position:.2f}; {str(side)}: {float(qty)*float(price):.2f}; Price: {float(price):.2f}') logging.info(f'EXTEND ORDER PLACED SUCCESS: {order_dict}') # print_summary(use_logging=True) else: logging.critical(f'*** Extend Order Response Abnormal: {order_resp};') await kill_algo() ### OPEN ORDERS ### async def handle_order_updates(exch: str, local_open_orders: list[dict], ws_open_orders: list[dict], ws_pos_updates: list[dict]) -> list[dict]: # exch = 'ASTER' | 'EXTEND' global Aster global Extend global Last_Aster_Fill_Time_Ts if ws_open_orders: for idx, o in enumerate(local_open_orders): o = dict(o) if o.get('order_id') is not None: ws_order_id_field = 'order_id' elif o.get('orderId') is not None: ws_order_id_field = 'orderId' else: ws_order_id_field = 'id' order_id = o[ws_order_id_field] order_orig_status: str = o.get('status') if o.get('status') is not None else o['order_status'] # ty:ignore[invalid-assignment] order_update: list[dict] = [dict(ou) for ou in ws_open_orders if dict(ou).get('order_id', None) == order_id] if len(order_update) > 0: order_update: dict = order_update[0] order_update_status: str = order_update.get('status') if order_update.get('status') is not None else order_update['order_status'] # ty:ignore[invalid-assignment] order_status_changed: bool = order_orig_status.upper() != order_update_status.upper() local_open_orders[idx]['order_id'] = order_id local_open_orders[idx]['status'] = order_update_status local_open_orders[idx]['price'] = order_update.get('price', 0) if order_update.get('price') is not None else order_update['original_price'] if order_status_changed: logging.info(f'{exch} ORDER ({order_id}): {order_orig_status} -> {order_update_status}') local_open_orders[idx] = order_update if order_update_status in ['CANCELLED','CANCELED']: logging.info(f'{exch} ORDER CANCELLED: {order_id}') local_open_orders.pop(idx) # utils.send_tg_alert(f'FR_ALGO - {exch} REJECTED ({order_id})') elif order_update_status in ['EXPIRED','REJECTED']: logging.info(f'{exch} ORDER REJECTED or EXPIRED: {order_id}') local_open_orders.pop(idx) if exch=='ASTER': Aster.just_rejected_count = Aster.just_rejected_count + 1 Config.Config.Price_Worsener_Aster=1 else: Extend.just_rejected_count = Extend.just_rejected_count + 1 # Config.Config.Price_Worsener_Extend=1 if Aster.just_rejected_count > 1 or Extend.just_rejected_count > 1: time.sleep(1) Aster.just_rejected_count = 0 Extend.just_rejected_count = 0 utils.send_tg_alert(f'FR_ALGO - {exch} REJECTED ({order_id})') elif order_update_status in ['PARTIALLY_FILLED']: logging.info(f'{exch} ORDER PARTIALLY FILLED: {order_id}') # await get_aster_collateral() if exch=='ASTER': await get_aster_notional_position(resp=ws_pos_updates) Last_Aster_Fill_Time_Ts = datetime.now().timestamp()*1000 else: await get_extend_notional(resp=ws_pos_updates) utils.send_tg_alert(f'FR_ALGO - {exch} PARTIALLY FILLED ({order_id})') elif order_update_status in ['FILLED']: logging.info(f'{exch} ORDER FILLED: {order_id}') local_open_orders.pop(idx) # await get_aster_collateral() if exch=='ASTER': # await aster_cancel_all_orders() await get_aster_notional_position(resp=ws_pos_updates) Last_Aster_Fill_Time_Ts = datetime.now().timestamp()*1000 else: await extend_cancel_all_orders() await get_extend_notional() utils.send_tg_alert(f'FR_ALGO - {exch} FILLED ({order_id})') else: logging.critical(f'{exch} ORDER STATUS CHG TO UNEXPECTED VALUE, KILLING... ({order_id}): {order_orig_status} -> {order_update_status}') await kill_algo() return local_open_orders async def get_aster_open_orders(): global Aster_Open_Orders fut_acct_openOrders = { "url": "/fapi/v3/openOrders", "method": "GET", "params": {} } Aster_Open_Orders = await aster_auth.post_authenticated_url(fut_acct_openOrders) # ty:ignore[invalid-assignment] async def get_extend_open_orders(): global Extend_Open_Orders Extend_Open_Orders = list(dict(await EXTEND_CLIENT.account.get_open_orders()).get('data', 0)) ### WALLLET ### async def get_aster_account_open_symbols() -> list[str]: fut_acct_positionRisk: dict = { "url": "/fapi/v3/positionRisk", "method": "GET", "params": { 'symbol':'' } } try: resp: list = await aster_auth.post_authenticated_url(req=fut_acct_positionRisk) # ty:ignore[invalid-assignment] except Exception as e: logging.critical(f'JSONDecodeError trying to get Aster open orders: {e}; resp: {resp}') await kill_algo() resp: list = [] ld = [ utils.symbol_to_extend_fmt(x['symbol']) for x in resp if abs(float(x.get('positionAmt', 0))) > 0] return ld async def get_aster_notional_position(resp: list | None = None): global Aster previous_notional_obj = Aster.notional_obj previous_notional_position = Aster.notional_position if resp: pos_dict = [x for x in resp if x.get('symbol', None) == Aster.symbol] if pos_dict: pos_dict = pos_dict[0] else: pos_dict = {} pos_dict['side'] = 'LONG' pos_dict['entry_price'] = 0.00 pos_dict['position_amount'] = 0.00 pos_dict['unrealized_pnl'] = 0.00 pos_dict['timestamp_arrival'] = round(datetime.now().timestamp()*1000) # logging.info('get_aster_notional - No Positions') else: logging.info('Getting Aster Notionals from API') fut_acct_positionRisk: dict = { "url": "/fapi/v3/positionRisk", "method": "GET", "params": { 'symbol': Aster.symbol, } } try: resp: list = await aster_auth.post_authenticated_url(req=fut_acct_positionRisk) # ty:ignore[invalid-assignment] except Exception as e: logging.critical(f'JSONDecodeError trying to get Aster notional: {e}; resp: {resp}') await kill_algo() resp: list = [] pos_dict = [x for x in resp if x.get('symbol', None) == Aster.symbol] if pos_dict: pos_dict = pos_dict[0] else: pos_dict = {} pos_dict['side'] = 'LONG' pos_dict['entry_price'] = 0.00 pos_dict['position_amount'] = 0.00 pos_dict['unrealized_pnl'] = 0.00 logging.info('get_aster_notional - No Positions') pos_dict['timestamp_arrival'] = round(datetime.now().timestamp()*1000) if previous_notional_obj: if previous_notional_obj['timestamp_arrival'] > pos_dict['timestamp_arrival']: # logging.info(f'ASTER NOTIONAL: prev timestamp ({pd.to_datetime(previous_notional_obj['timestamp_arrival'], unit='ms')}) > new timestamp ({pd.to_datetime(pos_dict['timestamp_arrival'], unit='ms')}); skipping') return Aster.notional_obj = pos_dict if len(pos_dict) < 1: logging.info(f'BAD NOTIONAL - ASTER CHANGE: Empty pos_dict: {pos_dict}; resp: {resp}') await kill_algo() try: Aster.unrealized_pnl = float(pos_dict['unrealized_pnl']) if pos_dict.get('unrealized_pnl') is not None else float(pos_dict['unRealizedProfit']) except Exception as e: logging.critical(f'Aster.unrealized_pnl pos_dict: {pos_dict}') raise ValueError(e) if pos_dict.get('notional') is not None: Aster.notional_position = float(pos_dict['notional']) #- Aster.unrealized_pnl else: Aster.notional_position = float(pos_dict['position_amount'])*float(pos_dict['entry_price']) if pos_dict.get('leverage') is not None: Aster.mult = int(pos_dict['leverage']) if abs(Aster.notional_position) > Config.Config.Max_Target_Notional*Config.Config.Max_Order_Over_Notional_Ratio: logging.info(f'BAD NOTIONAL - ASTER CHANGE: {previous_notional_position} -> {Aster.notional_position}; UR PNL: {Aster.unrealized_pnl}; MULT: {Aster.mult}; pos_dict: {pos_dict}; resp: {resp}; max_tgt_notional: {Config.Config.Max_Target_Notional}') await kill_algo() if Aster.notional_position != previous_notional_position: logging.info(f'ASTER NOTIONAL CHANGE: {previous_notional_position:.2f} -> {Aster.notional_position:.2f}; UR PNL: {Aster.unrealized_pnl:.2f}; MULT: {Aster.mult:.0f}; resp: {bool(resp)}') async def get_extend_account_open_symbols() -> list[str]: resp = dict(await EXTEND_CLIENT.account.get_positions()).get('data', []) ld = [x.market for x in list(resp) if abs(float(x.size)) > 0] return ld async def set_comb_open_symbols() -> None: global Open_Symbols open_aster_symbols = await get_aster_account_open_symbols() open_extend_symbols = await get_extend_account_open_symbols() Open_Symbols = list(set(open_aster_symbols + open_extend_symbols)) async def get_extend_notional(resp: list | None = None): global Extend previous_notional_obj = Extend.notional_obj previous_notional_position = Extend.notional_position if not resp: resp = dict(await EXTEND_CLIENT.account.get_positions()).get('data', []) pos_dict = [dict(d) for d in resp if dict(d).get('market') == Extend.symbol] if pos_dict: pos_dict = pos_dict[0] pos_dict['timestamp_arrival'] = round(datetime.now().timestamp()*1000) else: pos_dict = {} pos_dict['side'] = 'LONG' pos_dict['value'] = 0.00 pos_dict['unrealised_pnl'] = 0.00 pos_dict['timestamp_arrival'] = round(datetime.now().timestamp()*1000) logging.info('get_extend_notional - No Positions') else: pos_dict = [dict(d) for d in resp if dict(d).get('market') == Extend.symbol] if pos_dict: pos_dict = pos_dict[0] else: pos_dict = {} pos_dict['side'] = 'LONG' pos_dict['value'] = 0.00 pos_dict['unrealised_pnl'] = 0.00 pos_dict['timestamp_arrival'] = round(datetime.now().timestamp()*1000) # logging.info('get_extend_notional - No Positions') # pos_dict['timestamp_arrival'] = round(datetime.now().timestamp()*1000) if previous_notional_obj: if previous_notional_obj['timestamp_arrival'] > pos_dict['timestamp_arrival']: # logging.info(f'EXTEND NOTIONAL: prev timestamp ({pd.to_datetime(previous_notional_obj['timestamp_arrival'], unit='ms')}) > new timestamp ({pd.to_datetime(pos_dict['timestamp_arrival'], unit='ms')}); skipping') return else: previous_notional_obj = {} Extend.notional_obj = pos_dict Extend.unrealized_pnl = pos_dict.get('unrealised_pnl', 0) position_side = pos_dict['side'] # LONG or SHORT notional_pos_abs = abs(float(pos_dict['value'])) if position_side == 'LONG': notional_pos_sided = notional_pos_abs elif position_side == 'SHORT': notional_pos_sided = notional_pos_abs * -1 else: logging.info(f'EXTEND BAD SIDE ON POSITION UPDATE: {pos_dict}') Extend.notional_position = notional_pos_sided - float(Extend.unrealized_pnl) Extend.mult = pos_dict.get('leverage', Extend.mult) if abs(Extend.notional_position) > Config.Config.Max_Target_Notional*Config.Config.Max_Order_Over_Notional_Ratio: logging.info(f'BAD NOTIONAL - EXTEND CHANGE: {previous_notional_position} -> {Extend.notional_position}; UR PNL: {Extend.unrealized_pnl}; MULT: {Extend.mult}; pos_dict: {pos_dict}; resp: {resp}') await kill_algo() if Extend.notional_position != previous_notional_position: logging.info(f'EXTEND NOTIONAL CHANGE: {previous_notional_position} [{previous_notional_obj.get('timestamp_arrival')}] -> {Extend.notional_position:.2f} [{Extend.notional_obj['timestamp_arrival']}]; UR PNL: {Extend.unrealized_pnl:.2f}; MULT: {Extend.mult}; resp: {bool(resp)}') ### EXCHANGE INFO ### async def get_aster_exch_info(symbol_override: str | None = None): global Aster if symbol_override: Aster.symbol = utils.symbol_to_aster_fmt(symbol_override) fut_acct_exchangeInfo: dict = { "url": "/fapi/v3/exchangeInfo", "method": "GET", "params": {} } r: dict = await aster_auth.post_authenticated_url(fut_acct_exchangeInfo) # ty:ignore[invalid-assignment] s: list = r['symbols'] d: dict = [d for d in s if d.get('symbol', None) == Aster.symbol][0] f: dict = [f for f in d['filters'] if f.get('filterType', None) == 'LOT_SIZE'][0] q: dict = [f for f in d['filters'] if f.get('filterType', None) == 'PRICE_FILTER'][0] n: dict = [f for f in d['filters'] if f.get('filterType', None) == 'MIN_NOTIONAL'][0] min_qty = float(f['minQty']) min_qty = int(min_qty) if min_qty == int(min_qty) else min_qty min_price = float(q['minPrice']) min_price = int(min_price) if min_price == int(min_price) else min_price Aster.min_order_size = min_qty Aster.min_price = min_price Aster.min_notional = float(n['notional']) async def get_extend_exch_info(symbol_override: str | None = None): global Extend if symbol_override: Extend.symbol = utils.symbol_to_extend_fmt(symbol_override) r = await EXTEND_CLIENT.markets_info.get_markets_dict() Extend.min_order_size = float(r[Extend.symbol].trading_config.min_order_size) Extend.min_price = float(r[Extend.symbol].trading_config.min_price_change) ### CANCEL ORDERS ### async def aster_cancel_all_orders(): cancel_all_open_orders = { "url": "/fapi/v3/allOpenOrders", "method": "DELETE", "params": { 'symbol': Aster.symbol, } } r = await aster_auth.post_authenticated_url(cancel_all_open_orders) logging.info(f'ASTER CANCEL ALL OPEN ORDERS RESP: {r}') async def extend_cancel_all_orders(): r = await EXTEND_CLIENT.orders.mass_cancel(markets=[Extend.symbol]) logging.info(f'EXTEND CANCEL ALL OPEN ORDERS RESP: {r}') ### KILL ALGO ### async def kill_algo(): await aster_cancel_all_orders() await extend_cancel_all_orders() logging.info('ALGO KILL FLAG ACTIVATED; CANCELLING OPEN ORDERS AND SHUTTING DOWN') await output_algo_status('STOPPED') raise ValueError('KILL FLAG ACTIVATED') ### ALGO LOOP ### async def run_algo(): global Config global Algo_Status global Aster global Extend global Open_Symbols global Last_Aster_Fill_Time_Ts global Aster_Open_Orders global Extend_Open_Orders global Flags global Funding_Rates_Min_Remaining_Factor_Pcts try: while True: loop_start = time.time() # print('__________Start___________') ### Load Algo Config ### Config = json.loads(VAL_KEY.get('fr_orchestrator_output')) # ty:ignore[invalid-argument-type] Config = structs.Algo_Config(**Config) Config.Config.Max_Target_Notional = float(min([Aster.mult, Extend.mult]) * Config.Config.Target_Open_Cash_Position) min_time_to_funding = Config.Config.Min_Time_To_Funding_Minutes * 60 * 1000 ### Load Data from Feedhandlers ### best_symbol_by_exchange: dict = json.loads(s=VAL_KEY.get(name='fr_engine_best_fund_rate_output')) # ty:ignore[invalid-argument-type] best_symbol_by_exchange_aster = structs.Perpetual_Exchange(**best_symbol_by_exchange['ASTER']) best_symbol_by_exchange_extend = structs.Perpetual_Exchange(**best_symbol_by_exchange['EXTEND']) if Config.Overrides.Allow_Symbol_Change: await symbol_switch(best_symbol_by_exchange_aster=best_symbol_by_exchange_aster, best_symbol_by_exchange_extend=best_symbol_by_exchange_extend) ### Fund Rates aster_fund_rate_dict: Any = VAL_KEY.get('fund_rate_aster') aster_fund_rate_dict: dict = json.loads(s=aster_fund_rate_dict) if aster_fund_rate_dict is not None else {} if aster_fund_rate_dict.get('symbol', None) != Aster.symbol: aster_fund_rate: float = Aster.initial_funding_rate # logging.info(f'ASTER Symbol mismatch: {ASTER_FUND_RATE_DICT}; expected symbol: {ASTER.symbol}') # raise ValueError(f'ASTER Symbol mismatch: {ASTER_FUND_RATE_DICT}; expected symbol: {ASTER.symbol}') else: aster_fund_rate: float = float(aster_fund_rate_dict.get('funding_rate', 0)) extend_fund_rate_dict: Any = VAL_KEY.get('fund_rate_extended') extend_fund_rate_dict: dict = json.loads(s=extend_fund_rate_dict) if extend_fund_rate_dict is not None else {} if extend_fund_rate_dict.get('symbol', None) != Extend.symbol: extend_fund_rate: float = Extend.initial_funding_rate # logging.info(f'ASTER Symbol mismatch: {EXTENDED_FUND_RATE_DICT}; expected symbol: {EXTEND.symbol}') # raise ValueError(f'ASTER Symbol mismatch: {EXTENDED_FUND_RATE_DICT}; expected symbol: {EXTEND.symbol}') else: extend_fund_rate: float = float(extend_fund_rate_dict.get('funding_rate', 0)) if Config.Overrides.Flip_Side_For_Testing: aster_fund_rate = aster_fund_rate * -1 extend_fund_rate = extend_fund_rate * -1 aster_fund_rate_time = float(aster_fund_rate_dict.get('next_funding_time_ts_ms', 0)) aster_fund_rate_time = aster_fund_rate_time+(60*60*1000) if aster_fund_rate_time < (datetime.now().timestamp()*1000) else aster_fund_rate_time extend_fund_rate_time = max([float(extend_fund_rate_dict.get('next_funding_time_ts_ms', 0)), 0]) extend_fund_rate_time = extend_fund_rate_time+(60*60*1000) if extend_fund_rate_time < (datetime.now().timestamp()*1000) else extend_fund_rate_time now_ms = round(datetime.now().timestamp()*1000) time_to_funding_ms = min([aster_fund_rate_time, extend_fund_rate_time]) - now_ms time_to_funding_minutes = int(time_to_funding_ms/1000/60) min_between_fundings = round((abs(aster_fund_rate_time - extend_fund_rate_time) / 1000 / 60)) next_funding_at_same_time = min_between_fundings < 5 def calc_next_net_fund_rate(next_funding_at_same_time: bool, fund_rate_ast: float, fund_rate_ext: float) -> tuple[float, str, str]: if next_funding_at_same_time: net_fr = max([fund_rate_ast, fund_rate_ext]) - min([fund_rate_ast, fund_rate_ext]) fr_best_exch = 'ASTER' if max([abs(fund_rate_ast), abs(fund_rate_ext)]) == abs(fund_rate_ast) else 'EXTEND' fr_best_side = 'BUY' if net_fr < 0 else 'SELL' return net_fr, fr_best_exch, fr_best_side else: fr_best_exch = 'EXTEND' fr_best_side = 'BUY' if fund_rate_ext < 0 else 'SELL' return fund_rate_ext, fr_best_exch, fr_best_side next_net_funding_rate, fr_best_exch, fr_best_side = calc_next_net_fund_rate(next_funding_at_same_time, fund_rate_ast=aster_fund_rate, fund_rate_ext=extend_fund_rate) Flags.NET_FUNDING_IS_ZERO = ( next_net_funding_rate >= ( (Config.Config.Min_Fund_Rate_Pct_To_Trade*-1) / 100) ) and ( next_net_funding_rate <= ( Config.Config.Min_Fund_Rate_Pct_To_Trade / 100 ) ) # Tickers aster_ticker_dict: Any = VAL_KEY.get('fut_ticker_aster') aster_ticker_dict: dict = json.loads(s=aster_ticker_dict) if aster_ticker_dict is not None else {} if ( aster_ticker_dict.get('symbol', None) != Aster.symbol ) and not(Config.Overrides.Flatten_Open_Positions): logging.warning(f'ASTER Symbol mismatch: {aster_ticker_dict}; expected symbol: {Aster.symbol}') VAL_KEY.set(name='fr_algo_working_symbol', value=json.dumps(obj={'ASTER': asdict(obj=Aster), 'EXTEND': asdict(obj=Extend)})) time.sleep(5) continue # raise ValueError(f'ASTER Symbol mismatch: {ASTER_TICKER_DICT}; expected symbol: {ASTER.symbol}') extend_ticker_dict: Any = VAL_KEY.get('fut_ticker_extended') extend_ticker_dict: dict = json.loads(s=extend_ticker_dict) if extend_ticker_dict is not None else {} if ( extend_ticker_dict.get('symbol', None) != Extend.symbol) and not(Config.Overrides.Flatten_Open_Positions): logging.warning(f'EXTEND Symbol mismatch: {extend_ticker_dict}; expected symbol: {Extend.symbol}') VAL_KEY.set(name='fr_algo_working_symbol', value=json.dumps(obj={'ASTER': asdict(obj=Aster), 'EXTEND': asdict(obj=Extend)})) time.sleep(5) continue # raise ValueError(f'EXTEND Symbol mismatch: {EXTENDED_TICKER_DICT}; expected symbol: {EXTEND.symbol}') ### Load Local Notional Updates from WS ### aster_ws_pos_updates: Any = VAL_KEY.get(name='fr_aster_user_positions') aster_ws_pos_updates: list = json.loads(s=aster_ws_pos_updates) if aster_ws_pos_updates is not None else [] extend_ws_pos_updates: Any = VAL_KEY.get('fr_extended_user_positions') extend_ws_pos_updates: list = json.loads(extend_ws_pos_updates) if extend_ws_pos_updates is not None else [] if len(aster_ws_pos_updates) > 0: await get_aster_notional_position(resp=aster_ws_pos_updates) if len(extend_ws_pos_updates) > 0: await get_extend_notional(resp=extend_ws_pos_updates) ### Load Local Order Updates from WS ### aster_ws_order_updates: Any = VAL_KEY.get('fr_aster_user_orders') aster_ws_order_updates: list = json.loads(aster_ws_order_updates) if aster_ws_order_updates is not None else [] extend_ws_order_updates: Any = VAL_KEY.get('fr_extended_user_orders') extend_ws_order_updates: list = json.loads(extend_ws_order_updates) if extend_ws_order_updates is not None else [] ### Update Local Open Orders w Changes from WS ### Aster_Open_Orders = await handle_order_updates(exch='ASTER', local_open_orders=Aster_Open_Orders, ws_open_orders=aster_ws_order_updates, ws_pos_updates=aster_ws_pos_updates) Extend_Open_Orders = await handle_order_updates(exch='EXTEND', local_open_orders=Extend_Open_Orders, ws_open_orders=extend_ws_order_updates, ws_pos_updates=extend_ws_pos_updates) ### CHECK NO MORE THAN 1 OPEN ORDER ON EITHER EXCHANGE ### if len(Aster_Open_Orders) > 1 or len(Extend_Open_Orders) > 1: logging.info(f'MORE THAN 1 ORDER OPEN - KILLING ALGO: ASTER_OPEN_ORDERS ({len(Aster_Open_Orders)}): {Aster_Open_Orders}; EXTEND_OPEN_ORDERS ({len(Extend_Open_Orders)}): {Extend_Open_Orders}') await kill_algo() raise ValueError('NOT HERE: MORE THAN 1 ORDER OPEN - KILLING ALGO: ASTER_OPEN_ORDERS') ### Decisions ### fr_factor: Decimal = get_fr_factor_by_minute(time_to_funding_minutes, Funding_Rates_Min_Remaining_Factor_Pcts) funding_rate_factored = Decimal(str(next_net_funding_rate)) * fr_factor funding_rate_switch_net, _, _ = calc_next_net_fund_rate(best_symbol_by_exchange_aster.fund_rate_at_same_time, fund_rate_ast=float(best_symbol_by_exchange_aster.initial_funding_rate), fund_rate_ext=float(best_symbol_by_exchange_extend.initial_funding_rate)) funding_rate_switch_net_factored = Decimal(str(funding_rate_switch_net)) * fr_factor # funding_rate_switch_net_factored = Decimal(str(funding_rate_switch_net)) * Decimal('15') signal: Signal = signal_alpha_over_taker( Aster=Aster, Extend=Extend, aster_ticker_dict=aster_ticker_dict, extend_ticker_dict=extend_ticker_dict, funding_rate_exch=fr_best_exch, funding_rate_side=fr_best_side, funding_rate=funding_rate_factored, funding_rate_switch=funding_rate_switch_net_factored, alpha_hurdle_adj=Decimal(str(Config.Config.Min_Fund_Rate_Pct_To_Trade)), ) Algo_Status.expected_alpha = float(signal.expected_alpha) Algo_Status.model_ratio = float(signal.model_ratio) Algo_Status.current_ratio = float(signal.current_ratio) if signal.signal: ### True signal, standard target alpha_target_notional = Decimal(str(Config.Config.Max_Target_Notional)) else: ### False signal, set target equal to current position if signal.exchange == 'ASTER': alpha_target_notional = Decimal(str(Aster.notional_position)) else: alpha_target_notional = Decimal(str(Aster.notional_position*-1)) ### Apply Overrides ### if signal.exchange == 'ASTER': aster_tgt = alpha_target_notional else: aster_tgt = Decimal(str(alpha_target_notional*-1)) if Config.Overrides.Flatten_Open_Positions: aster_tgt = Decimal('0.00') elif Config.Overrides.Flatten_Open_Positions_Opportunistic: if signal.signal: if signal.exchange == 'EXTEND' and Decimal(str(Aster.notional_position)) > 0: aster_tgt = Decimal('0.00') if signal.exchange == 'EXTEND' and Decimal(str(Aster.notional_position)) < 0: pass if signal.exchange == 'ASTER' and Decimal(str(Aster.notional_position)) > 0: pass if signal.exchange == 'ASTER' and Decimal(str(Aster.notional_position)) < 0: aster_tgt = Decimal('0.00') aster_target = Target( exchange = 'ASTER', symbol = Aster.symbol, notional_tgt = aster_tgt, ) extend_target = Target( exchange = 'EXTEND', symbol = Extend.symbol, notional_tgt = Decimal(str(Aster.notional_position*-1)), ) ### Logging ### def print_summary(use_logging: bool = False): OUT: Any = logging.info if use_logging else print # ASTER: [ Available Collateral: {ASTER_AVAIL_COLLATERAL:.4f} ] | EXTEND: [ Available Collateral: {EXTEND_AVAIL_COLLATERAL:.4f} ] OUT(f''' FLIP SIDES FOR TESTING?: {Config.Overrides.Flip_Side_For_Testing}; ASTER ORDER ENABLED? {Config.Overrides.Allow_Ordering_Aster}; EXTEND ORDER ENABLED? {Config.Overrides.Allow_Ordering_Extend} MKT : Aster: {Aster.symbol:<10} (best: {best_symbol_by_exchange_aster.symbol}) | Extend: {Extend.symbol:<10} (best: {best_symbol_by_exchange_extend.symbol}) FR SWITCH : {funding_rate_switch_net:.6%} [{funding_rate_switch_net*10_000:.2f}bps] [{funding_rate_switch_net*1_000_000:.0f}pips] {pd.to_datetime(aster_fund_rate_time, unit='ms')} ({(pd.to_datetime(aster_fund_rate_time, unit='ms')-datetime.now()):}) | {pd.to_datetime(extend_fund_rate_time, unit='ms')} ({(pd.to_datetime(extend_fund_rate_time, unit='ms')-datetime.now()):}) ASTER: {aster_fund_rate:.6%} [{aster_fund_rate*10_000:.2f}bps] [{aster_fund_rate*1_000_000:.0f}pips] | EXTEND: {extend_fund_rate:.6%} [{extend_fund_rate*10_000:.2f}bps] [{extend_fund_rate*1_000_000:.0f}pips] ASTER: {'LONG PAYS SHORT' if aster_fund_rate > 0 else 'SHORT PAYS LONG'} | EXTEND: {'LONG PAYS SHORT' if extend_fund_rate > 0 else 'SHORT PAYS LONG'} ASTER: [ Notional Position $ : {Aster.notional_position:.4f} ] | EXTEND: [ Notional Position $ : {Extend.notional_position:.4f} ] SAME TIME? : {next_funding_at_same_time} [ Minutes Between Fundings: {min_between_fundings} ] NET FUNDING : {next_net_funding_rate:.6%} [{next_net_funding_rate*10_000:.2f}bps] [{next_net_funding_rate*1_000_000:.0f}pips]; Is Zero?: {Flags.NET_FUNDING_IS_ZERO} [Min: {Config.Config.Min_Fund_Rate_Pct_To_Trade}] FR BEST : {fr_best_side:<5} at {fr_best_exch}; FR Factor %: {fr_factor:.4%} * {next_net_funding_rate:.4%} = {float(fr_factor)*float(next_net_funding_rate):.6%} ALPHA SIDE : {signal.side:<5} at {signal.exchange} ALPHA ALPHA : {signal.expected_alpha: .6f} [{signal.expected_alpha*100:.2f}bps] [{signal.expected_alpha*10_000:.2f}pips]; Current {signal.current_ratio:.6f} [{signal.current_ratio*10_000:.2f}scl] {">" if signal.side=='BUY' else "<"} Model {signal.model_ratio:.6f} [{signal.model_ratio*10_000:.2f}scl] ALPHA St.Dev: Buy Ratio Std: {Aster.buy_ratio_std:.4%} *** ALPHA SIGNAL: {signal.signal} *** ASTER: [ Notional Position $ : {Aster.notional_position:05.4f} ] | EXTEND: [ Notional Position $ : {Extend.notional_position:05.4f} ] ASTER: {Aster.notional_position:05.4f} -> {aster_target.notional_tgt:05.4f} [ Remain: {aster_target.notional_tail:05.4f} ] | EXTEND: {Extend.notional_position:05.4f} -> {extend_target.notional_tgt:05.4f} [ Remain: {extend_target.notional_tail:05.4f} ] ASTER: {aster_target.notional_tgt:.2f} - {Aster.notional_position:.2f} + {Aster.unrealized_pnl:.2f} = {aster_target.notional_tail:2f} | EXTEND: {extend_target.notional_tgt:.2f} - {Extend.notional_position:.2f} + {Extend.unrealized_pnl:.2f} = {extend_target.notional_tail:2f} TGT NOTIONAL: $ {abs(alpha_target_notional):.2f}; Flatten Open Positions Flag? {Config.Overrides.Flatten_Open_Positions}; Opportunistic? {Config.Overrides.Flatten_Open_Positions_Opportunistic} --- ASTER OPEN ORDERS --- {Aster_Open_Orders} --- EXTEND OPEN ORDERS --- {Extend_Open_Orders} ''') if Config.Logging.Log_Summary_Each_Loop: print_summary(use_logging=True) if Config.Logging.Print_Summary_Each_Loop: print_summary(use_logging=False) ### ASTER async def aster_order_logic(): skip = False side = aster_target.side price_worsener: Decimal = Decimal(str(Aster.min_price)) * Decimal(str(int(Config.Config.Price_Worsener_Aster))) if side == 'BUY': tob_px: Decimal = Decimal(str(aster_ticker_dict['best_bid_px'])) price: Decimal = tob_px - price_worsener else: tob_px: Decimal = Decimal(str(aster_ticker_dict['best_ask_px'])) price: Decimal = tob_px + price_worsener qty: Decimal = aster_target.base_tail(price=price) ### CHECKS ### if abs( (qty*price) + Decimal(str(Aster.notional_position)) ) > Config.Config.Max_Target_Notional*Config.Config.Max_Order_Over_Notional_Ratio: logging.info(f'TRYING TO ORDER OVER MAX NOTIONAL - ASTER: {Aster.notional_position} + {(qty*price) + Decimal(str(Aster.notional_position))} (qty: {aster_target.base_tail(price=price)}; px: {price})') skip = True await kill_algo() if not(aster_target.is_orderable(price=price)) and Aster_Open_Orders: logging.info('ASTER HAS NO TAIL BUT OPEN ORDERS - CANCELLING OPEN ORDERS; SKIPPING') skip = True await aster_cancel_all_orders() if aster_target.is_orderable(price=price) and aster_target.base_tail(price=price) == Decimal('0.00'): logging.info(f'ASTER TRYNG TO ORDER 0.00 BASE QTY, SKIPPING: base_qty: {aster_target.base_tail(price=price)}; {aster_target}') skip = True await kill_algo() if aster_target.is_orderable(price=price) and price == Decimal(str(0.00)).quantize(Decimal(str(aster_target.min_order_price)), rounding="ROUND_DOWN"): logging.info(f'ASTER TRYNG TO ORDER with A PRICE OF 0.00, SKIPPING: {aster_target}') skip = True await kill_algo() ### Order Path ### if aster_target.is_orderable(price=price) and not(skip): if Aster_Open_Orders: # Cancel Open Order? open_order_dict = dict(Aster_Open_Orders[0]) open_order_id = str(open_order_dict['order_id']) try: open_order_px = float(open_order_dict['price']) except Exception as e: logging.critical(f'Aster cant find price on order obj: {open_order_dict}; e: {e}') await kill_algo() if Decimal(str(float(open_order_px))) == price: skip = True if Config.Logging.Print_Summary_Each_Loop: print('ASTER OPEN ORDER NO PX CHG; SKIPPING') else: await cancel_aster_order(open_order_id) skip = False if not (skip): print_summary(use_logging=True) await post_aster_order( symbol=Aster.symbol, side=side, qty=qty, price=price, reduceOnly=aster_target.is_reduce_only(price=price), postOnly=True, ) ### EXTEND async def extend_order_logic(): skip = False side = extend_target.side time_since_last_aster_fill_ms = ( datetime.now().timestamp()*1000 ) - Last_Aster_Fill_Time_Ts if time_since_last_aster_fill_ms > ( 1000 * Config.Config.Switch_To_Taker_Seconds ): # Change to allow taker orders if its been more than x seconds post_only = False price_worsener: Decimal = Decimal(str(Extend.min_price)) * Decimal(str(int(Config.Config.Price_Worsener_Extend))) else: post_only = True price_worsener = Decimal('0') if side == 'BUY': tob_px: Decimal = Decimal(str(extend_ticker_dict['best_bid_px'])) price: Decimal = tob_px - price_worsener else: tob_px: Decimal = Decimal(str(extend_ticker_dict['best_ask_px'])) price: Decimal = tob_px + price_worsener qty: Decimal = extend_target.base_tail(price=price) ### CHECKS ### if abs( (qty*price) + Decimal(str(Extend.notional_position)) ) > Config.Config.Max_Target_Notional*Config.Config.Max_Order_Over_Notional_Ratio: logging.info(f'TRYING TO ORDER OVER MAX NOTIONAL - EXTEND: {Extend.notional_position} + {(qty*price) + Decimal(str(Extend.notional_position))} (qty: {extend_target.base_tail(price=price)}; px: {price})') skip = True await kill_algo() if not(extend_target.is_orderable(price=price)) and Extend_Open_Orders: logging.info('EXTEND HAS NO TAIL BUT OPEN ORDERS - CANCELLING OPEN ORDERS; SKIPPING') skip = True await extend_cancel_all_orders() if extend_target.is_orderable(price=price) and extend_target.base_tail(price=price) == Decimal('0.00'): logging.info(f'EXTEND TRYNG TO ORDER 0.00 BASE QTY, SKIPPING: base_qty: {extend_target.base_tail(price=price)}; {extend_target}') skip = True await kill_algo() if extend_target.is_orderable(price=price) and price == Decimal(str(0.00)).quantize(Decimal(str(extend_target.min_order_price)), rounding="ROUND_DOWN"): logging.info(f'EXTEND TRYNG TO ORDER with A PRICE OF 0.00, SKIPPING: {extend_target}') skip = True await kill_algo() ### Order Path ### if extend_target.is_orderable(price=price) and not(skip): open_order_id = None if Extend_Open_Orders: # Cancel Open Order? open_order_dict = dict(Extend_Open_Orders[0]) open_order_id = str(open_order_dict['order_id']) open_order_px = float(open_order_dict['price']) if Decimal(str(float(open_order_px))) == price: skip = True if Config.Logging.Print_Summary_Each_Loop: print('EXTEND OPEN ORDER NO PX CHG; SKIPPING') else: skip = False if not (skip): print_summary(use_logging=True) await post_extend_order( symbol=Extend.symbol, side=side, qty=qty, price=price, reduceOnly=extend_target.is_reduce_only(price=price), postOnly=post_only, cxl_prev_order_id=open_order_id ) ### Output Algo Status ### await output_algo_status('WORKING') ### CHECK TIME TO FUNDING AND WHETHER TO BE ACTIVE ### if ( time_to_funding_ms > min_time_to_funding ) and (not Aster_Open_Orders) and (not Extend_Open_Orders): logging.info(f'Outside action window (minutes) and no active order (sleeping for 5 sec): {pd.to_datetime(time_to_funding_ms, unit='ms').minute} > {pd.to_datetime(min_time_to_funding, unit='ms').minute}') time.sleep(5) else: await aster_order_logic() await extend_order_logic() if Aster_Open_Orders or Extend_Open_Orders: if Config.Logging.Print_Summary_Each_Loop: print(f'_____ Open Orders _____ (Algo Engine ms: {(time.time() - loop_start)*1000:.2f}); Continuing...') continue else: if Config.Logging.Print_Summary_Each_Loop: print(f'_____ End No Open Orders _____ (Algo Engine ms: {(time.time() - loop_start)*1000:.2f}); Sleeping for sec: {Config.Config.Loop_Sleep_Sec:.0f}') time.sleep(Config.Config.Loop_Sleep_Sec) except KeyboardInterrupt: logging.info('CANCELLING OPEN ORDERS') await output_algo_status('STOPPING') await kill_algo() except Exception as e: logging.error(traceback.format_exc()) logging.critical(f'*** ALGO ENGINE CRASHED: {e}') logging.info('CANCELLING OPEN ORDERS') await output_algo_status('STOPPING') utils.send_tg_alert(f'FR_ALGO_CRASHED: {str(e)}') await kill_algo() ### MAIN STARTUP ### async def main(): global EXTEND_CLIENT global VAL_KEY global CON global Config global Algo_Status global Aster global Extend global Open_Symbols global Funding_Rates_Min_Remaining_Factor_Pcts _, EXTEND_CLIENT = await extend_auth.create_auth_account_and_trading_client() VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') await set_comb_open_symbols() # Open_Symbols = ['HYPE-USD'] best_symbol_by_exchange: dict = json.loads(s=VAL_KEY.get(name='fr_engine_best_fund_rate_output')) # ty:ignore[invalid-argument-type] if Open_Symbols: logging.info(f'OPEN SYMBOLS: {Open_Symbols}') master_data = json.loads(s=VAL_KEY.get(name='fr_engine_best_fund_rate_master')) # ty:ignore[invalid-argument-type] open_symbol_to_work = Open_Symbols[0] current_pos_master_ast = [d for d in master_data if d.get('symbol_ext') == open_symbol_to_work][0] Aster, Extend = create_exchange_objs_from_dict(exchanges_dict=current_pos_master_ast) Open_Symbols.pop(0) else: Aster = structs.Perpetual_Exchange(**best_symbol_by_exchange['ASTER']) Extend = structs.Perpetual_Exchange(**best_symbol_by_exchange['EXTEND']) # await get_aster_exch_info(symbol_override=Open_Symbols[0]) # await get_extend_exch_info(symbol_override=Open_Symbols[0]) with open('algo_config.json', mode='r', encoding='utf-8') as file: Config = json.load(file) Config = structs.Algo_Config(**Config) Config.Config.Max_Target_Notional = float(min([Aster.mult, Extend.mult]) * Config.Config.Target_Open_Cash_Position) # logging.info(f'Initial Algo Config: {ALGO_CONFIG}') VAL_KEY.set(name='fr_orchestrator_output', value=json.dumps(obj=Config.model_dump())) VAL_KEY.set(name='fr_algo_working_symbol', value=json.dumps(obj={'ASTER': asdict(obj=Aster), 'EXTEND': asdict(obj=Extend)})) Funding_Rates_Min_Remaining_Factor_Pcts = calc_fr_minutes_remaining_factor( min_start_procedure = 30, min_to_end_procedure = 7, factor_exp_pct = 0.50 ) Algo_Status = structs.Algo_Status( last_update_ts_ms = int(round(datetime.now().timestamp()*1000, 2)), status = 'WORKING', expected_alpha = 0.00, model_ratio = 0.00, current_ratio = 0.00, ) await output_algo_status('WORKING') async with engine.connect() as CON: ### ASTER SETUP ### # await get_aster_collateral() await get_aster_notional_position() await get_aster_exch_info() await get_aster_open_orders() ### EXTEND SETUP ### # await get_extend_collateral() await get_extend_notional() await get_extend_exch_info() await get_extend_open_orders() await run_algo() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") asyncio.run(main())