From 484fe4ba0b6aa641b0c2e5f12254dd4348a9e07e Mon Sep 17 00:00:00 2001 From: stevekeyharvey Date: Mon, 27 Apr 2026 17:57:58 +0000 Subject: [PATCH] bug fixes --- _On_Ice/algo_config_backup.json | 13 ++ algo.ipynb | 194 ++++++++++++++++------- algo/Dockerfile | 2 +- algo_config.json | 11 +- algo_orchestrator.py | 13 +- algo_orchestrator/Dockerfile | 2 +- docker-compose-algo.yml | 15 ++ docker-compose.yml | 48 +++--- engine_health.py | 0 main.py | 76 ++++++--- modules/aster_db.py | 33 +++- modules/extended_db.py | 29 ++++ modules/structs.py | 1 + order_engine.ipynb | 272 ++++++++++++++++++++++++++++++++ ws_aster.py | 100 +++--------- ws_extended_trades.py | 134 ++++++++++++++++ ws_extended_trades/Dockerfile | 19 +++ ws_extended_user.py | 1 - 18 files changed, 775 insertions(+), 188 deletions(-) create mode 100644 _On_Ice/algo_config_backup.json create mode 100644 docker-compose-algo.yml create mode 100644 engine_health.py create mode 100644 order_engine.ipynb create mode 100644 ws_extended_trades.py create mode 100644 ws_extended_trades/Dockerfile diff --git a/_On_Ice/algo_config_backup.json b/_On_Ice/algo_config_backup.json new file mode 100644 index 0000000..6f9bf52 --- /dev/null +++ b/_On_Ice/algo_config_backup.json @@ -0,0 +1,13 @@ +{ + "Config_Updated_Timestamp": 1777098091913, + "Allow_Ordering_Aster": false, + "Allow_Ordering_Extend": false, + "Loop_Sleep_Sec": 5.00, + "Max_Target_Notional": 0.00, + "Min_Time_To_Funding_Minutes": 10, + "Price_Worsener_Aster": 0.0, + "Price_Worsener_Extend": 0.0, + "Target_Open_Cash_Position": 10, + "Print_Summary_Each_Loop" : true, + "Flip_Side_For_Testing": false +} \ No newline at end of file diff --git a/algo.ipynb b/algo.ipynb index 4486d98..42d9f46 100644 --- a/algo.ipynb +++ b/algo.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 7, + "execution_count": 76, "id": "d1eed397", "metadata": {}, "outputs": [], @@ -18,7 +18,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 77, "id": "c6151613", "metadata": {}, "outputs": [], @@ -28,7 +28,7 @@ }, { "cell_type": "code", - "execution_count": 31, + "execution_count": null, "id": "d83c61e5", "metadata": {}, "outputs": [ @@ -38,87 +38,50 @@ "1" ] }, - "execution_count": 31, + "execution_count": 131, "metadata": {}, "output_type": "execute_result" } ], "source": [ "config_update = {\n", - " 'Min_Time_To_Funding_Minutes': 60,\n", - " 'Print_Summary_Each_Loop': True,\n", - " 'Allow_Ordering_Aster': True,\n", - " 'Allow_Ordering_Extend': True,\n", - " 'Loop_Sleep_Sec': 0.0,\n", - " 'Flip_Side_For_Testing': False,\n", - " 'Price_Worsener_Extend': 0.0,\n", + " 'Min_Time_To_Funding_Minutes': 7,\n", + " # 'Allow_Ordering_Aster': True,\n", + " # 'Allow_Ordering_Extend': True,\n", + " 'Loop_Sleep_Sec': 0.00,\n", + "# 'Flip_Side_For_Testing': False,\n", + "# 'Price_Worsener_Extend': 0.0,\n", + " 'Log_Summary_Each_Loop': False,\n", + " 'Print_Summary_Each_Loop': False,\n", "}\n", "VAL_KEY.publish('fr_orchestrator_input', json.dumps(config_update))" ] }, { "cell_type": "code", - "execution_count": null, - "id": "d2fdd7d2", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 11, + "execution_count": 93, "id": "45fae761", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "Algo_Config(Config_Updated_Timestamp=1777151524162, Allow_Ordering_Aster=True, Allow_Ordering_Extend=True, Loop_Sleep_Sec=1, Max_Target_Notional=0.0, Min_Time_To_Funding_Minutes=60, Price_Worsener_Aster=0.0, Price_Worsener_Extend=0.0, Target_Open_Cash_Position=10)" + "5.0" ] }, - "execution_count": 11, + "execution_count": 93, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "json.loads(VAL_KEY.get('fr_orchestrator_output'), object_hook=lambda d: structs.Algo_Config(**d))" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'Config_Updated_Timestamp': 1777098091913,\n", - " 'Allow_Ordering_Aster': False,\n", - " 'Allow_Ordering_Extend': False,\n", - " 'Loop_Sleep_Sec': 1,\n", - " 'Max_Target_Notional': 0.0,\n", - " 'Min_Time_To_Funding_Minutes': 60,\n", - " 'Price_Worsener_Aster': 0.0,\n", - " 'Price_Worsener_Extend': 0.0,\n", - " 'Target_Open_Cash_Position': 10,\n", - " 'Print_Summary_Each_Loop': False,\n", - " 'Flip_Side_For_Testing': False}" - ] - }, - "execution_count": 9, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "asdict(ALGO_CONFIG)" + "json.loads(VAL_KEY.get('fr_orchestrator_output'), object_hook=lambda d: structs.Algo_Config(**d)).Loop_Sleep_Sec" ] }, { "cell_type": "code", "execution_count": null, - "id": "d2e26271", + "id": "98c500cc", "metadata": {}, "outputs": [], "source": [] @@ -126,9 +89,130 @@ { "cell_type": "code", "execution_count": null, + "id": "f2cf3325", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 31, "id": "a0df43de", "metadata": {}, "outputs": [], + "source": [ + "pos = json.loads(VAL_KEY.get('fr_aster_user_positions'))" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "ca526c8a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[{'timestamp_arrival': 1777303258987,\n", + " 'timestamp_msg': 1777303258979,\n", + " 'timestamp_transaction': 1777303258950,\n", + " 'event_reason_type': 'ORDER',\n", + " 'symbol': 'ETHUSDT',\n", + " 'position_amount': 0.226,\n", + " 'entry_price': 2284.28,\n", + " 'accumulated_realized_pre_fees': 8.24392002,\n", + " 'unrealized_pnl': 0.0,\n", + " 'margin_type': 'cross',\n", + " 'isolated_wallet': 0.0,\n", + " 'position_side': 'BOTH'}]" + ] + }, + "execution_count": 32, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pos" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "f788b6df", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Timestamp('2026-04-27 15:20:58.987000')" + ] + }, + "execution_count": 33, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd\n", + "pd.to_datetime(1777303258987, unit='ms')" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "855f980b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Timestamp('2026-04-27 15:20:58.979000')" + ] + }, + "execution_count": 34, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd\n", + "pd.to_datetime(1777303258979, unit='ms')" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Timestamp('2026-04-27 15:20:58.950000')" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd\n", + "pd.to_datetime(1777303258950, unit='ms')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [] }, { @@ -155,7 +239,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.12" + "version": "3.13.13" } }, "nbformat": 4, diff --git a/algo/Dockerfile b/algo/Dockerfile index b758fb8..ed37646 100644 --- a/algo/Dockerfile +++ b/algo/Dockerfile @@ -15,5 +15,5 @@ RUN pip install --no-cache-dir -r requirements.txt COPY . . # Finally, run gunicorn. -CMD [ "python", "-u" ,"main.py"] +CMD [ "python", "main.py"] # CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"] \ No newline at end of file diff --git a/algo_config.json b/algo_config.json index e2a9366..e712764 100644 --- a/algo_config.json +++ b/algo_config.json @@ -1,13 +1,14 @@ { - "Config_Updated_Timestamp": 1777098091913, + "Config_Updated_Timestamp": 1777312620005, "Allow_Ordering_Aster": true, "Allow_Ordering_Extend": true, - "Loop_Sleep_Sec": 0.00, - "Max_Target_Notional": 0.00, - "Min_Time_To_Funding_Minutes": 10, + "Loop_Sleep_Sec": 0.0, + "Max_Target_Notional": 0.0, + "Min_Time_To_Funding_Minutes": 7, "Price_Worsener_Aster": 0.0, "Price_Worsener_Extend": 0.0, "Target_Open_Cash_Position": 10, - "Print_Summary_Each_Loop" : false, + "Log_Summary_Each_Loop": false, + "Print_Summary_Each_Loop": false, "Flip_Side_For_Testing": false } \ No newline at end of file diff --git a/algo_orchestrator.py b/algo_orchestrator.py index ccd76e4..12969ec 100644 --- a/algo_orchestrator.py +++ b/algo_orchestrator.py @@ -42,14 +42,19 @@ async def orchestrator() -> None: timestamp = round(datetime.now().timestamp()*1000) data = json.loads(message['data']) # channel = message['channel'] - + + with open('/algo_local_drive/algo_config.json', 'r', encoding='utf-8') as f: + # ALGO_CONFIG = json.load(f, object_hook=lambda d: Algo_Config(**d)) + ALGO_CONFIG = json.load(f) + ALGO_CONFIG['Config_Updated_Timestamp'] = timestamp + for k, v in data.items(): if ALGO_CONFIG.get(k, None) is not None: ALGO_CONFIG[k] = v - ALGO_CONFIG['Config_Updated_Timestamp'] = timestamp VAL_KEY.set(VK_OUT, json.dumps(ALGO_CONFIG)) - with open('algo_config.json', 'w', encoding='utf-8') as f: + with open('/algo_local_drive/algo_config.json', 'w', encoding='utf-8') as f: + # print('SAVING FILE') json.dump(ALGO_CONFIG, f, indent=4) print(f"Algo Config Updated @ {timestamp}; {data}") @@ -70,7 +75,7 @@ async def main() -> None: VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) # engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') - with open('algo_config.json', 'r', encoding='utf-8') as f: + with open('/algo_local_drive/algo_config.json', 'r', encoding='utf-8') as f: # ALGO_CONFIG = json.load(f, object_hook=lambda d: Algo_Config(**d)) ALGO_CONFIG = json.load(f) ALGO_CONFIG['Config_Updated_Timestamp'] = round(datetime.now().timestamp()*1000) diff --git a/algo_orchestrator/Dockerfile b/algo_orchestrator/Dockerfile index bd579d7..7f00306 100644 --- a/algo_orchestrator/Dockerfile +++ b/algo_orchestrator/Dockerfile @@ -15,5 +15,5 @@ RUN pip install --no-cache-dir -r requirements.txt COPY . . # Finally, run gunicorn. -CMD [ "python", "-u" ,"algo_orchestrator.py"] +CMD [ "python", "algo_orchestrator.py"] # CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"] \ No newline at end of file diff --git a/docker-compose-algo.yml b/docker-compose-algo.yml new file mode 100644 index 0000000..ff0dc9b --- /dev/null +++ b/docker-compose-algo.yml @@ -0,0 +1,15 @@ +# tail -f Fund_Rate_Algo.log +# docker compose -f docker-compose-algo.yml up --build + +services: + algo: + container_name: algo + restart: "no" + build: + context: ./ + dockerfile: ./algo/Dockerfile + volumes: + - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + - ./:/algo_local_drive:rw # Read-write access to data + network_mode: "host" diff --git a/docker-compose.yml b/docker-compose.yml index b6c42fb..cfa8096 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,23 +1,24 @@ -# tail -f Fund_Rate_Algo.log Fund_Rate_Aster_User.log Fund_Rate_Aster.log Fund_Rate_Extended_FR.log Fund_Rate_Extended_OB.log Fund_Rate_Extended_User.log +# tail -f Fund_Rate_Algo.log Fund_Rate_Aster_User.log Fund_Rate_Aster.log Fund_Rate_Extended_FR.log Fund_Rate_Extended_OB.log Fund_Rate_Extended_Trades.log Fund_Rate_Extended_User.log services: - algo: - container_name: algo - restart: "no" - build: - context: ./ - dockerfile: ./algo/Dockerfile - depends_on: - - algo_orchestrator - - ws_aster - - ws_aster_user - - ws_extended_fund_rate - - ws_extended_orderbook - - ws_extended_user - volumes: - - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data - - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data - network_mode: "host" + # algo: + # container_name: algo + # restart: "no" + # build: + # context: ./ + # dockerfile: ./algo/Dockerfile + # depends_on: + # - algo_orchestrator + # - ws_aster + # - ws_aster_user + # - ws_extended_fund_rate + # - ws_extended_orderbook + # - ws_extended_trades + # - ws_extended_user + # volumes: + # - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + # - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + # network_mode: "host" algo_orchestrator: container_name: algo_orchestrator restart: "unless-stopped" @@ -27,6 +28,7 @@ services: volumes: - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + - ./:/algo_local_drive:rw # Read-write access to data network_mode: "host" ws_aster: container_name: ws_aster @@ -68,6 +70,16 @@ services: - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to dataw - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data network_mode: "host" + ws_extended_trades: + container_name: ws_extended_trades + restart: "unless-stopped" + build: + context: ./ + dockerfile: ./ws_extended_trades/Dockerfile + volumes: + - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to dataw + - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + network_mode: "host" ws_extended_user: container_name: ws_extended_user restart: "unless-stopped" diff --git a/engine_health.py b/engine_health.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py index 82b09dc..6ab8d70 100644 --- a/main.py +++ b/main.py @@ -38,6 +38,7 @@ LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo.log' ### Algo Config ### ALGO_CONFIG: structs.Algo_Config = None +MIN_TIME_TO_FUNDING: int ### CONSTANTS ### ASTER = structs.Perpetual_Exchange( @@ -64,6 +65,12 @@ EXTEND_AVAIL_COLLATERAL = 0 ASTER_NOTIONAL_POSITION = 0 EXTEND_NOTIONAL_POSITION = 0 +ASTER_NOTIONAL_OBJ: dict | None = None +EXTEND_NOTIONAL_OBJ: dict | None = None + +ASTER_UNREALIZED_PNL = 0 +EXTEND_UNREALIZED_PNL = 0 + ASTER_OPEN_ORDERS = [] EXTEND_OPEN_ORDERS = [] @@ -111,9 +118,13 @@ async def get_aster_collateral(): ASTER_AVAIL_COLLATERAL = float([d for d in r if d.get('asset')==ASTER.rh_asset][0].get('availableBalance')) async def get_aster_notional_position(resp: dict | None = None): + global ASTER_NOTIONAL_OBJ global ASTER_NOTIONAL_POSITION + global ASTER_UNREALIZED_PNL global ASTER_MULT + previous_notional_obj = ASTER_NOTIONAL_OBJ + if not resp: fut_acct_positionRisk = { "url": "/fapi/v3/positionRisk", @@ -123,29 +134,38 @@ async def get_aster_notional_position(resp: dict | None = None): } } resp = await aster_auth.post_authenticated_url(fut_acct_positionRisk) + d = [x for x in resp if x.get('symbol', None) == ASTER.symbol][0] + d['timestamp_arrival'] = round(datetime.now().timestamp()*1000) + else: + d = [x for x in resp if x.get('symbol', None) == ASTER.symbol][0] + + if previous_notional_obj is not None: + if previous_notional_obj['timestamp_arrival'] > d['timestamp_arrival']: + # logging.info(f'ASTER NOTIONAL: prev timestamp ({pd.to_datetime(previous_notional_obj['timestamp_arrival'], unit='ms')}) > new timestamp ({pd.to_datetime(d['timestamp_arrival'], unit='ms')}); skipping') + return - d = [x for x in resp if x.get('symbol', None) == ASTER.symbol][0] + ASTER_NOTIONAL_OBJ = d if len(d) < 1: logging.info(f'BAD NOTIONAL - ASTER CHANGE: Empty d: {d}; resp: {resp}') await kill_algo() - aster_unrealized_pnl = float(d['unrealized_pnl']) if d.get('unrealized_pnl') is not None else float(d['unRealizedProfit']) + ASTER_UNREALIZED_PNL = float(d['unrealized_pnl']) if d.get('unrealized_pnl') is not None else float(d['unRealizedProfit']) if d.get('notional') is not None: - notional = float(d['notional']) + ASTER_NOTIONAL_POSITION = float(d['notional']) - ASTER_UNREALIZED_PNL else: - notional = float(d['position_amount'])*float(d['entry_price']) + ASTER_NOTIONAL_POSITION = float(d['position_amount'])*float(d['entry_price']) previous_notional_position = ASTER_NOTIONAL_POSITION - ASTER_NOTIONAL_POSITION = notional - aster_unrealized_pnl - if not resp: - ASTER_MULT = float(d['leverage']) - if abs(ASTER_NOTIONAL_POSITION) > ALGO_CONFIG.Max_Target_Notional*1.01: - logging.info(f'BAD NOTIONAL - ASTER CHANGE: {ASTER_NOTIONAL_POSITION}; UR PNL: {aster_unrealized_pnl}; MULT: {ASTER_MULT}; d: {d}; resp: {resp}') + # if not resp: # this can never evaluate + # ASTER_MULT = float(d['leverage']) + # if abs(ASTER_NOTIONAL_POSITION) > ALGO_CONFIG.Max_Target_Notional*1.01: + if abs(ASTER_NOTIONAL_POSITION) > ALGO_CONFIG.Max_Target_Notional*2.01: + logging.info(f'BAD NOTIONAL - ASTER CHANGE: {previous_notional_position} -> {ASTER_NOTIONAL_POSITION}; UR PNL: {ASTER_UNREALIZED_PNL}; MULT: {ASTER_MULT}; d: {d}; resp: {resp}') await kill_algo() - if ASTER_NOTIONAL_POSITION != previous_notional_position: - logging.info(f'ASTER NOTIONAL CHANGE: {previous_notional_position} -> {ASTER_NOTIONAL_POSITION:.2f}; UR PNL: {aster_unrealized_pnl:.2f}; MULT: {ASTER_MULT:.0f}; resp: {bool(resp)}') + # if ASTER_NOTIONAL_POSITION != previous_notional_position: + logging.info(f'ASTER NOTIONAL CHANGE: {previous_notional_position} -> {ASTER_NOTIONAL_POSITION:.2f}; UR PNL: {ASTER_UNREALIZED_PNL:.2f}; MULT: {ASTER_MULT:.0f}; resp: {bool(resp)}') async def get_extend_collateral(): global EXTEND_AVAIL_COLLATERAL @@ -155,6 +175,7 @@ async def get_extend_collateral(): async def get_extend_notional(resp: dict | None = None): global EXTEND_NOTIONAL_POSITION + global EXTEND_UNREALIZED_PNL global EXTEND_MULT if not resp: @@ -166,7 +187,7 @@ async def get_extend_notional(resp: dict | None = None): logging.info('get_extend_notional - No Positions') else: pos_dict = pos_dict[0] - unrealized_pnl = pos_dict.get('unrealised_pnl', 0) + EXTEND_UNREALIZED_PNL = pos_dict.get('unrealised_pnl', 0) previous_notional_position = EXTEND_NOTIONAL_POSITION position_side = pos_dict['side'] # LONG or SHORT notional_pos_abs = abs(float(pos_dict['value'])) @@ -177,10 +198,13 @@ async def get_extend_notional(resp: dict | None = None): else: logging.info(f'EXTEND BAD SIDE ON POSITION UPDATE: {pos_dict}') - EXTEND_NOTIONAL_POSITION = notional_pos_sided - float(unrealized_pnl) + EXTEND_NOTIONAL_POSITION = notional_pos_sided - float(EXTEND_UNREALIZED_PNL) EXTEND_MULT = pos_dict.get('leverage', EXTEND_MULT) + if abs(EXTEND_NOTIONAL_POSITION) > ALGO_CONFIG.Max_Target_Notional*2.01: + logging.info(f'BAD NOTIONAL - EXTEND CHANGE: {previous_notional_position} -> {EXTEND_NOTIONAL_POSITION}; UR PNL: {EXTEND_UNREALIZED_PNL}; MULT: {EXTEND_MULT}; d: {pos_dict}; resp: {resp}') + await kill_algo() if EXTEND_NOTIONAL_POSITION != previous_notional_position: - logging.info(f'EXTEND NOTIONAL CHANGE: {previous_notional_position} -> {EXTEND_NOTIONAL_POSITION:.2f}; UR PNL: {unrealized_pnl:.2f}; MULT: {EXTEND_MULT:.0f}; resp: {bool(resp)}') + logging.info(f'EXTEND NOTIONAL CHANGE: {previous_notional_position} -> {EXTEND_NOTIONAL_POSITION:.2f}; UR PNL: {EXTEND_UNREALIZED_PNL:.2f}; MULT: {EXTEND_MULT:.0f}; resp: {bool(resp)}') ### EXCHANGE INFO ### async def get_aster_exch_info(): @@ -226,9 +250,11 @@ async def kill_algo(): logging.info('ALGO KILL FLAG ACTIVATED; CANCELLING OPEN ORDERS AND SHUTTING DOWN') raise ValueError('KILL FLAG ACTIVATED') + ### ALGO LOOP ### async def run_algo(): global ALGO_CONFIG + global MIN_TIME_TO_FUNDING global ASTER_OPEN_ORDERS global EXTEND_OPEN_ORDERS @@ -288,13 +314,15 @@ async def run_algo(): now_ms = round(datetime.now().timestamp()*1000) time_to_funding_ms = min([ASTER_FUND_RATE_TIME, EXTEND_FUND_RATE_TIME]) - now_ms if ( time_to_funding_ms > MIN_TIME_TO_FUNDING ) and (not ASTER_OPEN_ORDERS) and (not EXTEND_OPEN_ORDERS): - print(f'Outside action window (minutes) and no active order (sleeping for 5 sec): {pd.to_datetime(time_to_funding_ms, unit='ms').minute} > {pd.to_datetime(MIN_TIME_TO_FUNDING, unit='ms').minute}') + logging.info(f'Outside action window (minutes) and no active order (sleeping for 5 sec): {pd.to_datetime(time_to_funding_ms, unit='ms').minute} > {pd.to_datetime(MIN_TIME_TO_FUNDING, unit='ms').minute}') time.sleep(5) continue if len(ASTER_WS_POS_UPDATES) > 0: + # await get_aster_notional_position() await get_aster_notional_position(resp=ASTER_WS_POS_UPDATES) ###### *** returned 0 notional even though had a position, need to handle and safety check to not order above max notional. + ##### NEED TO UPDATE SO IT TAKES THE LATEST MSG, ie drop the WS msg if its older than the exisiting one from the API. if len(EXTEND_WS_POS_UPDATES) > 0: await get_extend_notional(resp=EXTEND_WS_POS_UPDATES) @@ -304,7 +332,7 @@ async def run_algo(): if ASTER_WS_ORDER_UPDATES is not None: for idx, o in enumerate(ASTER_OPEN_ORDERS): order_id = o.get('order_id') if o.get('order_id') is not None else o['orderId'] - order_orig_status = o['status'] ### Got a keyerror on this + order_orig_status = o.get('status') if o.get('status') is not None else o['order_status'] order_update = [ou for ou in ASTER_WS_ORDER_UPDATES if ou.get('order_id', None) == order_id] if len(order_update) > 0: @@ -411,8 +439,8 @@ async def run_algo(): EXTEND_TOB_PX = float(EXTENDED_TICKER_DICT['best_bid_px']) - ASTER_TGT_TAIL = ASTER_TGT_NOTIONAL - ASTER_NOTIONAL_POSITION - EXTEND_TGT_TAIL = EXTEND_TGT_NOTIONAL - EXTEND_NOTIONAL_POSITION + ASTER_TGT_TAIL = ASTER_TGT_NOTIONAL - ( float(ASTER_NOTIONAL_POSITION) + float(ASTER_UNREALIZED_PNL) ) + EXTEND_TGT_TAIL = EXTEND_TGT_NOTIONAL - ( float(EXTEND_NOTIONAL_POSITION) + float(EXTEND_UNREALIZED_PNL) ) ASTER_TGT_TAIL_BASE_QTY = Decimal(str(float(ASTER_TGT_TAIL) / float(ASTER_TOB_PX))).quantize(Decimal(str(0.001)), rounding=ROUND_DOWN) EXTEND_TGT_TAIL_BASE_QTY = Decimal(str(float(EXTEND_TGT_TAIL) / float(EXTEND_TOB_PX))).quantize(Decimal(str(0.001)), rounding=ROUND_DOWN) @@ -426,9 +454,10 @@ async def run_algo(): OUT: print | logging.info = logging.info if use_logging else print OUT(f''' + LOOP SLEEP (SEC): {ALGO_CONFIG.Loop_Sleep_Sec} FLIP SIDES FOR TESTING?: {ALGO_CONFIG.Flip_Side_For_Testing} {pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')-datetime.now()):}) | {pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')-datetime.now()):}) - ASTER: {ASTER_FUND_RATE:.6%} [{ASTER_FUND_RATE*10_000:.2f}bps] [{ASTER_FUND_RATE*1_000_000:.0f}pips] | EXTEND: {EXTEND_FUND_RATE:.6%} [{EXTEND_FUND_RATE*10_000:.2f}bps] [{EXTEND_FUND_RATE*1_000_000:.0f}pips] + ASTER: {ASTER_FUND_RATE:.6%} [{ASTER_FUND_RATE*10_000:.2f}bps] [{ASTER_FUND_RATE*1_000_000:.0f}pips] | EXTEND: {EXTEND_FUND_RATE:.6%} [{EXTEND_FUND_RATE*10_000:.2f}bps] [{EXTEND_FUND_RATE*1_000_000:.0f}pips] ASTER: {'LONG PAYS SHORT' if ASTER_FUND_RATE > 0 else 'SHORT PAYS LONG'} | EXTEND: {'LONG PAYS SHORT' if EXTEND_FUND_RATE > 0 else 'SHORT PAYS LONG'} ASTER: [ Available Collateral: {ASTER_AVAIL_COLLATERAL:.4f} ] | EXTEND: [ Available Collateral: {EXTEND_AVAIL_COLLATERAL:.4f} ] ASTER: [ Notional Position $ : {ASTER_NOTIONAL_POSITION:.4f} ] | EXTEND: [ Notional Position $ : {EXTEND_NOTIONAL_POSITION:.4f} ] @@ -440,7 +469,7 @@ async def run_algo(): TGT NOTIONAL: $ {ALGO_CONFIG.Max_Target_Notional if not Flags.NET_FUNDING_IS_ZERO else 0.00} ASTER: {ASTER_NOTIONAL_POSITION:.4f} -> {ASTER_TGT_NOTIONAL:.2f} [ Remain: {ASTER_TGT_TAIL:.4f} ] | EXTEND: {EXTEND_NOTIONAL_POSITION:.4f} -> {EXTEND_TGT_NOTIONAL:.2f} [ Remain: {EXTEND_TGT_TAIL:.4f} ] - ASTER: {ASTER_TGT_NOTIONAL:.4f} - {ASTER_NOTIONAL_POSITION:.4f} = Tail: {ASTER_TGT_TAIL:4f} | EXTEND: {EXTEND_TGT_NOTIONAL:.4f} - {EXTEND_NOTIONAL_POSITION:.4f} = Tail: {EXTEND_TGT_TAIL:4f} + ASTER: {ASTER_TGT_NOTIONAL:.2f} - {ASTER_NOTIONAL_POSITION:.2f} + {ASTER_UNREALIZED_PNL:.2f} = {ASTER_TGT_TAIL:2f} | EXTEND: {EXTEND_TGT_NOTIONAL:.2f} - {EXTEND_NOTIONAL_POSITION:.2f} + {EXTEND_UNREALIZED_PNL:.2f} = {EXTEND_TGT_TAIL:2f} ASTER: {ASTER_TGT_TAIL_BASE_QTY:.4f} > {MAX_MIN_ORDER_QTY:.4f} min [ Order: {ASTER_TGT_TAIL_ORDERABLE} ] | EXTEND: {EXTEND_TGT_TAIL_BASE_QTY:.4f} > {MAX_MIN_ORDER_QTY:.4f} min [ Order: {EXTEND_TGT_TAIL_ORDERABLE} ] --- ASTER OPEN ORDERS --- @@ -449,8 +478,10 @@ async def run_algo(): --- EXTEND OPEN ORDERS --- {EXTEND_OPEN_ORDERS} ''') + if ALGO_CONFIG.Log_Summary_Each_Loop: + print_summary(use_logging=True) if ALGO_CONFIG.Print_Summary_Each_Loop: - print_summary() + print_summary(use_logging=False) # print_summary() ### ROUTES ### @@ -510,6 +541,7 @@ async def run_algo(): order_resp = await aster_auth.post_authenticated_url(post_order) if order_resp.get('orderId', None) is not None: order_resp['original_price'] = price + order_resp['order_status'] = order_resp['status'] ASTER_OPEN_ORDERS.append(order_resp) utils.send_tg_alert(f'FR_ALGO - ASTER Order. Start_$: {ASTER_NOTIONAL_POSITION:.2f}; Value: {float(ASTER_TGT_TAIL_BASE_QTY)*float(price):.2f}; Price: {float(price):.2f}') logging.info(f'ASTER ORDER PLACED SUCCESS: {order_resp}') @@ -594,7 +626,7 @@ async def run_algo(): logging.info('EXTEND HAS NO TAIL BUT OPEN ORDERS - CANCELLING OPEN ORDERS') await extend_cancel_all_orders() - print(f'__________ End ___________ (Algo Engine ms: {(time.time() - loop_start)*1000})') + # print(f'__________ End ___________ (Algo Engine ms: {(time.time() - loop_start)*1000}); Sleeping for sec: {ALGO_CONFIG.Loop_Sleep_Sec}') time.sleep(ALGO_CONFIG.Loop_Sleep_Sec) diff --git a/modules/aster_db.py b/modules/aster_db.py index 93c0b04..237ec0d 100644 --- a/modules/aster_db.py +++ b/modules/aster_db.py @@ -139,9 +139,30 @@ async def create_fr_aster_user_account_pos( else: raise ValueError('Only MySQL engine is implemented') - - - - - - +### Mkt Trades Table #### +async def create_fr_aster_mkt_trades( + CON: AsyncContextManager, + engine: str = 'mysql', # mysql | duckdb + ) -> None: + if CON is None: + logging.info("NO DB CONNECTION, SKIPPING Create Statements") + else: + if engine == 'mysql': + logging.info('Creating Table if Does Not Exist: fr_aster_mkt_trades') + await CON.execute(text(""" + CREATE TABLE IF NOT EXISTS fr_aster_mkt_trades ( + timestamp_arrival BIGINT, + timestamp_msg BIGINT, + timestamp_trade BIGINT, + symbol VARCHAR(20), + aggregate_trade_id VARCHAR(100), + price DOUBLE, + qty DOUBLE, + first_trade_id VARCHAR(100), + last_trade_id VARCHAR(100), + is_buyer_mkt_maker BOOL + ); + """)) + await CON.commit() + else: + raise ValueError('Only MySQL engine is implemented') diff --git a/modules/extended_db.py b/modules/extended_db.py index fd5f67d..2697270 100644 --- a/modules/extended_db.py +++ b/modules/extended_db.py @@ -150,3 +150,32 @@ async def create_fr_extended_user_position( else: raise ValueError('Only MySQL engine is implemented') +### Market Trades Table #### +async def create_fr_extended_mkt_trades( + CON: AsyncContextManager, + engine: str = 'mysql', # mysql | duckdb + ) -> None: + if CON is None: + logging.info("NO DB CONNECTION, SKIPPING Create Statements") + else: + if engine == 'mysql': + logging.info('Creating Table if Does Not Exist: fr_extended_mkt_trades') + await CON.execute(text(""" + CREATE TABLE IF NOT EXISTS fr_extended_mkt_trades ( + sequence_id INT, + timestamp_arrival BIGINT, + timestamp_msg BIGINT, + timestamp_trade BIGINT, + symbol VARCHAR(20), + side_taker VARCHAR(20), + trade_type VARCHAR(20), + price DOUBLE, + qty DOUBLE, + trade_id VARCHAR(100), + is_buyer_mkt_maker BOOL + ); + """)) + await CON.commit() + else: + raise ValueError('Only MySQL engine is implemented') + diff --git a/modules/structs.py b/modules/structs.py index 0d55047..2e3092e 100644 --- a/modules/structs.py +++ b/modules/structs.py @@ -17,6 +17,7 @@ class Algo_Config: Price_Worsener_Extend: float Target_Open_Cash_Position: int + Log_Summary_Each_Loop: bool = False Print_Summary_Each_Loop: bool = False Flip_Side_For_Testing: bool = False diff --git a/order_engine.ipynb b/order_engine.ipynb new file mode 100644 index 0000000..17c97b0 --- /dev/null +++ b/order_engine.ipynb @@ -0,0 +1,272 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 32, + "id": "68966247", + "metadata": {}, + "outputs": [], + "source": [ + "import requests\n", + "import pandas as pd\n", + "import numpy as np\n", + "import json\n", + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "02cd5305", + "metadata": {}, + "outputs": [], + "source": [ + "### Extended Trades History ###\n", + "candleType = 'trades'\n", + "market = 'ETH-USD'\n", + "params = {\n", + " 'interval': \"1m\",\n", + " 'limit': 100,\n", + "}\n", + "r = requests.get(f'https://api.starknet.extended.exchange/api/v1/info/candles/{market}/{candleType}', params=params)" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "5603b04d", + "metadata": {}, + "outputs": [], + "source": [ + "### Aster Trades History ###\n", + "params = {\n", + " 'symbol': \"ETHUSDT\",\n", + " 'limit': 1000,\n", + "}\n", + "r = requests.get('https://fapi.asterdex.com/fapi/v3/trades', params=params)" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "a3ad1819", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idpriceqtyquoteQtytimeisBuyerMaker
0745065472311.020.044101.682026-04-27 14:22:45.650True
1745065482311.000.0049.242026-04-27 14:22:45.650True
2745065492310.910.0036.932026-04-27 14:22:45.650True
3745065502310.900.0049.242026-04-27 14:22:45.650True
4745065512310.800.0049.242026-04-27 14:22:45.700True
.....................
995745075422312.100.0049.242026-04-27 14:34:12.500True
996745075432312.182.4425646.342026-04-27 14:34:13.443True
997745075442312.2410.09923351.312026-04-27 14:34:13.600True
998745075452312.133.1207213.842026-04-27 14:34:14.568True
999745075462312.196.22814400.312026-04-27 14:34:15.988True
\n", + "

