Files
Funding_Rate/ws_aster_user.py
2026-04-23 03:11:52 +00:00

301 lines
17 KiB
Python

import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
import modules.aster_auth as aster_auth
import modules.aster_db as aster_db
import modules.db as db
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_ORDERS_TRADES = 'fr_aster_user_orders'
VK_MARGIN_CALLS = 'fr_aster_user_margin_calls'
VK_BALANCES = 'fr_aster_user_balances'
VK_POSITIONS = 'fr_aster_user_positions'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Aster_User.log'
### CONSTANTS ###
WSS_URL = "wss://fstream.asterdex.com/ws/"
LOCAL_RECENT_UPDATES_LOOKBACK_SEC = 30
### Globals ###
LISTEN_KEY: str | None = None
LISTEN_KEY_LAST_UPDATE_TS_S: int = 0
LISTEN_KEY_PUT_INTERVAL_SEC = 1800
LOCAL_RECENT_ORDERS: list = []
LOCAL_RECENT_MARGIN_CALLS: list = []
LOCAL_RECENT_BALANCES: list = []
LOCAL_RECENT_POSITIONS: list = []
def upsert_list_of_dicts_by_id(list_of_dicts, new_dict, id='id'):
for index, item in enumerate(list_of_dicts):
if item.get(id) == new_dict.get(id):
list_of_dicts[index] = new_dict
return list_of_dicts
list_of_dicts.append(new_dict)
return list_of_dicts
def get_new_listen_key() -> str:
global LISTEN_KEY_LAST_UPDATE_TS_S
listen_key_request = {
"url": "/fapi/v3/listenKey",
"method": "POST",
"params": {}
}
r = aster_auth.post_authenticated_url(listen_key_request)
listen_key = r.get('listenKey', None)
print(f'LISTEN KEY: {listen_key}')
if listen_key is not None:
LISTEN_KEY_LAST_UPDATE_TS_S = round(datetime.now().timestamp())
return listen_key
else:
raise ValueError(f'Listen Key is None; Failed to Update. response: {r}')
async def listen_key_interval():
global LISTEN_KEY
while True:
await asyncio.sleep(LISTEN_KEY_PUT_INTERVAL_SEC)
LISTEN_KEY = get_new_listen_key()
### Websocket ###
async def ws_stream():
global LISTEN_KEY
global LOCAL_RECENT_ORDERS
global LOCAL_RECENT_MARGIN_CALLS
global LOCAL_RECENT_BALANCES
global LOCAL_RECENT_POSITIONS
LISTEN_KEY = get_new_listen_key()
async for websocket in websockets.connect(WSS_URL+LISTEN_KEY):
logging.info(f"Connected to {WSS_URL}")
asyncio.create_task(listen_key_interval())
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
channel = data.get('e', None)
if channel is not None:
LOOKBACK_MIN_TS_MS = ts_arrival - (LOCAL_RECENT_UPDATES_LOOKBACK_SEC*1000)
match channel:
case 'ORDER_TRADE_UPDATE':
logging.info(f'ORDER_TRADE_UPDATE: {data}')
new_order_update = {
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['E'],
'timestamp_transaction': data['T'],
'symbol': data['o']["s"], # "BTCUSDT", // Symbol
'client_order_id': data['o']["c"], # "TEST", // Client Order Id
'side': data['o']["S"], # "SELL", // Side
'order_type': data['o']["o"], # "TRAILING_STOP_MARKET", // Order Type
'time_in_force': data['o']["f"], # "GTC", // Time in Force
'original_qty': float(data['o']["q"]), # "0.001", // Original Quantity
'original_price': float(data['o']["p"]), # "0", // Original Price
'avg_price': float(data['o']["ap"]), # :"0", // Average Price
'stop_price': float(data['o'].get("sp", 0)), # :"7103.04", // Stop Price. Please ignore with TRAILING_STOP_MARKET order
'execution_type': data['o']["x"], # "NEW", // Execution Type
'order_status': data['o']["X"], # "NEW", // Order Status
'order_id': data['o']["i"], # 8886774, // Order Id
'last_filled_qty': float(data['o']["l"]), # "0", // Order Last Filled Quantity
'filled_accumulated_qty': float(data['o']["z"]), # "0", // Order Filled Accumulated Quantity
'last_filled_price': float(data['o']["L"]), # "0", // Last Filled Price
'commission_asset': data['o'].get("N", None), # "USDT", // Commission Asset, will not push if no commission
'commission': float(data['o'].get("n",0)), # "0", // Commission, will not push if no commission
'order_trade_time_ts': data['o']["T"], # 1568879465651, // Order Trade Time
'trade_id': data['o']["t"], # 0, // Trade Id
'bid_notional': float(data['o']["b"]), # "0", // Bids Notional
'ask_notional': float(data['o']["a"]), # "9.91", // Ask Notional
'trade_is_maker': data['o']["m"], # false, // Is this trade the maker side?
'trade_is_reduce_only': data['o']["R"], # false, // Is this reduce only
'stop_px_working_type': data['o']["wt"], # :"CONTRACT_PRICE", // Stop Price Working Type
'original_order_type': data['o']["ot"], # :"TRAILING_STOP_MARKET", // Original Order Type
'position_side': data['o']["ps"], # :"LONG", // Position Side
'pushed_w_conditional_order': bool(data['o'].get("cp", False)), # :false, // If Close-All, pushed with conditional order
'activation_price': float(data['o'].get("AP", 0)), # :"7476.89", // Activation Price, only puhed with TRAILING_STOP_MARKET order
'callback_rate': float(data['o'].get("cr", 0)), # :"5.0", // Callback Rate, only puhed with TRAILING_STOP_MARKET order
'realized_profit': float(data['o']["rp"]), # :"0" // Realized Profit of the trade
}
LOCAL_RECENT_ORDERS = upsert_list_of_dicts_by_id(LOCAL_RECENT_ORDERS, new_order_update, id='client_order_id')
LOCAL_RECENT_ORDERS = [t for t in LOCAL_RECENT_ORDERS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_ORDERS)
VAL_KEY.set(VK_ORDERS_TRADES, VAL_KEY_OBJ)
await db.insert_df_to_mysql(table_name='fr_aster_user_order_trade', params=new_order_update, CON=CON)
continue
case 'MARGIN_CALL':
logging.info(f'MARGIN_CALL: {data}')
list_for_df = []
for p in list(data['p']):
margin_call_update = {
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['E'],
'cross_wallet_balance': float(data.get('cw', 0)),
'symbol': p["s"], # "ETHUSDT", // Symbol
'position_side': p["ps"], # :"LONG", // Position Side
'position_amount': float(p["pa"]), # :"1.327", // Position Amount
'margin_type': p["mt"], # :"CROSSED", // Margin Type
'isolated_wallet': float(p.get("iw", 0)), # :"0", // Isolated Wallet (if isolated position)
'mark_price': float(p["mp"]), # :"187.17127", // Mark Price
'unrealized_pnl': float(p["up"]), # :"-1.166074", // Unrealized PnL
'maint_margin_required': float(p["mm"]), # :"1.614445" // Maintenance Margin Required
}
list_for_df.append(margin_call_update)
LOCAL_RECENT_MARGIN_CALLS = upsert_list_of_dicts_by_id(LOCAL_RECENT_MARGIN_CALLS, margin_call_update, id='symbol')
LOCAL_RECENT_MARGIN_CALLS = [t for t in LOCAL_RECENT_MARGIN_CALLS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_MARGIN_CALLS)
VAL_KEY.set(VK_MARGIN_CALLS, VAL_KEY_OBJ)
await db.insert_df_to_mysql(table_name='fr_aster_user_margin', params=list_for_df, CON=CON)
continue
case 'ACCOUNT_UPDATE':
logging.info(f'ACCOUNT_UPDATE: {data}')
list_for_df_bal = []
list_for_df_pos = []
### Balance Updates ###
if len(list(data['a']['B'])) > 0:
for b in list(data['a']['B']):
balance_update = {
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['E'],
'timestamp_transaction': data['T'],
'event_reason_type': data['a']["m"],
'asset': b['a'],
'wallet_balance': float(b['wb']),
'cross_wallet_balance': float(b.get('cw', 0)),
'balance_change_excl_pnl_comms': float(b['bc']),
}
list_for_df_bal.append(balance_update)
LOCAL_RECENT_BALANCES = upsert_list_of_dicts_by_id(LOCAL_RECENT_BALANCES, balance_update, id='asset')
LOCAL_RECENT_BALANCES = [t for t in LOCAL_RECENT_BALANCES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY.set(VK_BALANCES, json.dumps(LOCAL_RECENT_BALANCES))
### Position Updates ###
if len(list(data['a']['P'])) > 0:
for p in list(data['a']['P']):
position_update = {
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['E'],
'timestamp_transaction': data['T'],
'event_reason_type': data['a']["m"],
'symbol': p['s'],
'position_amount': float(p['pa']),
'entry_price': float(p['ep']),
'accumulated_realized_pre_fees': float(p['cr']),
'unrealized_pnl': float(p['up']),
'margin_type': p['mt'],
'isolated_wallet': float(p.get('iw', 0)),
'position_side': p['ps'],
}
list_for_df_pos.append(position_update)
LOCAL_RECENT_POSITIONS = upsert_list_of_dicts_by_id(LOCAL_RECENT_POSITIONS, position_update, id='symbol')
LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY.set(VK_POSITIONS, json.dumps(LOCAL_RECENT_POSITIONS))
if balance_update:
await db.insert_df_to_mysql(table_name='fr_aster_user_account_bal', params=list_for_df_bal, CON=CON)
if position_update:
await db.insert_df_to_mysql(table_name='fr_aster_user_account_pos', params=list_for_df_bal, CON=CON)
continue
case 'listenKeyExpired':
raise('Listen Key Has Expired; Failed to Update Properly. Restarting.')
case _:
logging.warning(f'UNMATCHED OTHER MSG: {data}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
await aster_db.create_fr_aster_user_order_trade_table(CON=CON)
await aster_db.create_fr_aster_user_margin_table(CON=CON)
await aster_db.create_fr_aster_user_account_bal(CON=CON)
await aster_db.create_fr_aster_user_account_pos(CON=CON)
await ws_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await ws_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")