From 64266366dd599ed7be18ebd17d4529edc3edc955 Mon Sep 17 00:00:00 2001 From: stevekeyharvey Date: Wed, 22 Apr 2026 01:33:35 +0000 Subject: [PATCH] adding aster --- aster.ipynb | 0 ws_aster.py | 141 +++++++++++++++++++------------------------------ ws_extended.py | 0 3 files changed, 54 insertions(+), 87 deletions(-) create mode 100644 aster.ipynb create mode 100644 ws_extended.py diff --git a/aster.ipynb b/aster.ipynb new file mode 100644 index 0000000..e69de29 diff --git a/ws_aster.py b/ws_aster.py index 9a151a7..cc529bc 100644 --- a/ws_aster.py +++ b/ws_aster.py @@ -25,21 +25,23 @@ urllib3_cn.allowed_gai_family = allowed_gai_family ### Database ### USE_DB: bool = False USE_VK: bool = True -# VK_FUND_RATE = 'fund_rate_apex' -VK_TICKER = 'fut_ticker_apex' +VK_FUND_RATE = 'fund_rate_aster' +VK_TICKER = 'fut_ticker_aster' CON: AsyncContextManager | None = None VAL_KEY = None ### Logging ### load_dotenv() -LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Apex.log' +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Aster.log' ### CONSTANTS ### -PING_INTERVAL_SEC = 15 +SYMBOL: str = 'ETHUSDT' +STREAM_MARKPRICE: str = f'{SYMBOL.lower()}@markPrice@1s' +STREAM_BOOKTICKER: str = f'{SYMBOL.lower()}@bookTicker' ### Globals ### -WSS_URL = "wss://quote.omni.apex.exchange/realtime_public?v=2×tamp=" -TICKER_SNAPSHOT_DATA_LAST: dict = {} +WSS_URL = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}/{STREAM_BOOKTICKER}" +# WSS_URL = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}" # HIST_TRADES = np.empty((0, 3)) # HIST_TRADES_LOOKBACK_SEC = 6 @@ -112,93 +114,58 @@ TICKER_SNAPSHOT_DATA_LAST: dict = {} # raise ValueError('Only MySQL engine is implemented') ### Websocket ### -async def heartbeat(ws): - while True: - await asyncio.sleep(PING_INTERVAL_SEC) - logging.info("SENDING PING...") - ping_msg = {"op":"ping","args":[ str(round(datetime.now().timestamp()*1000)) ]} - await ws.send(json.dumps(ping_msg)) - async def ws_stream(): - global TICKER_SNAPSHOT_DATA_LAST - - async for websocket in websockets.connect(f'{WSS_URL}{round(datetime.now().timestamp())}'): + async for websocket in websockets.connect(WSS_URL): logging.info(f"Connected to {WSS_URL}") - asyncio.create_task(heartbeat(ws=websocket)) - - subscribe_msg = { - "op": "subscribe", - "args": ["instrumentInfo.H.ETHUSDT"] - } - - await websocket.send(json.dumps(subscribe_msg)) - try: async for message in websocket: ts_arrival = round(datetime.now().timestamp()*1000) - print(message) - # if isinstance(message, str): - # try: - # data = json.loads(message) - # if data.get('op', None) == 'ping': - # pong_msg = {"op":"pong","args":[ str(round(datetime.now().timestamp()*1000)) ]} - # logging.info(f'RECEIVED PING: {data}; SENDING PONG: {pong_msg}') - # await websocket.send(json.dumps(pong_msg)) - # continue - # elif data.get('success', None): - # # logging.info('CONNECTION SUCCESFUL RESP MSG') - # continue - - # msg_type = data.get('type', None) - # if msg_type is not None: - # match msg_type: - # case 'snapshot': - # TICKER_SNAPSHOT_DATA_LAST = data['data'] - - # nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000) - # VAL_KEY_OBJ = json.dumps({ - # 'timestamp_arrival': ts_arrival, - # 'timestamp_msg': data['ts'], - # 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'], - # 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']), - # 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']), - # 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']), - # 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']), - # 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']), - # 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']), - # 'nextFundingTime_ts_ms': nextFundingTime_ts, - # }) - # VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) - # continue - # case 'delta': - # TICKER_SNAPSHOT_DATA_LAST.update(data['data']) - - # nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000) - # VAL_KEY_OBJ = json.dumps({ - # 'timestamp_arrival': ts_arrival, - # 'timestamp_msg': data['ts'], - # 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'], - # 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']), - # 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']), - # 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']), - # 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']), - # 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']), - # 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']), - # 'nextFundingTime_ts_ms': nextFundingTime_ts, - # }) - # VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) - # continue - # case _: - # logging.warning(f'UNMATCHED OTHER MSG: {data}') - # else: - # logging.info(f'Initial or unexpected data struct, skipping: {data}') - # continue - # except (json.JSONDecodeError, ValueError): - # logging.warning(f'Message not in JSON format, skipping: {message}') - # continue - # else: - # raise ValueError(f'Type: {type(data)} not expected: {message}') + if isinstance(message, str): + try: + data = json.loads(message) + channel = data.get('stream', None) + if channel is not None: + match channel: + case c if c == STREAM_MARKPRICE: + print(f'MP: {data}') + VAL_KEY_OBJ = json.dumps({ + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['data']['E'], + 'symbol': data['data']['s'], + 'mark_price': data['data']['p'], + 'index_price': data['data']['i'], + 'estimated_settle_price': data['data']['P'], + 'funding_rate': data['data']['r'], + 'next_funding_time_ts_ms': data['data']['T'], + }) + VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ) + continue + case c if c == STREAM_BOOKTICKER: + # print(f'BT: {data}') + VAL_KEY_OBJ = json.dumps({ + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['data']['E'], + 'timestamp_transaction': data['data']['T'], + 'orderbook_update_id': data['data']['u'], + 'symbol': data['data']['s'], + 'best_bid_px': data['data']['b'], + 'best_bid_qty': data['data']['B'], + 'best_ask_px': data['data']['a'], + 'best_ask_qty': data['data']['A'], + }) + VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) + continue + case _: + logging.warning(f'UNMATCHED OTHER MSG: {data}') + else: + logging.info(f'Initial or unexpected data struct, skipping: {data}') + continue + except (json.JSONDecodeError, ValueError): + logging.warning(f'Message not in JSON format, skipping: {message}') + continue + else: + raise ValueError(f'Type: {type(data)} not expected: {message}') except websockets.ConnectionClosed as e: logging.error(f'Connection closed: {e}') logging.error(traceback.format_exc()) diff --git a/ws_extended.py b/ws_extended.py new file mode 100644 index 0000000..e69de29