import asyncio import json import logging import math import os import time import traceback from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from decimal import ROUND_DOWN, Decimal from typing import AsyncContextManager from typing import Any import numpy as np import pandas as pd import requests # import talib import valkey from dotenv import load_dotenv from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine from x10.models.order import OrderSide import modules.utils as utils import modules.aster_auth as aster_auth import modules.extended_auth as extend_auth ### Database ### EXTEND_CLIENT = None CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo.log' ### CONSTANTS ### ASTER_ALLOW_ORDERING: bool = False EXTEND_ALLOW_ORDERING: bool = False LOOP_SLEEP_SEC = 1 PRICE_WORSENER_ASTER = 0.00 PRICE_WORSENER_EXTEND = 0.0 MIN_TIME_TO_FUNDING: int = 1000 * 60 * 7 # 5 minutes. ASTER_LH_ASSET: str = 'ETH' ASTER_RH_ASSET: str = 'USDT' ASTER_TICKER: str = ASTER_LH_ASSET + ASTER_RH_ASSET EXTEND_LH_ASSET: str = 'ETH' EXTEND_RH_ASSET: str = 'USD' EXTEND_TICKER: str = EXTEND_LH_ASSET + '-' + EXTEND_RH_ASSET TARGET_OPEN_CASH_POSITION: float = 10 # Each side (alpha and hedge) ### GLOBALS ### ASTER_MULT = 150 EXTEND_MULT = 50 MAX_TARGET_NOTIONAL = min([ASTER_MULT, EXTEND_MULT]) * TARGET_OPEN_CASH_POSITION ASTER_MIN_ORDER_QTY = 0.001 EXTEND_MIN_ORDER_QTY = 0.01 ASTER_AVAIL_COLLATERAL = 0 EXTEND_AVAIL_COLLATERAL = 0 ASTER_NOTIONAL_POSITION = 0 EXTEND_NOTIONAL_POSITION = 0 ASTER_OPEN_ORDERS = [] EXTEND_OPEN_ORDERS = [] # ASTER_OPEN_POSITIONS = [] # EXTEND_OPEN_POSITIONS = [] @dataclass(kw_only=True) class Valkey_Stream: channel: str data: Any = None none_fill: Any = None async def update(self): r = VAL_KEY.get(self.channel) self.data = json.loads(r) if r is not None else self.none_fill @dataclass(kw_only=True) class Position: market: str notional: float qty: float @dataclass(kw_only=True) class Open_Positions: Valkey: Valkey_Stream Positions: list[Position] = field(default_factory = list) async def update(self) -> None: self.Valkey = await self.Valkey.update() ### Collateral ### @dataclass(kw_only=True) class Asset: symbol: str balance: float # min_order_qty: float @dataclass(kw_only=True) class Collateral: Valkey: Valkey_Stream # Last_Updated_Ts_Ms: int # Last_Pulled_Ts_Ms: int Assets: list[Asset] = field(default_factory = list) async def update(self) -> None: self.Valkey = await self.Valkey.update() ### Orders ### @dataclass(kw_only=True) class Order: symbol: str order_id: str client_order_id: str side: str order_type: str original_qty: float original_price: float order_status: str last_filled_qty: float last_filled_price: float commission: float trade_is_maker: bool @dataclass(kw_only=True) class Order_Updates: # Last_Updated_Ts_Ms: int # Last_Pulled_Ts_Ms: int Valkey: Valkey_Stream Orders: list[Order] = field(default_factory = list) async def update(self) -> None: self.Valkey = await self.Valkey.update() ### Funding Rate ### @dataclass(kw_only=True) class Funding_Rate: # Last_Updated_Ts_Ms: int # Last_Pulled_Ts_Ms: int Valkey: Valkey_Stream timestamp_arrival: int timestamp_msg: int symbol: str funding_rate: float next_funding_time_ts_ms: int mark_price: float index_price: float estimated_settle_price: float async def update(self) -> None: self.Valkey = await self.Valkey.update() ### Markets Info ### @dataclass(kw_only=True) class Market: symbol: str min_order_qty: float @dataclass(kw_only=True) class Markets_Details: Markets: list[Market] = field(default_factory=list) ### Exchanges ### @dataclass(kw_only=True) class Perpetual_Exchange: Order_Updates: Order_Updates Position_Updates: Open_Positions Collateral_Updates: Collateral Funding_Rate: Funding_Rate Markets: Markets_Details mult: int lh_asset: str rh_asset: str symbol_asset_separator: str = '' symbol: str async def update(self): await self.Collateral_Updates.update() await self.Order_Updates.update() await self.Position_Updates.update() await self.Funding_Rate.update() def __post_init__(self) -> None: self.symbol = f'{self.lh_asset.upper()}{self.symbol_asset_separator}{self.rh_asset.upper()}' @dataclass(kw_only=True) class Aster(Perpetual_Exchange): name: str = 'Aster' lh_asset: str = 'ETH' rh_asset: str = 'USDT' def __post_init__(self): super().__post_init__() self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) @dataclass(kw_only=True) class Extend(Perpetual_Exchange): name: str = 'Extended' lh_asset: str = 'ETH' rh_asset: str = 'USD' symbol_asset_separator: str = '-' def __post_init__(self): super().__post_init__() self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) # EXCHANGES: list = [ Aster(), Extend() ] ### FLAGS ### @dataclass(kw_only=True) class Flags: LIQUIDATE_POS_AND_KILL_ALGO_FLAG: bool = False NET_FUNDING_IS_ZERO: bool = False Flags = Flags() ### UTILS ### def round_decimal_down(value, decimal_places): # Construct precision string like '0.01' for 2 places fmt = f'0.{"0" * decimal_places}' if decimal_places > 0 else '0' precision = Decimal(fmt) return Decimal(str(value)).quantize(precision, rounding=ROUND_DOWN) ### OPEN ORDERS ### async def get_aster_open_orders(): global ASTER_OPEN_ORDERS fut_acct_openOrders = { "url": "/fapi/v3/openOrders", "method": "GET", "params": {} } ASTER_OPEN_ORDERS = await aster_auth.post_authenticated_url(fut_acct_openOrders) async def get_extend_open_orders(): global EXTEND_OPEN_ORDERS EXTEND_OPEN_ORDERS = list(dict(await EXTEND_CLIENT.account.get_open_orders()).get('data', 0)) ### WALLLET ### async def get_aster_collateral(): global ASTER_AVAIL_COLLATERAL fut_acct_balances = { "url": "/fapi/v3/balance", "method": "GET", "params": {} } r = await aster_auth.post_authenticated_url(fut_acct_balances) ASTER_AVAIL_COLLATERAL = float([d for d in r if d.get('asset')==ASTER_RH_ASSET][0].get('availableBalance')) async def get_aster_notional_position(resp: dict | None = None): global ASTER_NOTIONAL_POSITION global ASTER_MULT if not resp: fut_acct_positionRisk = { "url": "/fapi/v3/positionRisk", "method": "GET", "params": { 'symbol': ASTER_TICKER, } } resp = await aster_auth.post_authenticated_url(fut_acct_positionRisk) d = [x for x in resp if x.get('symbol', None) == ASTER_TICKER][0] if len(d) < 1: logging.info(f'BAD NOTIONAL - ASTER CHANGE: Empty d: {d}; resp: {resp}') kill_algo() aster_unrealized_pnl = float(d['unrealized_pnl']) if d.get('unrealized_pnl') is not None else float(d['unRealizedProfit']) if d.get('notional') is not None: notional = float(d['notional']) else: notional = float(d['position_amount'])*float(d['entry_price']) previous_notional_position = ASTER_NOTIONAL_POSITION ASTER_NOTIONAL_POSITION = notional - aster_unrealized_pnl if not resp: ASTER_MULT = float(d['leverage']) if abs(ASTER_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: logging.info(f'BAD NOTIONAL - ASTER CHANGE: {ASTER_NOTIONAL_POSITION}; UR PNL: {aster_unrealized_pnl}; MULT: {ASTER_MULT}; d: {d}; resp: {resp}') kill_algo() if ASTER_NOTIONAL_POSITION != previous_notional_position: logging.info(f'ASTER NOTIONAL CHANGE: {ASTER_NOTIONAL_POSITION:.2f}; UR PNL: {aster_unrealized_pnl:.2f}; MULT: {ASTER_MULT:.0f}; resp: {bool(resp)}') async def get_extend_collateral(): global EXTEND_AVAIL_COLLATERAL get_bals = dict(dict(await EXTEND_CLIENT.account.get_balance()).get('data', {})) EXTEND_AVAIL_COLLATERAL = get_bals.get('available_for_trade', 0) if get_bals.get('collateral_name', None)==EXTEND_RH_ASSET else 0 async def get_extend_notional(resp: dict | None = None): global EXTEND_NOTIONAL_POSITION global EXTEND_MULT if not resp: resp = dict(await EXTEND_CLIENT.account.get_positions()).get('data', {}) pos_dict = [dict(d) for d in resp if dict(d).get('market') == EXTEND_TICKER] pos_dict = pos_dict[0] unrealized_pnl = pos_dict.get('unrealised_pnl', 0) previous_notional_position = EXTEND_NOTIONAL_POSITION EXTEND_NOTIONAL_POSITION = float(pos_dict.get('value', 0)) - float(unrealized_pnl) EXTEND_MULT = pos_dict.get('leverage', EXTEND_MULT) if EXTEND_NOTIONAL_POSITION != previous_notional_position: logging.info(f'EXTEND NOTIONAL CHANGE: {EXTEND_NOTIONAL_POSITION:.2f}; UR PNL: {unrealized_pnl:.2f}; MULT: {EXTEND_MULT:.0f}; resp: {bool(resp)}') ### EXCHANGE INFO ### async def get_aster_exch_info(): global ASTER_MIN_ORDER_QTY fut_acct_exchangeInfo = { "url": "/fapi/v3/exchangeInfo", "method": "GET", "params": {} } r = await aster_auth.post_authenticated_url(fut_acct_exchangeInfo) s = r['symbols'] d = [d for d in s if d.get('symbol', None) == 'ETHUSDT'][0] f = [f for f in d['filters'] if f.get('filterType', None) == 'LOT_SIZE'][0] ASTER_MIN_ORDER_QTY = float(f['minQty']) async def get_extend_exch_info(): global EXTEND_MIN_ORDER_QTY r = await EXTEND_CLIENT.markets_info.get_markets_dict() EXTEND_MIN_ORDER_QTY = float(r['ETH-USD'].trading_config.min_order_size) ### CANCEL ORDERS ### async def aster_cancel_all_orders(): cancel_all_open_orders = { "url": "/fapi/v3/allOpenOrders", "method": "DELETE", "params": { 'symbol': 'ETHUSDT', } } r = await aster_auth.post_authenticated_url(cancel_all_open_orders) logging.info(f'ASTER CANCEL ALL OPEN ORDERS RESP: {r}') async def extend_cancel_all_orders(): r = await EXTEND_CLIENT.orders.mass_cancel(markets=[EXTEND_TICKER]) logging.info(f'EXTEND CANCEL ALL OPEN ORDERS RESP: {r}') ### KILL ALGO ### async def kill_algo(): await aster_cancel_all_orders() await extend_cancel_all_orders() logging.info('ALGO KILL FLAG ACTIVATED; CANCELLING OPEN ORDERS AND SHUTTING DOWN') raise ValueError('KILL FLAG ACTIVATED') ### ROUTES ### # async def aster_remainder_route(): # # Check open orders...cancel replace or new order? # # Check collateral to confirm you have enough money to trade # # if CR, what should be the new price? has it changed? maybe no action needed? how long has it been working? # # if not enough collateral then need to liquidate and kill algo - flip flag # # if good to order, then create and post order. ADD to LOCAL OPEN ORDERS LIST # pass # async def extend_remainder_route(): # pass ### ALGO LOOP ### async def run_algo(): try: while True: loop_start = time.time() print('__________Start___________') ### Load Data from Feedhandlers ### ASTER_FUND_RATE_DICT = json.loads(VAL_KEY.get('fund_rate_aster')) EXTENDED_FUND_RATE_DICT = json.loads(VAL_KEY.get('fund_rate_extended')) ASTER_FUND_RATE = float(ASTER_FUND_RATE_DICT.get('funding_rate', 0)) EXTEND_FUND_RATE = float(EXTENDED_FUND_RATE_DICT.get('funding_rate', 0)) ASTER_FUND_RATE_TIME = float(ASTER_FUND_RATE_DICT.get('next_funding_time_ts_ms', 0)) EXTEND_FUND_RATE_TIME = float(EXTENDED_FUND_RATE_DICT.get('next_funding_time_ts_ms', 0)) ASTER_TICKER_DICT = json.loads(VAL_KEY.get('fut_ticker_aster')) EXTENDED_TICKER_DICT = json.loads(VAL_KEY.get('fut_ticker_extended')) ### Manage Local Collateral Using Updates from WS ### ASTER_WS_COLLATERAL_UPDATES = VAL_KEY.get('fr_aster_user_positions') ASTER_WS_COLLATERAL_UPDATES = json.loads(ASTER_WS_COLLATERAL_UPDATES) if ASTER_WS_COLLATERAL_UPDATES is not None else [] EXTEND_WS_COLLATERAL_UPDATES = VAL_KEY.get('fr_extended_user_positions') EXTEND_WS_COLLATERAL_UPDATES = json.loads(EXTEND_WS_COLLATERAL_UPDATES) if EXTEND_WS_COLLATERAL_UPDATES is not None else [] ### Manage Local Notionals Using Updates from WS ### ASTER_WS_POS_UPDATES = VAL_KEY.get('fr_aster_user_positions') ASTER_WS_POS_UPDATES = json.loads(ASTER_WS_POS_UPDATES) if ASTER_WS_POS_UPDATES is not None else [] EXTEND_WS_POS_UPDATES = VAL_KEY.get('fr_extended_user_positions') EXTEND_WS_POS_UPDATES = json.loads(EXTEND_WS_POS_UPDATES) if EXTEND_WS_POS_UPDATES is not None else [] ### Manage Local Orders Using Updates from WS ### ASTER_WS_ORDER_UPDATES = VAL_KEY.get('fr_aster_user_orders') ASTER_WS_ORDER_UPDATES = json.loads(ASTER_WS_ORDER_UPDATES) if ASTER_WS_ORDER_UPDATES is not None else [] EXTEND_WS_ORDER_UPDATES = VAL_KEY.get('fr_extended_user_orders') EXTEND_WS_ORDER_UPDATES = json.loads(EXTEND_WS_ORDER_UPDATES) if EXTEND_WS_ORDER_UPDATES is not None else [] # CHECK NO MORE THAN 1 OPEN ORDER ON EITHER EXCHANGE # if len(ASTER_OPEN_ORDERS) > 1 or len(EXTEND_OPEN_ORDERS) > 1: logging.info(f'MORE THAN 1 ORDER OPEN - KILLING ALGO: ASTER_OPEN_ORDERS ({len(ASTER_OPEN_ORDERS)}): {ASTER_OPEN_ORDERS}; EXTEND_OPEN_ORDERS ({len(EXTEND_OPEN_ORDERS)}): {EXTEND_OPEN_ORDERS}') await kill_algo() raise ValueError('NOT HERE: MORE THAN 1 ORDER OPEN - KILLING ALGO: ASTER_OPEN_ORDERS') ### CHECK TIME TO FUNDING AND WHETHER TO BE ACTIVE ### now_ms = round(datetime.now().timestamp()*1000) time_to_funding_ms = min([ASTER_FUND_RATE_TIME, EXTEND_FUND_RATE_TIME]) - now_ms if ( time_to_funding_ms > MIN_TIME_TO_FUNDING ) and (not ASTER_OPEN_ORDERS) and (not EXTEND_OPEN_ORDERS): print(f'Outside action window (minutes) and no active order (sleeping for 5 sec): {pd.to_datetime(time_to_funding_ms, unit='ms').minute} > {pd.to_datetime(MIN_TIME_TO_FUNDING, unit='ms').minute}') time.sleep(5) continue if len(ASTER_WS_POS_UPDATES) > 0: await get_aster_notional_position(resp=ASTER_WS_POS_UPDATES) ###### *** returned 0 notional even though had a position, need to handle and safety check to not order above max notional. if len(EXTEND_WS_POS_UPDATES) > 0: await get_extend_notional(resp=EXTEND_WS_POS_UPDATES) if ASTER_WS_ORDER_UPDATES is not None: for idx, o in enumerate(ASTER_OPEN_ORDERS): order_id = o.get('order_id') if o.get('order_id') is not None else o.get('orderId') order_orig_status = o['status'] order_update = [ou for ou in ASTER_WS_ORDER_UPDATES if ou.get('order_id', None) == order_id] if len(order_update) > 0: order_update = order_update[0] order_update_status = order_update.get('status') if order_update.get('status') is not None else order_update.get('order_status') order_status_changed = order_orig_status.upper() != order_update_status.upper() if order_status_changed: logging.info(f'ASTER ORDER ({order_id}): {order_orig_status} -> {order_update_status}') ASTER_OPEN_ORDERS[idx] = order_update if order_update_status in ['CANCELED','EXPIRED']: logging.info(f'ASTER ORDER CANCELLED or EXPIRED: {order_id}') ASTER_OPEN_ORDERS.pop(idx) elif order_update_status in ['PARTIALLY_FILLED']: logging.info(f'ASTER ORDER PARTIALLY FILLED: {order_id}') await get_aster_collateral() await get_aster_notional_position() elif order_update_status in ['FILLED']: logging.info(f'ASTER ORDER FILLED: {order_id}') ASTER_OPEN_ORDERS.pop(idx) await get_aster_collateral() await get_aster_notional_position() else: logging.critical(f'EXTEND ORDER STATUS CHG TO UNEXPECTED VALUE, KILLING... ({order_id}): {order_orig_status} -> {order_update_status}') if EXTEND_WS_ORDER_UPDATES is not None: for idx, o in enumerate(EXTEND_OPEN_ORDERS): o = dict(o) order_id = o.get('order_id') if o.get('order_id') is not None else o.get('id') order_orig_status = o['status'] order_update = [dict(ou) for ou in EXTEND_WS_ORDER_UPDATES if dict(ou).get('order_id', None) == order_id] if len(order_update) > 0: order_update = order_update[0] order_update_status = order_update.get('status') order_status_changed = order_orig_status.upper() != order_update_status.upper() if order_status_changed: logging.info(f'EXTEND ORDER ({order_id}): {order_orig_status} -> {order_update_status}') EXTEND_OPEN_ORDERS[idx] = order_update if order_update_status in ['CANCELLED','EXPIRED','REJECTED']: logging.info(f'EXTEND ORDER CANCELLED or EXPIRED: {order_id}') EXTEND_OPEN_ORDERS.pop(idx) elif order_update_status in ['PARTIALLY_FILLED']: logging.info(f'EXTEND ORDER PARTIALLY FILLED: {order_id}') await get_extend_collateral() await get_extend_notional() elif order_update_status in ['FILLED']: logging.info(f'EXTEND ORDER FILLED: {order_id}') EXTEND_OPEN_ORDERS.pop(idx) await get_extend_collateral() await get_extend_notional() else: logging.critical(f'EXTEND ORDER STATUS CHG TO UNEXPECTED VALUE, KILLING... ({order_id}): {order_orig_status} -> {order_update_status}') ASTER_PAYOUT_DIRECTION_STR = 'LONG PAYS SHORT' if ASTER_FUND_RATE > 0 else 'SHORT PAYS LONG' EXTEND_PAYOUT_DIRECTION_STR = 'LONG PAYS SHORT' if EXTEND_FUND_RATE > 0 else 'SHORT PAYS LONG' min_between_fundings = round((abs(ASTER_FUND_RATE_TIME - EXTEND_FUND_RATE_TIME) / 1000 / 60)) FUNDINGS_AT_SAME_TIME_NEXT_HR = min_between_fundings < 5 # FUNDINGS_AT_SAME_TIME_NEXT_HR = ( (ASTER_FUND_RATE_TIME < 60*60*1000) and (EXTEND_FUND_RATE < 60*60*1000) ) if ( abs(ASTER_FUND_RATE) > abs(EXTEND_FUND_RATE) ) and FUNDINGS_AT_SAME_TIME_NEXT_HR: ALPHA_EXCH = 'ASTER' ALPHA_FUND_RATE = ASTER_FUND_RATE else: ALPHA_EXCH = 'EXTEND' ALPHA_FUND_RATE = EXTEND_FUND_RATE if ALPHA_FUND_RATE < 0: ALPHA_CARRY_SIDE = 'BUY' ALPHA_TGT_NOTIONAL = MAX_TARGET_NOTIONAL else: ALPHA_CARRY_SIDE = 'SELL' ALPHA_TGT_NOTIONAL = MAX_TARGET_NOTIONAL*-1 def calc_next_net_fund_rate(FUNDINGS_AT_SAME_TIME_NEXT_HR: bool) -> float: if FUNDINGS_AT_SAME_TIME_NEXT_HR: return ASTER_FUND_RATE + EXTEND_FUND_RATE else: return EXTEND_FUND_RATE NEXT_NET_FUNDING_RATE = calc_next_net_fund_rate(FUNDINGS_AT_SAME_TIME_NEXT_HR) Flags.NET_FUNDING_IS_ZERO = NEXT_NET_FUNDING_RATE == 0.00 if Flags.NET_FUNDING_IS_ZERO: logging.info('NET FUNDING = 0.00; Cancelling Open Orders; Wait Until Non-Zero.') ALPHA_TGT_NOTIONAL = 0.00 if ALPHA_EXCH == 'EXTEND': ASTER_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL*-1 EXTEND_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL if ALPHA_CARRY_SIDE == 'BUY': ASTER_TOB_PX = float(ASTER_TICKER_DICT['best_ask_px']) EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_bid_px']) else: ASTER_TOB_PX = float(ASTER_TICKER_DICT['best_bid_px']) EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_ask_px']) else: ASTER_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL EXTEND_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL*-1 if ALPHA_CARRY_SIDE == 'BUY': ASTER_TOB_PX = float(ASTER_TICKER_DICT['best_bid_px']) EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_ask_px']) else: ASTER_TOB_PX = float(ASTER_TICKER_DICT['best_ask_px']) EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_bid_px']) ASTER_TGT_TAIL = ASTER_TGT_NOTIONAL - ASTER_NOTIONAL_POSITION EXTEND_TGT_TAIL = EXTEND_TGT_NOTIONAL - EXTEND_NOTIONAL_POSITION ASTER_TGT_TAIL_BASE_QTY = Decimal(str(float(ASTER_TGT_TAIL) / float(ASTER_TOB_PX))).quantize(Decimal(str(0.001)), rounding=ROUND_DOWN) EXTEND_TGT_TAIL_BASE_QTY = Decimal(str(float(EXTEND_TGT_TAIL) / float(EXTEND_TOB_PX))).quantize(Decimal(str(0.001)), rounding=ROUND_DOWN) ASTER_TGT_TAIL_ORDERABLE = abs(ASTER_TGT_TAIL_BASE_QTY) >= ASTER_MIN_ORDER_QTY EXTEND_TGT_TAIL_ORDERABLE = abs(EXTEND_TGT_TAIL_BASE_QTY) >= EXTEND_MIN_ORDER_QTY print(f''' {pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')-datetime.now()):}) | {pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')-datetime.now()):}) ASTER: {ASTER_FUND_RATE:.6%} [{ASTER_FUND_RATE*10_000:.2f}bps] [{ASTER_FUND_RATE*1_000_000:.0f}pips] | EXTEND: {EXTEND_FUND_RATE:.6%} [{EXTEND_FUND_RATE*10_000:.2f}bps] [{EXTEND_FUND_RATE*1_000_000:.0f}pips] ASTER: {ASTER_PAYOUT_DIRECTION_STR} | EXTEND: {EXTEND_PAYOUT_DIRECTION_STR} ASTER: [ Available Collateral: {ASTER_AVAIL_COLLATERAL:.4f} ] | EXTEND: [ Available Collateral: {EXTEND_AVAIL_COLLATERAL:.4f} ] ASTER: [ Notional Position $ : {ASTER_NOTIONAL_POSITION:.4f} ] | EXTEND: [ Notional Position $ : {EXTEND_NOTIONAL_POSITION:.4f} ] SAME TIME? : {FUNDINGS_AT_SAME_TIME_NEXT_HR} [ Minutes Between Fundings: {min_between_fundings} ] NET FUNDING : {NEXT_NET_FUNDING_RATE:.6%} [{NEXT_NET_FUNDING_RATE*10_000:.2f}bps] [{NEXT_NET_FUNDING_RATE*1_000_000:.0f}pips]; Is Zero?: {Flags.NET_FUNDING_IS_ZERO} ALPHA SIDE : {ALPHA_EXCH} [{ALPHA_CARRY_SIDE}] TGT NOTIONAL: $ {MAX_TARGET_NOTIONAL if not Flags.NET_FUNDING_IS_ZERO else 0.00} ASTER: {ASTER_NOTIONAL_POSITION:.4f} -> {ASTER_TGT_NOTIONAL:.2f} [ Remain: {ASTER_TGT_TAIL:.4f} ] | EXTEND: {EXTEND_NOTIONAL_POSITION:.4f} -> {EXTEND_TGT_NOTIONAL:.2f} [ Remain: {EXTEND_TGT_TAIL:.4f} ] ASTER: {ASTER_TGT_NOTIONAL:.4f} - {ASTER_NOTIONAL_POSITION:.4f} = Tail: {ASTER_TGT_TAIL:4f} | EXTEND: {EXTEND_TGT_NOTIONAL:.4f} - {EXTEND_NOTIONAL_POSITION:.4f} = Tail: {EXTEND_TGT_TAIL:4f} ASTER: {ASTER_TGT_TAIL_BASE_QTY:.4f} > {ASTER_MIN_ORDER_QTY:.4f} min [ Order: {ASTER_TGT_TAIL_ORDERABLE} ] | EXTEND: {EXTEND_TGT_TAIL_BASE_QTY:.4f} > {EXTEND_MIN_ORDER_QTY:.4f} min [ Order: {EXTEND_TGT_TAIL_ORDERABLE} ] --- ASTER OPEN ORDERS --- {ASTER_OPEN_ORDERS} --- EXTEND OPEN ORDERS --- {EXTEND_OPEN_ORDERS} ''') ### ROUTES ### # ASTER if ASTER_TGT_TAIL_ORDERABLE and ASTER_ALLOW_ORDERING: symbol = ASTER_TICKER side = 'BUY' if ASTER_TGT_TAIL_BASE_QTY > 0.00 else 'SELL' qty = str(abs(ASTER_TGT_TAIL_BASE_QTY)) price = ASTER_TOB_PX - PRICE_WORSENER_ASTER if side == 'BUY' else ASTER_TOB_PX + PRICE_WORSENER_ASTER if abs(abs(float(ASTER_TGT_TAIL_BASE_QTY))*float(price)) + abs(ASTER_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: pass logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - ASTER: {ASTER_NOTIONAL_POSITION} + {float(ASTER_TGT_TAIL_BASE_QTY)*float(price)} (qty: {float(ASTER_TGT_TAIL_BASE_QTY):.2f}; px: {float(price):.2f})') # await aster_remainder_route() if ASTER_OPEN_ORDERS: open_order_id = ASTER_OPEN_ORDERS[0].get('order_id') if ASTER_OPEN_ORDERS[0].get('order_id') is not None else ASTER_OPEN_ORDERS[0]['orderId'] open_order_px = float(ASTER_OPEN_ORDERS[0].get('price')) if ASTER_OPEN_ORDERS[0].get('price') is not None else float(ASTER_OPEN_ORDERS[0]['original_price']) if round(open_order_px - float(price), 2) == 0.00: logging.info('ASTER OPEN ORDER NO PX CHG; SKIPPING') place_order = False else: cancel_order = { "url": "/fapi/v3/order", "method": "DELETE", "params": { 'symbol': ASTER_TICKER, 'orderId': open_order_id, } } cr = await aster_auth.post_authenticated_url(cancel_order) if cr.get('status', None) == 'CANCELED': ASTER_OPEN_ORDERS.pop(0) place_order = True else: logging.warning(f'ASTER ORDER FAILED TO CANCEL DURING CR ({open_order_id}): RESP {cr}') place_order = False else: place_order = True if ASTER_TGT_TAIL_BASE_QTY == 0.00: place_order = False logging.info('ASTER TRYNG TO ORDER 0.00 BASE QTY, SKIPPING') if place_order: price = Decimal(str(price)).quantize(Decimal(str(0.01)), rounding=ROUND_DOWN) post_order = { "url": "/fapi/v3/order", "method": "POST", "params": { 'symbol': symbol, 'side': side, 'type': 'LIMIT', 'timeInForce': 'GTC', 'quantity': qty, 'price': price, } } order_resp = await aster_auth.post_authenticated_url(post_order) if order_resp.get('orderId', None) is not None: order_resp['original_price'] = price ASTER_OPEN_ORDERS.append(order_resp) utils.send_tg_alert(f'FR_ALGO - ASTER Order. Start_$: {ASTER_NOTIONAL_POSITION:.2f}; Value: {float(ASTER_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') logging.info(f'ASTER ORDER PLACED SUCCESS: {order_resp}') else: logging.warning('ASTER PLACE ORDER CHECKS FAILED, SKIPPING') elif not(ASTER_TGT_TAIL_ORDERABLE) and ASTER_OPEN_ORDERS: logging.info('ASTER HAS NO TAIL BUT OPEN ORDERS - CANCELLING OPEN ORDERS') await aster_cancel_all_orders() # EXTEND if EXTEND_TGT_TAIL_ORDERABLE and EXTEND_ALLOW_ORDERING: symbol = EXTEND_TICKER side = OrderSide.BUY if EXTEND_TGT_TAIL_BASE_QTY > 0.00 else OrderSide.SELL qty = Decimal(str(abs(EXTEND_TGT_TAIL_BASE_QTY))) price = EXTEND_TOB_PX - PRICE_WORSENER_EXTEND if side == 'BUY' else EXTEND_TOB_PX + PRICE_WORSENER_EXTEND if abs(float(EXTEND_TGT_TAIL_BASE_QTY)*float(price)) + abs(float(EXTEND_NOTIONAL_POSITION)) > MAX_TARGET_NOTIONAL*1.01: logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - EXTEND: {EXTEND_NOTIONAL_POSITION:.2f} + {float(EXTEND_TGT_TAIL_BASE_QTY)*float(price):.2f} (qty: {float(EXTEND_TGT_TAIL_BASE_QTY):.2f}; px: {float(price):.2f})') pass # await extend_remainder_route() if EXTEND_OPEN_ORDERS: open_order_dict = dict(EXTEND_OPEN_ORDERS[0]) open_order_id = open_order_dict['external_id'] open_order_px = float(open_order_dict['price']) place_order = True else: open_order_id = None open_order_px = 0 place_order = True if place_order: price = Decimal(str(price)).quantize(Decimal(str(0.01)), rounding=ROUND_DOWN) if round(open_order_px - float(price), 2) == 0.00: logging.info('EXTEND OPEN ORDER NO PX CHG; SKIPPING') else: order_resp = await EXTEND_CLIENT.place_order( market_name=symbol, amount_of_synthetic=qty, price=price, side=side, taker_fee=Decimal("0.00025"), previous_order_id=open_order_id, ) order_resp_dict = dict(order_resp) if order_resp_dict.get('status', None) == 'OK': if EXTEND_OPEN_ORDERS: EXTEND_OPEN_ORDERS.pop(0) order_dict = dict(order_resp_dict['data']) order_dict['status'] = 'NEW' order_dict['price'] = str(price) EXTEND_OPEN_ORDERS.append(order_dict) utils.send_tg_alert(f'FR_ALGO - EXTEND Order. Start_$: {EXTEND_NOTIONAL_POSITION:.2f}; Value: {float(EXTEND_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') logging.info(f'EXTEND ORDER PLACED SUCCESS: {order_dict}') else: logging.warning('EXTEND PLACE ORDER CHECKS FAILED, SKIPPING') elif not(EXTEND_TGT_TAIL_ORDERABLE) and EXTEND_OPEN_ORDERS: logging.info('EXTEND HAS NO TAIL BUT OPEN ORDERS - CANCELLING OPEN ORDERS') await extend_cancel_all_orders() print(f'__________ End ___________ (Algo Engine ms: {(time.time() - loop_start)*1000})') time.sleep(LOOP_SLEEP_SEC) except KeyboardInterrupt: logging.info('CANCELLING OPEN ORDERS') await kill_algo() except Exception as e: logging.error(traceback.format_exc()) logging.critical(f'*** ALGO ENGINE CRASHED: {e}') logging.info('CANCELLING OPEN ORDERS') utils.send_tg_alert(f'FR_ALGO_CRASHED: {str(e)}') await kill_algo() ### MAIN STARTUP ### async def main(): global EXTEND_CLIENT global VAL_KEY global CON _, EXTEND_CLIENT = await extend_auth.create_auth_account_and_trading_client() VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: ### ASTER SETUP ### await get_aster_collateral() await get_aster_notional_position() await get_aster_exch_info() await get_aster_open_orders() ### EXTEND SETUP ### await get_extend_collateral() await get_extend_notional() await get_extend_exch_info() await get_extend_open_orders() await run_algo() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") asyncio.run(main())