From d2068b1c73b687cbfe088f755b2300c1caaffd25 Mon Sep 17 00:00:00 2001 From: stevekeyharvey Date: Tue, 21 Apr 2026 20:22:33 +0000 Subject: [PATCH] setting up feed handlers, more to come --- .gitignore | 1 + .vscode/settings.json | 4 + apex.ipynb | 198 +++++++++++++++ main.py | 84 +++++++ mexc.ipynb | 198 ++++++++++----- modules/__pycache__/apex_api.cpython-313.pyc | Bin 0 -> 1821 bytes modules/apex_api.py | 37 +++ ws_apex.py | 248 ++++++++++++++++++ ws_aster.py | 249 +++++++++++++++++++ ws_mexc.py | 244 ++++++++++++++++++ 10 files changed, 1197 insertions(+), 66 deletions(-) create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 apex.ipynb create mode 100644 main.py create mode 100644 modules/__pycache__/apex_api.cpython-313.pyc create mode 100644 modules/apex_api.py create mode 100644 ws_apex.py create mode 100644 ws_aster.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..4b5a294 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python-envs.defaultEnvManager": "ms-python.python:conda", + "python-envs.defaultPackageManager": "ms-python.python:conda" +} \ No newline at end of file diff --git a/apex.ipynb b/apex.ipynb new file mode 100644 index 0000000..4c3a02c --- /dev/null +++ b/apex.ipynb @@ -0,0 +1,198 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "id": "b6c46d40", + "metadata": {}, + "outputs": [], + "source": [ + "import modules.apex_api as apex_api" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "7fb6d9dc", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Authenticating...\n", + "...Authenticated\n" + ] + } + ], + "source": [ + "client = apex_api.apex_create_client()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "d5a1203a", + "metadata": {}, + "outputs": [], + "source": [ + "# print(\"*** POSTING ORDER ***\")\n", + "# createOrderRes = client.create_order_v3(\n", + "# symbol=\"ETH-USDT\", \n", + "# side=\"BUY\",\n", + "# type=\"LIMIT\",\n", + "# size=\"0.01\",\n", + "# price=\"2100\",\n", + "# )\n", + "# print(createOrderRes)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c21254eb", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'data': {'totalEquityValue': '13.840000000000000000',\n", + " 'availableBalance': '13.840000000000000000',\n", + " 'initialMargin': '0',\n", + " 'maintenanceMargin': '0',\n", + " 'walletBalance': '',\n", + " 'realizedPnl': '-5.399416243793950000',\n", + " 'unrealizedPnl': '0.00',\n", + " 'totalRisk': '0',\n", + " 'totalValueWithoutDiscount': '13.840000000000000000',\n", + " 'liabilities': '13.840000000000000000',\n", + " 'totalAvailableBalance': '13.840000000000000000'},\n", + " 'timeCost': 6327944}" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.get_account_balance_v3()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "7cba63d4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'data': [], 'timeCost': 3984811}" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.open_orders_v3()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "b072c0de", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'timeCost': 4389124}" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.delete_open_orders_v3(symbol=\"ETH-USDT\")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "5ea177f8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "TOKEN: USDT == 13.840000000000000000\n", + "TOKEN: USDC == 0.000000000000000000\n" + ] + } + ], + "source": [ + "account_and_pos = client.get_account_v3()\n", + "for c in account_and_pos['contractWallets']:\n", + " print(f'TOKEN: {c['token']} == {c['balance']}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "70eb3b4f", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fefca500", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dc048386", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py_313", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/main.py b/main.py new file mode 100644 index 0000000..6e9004d --- /dev/null +++ b/main.py @@ -0,0 +1,84 @@ +import asyncio +import json +import logging +import math +import os +import time +import traceback +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from typing import AsyncContextManager +from dotenv import load_dotenv + +import numpy as np +import pandas as pd +import requests +# import talib +import valkey +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine + + +### Database ### +CLIENT = None +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo.log' + + +async def run_algo(): + + try: + while True: + loop_start = time.time() + print('__________Start___________') + + MEXC_FUND_RATE = json.loads(VAL_KEY.get('fund_rate_mexc')) + MEXC_TICKER = json.loads(VAL_KEY.get('fut_ticker_mexc')) + APEX_TICKER = json.loads(VAL_KEY.get('fut_ticker_apex')) + print(f'MEXC FUND RATE: {MEXC_FUND_RATE}') + print(f'MEXC TICKER: {MEXC_TICKER}') + print(f'APEX TICKER: {APEX_TICKER}') + + time.sleep(5) + + print(f'__________________________ (Algo Engine ms: {(time.time() - loop_start)*1000})') + except KeyboardInterrupt: + print('...algo stopped') + # await cancel_all_orders(CLIENT=CLIENT) + except Exception as e: + logging.critical(f'*** ALGO ENGINE CRASHED: {e}') + logging.error(traceback.format_exc()) + # await cancel_all_orders(CLIENT=CLIENT) + +async def main(): + global CLIENT + global VAL_KEY + global CON + + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') + + async with engine.connect() as CON: + # await create_executions_orders_table(CON=CON) + await run_algo() + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + asyncio.run(main()) + \ No newline at end of file diff --git a/mexc.ipynb b/mexc.ipynb index 9672584..fbd027c 100644 --- a/mexc.ipynb +++ b/mexc.ipynb @@ -2,39 +2,52 @@ "cells": [ { "cell_type": "code", - "execution_count": 2, + "execution_count": 15, "id": "7a3f41bd", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", - "import requests" + "import requests\n", + "from datetime import datetime" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 44, "id": "3b48e1ce", "metadata": {}, "outputs": [], "source": [ - "url_all_tickers = 'https://api.mexc.com/api/v1/contract/ticker'" + "mexc_all_tickers = 'https://api.mexc.com/api/v1/contract/ticker'\n", + "edgex_all_tickers = 'https://pro.edgex.exchange/api/v1/public/quote/getTicker/?contractId=10000001'" ] }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 34, "id": "ab38d984", "metadata": {}, "outputs": [], "source": [ - "r = requests.get(url_all_tickers)\n", + "r = requests.get(mexc_all_tickers)\n", "data = r.json()['data']" ] }, { "cell_type": "code", - "execution_count": 21, + "execution_count": 49, + "id": "2976b377", + "metadata": {}, + "outputs": [], + "source": [ + "r = requests.get(edgex_all_tickers)\n", + "data = r.json()['data']" + ] + }, + { + "cell_type": "code", + "execution_count": 4, "id": "1139b1a3", "metadata": {}, "outputs": [], @@ -45,19 +58,19 @@ }, { "cell_type": "code", - "execution_count": 22, + "execution_count": null, "id": "b00512dc", "metadata": {}, "outputs": [], "source": [ "df_trim = df[['symbol','fundingRate_pct','volume24']].copy()\n", - "df_trim = df_trim.loc[df_trim['volume24'] > 10_000]\n" + "df_trim = df_trim.loc[df_trim['volume24'] > 10_000]" ] }, { "cell_type": "code", - "execution_count": 23, - "id": "43b053d0", + "execution_count": 9, + "id": "f7b44068", "metadata": {}, "outputs": [ { @@ -88,34 +101,34 @@ " \n", " \n", " \n", - " 0\n", - " BTC_USDT\n", - " -0.0007\n", - " 208341522\n", + " 55\n", + " DRIFT_USDT\n", + " -1.1360\n", + " 3308466\n", " \n", " \n", - " 1\n", - " ETH_USDT\n", - " 0.0012\n", - " 27134917\n", + " 10\n", + " RED_USDT\n", + " -1.1138\n", + " 105826673\n", " \n", " \n", - " 2\n", - " XAUT_USDT\n", - " 0.0050\n", - " 429520428\n", + " 157\n", + " PIXEL_USDT\n", + " -0.4059\n", + " 12415472\n", " \n", " \n", - " 3\n", - " SOL_USDT\n", - " -0.0016\n", - " 188591783\n", + " 105\n", + " NIL_USDT\n", + " -0.3846\n", + " 31438005\n", " \n", " \n", - " 4\n", - " SILVER_USDT\n", - " 0.0000\n", - " 634682239\n", + " 33\n", + " SUPER_USDT\n", + " -0.3718\n", + " 2469502\n", " \n", " \n", " ...\n", @@ -124,34 +137,34 @@ " ...\n", " \n", " \n", - " 837\n", - " ENS_USDC\n", - " 0.0100\n", - " 196467\n", + " 12\n", + " PLAY_USDT\n", + " 0.0838\n", + " 22168649\n", " \n", " \n", - " 838\n", - " KAITO_USDC\n", - " -0.0106\n", - " 245467\n", + " 414\n", + " PUMPBTC_USDT\n", + " 0.0913\n", + " 745129\n", " \n", " \n", - " 839\n", - " BIO_USDC\n", - " 0.0050\n", - " 586982\n", + " 398\n", + " QQQSTOCK_USDT\n", + " 0.0969\n", + " 59485\n", " \n", " \n", - " 840\n", - " ETC_USDC\n", - " 0.0100\n", - " 117338\n", + " 222\n", + " GUA_USDT\n", + " 0.0996\n", + " 45623\n", " \n", " \n", - " 841\n", - " MNT_USDC\n", - " 0.0100\n", - " 150912\n", + " 265\n", + " BROCCOLIF3B_USDT\n", + " 0.1647\n", + " 3936939\n", " \n", " \n", "\n", @@ -159,35 +172,88 @@ "" ], "text/plain": [ - " symbol fundingRate_pct volume24\n", - "0 BTC_USDT -0.0007 208341522\n", - "1 ETH_USDT 0.0012 27134917\n", - "2 XAUT_USDT 0.0050 429520428\n", - "3 SOL_USDT -0.0016 188591783\n", - "4 SILVER_USDT 0.0000 634682239\n", - ".. ... ... ...\n", - "837 ENS_USDC 0.0100 196467\n", - "838 KAITO_USDC -0.0106 245467\n", - "839 BIO_USDC 0.0050 586982\n", - "840 ETC_USDC 0.0100 117338\n", - "841 MNT_USDC 0.0100 150912\n", + " symbol fundingRate_pct volume24\n", + "55 DRIFT_USDT -1.1360 3308466\n", + "10 RED_USDT -1.1138 105826673\n", + "157 PIXEL_USDT -0.4059 12415472\n", + "105 NIL_USDT -0.3846 31438005\n", + "33 SUPER_USDT -0.3718 2469502\n", + ".. ... ... ...\n", + "12 PLAY_USDT 0.0838 22168649\n", + "414 PUMPBTC_USDT 0.0913 745129\n", + "398 QQQSTOCK_USDT 0.0969 59485\n", + "222 GUA_USDT 0.0996 45623\n", + "265 BROCCOLIF3B_USDT 0.1647 3936939\n", "\n", "[837 rows x 3 columns]" ] }, - "execution_count": 23, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "df_trim" + "df_trim.sort_values('fundingRate_pct', ascending=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "6775a03f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1775583908" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "round(datetime.now().timestamp())" ] }, { "cell_type": "code", "execution_count": null, - "id": "f7b44068", + "id": "edca3b9f", + "metadata": {}, + "outputs": [], + "source": [ + "### PRIVATE API ###\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "51e50e0d", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21aabf30", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, "metadata": {}, "outputs": [], "source": [] diff --git a/modules/__pycache__/apex_api.cpython-313.pyc b/modules/__pycache__/apex_api.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ef172e67187d68c6517ef644e5421cea7d9e863c GIT binary patch literal 1821 zcmah}%}*Og6rc5ecx_la#!Uj@BMVKNT1B`mNgAqt$N^Kp!B}3L207Ad>m6W=*|lcZ zB;n*kFU=)sq$oZ1+FSpC9IB>=9F0H%O&v8-Q&qjCB2}yQ(l-lhbBM&qp8dVAnfGRX zZ}t@>ieO~F|K0jpKCIgyBS5|Mma z$c5)*BKug(MMy-GZtxdETKEk5LLku59=G_pot&RXx56I z3ejYykiD)i%;#tInZ?C|KA)M*(^x*c_}M}+H#_Ca5ZP6A3{y8958E4gDiQ3}iCtkE z1%nmK!+*aQihF>{Twu6(2vk&Nc7JHx#X~!2;cyN{Q3_D!I|9cM_}a(&@@-%4*mZ@r zZ|GQGvF*#3BSCg$|ALuR?hbS%q)8LJEtG^}JkWK%pgqx%0@PkxEQwlJ1hp*zl6@2| zCj-A%?kAPP*LzC578&EU=!9qsLkM;`8N_tHoqGWL>5e`?9s7}c>JD;KVbybJ7?pbi z|1dHjGg~hA2O6!bB*)Nr`@S%zfjuKHbIa%>{w!MNQc8X^k*RyD*!HZ7;aT>|*w`3N z6f;+@Ea`>f?6u5dR?lUZXt&SHOkviS;%!OGP8PF^hqPtpLPpaHGsTRS1vW=*?A>(8 znr@jij=fdAc1ZsZf>U%kSJH=C;DJ2(IfXy^oGhDZ}N(>jL1QLUisJywR zAA~UEN71BFvvi2lA;@G!7*%)k{Ip?Jv8j5FYIJJD>P^dARaxdL>#q;g%@a`Uk?OIT zijgqeN`yRZkA@tVN-LN(8DYIb7y}6F3aXeGO2Im0!qSeLc-g2|J>4)(f?b!eCZf@b zV=r4PE_6GM!8*E8sW^4pV{#;WtAcBu<=8Yn>DV@|Fq|dCA*l#qTqQkhVr(O?vDKtX z)q)bKw7CWc<`TwSl3*4NyQ~S+V9<&5s2=4fSFxKmYWS88CplKTO?!L+j+4Do2Ro8CU~w7#?%yyqP)mq^Io}c@&FJ8cFw{EL z`_Q^?HP!L$Qx_Twt!Ve%)ScA5{C4#9#$-#0-(9-1)I9yxwvuX0w-U*RefRsC{qJli z-fhf1SNeAb(vR}Z8*5uiwK2V?BzKkJEoJ!IH+M%aZ;f2uRwf$L&$@f=z5mrhBfA%X z=gL4!?r$ZB_Y$Yr@7(z(XSN0w|LhWD*SN;?iZeb*AKd0pZaa3hDPR5X1p`o8q HttpPrivateSign: + load_dotenv() + print("Authenticating...") + eth_private_key = os.getenv("RABBY_PRIVATE_KEY") + key = os.getenv("APEX_API_KEY") + secret = os.getenv("APEX_API_SECRET") + passphrase = os.getenv("APEX_API_PASSPHRASE") + + client = HttpPrivate_v3(APEX_OMNI_HTTP_MAIN, network_id=NETWORKID_MAIN, eth_private_key=eth_private_key) + zkKeys = client.derive_zk_key(client.default_address) + + seeds = zkKeys['seeds'] + l2Key = zkKeys['l2Key'] + + client = HttpPrivateSign( + APEX_OMNI_HTTP_MAIN, + network_id=NETWORKID_MAIN, + zk_seeds=seeds, + zk_l2Key=l2Key, + api_key_credentials={'key': key, 'secret': secret, 'passphrase': passphrase}) + + try: + client.configs_v3() + client.get_account_v3() + except Exception as e: + raise ConnectionError(f'Failed to authenticate with APEX OMNI: {e}') + + print("...Authenticated") + + return client \ No newline at end of file diff --git a/ws_apex.py b/ws_apex.py new file mode 100644 index 0000000..5ed877e --- /dev/null +++ b/ws_apex.py @@ -0,0 +1,248 @@ +import asyncio +import json +import logging +import socket +import traceback +from datetime import datetime +from typing import AsyncContextManager + +import numpy as np +import pandas as pd +import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore +from sqlalchemy import text +import websockets +from sqlalchemy.ext.asyncio import create_async_engine +import valkey +import os +from dotenv import load_dotenv + + +### Allow only ipv4 ### +def allowed_gai_family(): + return socket.AF_INET +urllib3_cn.allowed_gai_family = allowed_gai_family + +### Database ### +USE_DB: bool = False +USE_VK: bool = True +# VK_FUND_RATE = 'fund_rate_apex' +VK_TICKER = 'fut_ticker_apex' +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Apex.log' + +### CONSTANTS ### +PING_INTERVAL_SEC = 15 + +### Globals ### +WSS_URL = "wss://quote.omni.apex.exchange/realtime_public?v=2×tamp=" +TICKER_SNAPSHOT_DATA_LAST: dict = {} + +# HIST_TRADES = np.empty((0, 3)) +# HIST_TRADES_LOOKBACK_SEC = 6 + +# ### Database Funcs ### +# async def create_rtds_btcusd_table( +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Create Statements") +# else: +# if engine == 'mysql': +# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') +# await CON.execute(text(""" +# CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( +# timestamp_arrival BIGINT, +# timestamp_msg BIGINT, +# timestamp_value BIGINT, +# value DOUBLE, +# qty DOUBLE +# ); +# """)) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +# async def insert_rtds_btcusd_table( +# timestamp_arrival: int, +# timestamp_msg: int, +# timestamp_value: int, +# value: float, +# qty: float, +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# params={ +# 'timestamp_arrival': timestamp_arrival, +# 'timestamp_msg': timestamp_msg, +# 'timestamp_value': timestamp_value, +# 'value': value, +# 'qty': qty, +# } +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Insert Statements") +# else: +# if engine == 'mysql': +# await CON.execute(text(""" +# INSERT INTO binance_btcusd_trades +# ( +# timestamp_arrival, +# timestamp_msg, +# timestamp_value, +# value, +# qty +# ) +# VALUES +# ( +# :timestamp_arrival, +# :timestamp_msg, +# :timestamp_value, +# :value, +# :qty +# ) +# """), +# parameters=params +# ) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +### Websocket ### +async def heartbeat(ws): + while True: + await asyncio.sleep(PING_INTERVAL_SEC) + logging.info("SENDING PING...") + ping_msg = {"op":"ping","args":[ str(round(datetime.now().timestamp()*1000)) ]} + await ws.send(json.dumps(ping_msg)) + +async def ws_stream(): + global TICKER_SNAPSHOT_DATA_LAST + + async for websocket in websockets.connect(f'{WSS_URL}{round(datetime.now().timestamp())}'): + logging.info(f"Connected to {WSS_URL}") + + asyncio.create_task(heartbeat(ws=websocket)) + + subscribe_msg = { + "op": "subscribe", + "args": ["instrumentInfo.H.ETHUSDT"] + } + + await websocket.send(json.dumps(subscribe_msg)) + + try: + async for message in websocket: + ts_arrival = round(datetime.now().timestamp()*1000) + if isinstance(message, str): + try: + data = json.loads(message) + if data.get('op', None) == 'ping': + pong_msg = {"op":"pong","args":[ str(round(datetime.now().timestamp()*1000)) ]} + logging.info(f'RECEIVED PING: {data}; SENDING PONG: {pong_msg}') + await websocket.send(json.dumps(pong_msg)) + continue + elif data.get('success', None): + # logging.info('CONNECTION SUCCESFUL RESP MSG') + continue + + msg_type = data.get('type', None) + if msg_type is not None: + match msg_type: + case 'snapshot': + TICKER_SNAPSHOT_DATA_LAST = data['data'] + + nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000) + VAL_KEY_OBJ = json.dumps({ + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['ts'], + 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'], + 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']), + 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']), + 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']), + 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']), + 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']), + 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']), + 'nextFundingTime_ts_ms': nextFundingTime_ts, + }) + VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) + continue + case 'delta': + TICKER_SNAPSHOT_DATA_LAST.update(data['data']) + + nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000) + VAL_KEY_OBJ = json.dumps({ + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['ts'], + 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'], + 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']), + 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']), + 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']), + 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']), + 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']), + 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']), + 'nextFundingTime_ts_ms': nextFundingTime_ts, + }) + VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) + continue + case _: + logging.warning(f'UNMATCHED OTHER MSG: {data}') + else: + logging.info(f'Initial or unexpected data struct, skipping: {data}') + continue + except (json.JSONDecodeError, ValueError): + logging.warning(f'Message not in JSON format, skipping: {message}') + continue + else: + raise ValueError(f'Type: {type(data)} not expected: {message}') + except websockets.ConnectionClosed as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + continue + except Exception as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + + +async def main(): + global VAL_KEY + global CON + + if USE_VK: + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) + else: + VAL_KEY = None + logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") + + if USE_DB: + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') + async with engine.connect() as CON: + # await create_rtds_btcusd_table(CON=CON) + await ws_stream() + else: + CON = None + logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") + await ws_stream() + + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Stream stopped") \ No newline at end of file diff --git a/ws_aster.py b/ws_aster.py new file mode 100644 index 0000000..9a151a7 --- /dev/null +++ b/ws_aster.py @@ -0,0 +1,249 @@ +import asyncio +import json +import logging +import socket +import traceback +from datetime import datetime +from typing import AsyncContextManager + +import numpy as np +import pandas as pd +import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore +from sqlalchemy import text +import websockets +from sqlalchemy.ext.asyncio import create_async_engine +import valkey +import os +from dotenv import load_dotenv + + +### Allow only ipv4 ### +def allowed_gai_family(): + return socket.AF_INET +urllib3_cn.allowed_gai_family = allowed_gai_family + +### Database ### +USE_DB: bool = False +USE_VK: bool = True +# VK_FUND_RATE = 'fund_rate_apex' +VK_TICKER = 'fut_ticker_apex' +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Apex.log' + +### CONSTANTS ### +PING_INTERVAL_SEC = 15 + +### Globals ### +WSS_URL = "wss://quote.omni.apex.exchange/realtime_public?v=2×tamp=" +TICKER_SNAPSHOT_DATA_LAST: dict = {} + +# HIST_TRADES = np.empty((0, 3)) +# HIST_TRADES_LOOKBACK_SEC = 6 + +# ### Database Funcs ### +# async def create_rtds_btcusd_table( +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Create Statements") +# else: +# if engine == 'mysql': +# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') +# await CON.execute(text(""" +# CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( +# timestamp_arrival BIGINT, +# timestamp_msg BIGINT, +# timestamp_value BIGINT, +# value DOUBLE, +# qty DOUBLE +# ); +# """)) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +# async def insert_rtds_btcusd_table( +# timestamp_arrival: int, +# timestamp_msg: int, +# timestamp_value: int, +# value: float, +# qty: float, +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# params={ +# 'timestamp_arrival': timestamp_arrival, +# 'timestamp_msg': timestamp_msg, +# 'timestamp_value': timestamp_value, +# 'value': value, +# 'qty': qty, +# } +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Insert Statements") +# else: +# if engine == 'mysql': +# await CON.execute(text(""" +# INSERT INTO binance_btcusd_trades +# ( +# timestamp_arrival, +# timestamp_msg, +# timestamp_value, +# value, +# qty +# ) +# VALUES +# ( +# :timestamp_arrival, +# :timestamp_msg, +# :timestamp_value, +# :value, +# :qty +# ) +# """), +# parameters=params +# ) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +### Websocket ### +async def heartbeat(ws): + while True: + await asyncio.sleep(PING_INTERVAL_SEC) + logging.info("SENDING PING...") + ping_msg = {"op":"ping","args":[ str(round(datetime.now().timestamp()*1000)) ]} + await ws.send(json.dumps(ping_msg)) + +async def ws_stream(): + global TICKER_SNAPSHOT_DATA_LAST + + async for websocket in websockets.connect(f'{WSS_URL}{round(datetime.now().timestamp())}'): + logging.info(f"Connected to {WSS_URL}") + + asyncio.create_task(heartbeat(ws=websocket)) + + subscribe_msg = { + "op": "subscribe", + "args": ["instrumentInfo.H.ETHUSDT"] + } + + await websocket.send(json.dumps(subscribe_msg)) + + try: + async for message in websocket: + ts_arrival = round(datetime.now().timestamp()*1000) + print(message) + # if isinstance(message, str): + # try: + # data = json.loads(message) + # if data.get('op', None) == 'ping': + # pong_msg = {"op":"pong","args":[ str(round(datetime.now().timestamp()*1000)) ]} + # logging.info(f'RECEIVED PING: {data}; SENDING PONG: {pong_msg}') + # await websocket.send(json.dumps(pong_msg)) + # continue + # elif data.get('success', None): + # # logging.info('CONNECTION SUCCESFUL RESP MSG') + # continue + + # msg_type = data.get('type', None) + # if msg_type is not None: + # match msg_type: + # case 'snapshot': + # TICKER_SNAPSHOT_DATA_LAST = data['data'] + + # nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000) + # VAL_KEY_OBJ = json.dumps({ + # 'timestamp_arrival': ts_arrival, + # 'timestamp_msg': data['ts'], + # 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'], + # 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']), + # 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']), + # 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']), + # 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']), + # 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']), + # 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']), + # 'nextFundingTime_ts_ms': nextFundingTime_ts, + # }) + # VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) + # continue + # case 'delta': + # TICKER_SNAPSHOT_DATA_LAST.update(data['data']) + + # nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000) + # VAL_KEY_OBJ = json.dumps({ + # 'timestamp_arrival': ts_arrival, + # 'timestamp_msg': data['ts'], + # 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'], + # 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']), + # 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']), + # 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']), + # 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']), + # 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']), + # 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']), + # 'nextFundingTime_ts_ms': nextFundingTime_ts, + # }) + # VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) + # continue + # case _: + # logging.warning(f'UNMATCHED OTHER MSG: {data}') + # else: + # logging.info(f'Initial or unexpected data struct, skipping: {data}') + # continue + # except (json.JSONDecodeError, ValueError): + # logging.warning(f'Message not in JSON format, skipping: {message}') + # continue + # else: + # raise ValueError(f'Type: {type(data)} not expected: {message}') + except websockets.ConnectionClosed as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + continue + except Exception as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + + +async def main(): + global VAL_KEY + global CON + + if USE_VK: + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) + else: + VAL_KEY = None + logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") + + if USE_DB: + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') + async with engine.connect() as CON: + # await create_rtds_btcusd_table(CON=CON) + await ws_stream() + else: + CON = None + logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") + await ws_stream() + + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Stream stopped") \ No newline at end of file diff --git a/ws_mexc.py b/ws_mexc.py index e69de29..38d1dec 100644 --- a/ws_mexc.py +++ b/ws_mexc.py @@ -0,0 +1,244 @@ +import asyncio +import json +import logging +import socket +import traceback +from datetime import datetime +from typing import AsyncContextManager + +import numpy as np +import pandas as pd +import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore +from sqlalchemy import text +import websockets +from sqlalchemy.ext.asyncio import create_async_engine +import valkey +import os +from dotenv import load_dotenv + + +### Allow only ipv4 ### +def allowed_gai_family(): + return socket.AF_INET +urllib3_cn.allowed_gai_family = allowed_gai_family + +### Database ### +USE_DB: bool = False +USE_VK: bool = True +VK_FUND_RATE = 'fund_rate_mexc' +VK_TICKER = 'fut_ticker_mexc' +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_MEXC.log' + +### CONSTANTS ### +PING_INTERVAL_SEC = 20 + +### Globals ### +WSS_URL = "wss://contract.mexc.com/edge" +# HIST_TRADES = np.empty((0, 3)) +# HIST_TRADES_LOOKBACK_SEC = 6 + +# ### Database Funcs ### +# async def create_rtds_btcusd_table( +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Create Statements") +# else: +# if engine == 'mysql': +# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') +# await CON.execute(text(""" +# CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( +# timestamp_arrival BIGINT, +# timestamp_msg BIGINT, +# timestamp_value BIGINT, +# value DOUBLE, +# qty DOUBLE +# ); +# """)) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +# async def insert_rtds_btcusd_table( +# timestamp_arrival: int, +# timestamp_msg: int, +# timestamp_value: int, +# value: float, +# qty: float, +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# params={ +# 'timestamp_arrival': timestamp_arrival, +# 'timestamp_msg': timestamp_msg, +# 'timestamp_value': timestamp_value, +# 'value': value, +# 'qty': qty, +# } +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Insert Statements") +# else: +# if engine == 'mysql': +# await CON.execute(text(""" +# INSERT INTO binance_btcusd_trades +# ( +# timestamp_arrival, +# timestamp_msg, +# timestamp_value, +# value, +# qty +# ) +# VALUES +# ( +# :timestamp_arrival, +# :timestamp_msg, +# :timestamp_value, +# :value, +# :qty +# ) +# """), +# parameters=params +# ) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +### Websocket ### +async def heartbeat(ws): + while True: + await asyncio.sleep(PING_INTERVAL_SEC) + logging.info("SENDING PING...") + ping_msg = { + "method": "ping" + } + await ws.send(json.dumps(ping_msg)) + + +async def ws_stream(): + # global HIST_TRADES + + async for websocket in websockets.connect(WSS_URL): + logging.info(f"Connected to {WSS_URL}") + + asyncio.create_task(heartbeat(ws=websocket)) + + subscribe_msg = { + "method": "sub.ticker", + "param": { + "symbol": "ETH_USDT" + }, + # "gzip": false + } + await websocket.send(json.dumps(subscribe_msg)) + subscribe_msg = { + "method": "sub.funding.rate", + "param": { + "symbol": "ETH_USDT" + }, + # "gzip": false + } + await websocket.send(json.dumps(subscribe_msg)) + + try: + async for message in websocket: + ts_arrival = round(datetime.now().timestamp()*1000) + if isinstance(message, str): + try: + data = json.loads(message) + channel = data.get('channel', None) + if channel is not None: + match channel: + case 'push.ticker': + # logging.info(f'TICKER: {data}') + VAL_KEY_OBJ = json.dumps({ + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['ts'], + 'timestamp_data': data['data']['timestamp'], + 'symbol': data['data']['symbol'], + 'lastPrice': float(data['data']['lastPrice']), + 'fairPrice': float(data['data']['fairPrice']), + 'indexPrice': float(data['data']['indexPrice']), + 'volume24': float(data['data']['volume24']), + 'bid1': float(data['data']['bid1']), + 'ask1': float(data['data']['ask1']), + 'fundingRate': float(data['data']['fundingRate']), + + }) + VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) + case 'push.funding.rate': + # logging.info(f'FUNDING RATE: {data}') + VAL_KEY_OBJ = json.dumps({ + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['ts'], + 'symbol': data['data']['symbol'], + 'rate': float(data['data']['rate']), + 'nextSettleTime_ts_ms': data['data']['nextSettleTime'], + }) + VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ) + case 'pong': + logging.info(f'PING RESPONSE: {data}') + case _: + logging.info(f'OTHER: {data}') + + else: + logging.info(f'Initial or unexpected data struct, skipping: {data}') + continue + except (json.JSONDecodeError, ValueError): + logging.warning(f'Message not in JSON format, skipping: {message}') + continue + else: + raise ValueError(f'Type: {type(data)} not expected: {message}') + except websockets.ConnectionClosed as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + continue + except Exception as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + + +async def main(): + global VAL_KEY + global CON + + if USE_VK: + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) + else: + VAL_KEY = None + logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") + + if USE_DB: + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') + async with engine.connect() as CON: + # await create_rtds_btcusd_table(CON=CON) + await ws_stream() + else: + CON = None + logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") + await ws_stream() + + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Stream stopped") \ No newline at end of file