diff --git a/modules/apex_api.py b/_On_Ice/apex_api.py similarity index 100% rename from modules/apex_api.py rename to _On_Ice/apex_api.py diff --git a/mexc.ipynb b/_On_Ice/mexc.ipynb similarity index 100% rename from mexc.ipynb rename to _On_Ice/mexc.ipynb diff --git a/ws_apex.py b/_On_Ice/ws_apex.py similarity index 100% rename from ws_apex.py rename to _On_Ice/ws_apex.py diff --git a/ws_mexc.py b/_On_Ice/ws_mexc.py similarity index 100% rename from ws_mexc.py rename to _On_Ice/ws_mexc.py diff --git a/algo.ipynb b/algo.ipynb new file mode 100644 index 0000000..e69de29 diff --git a/algo_orchestrator.py b/algo_orchestrator.py new file mode 100644 index 0000000..b8c7316 --- /dev/null +++ b/algo_orchestrator.py @@ -0,0 +1,80 @@ +import asyncio +import json +import logging +import os +import time +import traceback +from datetime import datetime +from typing import AsyncContextManager + +import valkey +from dotenv import load_dotenv +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine + + +### Database ### +EXTEND_CLIENT = None +CON: AsyncContextManager | None = None +VAL_KEY = None +VK_IN = 'fr_orchestrator_input' +VK_OUT = 'fr_orchestrator_output' + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo_Orchestrator.log' + +### ALGO GLOBALS ### +ASTER_ALLOW_ORDERING: bool = True +EXTEND_ALLOW_ORDERING: bool = True +LOOP_SLEEP_SEC = 1 + +async def orchestrator() -> None: + try: + VK_PUBSUB = VAL_KEY.pubsub() + VK_PUBSUB.subscribe(VK_IN) + + print(f"Subscribed to '{VK_IN}'. Waiting for messages...") + + for message in VK_PUBSUB.listen(): + # Valkey sends a 'subscribe' message first; we usually want to skip it + if message['type'] == 'message': + data = message['data'] + channel = message['channel'] + print(f"[{channel}] Received: {data}") + + except valkey.exceptions.ConnectionError as e: + print(f"Could not connect to Valkey. Please check the publish server is up; {e}") + except KeyboardInterrupt: + logging.info('ORCHESTRATOR SHUTTING DOWN...') + except Exception as e: + logging.error(traceback.format_exc()) + logging.critical(f'*** ORCHESTRATOR CRASHED: {e}') + +### MAIN STARTUP ### +async def main() -> None: + global EXTEND_CLIENT + global VAL_KEY + global CON + + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') + + async with engine.connect() as CON: + await orchestrator() + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + asyncio.run(main()) \ No newline at end of file diff --git a/apex.ipynb b/apex.ipynb deleted file mode 100644 index 4c3a02c..0000000 --- a/apex.ipynb +++ /dev/null @@ -1,198 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 2, - "id": "b6c46d40", - "metadata": {}, - "outputs": [], - "source": [ - "import modules.apex_api as apex_api" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "7fb6d9dc", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Authenticating...\n", - "...Authenticated\n" - ] - } - ], - "source": [ - "client = apex_api.apex_create_client()" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "d5a1203a", - "metadata": {}, - "outputs": [], - "source": [ - "# print(\"*** POSTING ORDER ***\")\n", - "# createOrderRes = client.create_order_v3(\n", - "# symbol=\"ETH-USDT\", \n", - "# side=\"BUY\",\n", - "# type=\"LIMIT\",\n", - "# size=\"0.01\",\n", - "# price=\"2100\",\n", - "# )\n", - "# print(createOrderRes)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c21254eb", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'data': {'totalEquityValue': '13.840000000000000000',\n", - " 'availableBalance': '13.840000000000000000',\n", - " 'initialMargin': '0',\n", - " 'maintenanceMargin': '0',\n", - " 'walletBalance': '',\n", - " 'realizedPnl': '-5.399416243793950000',\n", - " 'unrealizedPnl': '0.00',\n", - " 'totalRisk': '0',\n", - " 'totalValueWithoutDiscount': '13.840000000000000000',\n", - " 'liabilities': '13.840000000000000000',\n", - " 'totalAvailableBalance': '13.840000000000000000'},\n", - " 'timeCost': 6327944}" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "client.get_account_balance_v3()" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "id": "7cba63d4", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'data': [], 'timeCost': 3984811}" - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "client.open_orders_v3()" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "b072c0de", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'timeCost': 4389124}" - ] - }, - "execution_count": 12, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "client.delete_open_orders_v3(symbol=\"ETH-USDT\")" - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "5ea177f8", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "TOKEN: USDT == 13.840000000000000000\n", - "TOKEN: USDC == 0.000000000000000000\n" - ] - } - ], - "source": [ - "account_and_pos = client.get_account_v3()\n", - "for c in account_and_pos['contractWallets']:\n", - " print(f'TOKEN: {c['token']} == {c['balance']}')" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "70eb3b4f", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "fefca500", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "dc048386", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "py_313", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.13.12" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/main.py b/main.py index 8ce916e..432d5ea 100644 --- a/main.py +++ b/main.py @@ -36,8 +36,8 @@ load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo.log' ### CONSTANTS ### -ASTER_ALLOW_ORDERING: bool = True -EXTEND_ALLOW_ORDERING: bool = True +ASTER_ALLOW_ORDERING: bool = False +EXTEND_ALLOW_ORDERING: bool = False LOOP_SLEEP_SEC = 1 PRICE_WORSENER_ASTER = 0.00 PRICE_WORSENER_EXTEND = 0.0 @@ -102,6 +102,7 @@ class Open_Positions: class Asset: symbol: str balance: float + # min_order_qty: float @dataclass(kw_only=True) class Collateral: @@ -157,50 +158,77 @@ class Funding_Rate: async def update(self) -> None: self.Valkey = await self.Valkey.update() +### Markets Info ### +@dataclass(kw_only=True) +class Market: + symbol: str + min_order_qty: float + +@dataclass(kw_only=True) +class Markets_Details: + Markets: list[Market] = field(default_factory=list) + + ### Exchanges ### @dataclass(kw_only=True) class Perpetual_Exchange: Order_Updates: Order_Updates Position_Updates: Open_Positions - Collateral: Collateral + Collateral_Updates: Collateral Funding_Rate: Funding_Rate + Markets: Markets_Details mult: int - + lh_asset: str + rh_asset: str + symbol_asset_separator: str = '' + symbol: str + async def update(self): - await self.Collateral.update() - await self.Orders.update() - await self.Positions.update() + await self.Collateral_Updates.update() + await self.Order_Updates.update() + await self.Position_Updates.update() await self.Funding_Rate.update() + def __post_init__(self) -> None: + self.symbol = f'{self.lh_asset.upper()}{self.symbol_asset_separator}{self.rh_asset.upper()}' + + @dataclass(kw_only=True) class Aster(Perpetual_Exchange): name: str = 'Aster' + lh_asset: str = 'ETH' + rh_asset: str = 'USDT' def __post_init__(self): - self.Collateral = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) - self.Orders = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) - self.Positions = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) + super().__post_init__() + self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) + self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) + self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) -# @dataclass(kw_only=True) -# class Exchanges: -# Aster: Aster -# Extend: Perpetual_Exchange - -# async def update(self): -# await self.Aster.update() -# await self.Extend.update() +@dataclass(kw_only=True) +class Extend(Perpetual_Exchange): + name: str = 'Extended' + lh_asset: str = 'ETH' + rh_asset: str = 'USD' + symbol_asset_separator: str = '-' + def __post_init__(self): + super().__post_init__() + self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) + self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) + self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) + self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) +# EXCHANGES: list = [ Aster(), Extend() ] ### FLAGS ### @dataclass(kw_only=True) class Flags: LIQUIDATE_POS_AND_KILL_ALGO_FLAG: bool = False NET_FUNDING_IS_ZERO: bool = False - # list = field(init=False) Flags = Flags() @@ -508,7 +536,8 @@ async def run_algo(): NEXT_NET_FUNDING_RATE = calc_next_net_fund_rate(FUNDINGS_AT_SAME_TIME_NEXT_HR) Flags.NET_FUNDING_IS_ZERO = NEXT_NET_FUNDING_RATE == 0.00 if Flags.NET_FUNDING_IS_ZERO: - logging.info('NET FUNDING = 0.00; Flattening Open Positions; Wait Until Non-Zero.') + logging.info('NET FUNDING = 0.00; Cancelling Open Orders; Wait Until Non-Zero.') + ALPHA_TGT_NOTIONAL = 0.00 if ALPHA_EXCH == 'EXTEND': @@ -554,6 +583,7 @@ async def run_algo(): TGT NOTIONAL: $ {MAX_TARGET_NOTIONAL if not Flags.NET_FUNDING_IS_ZERO else 0.00} ASTER: {ASTER_NOTIONAL_POSITION:.4f} -> {ASTER_TGT_NOTIONAL:.2f} [ Remain: {ASTER_TGT_TAIL:.4f} ] | EXTEND: {EXTEND_NOTIONAL_POSITION:.4f} -> {EXTEND_TGT_NOTIONAL:.2f} [ Remain: {EXTEND_TGT_TAIL:.4f} ] + ASTER: {ASTER_TGT_NOTIONAL:.4f} - {ASTER_NOTIONAL_POSITION:.4f} = Tail: {ASTER_TGT_TAIL:4f} | EXTEND: {EXTEND_TGT_NOTIONAL:.4f} - {EXTEND_NOTIONAL_POSITION:.4f} = Tail: {EXTEND_TGT_TAIL:4f} ASTER: {ASTER_TGT_TAIL_BASE_QTY:.4f} > {ASTER_MIN_ORDER_QTY:.4f} min [ Order: {ASTER_TGT_TAIL_ORDERABLE} ] | EXTEND: {EXTEND_TGT_TAIL_BASE_QTY:.4f} > {EXTEND_MIN_ORDER_QTY:.4f} min [ Order: {EXTEND_TGT_TAIL_ORDERABLE} ] --- ASTER OPEN ORDERS --- @@ -572,8 +602,9 @@ async def run_algo(): qty = str(abs(ASTER_TGT_TAIL_BASE_QTY)) price = ASTER_TOB_PX - PRICE_WORSENER_ASTER if side == 'BUY' else ASTER_TOB_PX + PRICE_WORSENER_ASTER - if abs(qty*price) + abs(ASTER_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: - logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - ASTER: {ASTER_NOTIONAL_POSITION} + {qty*price} (qty: {qty}; px: {price})') + if abs(abs(float(ASTER_TGT_TAIL_BASE_QTY))*float(price)) + abs(ASTER_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: + pass + logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - ASTER: {ASTER_NOTIONAL_POSITION} + {float(ASTER_TGT_TAIL_BASE_QTY)*float(price)} (qty: {float(ASTER_TGT_TAIL_BASE_QTY):.2f}; px: {float(price):.2f})') # await aster_remainder_route() if ASTER_OPEN_ORDERS: open_order_id = ASTER_OPEN_ORDERS[0].get('order_id') if ASTER_OPEN_ORDERS[0].get('order_id') is not None else ASTER_OPEN_ORDERS[0]['orderId'] @@ -622,7 +653,7 @@ async def run_algo(): if order_resp.get('orderId', None) is not None: order_resp['original_price'] = price ASTER_OPEN_ORDERS.append(order_resp) - utils.send_tg_alert(f'FR_ALGO - ASTER Order. Start_$: {ASTER_NOTIONAL_POSITION:.2f}; Value: {abs(qty*price):.2f}; Price: {str(price):.2f}') + utils.send_tg_alert(f'FR_ALGO - ASTER Order. Start_$: {ASTER_NOTIONAL_POSITION:.2f}; Value: {float(ASTER_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') logging.info(f'ASTER ORDER PLACED SUCCESS: {order_resp}') else: logging.warning('ASTER PLACE ORDER CHECKS FAILED, SKIPPING') @@ -638,8 +669,9 @@ async def run_algo(): qty = Decimal(str(abs(EXTEND_TGT_TAIL_BASE_QTY))) price = EXTEND_TOB_PX - PRICE_WORSENER_EXTEND if side == 'BUY' else EXTEND_TOB_PX + PRICE_WORSENER_EXTEND - if abs(qty*price) + abs(EXTEND_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: - logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - EXTEND: {EXTEND_NOTIONAL_POSITION} + {qty*price} (qty: {qty}; px: {price})') + if abs(float(EXTEND_TGT_TAIL_BASE_QTY)*float(price)) + abs(float(EXTEND_NOTIONAL_POSITION)) > MAX_TARGET_NOTIONAL*1.01: + logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - EXTEND: {EXTEND_NOTIONAL_POSITION:.2f} + {float(EXTEND_TGT_TAIL_BASE_QTY)*float(price):.2f} (qty: {float(EXTEND_TGT_TAIL_BASE_QTY):.2f}; px: {float(price):.2f})') + pass # await extend_remainder_route() if EXTEND_OPEN_ORDERS: open_order_dict = dict(EXTEND_OPEN_ORDERS[0]) @@ -673,7 +705,7 @@ async def run_algo(): order_dict['price'] = str(price) EXTEND_OPEN_ORDERS.append(order_dict) - utils.send_tg_alert(f'FR_ALGO - EXTEND Order. Start_$: {EXTEND_NOTIONAL_POSITION:.2f}; Value: {abs(qty*price):.2f}; Price: {str(price):.2f}') + utils.send_tg_alert(f'FR_ALGO - EXTEND Order. Start_$: {EXTEND_NOTIONAL_POSITION:.2f}; Value: {float(EXTEND_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') logging.info(f'EXTEND ORDER PLACED SUCCESS: {order_dict}') else: logging.warning('EXTEND PLACE ORDER CHECKS FAILED, SKIPPING') diff --git a/main_update.py b/main_update.py new file mode 100644 index 0000000..432d5ea --- /dev/null +++ b/main_update.py @@ -0,0 +1,771 @@ +import asyncio +import json +import logging +import math +import os +import time +import traceback +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from decimal import ROUND_DOWN, Decimal +from typing import AsyncContextManager + +from typing import Any +import numpy as np +import pandas as pd +import requests + +# import talib +import valkey +from dotenv import load_dotenv +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine +from x10.models.order import OrderSide + +import modules.utils as utils +import modules.aster_auth as aster_auth +import modules.extended_auth as extend_auth + +### Database ### +EXTEND_CLIENT = None +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo.log' + +### CONSTANTS ### +ASTER_ALLOW_ORDERING: bool = False +EXTEND_ALLOW_ORDERING: bool = False +LOOP_SLEEP_SEC = 1 +PRICE_WORSENER_ASTER = 0.00 +PRICE_WORSENER_EXTEND = 0.0 + +MIN_TIME_TO_FUNDING: int = 1000 * 60 * 7 # 5 minutes. + +ASTER_LH_ASSET: str = 'ETH' +ASTER_RH_ASSET: str = 'USDT' +ASTER_TICKER: str = ASTER_LH_ASSET + ASTER_RH_ASSET +EXTEND_LH_ASSET: str = 'ETH' +EXTEND_RH_ASSET: str = 'USD' +EXTEND_TICKER: str = EXTEND_LH_ASSET + '-' + EXTEND_RH_ASSET + +TARGET_OPEN_CASH_POSITION: float = 10 # Each side (alpha and hedge) + +### GLOBALS ### +ASTER_MULT = 150 +EXTEND_MULT = 50 +MAX_TARGET_NOTIONAL = min([ASTER_MULT, EXTEND_MULT]) * TARGET_OPEN_CASH_POSITION + +ASTER_MIN_ORDER_QTY = 0.001 +EXTEND_MIN_ORDER_QTY = 0.01 + +ASTER_AVAIL_COLLATERAL = 0 +EXTEND_AVAIL_COLLATERAL = 0 + +ASTER_NOTIONAL_POSITION = 0 +EXTEND_NOTIONAL_POSITION = 0 + +ASTER_OPEN_ORDERS = [] +EXTEND_OPEN_ORDERS = [] + +# ASTER_OPEN_POSITIONS = [] +# EXTEND_OPEN_POSITIONS = [] + +@dataclass(kw_only=True) +class Valkey_Stream: + channel: str + data: Any = None + none_fill: Any = None + + async def update(self): + r = VAL_KEY.get(self.channel) + self.data = json.loads(r) if r is not None else self.none_fill + +@dataclass(kw_only=True) +class Position: + market: str + notional: float + qty: float + +@dataclass(kw_only=True) +class Open_Positions: + Valkey: Valkey_Stream + Positions: list[Position] = field(default_factory = list) + + async def update(self) -> None: + self.Valkey = await self.Valkey.update() + +### Collateral ### +@dataclass(kw_only=True) +class Asset: + symbol: str + balance: float + # min_order_qty: float + +@dataclass(kw_only=True) +class Collateral: + Valkey: Valkey_Stream + # Last_Updated_Ts_Ms: int + # Last_Pulled_Ts_Ms: int + Assets: list[Asset] = field(default_factory = list) + + async def update(self) -> None: + self.Valkey = await self.Valkey.update() + +### Orders ### +@dataclass(kw_only=True) +class Order: + symbol: str + order_id: str + client_order_id: str + side: str + order_type: str + original_qty: float + original_price: float + order_status: str + last_filled_qty: float + last_filled_price: float + commission: float + trade_is_maker: bool + +@dataclass(kw_only=True) +class Order_Updates: + # Last_Updated_Ts_Ms: int + # Last_Pulled_Ts_Ms: int + Valkey: Valkey_Stream + Orders: list[Order] = field(default_factory = list) + + async def update(self) -> None: + self.Valkey = await self.Valkey.update() + +### Funding Rate ### +@dataclass(kw_only=True) +class Funding_Rate: + # Last_Updated_Ts_Ms: int + # Last_Pulled_Ts_Ms: int + Valkey: Valkey_Stream + timestamp_arrival: int + timestamp_msg: int + symbol: str + funding_rate: float + next_funding_time_ts_ms: int + mark_price: float + index_price: float + estimated_settle_price: float + + async def update(self) -> None: + self.Valkey = await self.Valkey.update() + +### Markets Info ### +@dataclass(kw_only=True) +class Market: + symbol: str + min_order_qty: float + +@dataclass(kw_only=True) +class Markets_Details: + Markets: list[Market] = field(default_factory=list) + + +### Exchanges ### +@dataclass(kw_only=True) +class Perpetual_Exchange: + Order_Updates: Order_Updates + Position_Updates: Open_Positions + Collateral_Updates: Collateral + Funding_Rate: Funding_Rate + Markets: Markets_Details + mult: int + lh_asset: str + rh_asset: str + symbol_asset_separator: str = '' + symbol: str + + async def update(self): + await self.Collateral_Updates.update() + await self.Order_Updates.update() + await self.Position_Updates.update() + await self.Funding_Rate.update() + + def __post_init__(self) -> None: + self.symbol = f'{self.lh_asset.upper()}{self.symbol_asset_separator}{self.rh_asset.upper()}' + + +@dataclass(kw_only=True) +class Aster(Perpetual_Exchange): + name: str = 'Aster' + lh_asset: str = 'ETH' + rh_asset: str = 'USDT' + + def __post_init__(self): + super().__post_init__() + self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) + self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) + self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) + self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) + + +@dataclass(kw_only=True) +class Extend(Perpetual_Exchange): + name: str = 'Extended' + lh_asset: str = 'ETH' + rh_asset: str = 'USD' + symbol_asset_separator: str = '-' + + def __post_init__(self): + super().__post_init__() + self.Order_Updates = Order_Updates(Valkey=Valkey_Stream(channel = 'fr_aster_user_balances', none_fills = [])) + self.Collateral_Updates = Collateral(Valkey=Valkey_Stream(channel = 'fr_aster_user_orders', none_fills = [])) + self.Position_Updates = Open_Positions(Valkey=Valkey_Stream(channel = 'fr_aster_user_positions', none_fills = [])) + self.Funding_Rate - Funding_Rate(Valkey=Valkey_Stream(channel = 'fund_rate_aster', none_fills = None)) + + +# EXCHANGES: list = [ Aster(), Extend() ] + +### FLAGS ### +@dataclass(kw_only=True) +class Flags: + LIQUIDATE_POS_AND_KILL_ALGO_FLAG: bool = False + NET_FUNDING_IS_ZERO: bool = False +Flags = Flags() + + +### UTILS ### +def round_decimal_down(value, decimal_places): + # Construct precision string like '0.01' for 2 places + fmt = f'0.{"0" * decimal_places}' if decimal_places > 0 else '0' + precision = Decimal(fmt) + return Decimal(str(value)).quantize(precision, rounding=ROUND_DOWN) + +### OPEN ORDERS ### +async def get_aster_open_orders(): + global ASTER_OPEN_ORDERS + + fut_acct_openOrders = { + "url": "/fapi/v3/openOrders", + "method": "GET", + "params": {} + } + ASTER_OPEN_ORDERS = await aster_auth.post_authenticated_url(fut_acct_openOrders) + +async def get_extend_open_orders(): + global EXTEND_OPEN_ORDERS + + EXTEND_OPEN_ORDERS = list(dict(await EXTEND_CLIENT.account.get_open_orders()).get('data', 0)) + +### WALLLET ### +async def get_aster_collateral(): + global ASTER_AVAIL_COLLATERAL + + fut_acct_balances = { + "url": "/fapi/v3/balance", + "method": "GET", + "params": {} + } + r = await aster_auth.post_authenticated_url(fut_acct_balances) + ASTER_AVAIL_COLLATERAL = float([d for d in r if d.get('asset')==ASTER_RH_ASSET][0].get('availableBalance')) + +async def get_aster_notional_position(resp: dict | None = None): + global ASTER_NOTIONAL_POSITION + global ASTER_MULT + + if not resp: + fut_acct_positionRisk = { + "url": "/fapi/v3/positionRisk", + "method": "GET", + "params": { + 'symbol': ASTER_TICKER, + } + } + resp = await aster_auth.post_authenticated_url(fut_acct_positionRisk) + + d = [x for x in resp if x.get('symbol', None) == ASTER_TICKER][0] + if len(d) < 1: + logging.info(f'BAD NOTIONAL - ASTER CHANGE: Empty d: {d}; resp: {resp}') + kill_algo() + + aster_unrealized_pnl = float(d['unrealized_pnl']) if d.get('unrealized_pnl') is not None else float(d['unRealizedProfit']) + + if d.get('notional') is not None: + notional = float(d['notional']) + else: + notional = float(d['position_amount'])*float(d['entry_price']) + + previous_notional_position = ASTER_NOTIONAL_POSITION + ASTER_NOTIONAL_POSITION = notional - aster_unrealized_pnl + if not resp: + ASTER_MULT = float(d['leverage']) + if abs(ASTER_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: + logging.info(f'BAD NOTIONAL - ASTER CHANGE: {ASTER_NOTIONAL_POSITION}; UR PNL: {aster_unrealized_pnl}; MULT: {ASTER_MULT}; d: {d}; resp: {resp}') + kill_algo() + if ASTER_NOTIONAL_POSITION != previous_notional_position: + logging.info(f'ASTER NOTIONAL CHANGE: {ASTER_NOTIONAL_POSITION:.2f}; UR PNL: {aster_unrealized_pnl:.2f}; MULT: {ASTER_MULT:.0f}; resp: {bool(resp)}') + +async def get_extend_collateral(): + global EXTEND_AVAIL_COLLATERAL + + get_bals = dict(dict(await EXTEND_CLIENT.account.get_balance()).get('data', {})) + EXTEND_AVAIL_COLLATERAL = get_bals.get('available_for_trade', 0) if get_bals.get('collateral_name', None)==EXTEND_RH_ASSET else 0 + +async def get_extend_notional(resp: dict | None = None): + global EXTEND_NOTIONAL_POSITION + global EXTEND_MULT + + if not resp: + resp = dict(await EXTEND_CLIENT.account.get_positions()).get('data', {}) + + pos_dict = [dict(d) for d in resp if dict(d).get('market') == EXTEND_TICKER] + pos_dict = pos_dict[0] + unrealized_pnl = pos_dict.get('unrealised_pnl', 0) + previous_notional_position = EXTEND_NOTIONAL_POSITION + EXTEND_NOTIONAL_POSITION = float(pos_dict.get('value', 0)) - float(unrealized_pnl) + EXTEND_MULT = pos_dict.get('leverage', EXTEND_MULT) + if EXTEND_NOTIONAL_POSITION != previous_notional_position: + logging.info(f'EXTEND NOTIONAL CHANGE: {EXTEND_NOTIONAL_POSITION:.2f}; UR PNL: {unrealized_pnl:.2f}; MULT: {EXTEND_MULT:.0f}; resp: {bool(resp)}') + + +### EXCHANGE INFO ### +async def get_aster_exch_info(): + global ASTER_MIN_ORDER_QTY + + fut_acct_exchangeInfo = { + "url": "/fapi/v3/exchangeInfo", + "method": "GET", + "params": {} + } + r = await aster_auth.post_authenticated_url(fut_acct_exchangeInfo) + s = r['symbols'] + d = [d for d in s if d.get('symbol', None) == 'ETHUSDT'][0] + f = [f for f in d['filters'] if f.get('filterType', None) == 'LOT_SIZE'][0] + ASTER_MIN_ORDER_QTY = float(f['minQty']) + +async def get_extend_exch_info(): + global EXTEND_MIN_ORDER_QTY + + r = await EXTEND_CLIENT.markets_info.get_markets_dict() + EXTEND_MIN_ORDER_QTY = float(r['ETH-USD'].trading_config.min_order_size) + +### CANCEL ORDERS ### +async def aster_cancel_all_orders(): + cancel_all_open_orders = { + "url": "/fapi/v3/allOpenOrders", + "method": "DELETE", + "params": { + 'symbol': 'ETHUSDT', + } + } + r = await aster_auth.post_authenticated_url(cancel_all_open_orders) + logging.info(f'ASTER CANCEL ALL OPEN ORDERS RESP: {r}') + +async def extend_cancel_all_orders(): + r = await EXTEND_CLIENT.orders.mass_cancel(markets=[EXTEND_TICKER]) + logging.info(f'EXTEND CANCEL ALL OPEN ORDERS RESP: {r}') + +### KILL ALGO ### +async def kill_algo(): + await aster_cancel_all_orders() + await extend_cancel_all_orders() + logging.info('ALGO KILL FLAG ACTIVATED; CANCELLING OPEN ORDERS AND SHUTTING DOWN') + raise ValueError('KILL FLAG ACTIVATED') + +### ROUTES ### +# async def aster_remainder_route(): +# # Check open orders...cancel replace or new order? +# # Check collateral to confirm you have enough money to trade +# # if CR, what should be the new price? has it changed? maybe no action needed? how long has it been working? +# # if not enough collateral then need to liquidate and kill algo - flip flag + +# # if good to order, then create and post order. ADD to LOCAL OPEN ORDERS LIST + + +# pass + +# async def extend_remainder_route(): +# pass + + +### ALGO LOOP ### +async def run_algo(): + + try: + while True: + loop_start = time.time() + print('__________Start___________') + + ### Load Data from Feedhandlers ### + ASTER_FUND_RATE_DICT = json.loads(VAL_KEY.get('fund_rate_aster')) + EXTENDED_FUND_RATE_DICT = json.loads(VAL_KEY.get('fund_rate_extended')) + + ASTER_FUND_RATE = float(ASTER_FUND_RATE_DICT.get('funding_rate', 0)) + EXTEND_FUND_RATE = float(EXTENDED_FUND_RATE_DICT.get('funding_rate', 0)) + ASTER_FUND_RATE_TIME = float(ASTER_FUND_RATE_DICT.get('next_funding_time_ts_ms', 0)) + EXTEND_FUND_RATE_TIME = float(EXTENDED_FUND_RATE_DICT.get('next_funding_time_ts_ms', 0)) + + ASTER_TICKER_DICT = json.loads(VAL_KEY.get('fut_ticker_aster')) + EXTENDED_TICKER_DICT = json.loads(VAL_KEY.get('fut_ticker_extended')) + + ### Manage Local Collateral Using Updates from WS ### + ASTER_WS_COLLATERAL_UPDATES = VAL_KEY.get('fr_aster_user_positions') + ASTER_WS_COLLATERAL_UPDATES = json.loads(ASTER_WS_COLLATERAL_UPDATES) if ASTER_WS_COLLATERAL_UPDATES is not None else [] + EXTEND_WS_COLLATERAL_UPDATES = VAL_KEY.get('fr_extended_user_positions') + EXTEND_WS_COLLATERAL_UPDATES = json.loads(EXTEND_WS_COLLATERAL_UPDATES) if EXTEND_WS_COLLATERAL_UPDATES is not None else [] + + ### Manage Local Notionals Using Updates from WS ### + ASTER_WS_POS_UPDATES = VAL_KEY.get('fr_aster_user_positions') + ASTER_WS_POS_UPDATES = json.loads(ASTER_WS_POS_UPDATES) if ASTER_WS_POS_UPDATES is not None else [] + EXTEND_WS_POS_UPDATES = VAL_KEY.get('fr_extended_user_positions') + EXTEND_WS_POS_UPDATES = json.loads(EXTEND_WS_POS_UPDATES) if EXTEND_WS_POS_UPDATES is not None else [] + + ### Manage Local Orders Using Updates from WS ### + ASTER_WS_ORDER_UPDATES = VAL_KEY.get('fr_aster_user_orders') + ASTER_WS_ORDER_UPDATES = json.loads(ASTER_WS_ORDER_UPDATES) if ASTER_WS_ORDER_UPDATES is not None else [] + EXTEND_WS_ORDER_UPDATES = VAL_KEY.get('fr_extended_user_orders') + EXTEND_WS_ORDER_UPDATES = json.loads(EXTEND_WS_ORDER_UPDATES) if EXTEND_WS_ORDER_UPDATES is not None else [] + + # CHECK NO MORE THAN 1 OPEN ORDER ON EITHER EXCHANGE # + if len(ASTER_OPEN_ORDERS) > 1 or len(EXTEND_OPEN_ORDERS) > 1: + logging.info(f'MORE THAN 1 ORDER OPEN - KILLING ALGO: ASTER_OPEN_ORDERS ({len(ASTER_OPEN_ORDERS)}): {ASTER_OPEN_ORDERS}; EXTEND_OPEN_ORDERS ({len(EXTEND_OPEN_ORDERS)}): {EXTEND_OPEN_ORDERS}') + await kill_algo() + raise ValueError('NOT HERE: MORE THAN 1 ORDER OPEN - KILLING ALGO: ASTER_OPEN_ORDERS') + + ### CHECK TIME TO FUNDING AND WHETHER TO BE ACTIVE ### + now_ms = round(datetime.now().timestamp()*1000) + time_to_funding_ms = min([ASTER_FUND_RATE_TIME, EXTEND_FUND_RATE_TIME]) - now_ms + if ( time_to_funding_ms > MIN_TIME_TO_FUNDING ) and (not ASTER_OPEN_ORDERS) and (not EXTEND_OPEN_ORDERS): + print(f'Outside action window (minutes) and no active order (sleeping for 5 sec): {pd.to_datetime(time_to_funding_ms, unit='ms').minute} > {pd.to_datetime(MIN_TIME_TO_FUNDING, unit='ms').minute}') + time.sleep(5) + continue + + if len(ASTER_WS_POS_UPDATES) > 0: + await get_aster_notional_position(resp=ASTER_WS_POS_UPDATES) + ###### *** returned 0 notional even though had a position, need to handle and safety check to not order above max notional. + + if len(EXTEND_WS_POS_UPDATES) > 0: + await get_extend_notional(resp=EXTEND_WS_POS_UPDATES) + + if ASTER_WS_ORDER_UPDATES is not None: + for idx, o in enumerate(ASTER_OPEN_ORDERS): + order_id = o.get('order_id') if o.get('order_id') is not None else o.get('orderId') + order_orig_status = o['status'] + order_update = [ou for ou in ASTER_WS_ORDER_UPDATES if ou.get('order_id', None) == order_id] + + if len(order_update) > 0: + order_update = order_update[0] + order_update_status = order_update.get('status') if order_update.get('status') is not None else order_update.get('order_status') + order_status_changed = order_orig_status.upper() != order_update_status.upper() + + if order_status_changed: + logging.info(f'ASTER ORDER ({order_id}): {order_orig_status} -> {order_update_status}') + ASTER_OPEN_ORDERS[idx] = order_update + if order_update_status in ['CANCELED','EXPIRED']: + logging.info(f'ASTER ORDER CANCELLED or EXPIRED: {order_id}') + ASTER_OPEN_ORDERS.pop(idx) + elif order_update_status in ['PARTIALLY_FILLED']: + logging.info(f'ASTER ORDER PARTIALLY FILLED: {order_id}') + await get_aster_collateral() + await get_aster_notional_position() + elif order_update_status in ['FILLED']: + logging.info(f'ASTER ORDER FILLED: {order_id}') + ASTER_OPEN_ORDERS.pop(idx) + await get_aster_collateral() + await get_aster_notional_position() + else: + logging.critical(f'EXTEND ORDER STATUS CHG TO UNEXPECTED VALUE, KILLING... ({order_id}): {order_orig_status} -> {order_update_status}') + if EXTEND_WS_ORDER_UPDATES is not None: + for idx, o in enumerate(EXTEND_OPEN_ORDERS): + o = dict(o) + order_id = o.get('order_id') if o.get('order_id') is not None else o.get('id') + order_orig_status = o['status'] + order_update = [dict(ou) for ou in EXTEND_WS_ORDER_UPDATES if dict(ou).get('order_id', None) == order_id] + + if len(order_update) > 0: + order_update = order_update[0] + order_update_status = order_update.get('status') + order_status_changed = order_orig_status.upper() != order_update_status.upper() + + if order_status_changed: + logging.info(f'EXTEND ORDER ({order_id}): {order_orig_status} -> {order_update_status}') + EXTEND_OPEN_ORDERS[idx] = order_update + if order_update_status in ['CANCELLED','EXPIRED','REJECTED']: + logging.info(f'EXTEND ORDER CANCELLED or EXPIRED: {order_id}') + EXTEND_OPEN_ORDERS.pop(idx) + elif order_update_status in ['PARTIALLY_FILLED']: + logging.info(f'EXTEND ORDER PARTIALLY FILLED: {order_id}') + await get_extend_collateral() + await get_extend_notional() + elif order_update_status in ['FILLED']: + logging.info(f'EXTEND ORDER FILLED: {order_id}') + EXTEND_OPEN_ORDERS.pop(idx) + await get_extend_collateral() + await get_extend_notional() + else: + logging.critical(f'EXTEND ORDER STATUS CHG TO UNEXPECTED VALUE, KILLING... ({order_id}): {order_orig_status} -> {order_update_status}') + + + ASTER_PAYOUT_DIRECTION_STR = 'LONG PAYS SHORT' if ASTER_FUND_RATE > 0 else 'SHORT PAYS LONG' + EXTEND_PAYOUT_DIRECTION_STR = 'LONG PAYS SHORT' if EXTEND_FUND_RATE > 0 else 'SHORT PAYS LONG' + + min_between_fundings = round((abs(ASTER_FUND_RATE_TIME - EXTEND_FUND_RATE_TIME) / 1000 / 60)) + FUNDINGS_AT_SAME_TIME_NEXT_HR = min_between_fundings < 5 + # FUNDINGS_AT_SAME_TIME_NEXT_HR = ( (ASTER_FUND_RATE_TIME < 60*60*1000) and (EXTEND_FUND_RATE < 60*60*1000) ) + + + if ( abs(ASTER_FUND_RATE) > abs(EXTEND_FUND_RATE) ) and FUNDINGS_AT_SAME_TIME_NEXT_HR: + ALPHA_EXCH = 'ASTER' + ALPHA_FUND_RATE = ASTER_FUND_RATE + else: + ALPHA_EXCH = 'EXTEND' + ALPHA_FUND_RATE = EXTEND_FUND_RATE + + if ALPHA_FUND_RATE < 0: + ALPHA_CARRY_SIDE = 'BUY' + ALPHA_TGT_NOTIONAL = MAX_TARGET_NOTIONAL + else: + ALPHA_CARRY_SIDE = 'SELL' + ALPHA_TGT_NOTIONAL = MAX_TARGET_NOTIONAL*-1 + + + def calc_next_net_fund_rate(FUNDINGS_AT_SAME_TIME_NEXT_HR: bool) -> float: + if FUNDINGS_AT_SAME_TIME_NEXT_HR: + return ASTER_FUND_RATE + EXTEND_FUND_RATE + else: + return EXTEND_FUND_RATE + + NEXT_NET_FUNDING_RATE = calc_next_net_fund_rate(FUNDINGS_AT_SAME_TIME_NEXT_HR) + Flags.NET_FUNDING_IS_ZERO = NEXT_NET_FUNDING_RATE == 0.00 + if Flags.NET_FUNDING_IS_ZERO: + logging.info('NET FUNDING = 0.00; Cancelling Open Orders; Wait Until Non-Zero.') + + ALPHA_TGT_NOTIONAL = 0.00 + + if ALPHA_EXCH == 'EXTEND': + ASTER_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL*-1 + EXTEND_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL + if ALPHA_CARRY_SIDE == 'BUY': + ASTER_TOB_PX = float(ASTER_TICKER_DICT['best_ask_px']) + EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_bid_px']) + else: + ASTER_TOB_PX = float(ASTER_TICKER_DICT['best_bid_px']) + EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_ask_px']) + else: + ASTER_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL + EXTEND_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL*-1 + if ALPHA_CARRY_SIDE == 'BUY': + ASTER_TOB_PX = float(ASTER_TICKER_DICT['best_bid_px']) + EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_ask_px']) + else: + ASTER_TOB_PX = float(ASTER_TICKER_DICT['best_ask_px']) + EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_bid_px']) + + + ASTER_TGT_TAIL = ASTER_TGT_NOTIONAL - ASTER_NOTIONAL_POSITION + EXTEND_TGT_TAIL = EXTEND_TGT_NOTIONAL - EXTEND_NOTIONAL_POSITION + + ASTER_TGT_TAIL_BASE_QTY = Decimal(str(float(ASTER_TGT_TAIL) / float(ASTER_TOB_PX))).quantize(Decimal(str(0.001)), rounding=ROUND_DOWN) + EXTEND_TGT_TAIL_BASE_QTY = Decimal(str(float(EXTEND_TGT_TAIL) / float(EXTEND_TOB_PX))).quantize(Decimal(str(0.001)), rounding=ROUND_DOWN) + + ASTER_TGT_TAIL_ORDERABLE = abs(ASTER_TGT_TAIL_BASE_QTY) >= ASTER_MIN_ORDER_QTY + EXTEND_TGT_TAIL_ORDERABLE = abs(EXTEND_TGT_TAIL_BASE_QTY) >= EXTEND_MIN_ORDER_QTY + + print(f''' + {pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')-datetime.now()):}) | {pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')-datetime.now()):}) + ASTER: {ASTER_FUND_RATE:.6%} [{ASTER_FUND_RATE*10_000:.2f}bps] [{ASTER_FUND_RATE*1_000_000:.0f}pips] | EXTEND: {EXTEND_FUND_RATE:.6%} [{EXTEND_FUND_RATE*10_000:.2f}bps] [{EXTEND_FUND_RATE*1_000_000:.0f}pips] + ASTER: {ASTER_PAYOUT_DIRECTION_STR} | EXTEND: {EXTEND_PAYOUT_DIRECTION_STR} + ASTER: [ Available Collateral: {ASTER_AVAIL_COLLATERAL:.4f} ] | EXTEND: [ Available Collateral: {EXTEND_AVAIL_COLLATERAL:.4f} ] + ASTER: [ Notional Position $ : {ASTER_NOTIONAL_POSITION:.4f} ] | EXTEND: [ Notional Position $ : {EXTEND_NOTIONAL_POSITION:.4f} ] + + SAME TIME? : {FUNDINGS_AT_SAME_TIME_NEXT_HR} [ Minutes Between Fundings: {min_between_fundings} ] + NET FUNDING : {NEXT_NET_FUNDING_RATE:.6%} [{NEXT_NET_FUNDING_RATE*10_000:.2f}bps] [{NEXT_NET_FUNDING_RATE*1_000_000:.0f}pips]; Is Zero?: {Flags.NET_FUNDING_IS_ZERO} + ALPHA SIDE : {ALPHA_EXCH} [{ALPHA_CARRY_SIDE}] + + TGT NOTIONAL: $ {MAX_TARGET_NOTIONAL if not Flags.NET_FUNDING_IS_ZERO else 0.00} + + ASTER: {ASTER_NOTIONAL_POSITION:.4f} -> {ASTER_TGT_NOTIONAL:.2f} [ Remain: {ASTER_TGT_TAIL:.4f} ] | EXTEND: {EXTEND_NOTIONAL_POSITION:.4f} -> {EXTEND_TGT_NOTIONAL:.2f} [ Remain: {EXTEND_TGT_TAIL:.4f} ] + ASTER: {ASTER_TGT_NOTIONAL:.4f} - {ASTER_NOTIONAL_POSITION:.4f} = Tail: {ASTER_TGT_TAIL:4f} | EXTEND: {EXTEND_TGT_NOTIONAL:.4f} - {EXTEND_NOTIONAL_POSITION:.4f} = Tail: {EXTEND_TGT_TAIL:4f} + ASTER: {ASTER_TGT_TAIL_BASE_QTY:.4f} > {ASTER_MIN_ORDER_QTY:.4f} min [ Order: {ASTER_TGT_TAIL_ORDERABLE} ] | EXTEND: {EXTEND_TGT_TAIL_BASE_QTY:.4f} > {EXTEND_MIN_ORDER_QTY:.4f} min [ Order: {EXTEND_TGT_TAIL_ORDERABLE} ] + + --- ASTER OPEN ORDERS --- + {ASTER_OPEN_ORDERS} + + --- EXTEND OPEN ORDERS --- + {EXTEND_OPEN_ORDERS} + ''') + + + ### ROUTES ### + # ASTER + if ASTER_TGT_TAIL_ORDERABLE and ASTER_ALLOW_ORDERING: + symbol = ASTER_TICKER + side = 'BUY' if ASTER_TGT_TAIL_BASE_QTY > 0.00 else 'SELL' + qty = str(abs(ASTER_TGT_TAIL_BASE_QTY)) + price = ASTER_TOB_PX - PRICE_WORSENER_ASTER if side == 'BUY' else ASTER_TOB_PX + PRICE_WORSENER_ASTER + + if abs(abs(float(ASTER_TGT_TAIL_BASE_QTY))*float(price)) + abs(ASTER_NOTIONAL_POSITION) > MAX_TARGET_NOTIONAL*1.01: + pass + logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - ASTER: {ASTER_NOTIONAL_POSITION} + {float(ASTER_TGT_TAIL_BASE_QTY)*float(price)} (qty: {float(ASTER_TGT_TAIL_BASE_QTY):.2f}; px: {float(price):.2f})') + # await aster_remainder_route() + if ASTER_OPEN_ORDERS: + open_order_id = ASTER_OPEN_ORDERS[0].get('order_id') if ASTER_OPEN_ORDERS[0].get('order_id') is not None else ASTER_OPEN_ORDERS[0]['orderId'] + open_order_px = float(ASTER_OPEN_ORDERS[0].get('price')) if ASTER_OPEN_ORDERS[0].get('price') is not None else float(ASTER_OPEN_ORDERS[0]['original_price']) + if round(open_order_px - float(price), 2) == 0.00: + logging.info('ASTER OPEN ORDER NO PX CHG; SKIPPING') + place_order = False + else: + cancel_order = { + "url": "/fapi/v3/order", + "method": "DELETE", + "params": { + 'symbol': ASTER_TICKER, + 'orderId': open_order_id, + } + } + cr = await aster_auth.post_authenticated_url(cancel_order) + if cr.get('status', None) == 'CANCELED': + ASTER_OPEN_ORDERS.pop(0) + place_order = True + else: + logging.warning(f'ASTER ORDER FAILED TO CANCEL DURING CR ({open_order_id}): RESP {cr}') + place_order = False + else: + place_order = True + + if ASTER_TGT_TAIL_BASE_QTY == 0.00: + place_order = False + logging.info('ASTER TRYNG TO ORDER 0.00 BASE QTY, SKIPPING') + + if place_order: + price = Decimal(str(price)).quantize(Decimal(str(0.01)), rounding=ROUND_DOWN) + post_order = { + "url": "/fapi/v3/order", + "method": "POST", + "params": { + 'symbol': symbol, + 'side': side, + 'type': 'LIMIT', + 'timeInForce': 'GTC', + 'quantity': qty, + 'price': price, + } + } + order_resp = await aster_auth.post_authenticated_url(post_order) + if order_resp.get('orderId', None) is not None: + order_resp['original_price'] = price + ASTER_OPEN_ORDERS.append(order_resp) + utils.send_tg_alert(f'FR_ALGO - ASTER Order. Start_$: {ASTER_NOTIONAL_POSITION:.2f}; Value: {float(ASTER_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') + logging.info(f'ASTER ORDER PLACED SUCCESS: {order_resp}') + else: + logging.warning('ASTER PLACE ORDER CHECKS FAILED, SKIPPING') + + elif not(ASTER_TGT_TAIL_ORDERABLE) and ASTER_OPEN_ORDERS: + logging.info('ASTER HAS NO TAIL BUT OPEN ORDERS - CANCELLING OPEN ORDERS') + await aster_cancel_all_orders() + + # EXTEND + if EXTEND_TGT_TAIL_ORDERABLE and EXTEND_ALLOW_ORDERING: + symbol = EXTEND_TICKER + side = OrderSide.BUY if EXTEND_TGT_TAIL_BASE_QTY > 0.00 else OrderSide.SELL + qty = Decimal(str(abs(EXTEND_TGT_TAIL_BASE_QTY))) + price = EXTEND_TOB_PX - PRICE_WORSENER_EXTEND if side == 'BUY' else EXTEND_TOB_PX + PRICE_WORSENER_EXTEND + + if abs(float(EXTEND_TGT_TAIL_BASE_QTY)*float(price)) + abs(float(EXTEND_NOTIONAL_POSITION)) > MAX_TARGET_NOTIONAL*1.01: + logging.info(f'TRYING TO ORDER OVER MAX NOTIOANL - EXTEND: {EXTEND_NOTIONAL_POSITION:.2f} + {float(EXTEND_TGT_TAIL_BASE_QTY)*float(price):.2f} (qty: {float(EXTEND_TGT_TAIL_BASE_QTY):.2f}; px: {float(price):.2f})') + pass + # await extend_remainder_route() + if EXTEND_OPEN_ORDERS: + open_order_dict = dict(EXTEND_OPEN_ORDERS[0]) + open_order_id = open_order_dict['external_id'] + open_order_px = float(open_order_dict['price']) + place_order = True + else: + open_order_id = None + open_order_px = 0 + place_order = True + if place_order: + price = Decimal(str(price)).quantize(Decimal(str(0.01)), rounding=ROUND_DOWN) + if round(open_order_px - float(price), 2) == 0.00: + logging.info('EXTEND OPEN ORDER NO PX CHG; SKIPPING') + else: + order_resp = await EXTEND_CLIENT.place_order( + market_name=symbol, + amount_of_synthetic=qty, + price=price, + side=side, + taker_fee=Decimal("0.00025"), + previous_order_id=open_order_id, + ) + order_resp_dict = dict(order_resp) + if order_resp_dict.get('status', None) == 'OK': + if EXTEND_OPEN_ORDERS: + EXTEND_OPEN_ORDERS.pop(0) + + order_dict = dict(order_resp_dict['data']) + order_dict['status'] = 'NEW' + order_dict['price'] = str(price) + + EXTEND_OPEN_ORDERS.append(order_dict) + utils.send_tg_alert(f'FR_ALGO - EXTEND Order. Start_$: {EXTEND_NOTIONAL_POSITION:.2f}; Value: {float(EXTEND_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') + logging.info(f'EXTEND ORDER PLACED SUCCESS: {order_dict}') + else: + logging.warning('EXTEND PLACE ORDER CHECKS FAILED, SKIPPING') + + elif not(EXTEND_TGT_TAIL_ORDERABLE) and EXTEND_OPEN_ORDERS: + logging.info('EXTEND HAS NO TAIL BUT OPEN ORDERS - CANCELLING OPEN ORDERS') + await extend_cancel_all_orders() + + print(f'__________ End ___________ (Algo Engine ms: {(time.time() - loop_start)*1000})') + + time.sleep(LOOP_SLEEP_SEC) + + except KeyboardInterrupt: + logging.info('CANCELLING OPEN ORDERS') + await kill_algo() + except Exception as e: + logging.error(traceback.format_exc()) + logging.critical(f'*** ALGO ENGINE CRASHED: {e}') + logging.info('CANCELLING OPEN ORDERS') + utils.send_tg_alert(f'FR_ALGO_CRASHED: {str(e)}') + await kill_algo() + + +### MAIN STARTUP ### +async def main(): + global EXTEND_CLIENT + global VAL_KEY + global CON + + _, EXTEND_CLIENT = await extend_auth.create_auth_account_and_trading_client() + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') + + async with engine.connect() as CON: + ### ASTER SETUP ### + await get_aster_collateral() + await get_aster_notional_position() + await get_aster_exch_info() + await get_aster_open_orders() + ### EXTEND SETUP ### + await get_extend_collateral() + await get_extend_notional() + await get_extend_exch_info() + await get_extend_open_orders() + + await run_algo() + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + asyncio.run(main()) + \ No newline at end of file diff --git a/modules/__pycache__/utils.cpython-313.pyc b/modules/__pycache__/utils.cpython-313.pyc index 565d26c..34797c6 100644 Binary files a/modules/__pycache__/utils.cpython-313.pyc and b/modules/__pycache__/utils.cpython-313.pyc differ diff --git a/ws_aster_user.py b/ws_aster_user.py index 676a379..be0594f 100644 --- a/ws_aster_user.py +++ b/ws_aster_user.py @@ -222,9 +222,9 @@ async def ws_stream(): LOCAL_RECENT_POSITIONS = utils.upsert_list_of_dicts_by_id(LOCAL_RECENT_POSITIONS, position_update, id='symbol', seq_check_field='timestamp_msg') LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS] VAL_KEY.set(VK_POSITIONS, json.dumps(LOCAL_RECENT_POSITIONS)) - if balance_update: + if list_for_df_bal: await db.insert_df_to_mysql(table_name='fr_aster_user_account_bal', params=list_for_df_bal, CON=CON) - if position_update: + if list_for_df_pos: await db.insert_df_to_mysql(table_name='fr_aster_user_account_pos', params=list_for_df_pos, CON=CON) continue case 'listenKeyExpired':