Files
Funding_Rate/ws_aster.py

209 lines
8.9 KiB
Python

import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import time
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
import modules.db as db
import modules.aster_db as aster_db
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
CON: AsyncContextManager
VAL_KEY: valkey.Valkey
VK_FUND_RATE = 'fund_rate_aster'
VK_TICKER = 'fut_ticker_aster'
VK_LAST_TRADE = 'fut_last_trade_aster'
### Logging ###
load_dotenv()
LOG_FILEPATH: str = f'{os.getenv(key="LOGS_PATH")}/Fund_Rate_Aster.log'
### CONSTANTS ###
SYMBOL: str = 'ETHUSDT'
STREAM_MARKPRICE: str = f'{SYMBOL.lower()}@markPrice@1s'
STREAM_BOOKTICKER: str = f'{SYMBOL.lower()}@bookTicker'
STREAM_TRADES: str = f'{SYMBOL.lower()}@aggTrade'
### Globals ###
WSS_URL: str = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}/{STREAM_BOOKTICKER}/{STREAM_TRADES}"
ALLOW_SYMBOL_CHG: bool = True
### Funcs ###
async def subscribe_streams(websocket, streams: list[str]) -> None:
logging.info(f'Trying to sub: {streams}')
msg = {
"method": "SUBSCRIBE",
"params": streams,
"id": int(round(number=datetime.now().timestamp()*1000))
}
await websocket.send(json.dumps(obj=msg))
logging.info(f'Success sub: {streams}')
async def unsubscribe_streams(websocket, streams: list[str]) -> None:
logging.info(f'Trying to unsub: {streams}')
msg = {
"method": "UNSUBSCRIBE",
"params": streams,
"id": int(round(number=datetime.now().timestamp()*1000))
}
await websocket.send(json.dumps(obj=msg))
logging.info(f'Success unsub: {streams}')
### Websocket ###
async def ws_stream():
global SYMBOL
global STREAM_MARKPRICE
global STREAM_BOOKTICKER
global STREAM_TRADES
async for websocket in websockets.connect(WSS_URL, ping_interval=5):
logging.info(msg=f"Connected to {WSS_URL}")
try:
async for message in websocket:
### Update Symbol if Algo Outputs Change ###
if ALLOW_SYMBOL_CHG:
best_symbol_by_exchange: dict = json.loads(s=VAL_KEY.get(name='fr_algo_working_symbol')) # ty:ignore[invalid-argument-type]
best_symbol: str = best_symbol_by_exchange['ASTER']['symbol']
if best_symbol != SYMBOL:
logging.info(f'Symbol Change: {SYMBOL} -> {best_symbol}')
SYMBOL = best_symbol
await unsubscribe_streams(websocket = websocket, streams=[STREAM_MARKPRICE,STREAM_BOOKTICKER,STREAM_TRADES])
STREAM_MARKPRICE = f'{SYMBOL.lower()}@markPrice@1s'
STREAM_BOOKTICKER = f'{SYMBOL.lower()}@bookTicker'
STREAM_TRADES = f'{SYMBOL.lower()}@aggTrade'
await subscribe_streams(websocket = websocket, streams=[STREAM_MARKPRICE,STREAM_BOOKTICKER,STREAM_TRADES])
continue
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
channel = data.get('stream', None)
if channel is not None:
match channel:
case c if c == STREAM_MARKPRICE:
# print(f'MP: {data}')
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['data']['E'],
'symbol': data['data']['s'],
'mark_price': data['data']['p'],
'index_price': data['data']['i'],
'estimated_settle_price': data['data']['P'],
'funding_rate': data['data']['r'],
'next_funding_time_ts_ms': data['data']['T'],
})
VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ)
continue
case c if c == STREAM_BOOKTICKER:
# print(f'BT: {data}')
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['data']['E'],
'timestamp_transaction': data['data']['T'],
'orderbook_update_id': data['data']['u'],
'symbol': data['data']['s'],
'best_bid_px': data['data']['b'],
'best_bid_qty': data['data']['B'],
'best_ask_px': data['data']['a'],
'best_ask_qty': data['data']['A'],
})
VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
continue
case c if c == STREAM_TRADES:
# # print(f'MKT_TRADE: {data}')
# trade_obj = {
# 'timestamp_arrival': ts_arrival,
# 'timestamp_msg': data['data']['E'],
# 'timestamp_trade': data['data']['T'],
# 'symbol': data['data']['s'],
# 'aggregate_trade_id': data['data']['a'],
# 'price': float(data['data']['p']),
# 'qty': float(data['data']['q']),
# 'first_trade_id': data['data']['f'],
# 'last_trade_id': data['data']['l'],
# 'is_buyer_mkt_maker': bool(data['data']['m']),
# }
# # VAL_KEY.set(VK_LAST_TRADE, json.dumps(trade_obj))
# if USE_DB:
# await db.insert_df_to_mysql(table_name='fr_aster_mkt_trades', params=trade_obj, CON=CON)
continue
case _:
logging.warning(f'UNMATCHED OTHER MSG: {data}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
raise NotImplementedError('Cannot run without VK')
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
await aster_db.create_fr_aster_mkt_trades(CON=CON)
await ws_stream()
else:
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
raise NotImplementedError('Cannot run without DB')
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")