1000 rows × 6 columns

\n", + "
" + ], + "text/plain": [ + " id price qty quoteQty time isBuyerMaker\n", + "0 74506547 2311.02 0.044 101.68 2026-04-27 14:22:45.650 True\n", + "1 74506548 2311.00 0.004 9.24 2026-04-27 14:22:45.650 True\n", + "2 74506549 2310.91 0.003 6.93 2026-04-27 14:22:45.650 True\n", + "3 74506550 2310.90 0.004 9.24 2026-04-27 14:22:45.650 True\n", + "4 74506551 2310.80 0.004 9.24 2026-04-27 14:22:45.700 True\n", + ".. ... ... ... ... ... ...\n", + "995 74507542 2312.10 0.004 9.24 2026-04-27 14:34:12.500 True\n", + "996 74507543 2312.18 2.442 5646.34 2026-04-27 14:34:13.443 True\n", + "997 74507544 2312.24 10.099 23351.31 2026-04-27 14:34:13.600 True\n", + "998 74507545 2312.13 3.120 7213.84 2026-04-27 14:34:14.568 True\n", + "999 74507546 2312.19 6.228 14400.31 2026-04-27 14:34:15.988 True\n", + "\n", + "[1000 rows x 6 columns]" + ] + }, + "execution_count": 34, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "l = json.loads(r.text)\n", + "df = pd.DataFrame(l)\n", + "df['time'] = pd.to_datetime(df['time'], unit='ms')\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3c908942", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "60f4608a", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "76624896", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5ade3c15", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py_313", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ws_aster.py b/ws_aster.py index 47514d6..6ce296e 100644 --- a/ws_aster.py +++ b/ws_aster.py @@ -15,7 +15,8 @@ from sqlalchemy.ext.asyncio import create_async_engine import valkey import os from dotenv import load_dotenv - +import modules.db as db +import modules.aster_db as aster_db ### Allow only ipv4 ### def allowed_gai_family(): @@ -23,10 +24,11 @@ def allowed_gai_family(): urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### -USE_DB: bool = False +USE_DB: bool = True USE_VK: bool = True VK_FUND_RATE = 'fund_rate_aster' VK_TICKER = 'fut_ticker_aster' +VK_LAST_TRADE = 'fut_last_trade_aster' CON: AsyncContextManager | None = None VAL_KEY = None @@ -38,80 +40,10 @@ LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Aster.log' SYMBOL: str = 'ETHUSDT' STREAM_MARKPRICE: str = f'{SYMBOL.lower()}@markPrice@1s' STREAM_BOOKTICKER: str = f'{SYMBOL.lower()}@bookTicker' +STREAM_TRADES: str = f'{SYMBOL.lower()}@aggTrade' ### Globals ### -WSS_URL = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}/{STREAM_BOOKTICKER}" -# WSS_URL = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}" - -# HIST_TRADES = np.empty((0, 3)) -# HIST_TRADES_LOOKBACK_SEC = 6 - -# ### Database Funcs ### -# async def create_rtds_btcusd_table( -# CON: AsyncContextManager, -# engine: str = 'mysql', # mysql | duckdb -# ) -> None: -# if CON is None: -# logging.info("NO DB CONNECTION, SKIPPING Create Statements") -# else: -# if engine == 'mysql': -# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') -# await CON.execute(text(""" -# CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( -# timestamp_arrival BIGINT, -# timestamp_msg BIGINT, -# timestamp_value BIGINT, -# value DOUBLE, -# qty DOUBLE -# ); -# """)) -# await CON.commit() -# else: -# raise ValueError('Only MySQL engine is implemented') - -# async def insert_rtds_btcusd_table( -# timestamp_arrival: int, -# timestamp_msg: int, -# timestamp_value: int, -# value: float, -# qty: float, -# CON: AsyncContextManager, -# engine: str = 'mysql', # mysql | duckdb -# ) -> None: -# params={ -# 'timestamp_arrival': timestamp_arrival, -# 'timestamp_msg': timestamp_msg, -# 'timestamp_value': timestamp_value, -# 'value': value, -# 'qty': qty, -# } -# if CON is None: -# logging.info("NO DB CONNECTION, SKIPPING Insert Statements") -# else: -# if engine == 'mysql': -# await CON.execute(text(""" -# INSERT INTO binance_btcusd_trades -# ( -# timestamp_arrival, -# timestamp_msg, -# timestamp_value, -# value, -# qty -# ) -# VALUES -# ( -# :timestamp_arrival, -# :timestamp_msg, -# :timestamp_value, -# :value, -# :qty -# ) -# """), -# parameters=params -# ) -# await CON.commit() -# else: -# raise ValueError('Only MySQL engine is implemented') +WSS_URL = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}/{STREAM_BOOKTICKER}/{STREAM_TRADES}" ### Websocket ### async def ws_stream(): @@ -156,6 +88,24 @@ async def ws_stream(): }) VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) continue + case c if c == STREAM_TRADES: + # print(f'MKT_TRADE: {data}') + trade_obj = { + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['data']['E'], + 'timestamp_trade': data['data']['T'], + 'symbol': data['data']['s'], + 'aggregate_trade_id': data['data']['a'], + 'price': float(data['data']['p']), + 'qty': float(data['data']['q']), + 'first_trade_id': data['data']['f'], + 'last_trade_id': data['data']['l'], + 'is_buyer_mkt_maker': bool(data['data']['m']), + } + # VAL_KEY.set(VK_LAST_TRADE, json.dumps(trade_obj)) + if USE_DB: + await db.insert_df_to_mysql(table_name='fr_aster_mkt_trades', params=trade_obj, CON=CON) + continue case _: logging.warning(f'UNMATCHED OTHER MSG: {data}') else: @@ -188,7 +138,7 @@ async def main(): if USE_DB: engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: - # await create_rtds_btcusd_table(CON=CON) + await aster_db.create_fr_aster_mkt_trades(CON=CON) await ws_stream() else: CON = None diff --git a/ws_extended_trades.py b/ws_extended_trades.py new file mode 100644 index 0000000..d0d103b --- /dev/null +++ b/ws_extended_trades.py @@ -0,0 +1,134 @@ +import asyncio +import json +import logging +import os +import socket +import traceback +from datetime import datetime +from typing import AsyncContextManager + +import numpy as np +import pandas as pd +import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore +import valkey +import websockets +from dotenv import load_dotenv +from sqlalchemy.ext.asyncio import create_async_engine + +import modules.db as db +import modules.extended_db as extended_db + + +### Allow only ipv4 ### +def allowed_gai_family(): + return socket.AF_INET +urllib3_cn.allowed_gai_family = allowed_gai_family + +### Database ### +USE_DB: bool = True +USE_VK: bool = True + +VK_LAST_TRADE = 'fut_last_trade_extended' +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Extended_Trades.log' + +### CONSTANTS ### +WS_SYMBOL: str = 'ETH-USD' + +### Globals ### +WSS_URL = f"wss://api.starknet.extended.exchange/stream.extended.exchange/v1/publicTrades/{WS_SYMBOL}" + +### Websocket ### +async def ws_stream(): + async for websocket in websockets.connect(WSS_URL): + logging.info(f"Connected to {WSS_URL}") + try: + async for message in websocket: + ts_arrival = round(datetime.now().timestamp()*1000) + if isinstance(message, str): + try: + data = json.loads(message) + if data.get('data', None) is not None: + if data['seq'] == 1: # Skip first msg that has historical trades + continue + list_for_df = [] + for t in data['data']: + trade_obj = { + 'sequence_id': data['seq'], + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['ts'], + 'timestamp_trade': t['T'], + 'symbol': t['m'], + 'side_taker': t['S'], + 'trade_type': t['tT'], + 'price': float(t['p']), + 'qty': float(t['q']), + 'trade_id': t['i'], + 'is_buyer_mkt_maker': True if t['S']=='SELL' else False, + } + list_for_df.append(trade_obj) + # VAL_KEY.set(VK_LAST_TRADE, json.dumps(trade_obj)) + if USE_DB: + await db.insert_df_to_mysql(table_name='fr_extended_mkt_trades', params=list_for_df, CON=CON) + pass + continue + else: + logging.info(f'Initial or unexpected data struct, skipping: {data}') + continue + except (json.JSONDecodeError, ValueError): + logging.warning(f'Message not in JSON format, skipping: {message}') + continue + else: + raise ValueError(f'Type: {type(data)} not expected: {message}') + except websockets.ConnectionClosed as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + continue + except Exception as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + + +async def main(): + global VAL_KEY + global CON + + if USE_VK: + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) + else: + VAL_KEY = None + logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") + + if USE_DB: + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') + async with engine.connect() as CON: + await extended_db.create_fr_extended_mkt_trades(CON=CON) + await ws_stream() + else: + CON = None + logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") + await ws_stream() + + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Stream stopped") \ No newline at end of file diff --git a/ws_extended_trades/Dockerfile b/ws_extended_trades/Dockerfile new file mode 100644 index 0000000..56445e6 --- /dev/null +++ b/ws_extended_trades/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.13-slim + +RUN apt-get update && \ + apt-get install -y build-essential + +RUN gcc --version +RUN rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Finally, run gunicorn. +CMD [ "python", "ws_extended_trades.py"] +# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"] \ No newline at end of file diff --git a/ws_extended_user.py b/ws_extended_user.py index 2ce5a54..8050d7a 100644 --- a/ws_extended_user.py +++ b/ws_extended_user.py @@ -198,7 +198,6 @@ async def ws_stream(): VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_POSITIONS) VAL_KEY.set(VK_POSITIONS, VAL_KEY_OBJ) - await db.insert_df_to_mysql(table_name='fr_extended_user_position', params=list_for_df, CON=CON) continue case _: