diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2eea525
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.env
\ No newline at end of file
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..4b5a294
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,4 @@
+{
+ "python-envs.defaultEnvManager": "ms-python.python:conda",
+ "python-envs.defaultPackageManager": "ms-python.python:conda"
+}
\ No newline at end of file
diff --git a/apex.ipynb b/apex.ipynb
new file mode 100644
index 0000000..4c3a02c
--- /dev/null
+++ b/apex.ipynb
@@ -0,0 +1,198 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "b6c46d40",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import modules.apex_api as apex_api"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "7fb6d9dc",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Authenticating...\n",
+ "...Authenticated\n"
+ ]
+ }
+ ],
+ "source": [
+ "client = apex_api.apex_create_client()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "d5a1203a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# print(\"*** POSTING ORDER ***\")\n",
+ "# createOrderRes = client.create_order_v3(\n",
+ "# symbol=\"ETH-USDT\", \n",
+ "# side=\"BUY\",\n",
+ "# type=\"LIMIT\",\n",
+ "# size=\"0.01\",\n",
+ "# price=\"2100\",\n",
+ "# )\n",
+ "# print(createOrderRes)\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "c21254eb",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'data': {'totalEquityValue': '13.840000000000000000',\n",
+ " 'availableBalance': '13.840000000000000000',\n",
+ " 'initialMargin': '0',\n",
+ " 'maintenanceMargin': '0',\n",
+ " 'walletBalance': '',\n",
+ " 'realizedPnl': '-5.399416243793950000',\n",
+ " 'unrealizedPnl': '0.00',\n",
+ " 'totalRisk': '0',\n",
+ " 'totalValueWithoutDiscount': '13.840000000000000000',\n",
+ " 'liabilities': '13.840000000000000000',\n",
+ " 'totalAvailableBalance': '13.840000000000000000'},\n",
+ " 'timeCost': 6327944}"
+ ]
+ },
+ "execution_count": 7,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "client.get_account_balance_v3()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "7cba63d4",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'data': [], 'timeCost': 3984811}"
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "client.open_orders_v3()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "id": "b072c0de",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'timeCost': 4389124}"
+ ]
+ },
+ "execution_count": 12,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "client.delete_open_orders_v3(symbol=\"ETH-USDT\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "id": "5ea177f8",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "TOKEN: USDT == 13.840000000000000000\n",
+ "TOKEN: USDC == 0.000000000000000000\n"
+ ]
+ }
+ ],
+ "source": [
+ "account_and_pos = client.get_account_v3()\n",
+ "for c in account_and_pos['contractWallets']:\n",
+ " print(f'TOKEN: {c['token']} == {c['balance']}')"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "70eb3b4f",
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "fefca500",
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "dc048386",
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "py_313",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.13.12"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..6e9004d
--- /dev/null
+++ b/main.py
@@ -0,0 +1,84 @@
+import asyncio
+import json
+import logging
+import math
+import os
+import time
+import traceback
+from dataclasses import asdict, dataclass
+from datetime import datetime, timezone
+from typing import AsyncContextManager
+from dotenv import load_dotenv
+
+import numpy as np
+import pandas as pd
+import requests
+# import talib
+import valkey
+from sqlalchemy import text
+from sqlalchemy.ext.asyncio import create_async_engine
+
+
+### Database ###
+CLIENT = None
+CON: AsyncContextManager | None = None
+VAL_KEY = None
+
+### Logging ###
+load_dotenv()
+LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo.log'
+
+
+async def run_algo():
+
+ try:
+ while True:
+ loop_start = time.time()
+ print('__________Start___________')
+
+ MEXC_FUND_RATE = json.loads(VAL_KEY.get('fund_rate_mexc'))
+ MEXC_TICKER = json.loads(VAL_KEY.get('fut_ticker_mexc'))
+ APEX_TICKER = json.loads(VAL_KEY.get('fut_ticker_apex'))
+ print(f'MEXC FUND RATE: {MEXC_FUND_RATE}')
+ print(f'MEXC TICKER: {MEXC_TICKER}')
+ print(f'APEX TICKER: {APEX_TICKER}')
+
+ time.sleep(5)
+
+ print(f'__________________________ (Algo Engine ms: {(time.time() - loop_start)*1000})')
+ except KeyboardInterrupt:
+ print('...algo stopped')
+ # await cancel_all_orders(CLIENT=CLIENT)
+ except Exception as e:
+ logging.critical(f'*** ALGO ENGINE CRASHED: {e}')
+ logging.error(traceback.format_exc())
+ # await cancel_all_orders(CLIENT=CLIENT)
+
+async def main():
+ global CLIENT
+ global VAL_KEY
+ global CON
+
+ VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
+ engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
+
+ async with engine.connect() as CON:
+ # await create_executions_orders_table(CON=CON)
+ await run_algo()
+
+if __name__ == '__main__':
+ START_TIME = round(datetime.now().timestamp()*1000)
+
+ logging.info(f'Log FilePath: {LOG_FILEPATH}')
+
+ logging.basicConfig(
+ force=True,
+ filename=LOG_FILEPATH,
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ filemode='w'
+ )
+ logging.info(f"STARTED: {START_TIME}")
+
+ asyncio.run(main())
+
\ No newline at end of file
diff --git a/mexc.ipynb b/mexc.ipynb
index 9672584..fbd027c 100644
--- a/mexc.ipynb
+++ b/mexc.ipynb
@@ -2,39 +2,52 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 2,
+ "execution_count": 15,
"id": "7a3f41bd",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
- "import requests"
+ "import requests\n",
+ "from datetime import datetime"
]
},
{
"cell_type": "code",
- "execution_count": 3,
+ "execution_count": 44,
"id": "3b48e1ce",
"metadata": {},
"outputs": [],
"source": [
- "url_all_tickers = 'https://api.mexc.com/api/v1/contract/ticker'"
+ "mexc_all_tickers = 'https://api.mexc.com/api/v1/contract/ticker'\n",
+ "edgex_all_tickers = 'https://pro.edgex.exchange/api/v1/public/quote/getTicker/?contractId=10000001'"
]
},
{
"cell_type": "code",
- "execution_count": 10,
+ "execution_count": 34,
"id": "ab38d984",
"metadata": {},
"outputs": [],
"source": [
- "r = requests.get(url_all_tickers)\n",
+ "r = requests.get(mexc_all_tickers)\n",
"data = r.json()['data']"
]
},
{
"cell_type": "code",
- "execution_count": 21,
+ "execution_count": 49,
+ "id": "2976b377",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "r = requests.get(edgex_all_tickers)\n",
+ "data = r.json()['data']"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
"id": "1139b1a3",
"metadata": {},
"outputs": [],
@@ -45,19 +58,19 @@
},
{
"cell_type": "code",
- "execution_count": 22,
+ "execution_count": null,
"id": "b00512dc",
"metadata": {},
"outputs": [],
"source": [
"df_trim = df[['symbol','fundingRate_pct','volume24']].copy()\n",
- "df_trim = df_trim.loc[df_trim['volume24'] > 10_000]\n"
+ "df_trim = df_trim.loc[df_trim['volume24'] > 10_000]"
]
},
{
"cell_type": "code",
- "execution_count": 23,
- "id": "43b053d0",
+ "execution_count": 9,
+ "id": "f7b44068",
"metadata": {},
"outputs": [
{
@@ -88,34 +101,34 @@
" \n",
"
\n",
" \n",
- " | 0 | \n",
- " BTC_USDT | \n",
- " -0.0007 | \n",
- " 208341522 | \n",
+ " 55 | \n",
+ " DRIFT_USDT | \n",
+ " -1.1360 | \n",
+ " 3308466 | \n",
"
\n",
" \n",
- " | 1 | \n",
- " ETH_USDT | \n",
- " 0.0012 | \n",
- " 27134917 | \n",
+ " 10 | \n",
+ " RED_USDT | \n",
+ " -1.1138 | \n",
+ " 105826673 | \n",
"
\n",
" \n",
- " | 2 | \n",
- " XAUT_USDT | \n",
- " 0.0050 | \n",
- " 429520428 | \n",
+ " 157 | \n",
+ " PIXEL_USDT | \n",
+ " -0.4059 | \n",
+ " 12415472 | \n",
"
\n",
" \n",
- " | 3 | \n",
- " SOL_USDT | \n",
- " -0.0016 | \n",
- " 188591783 | \n",
+ " 105 | \n",
+ " NIL_USDT | \n",
+ " -0.3846 | \n",
+ " 31438005 | \n",
"
\n",
" \n",
- " | 4 | \n",
- " SILVER_USDT | \n",
- " 0.0000 | \n",
- " 634682239 | \n",
+ " 33 | \n",
+ " SUPER_USDT | \n",
+ " -0.3718 | \n",
+ " 2469502 | \n",
"
\n",
" \n",
" | ... | \n",
@@ -124,34 +137,34 @@
" ... | \n",
"
\n",
" \n",
- " | 837 | \n",
- " ENS_USDC | \n",
- " 0.0100 | \n",
- " 196467 | \n",
+ " 12 | \n",
+ " PLAY_USDT | \n",
+ " 0.0838 | \n",
+ " 22168649 | \n",
"
\n",
" \n",
- " | 838 | \n",
- " KAITO_USDC | \n",
- " -0.0106 | \n",
- " 245467 | \n",
+ " 414 | \n",
+ " PUMPBTC_USDT | \n",
+ " 0.0913 | \n",
+ " 745129 | \n",
"
\n",
" \n",
- " | 839 | \n",
- " BIO_USDC | \n",
- " 0.0050 | \n",
- " 586982 | \n",
+ " 398 | \n",
+ " QQQSTOCK_USDT | \n",
+ " 0.0969 | \n",
+ " 59485 | \n",
"
\n",
" \n",
- " | 840 | \n",
- " ETC_USDC | \n",
- " 0.0100 | \n",
- " 117338 | \n",
+ " 222 | \n",
+ " GUA_USDT | \n",
+ " 0.0996 | \n",
+ " 45623 | \n",
"
\n",
" \n",
- " | 841 | \n",
- " MNT_USDC | \n",
- " 0.0100 | \n",
- " 150912 | \n",
+ " 265 | \n",
+ " BROCCOLIF3B_USDT | \n",
+ " 0.1647 | \n",
+ " 3936939 | \n",
"
\n",
" \n",
"\n",
@@ -159,35 +172,88 @@
""
],
"text/plain": [
- " symbol fundingRate_pct volume24\n",
- "0 BTC_USDT -0.0007 208341522\n",
- "1 ETH_USDT 0.0012 27134917\n",
- "2 XAUT_USDT 0.0050 429520428\n",
- "3 SOL_USDT -0.0016 188591783\n",
- "4 SILVER_USDT 0.0000 634682239\n",
- ".. ... ... ...\n",
- "837 ENS_USDC 0.0100 196467\n",
- "838 KAITO_USDC -0.0106 245467\n",
- "839 BIO_USDC 0.0050 586982\n",
- "840 ETC_USDC 0.0100 117338\n",
- "841 MNT_USDC 0.0100 150912\n",
+ " symbol fundingRate_pct volume24\n",
+ "55 DRIFT_USDT -1.1360 3308466\n",
+ "10 RED_USDT -1.1138 105826673\n",
+ "157 PIXEL_USDT -0.4059 12415472\n",
+ "105 NIL_USDT -0.3846 31438005\n",
+ "33 SUPER_USDT -0.3718 2469502\n",
+ ".. ... ... ...\n",
+ "12 PLAY_USDT 0.0838 22168649\n",
+ "414 PUMPBTC_USDT 0.0913 745129\n",
+ "398 QQQSTOCK_USDT 0.0969 59485\n",
+ "222 GUA_USDT 0.0996 45623\n",
+ "265 BROCCOLIF3B_USDT 0.1647 3936939\n",
"\n",
"[837 rows x 3 columns]"
]
},
- "execution_count": 23,
+ "execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "df_trim"
+ "df_trim.sort_values('fundingRate_pct', ascending=True)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "6775a03f",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "1775583908"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "round(datetime.now().timestamp())"
]
},
{
"cell_type": "code",
"execution_count": null,
- "id": "f7b44068",
+ "id": "edca3b9f",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "### PRIVATE API ###\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "51e50e0d",
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "21aabf30",
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": []
diff --git a/modules/__pycache__/apex_api.cpython-313.pyc b/modules/__pycache__/apex_api.cpython-313.pyc
new file mode 100644
index 0000000..ef172e6
Binary files /dev/null and b/modules/__pycache__/apex_api.cpython-313.pyc differ
diff --git a/modules/apex_api.py b/modules/apex_api.py
new file mode 100644
index 0000000..4ef32b8
--- /dev/null
+++ b/modules/apex_api.py
@@ -0,0 +1,37 @@
+from apexomni.http_private_v3 import HttpPrivate_v3
+from apexomni.http_private_sign import HttpPrivateSign
+from apexomni.constants import APEX_OMNI_HTTP_MAIN, NETWORKID_MAIN
+
+from dotenv import load_dotenv
+import os
+
+def apex_create_client() -> HttpPrivateSign:
+ load_dotenv()
+ print("Authenticating...")
+ eth_private_key = os.getenv("RABBY_PRIVATE_KEY")
+ key = os.getenv("APEX_API_KEY")
+ secret = os.getenv("APEX_API_SECRET")
+ passphrase = os.getenv("APEX_API_PASSPHRASE")
+
+ client = HttpPrivate_v3(APEX_OMNI_HTTP_MAIN, network_id=NETWORKID_MAIN, eth_private_key=eth_private_key)
+ zkKeys = client.derive_zk_key(client.default_address)
+
+ seeds = zkKeys['seeds']
+ l2Key = zkKeys['l2Key']
+
+ client = HttpPrivateSign(
+ APEX_OMNI_HTTP_MAIN,
+ network_id=NETWORKID_MAIN,
+ zk_seeds=seeds,
+ zk_l2Key=l2Key,
+ api_key_credentials={'key': key, 'secret': secret, 'passphrase': passphrase})
+
+ try:
+ client.configs_v3()
+ client.get_account_v3()
+ except Exception as e:
+ raise ConnectionError(f'Failed to authenticate with APEX OMNI: {e}')
+
+ print("...Authenticated")
+
+ return client
\ No newline at end of file
diff --git a/ws_apex.py b/ws_apex.py
new file mode 100644
index 0000000..5ed877e
--- /dev/null
+++ b/ws_apex.py
@@ -0,0 +1,248 @@
+import asyncio
+import json
+import logging
+import socket
+import traceback
+from datetime import datetime
+from typing import AsyncContextManager
+
+import numpy as np
+import pandas as pd
+import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
+from sqlalchemy import text
+import websockets
+from sqlalchemy.ext.asyncio import create_async_engine
+import valkey
+import os
+from dotenv import load_dotenv
+
+
+### Allow only ipv4 ###
+def allowed_gai_family():
+ return socket.AF_INET
+urllib3_cn.allowed_gai_family = allowed_gai_family
+
+### Database ###
+USE_DB: bool = False
+USE_VK: bool = True
+# VK_FUND_RATE = 'fund_rate_apex'
+VK_TICKER = 'fut_ticker_apex'
+CON: AsyncContextManager | None = None
+VAL_KEY = None
+
+### Logging ###
+load_dotenv()
+LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Apex.log'
+
+### CONSTANTS ###
+PING_INTERVAL_SEC = 15
+
+### Globals ###
+WSS_URL = "wss://quote.omni.apex.exchange/realtime_public?v=2×tamp="
+TICKER_SNAPSHOT_DATA_LAST: dict = {}
+
+# HIST_TRADES = np.empty((0, 3))
+# HIST_TRADES_LOOKBACK_SEC = 6
+
+# ### Database Funcs ###
+# async def create_rtds_btcusd_table(
+# CON: AsyncContextManager,
+# engine: str = 'mysql', # mysql | duckdb
+# ) -> None:
+# if CON is None:
+# logging.info("NO DB CONNECTION, SKIPPING Create Statements")
+# else:
+# if engine == 'mysql':
+# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
+# await CON.execute(text("""
+# CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
+# timestamp_arrival BIGINT,
+# timestamp_msg BIGINT,
+# timestamp_value BIGINT,
+# value DOUBLE,
+# qty DOUBLE
+# );
+# """))
+# await CON.commit()
+# else:
+# raise ValueError('Only MySQL engine is implemented')
+
+# async def insert_rtds_btcusd_table(
+# timestamp_arrival: int,
+# timestamp_msg: int,
+# timestamp_value: int,
+# value: float,
+# qty: float,
+# CON: AsyncContextManager,
+# engine: str = 'mysql', # mysql | duckdb
+# ) -> None:
+# params={
+# 'timestamp_arrival': timestamp_arrival,
+# 'timestamp_msg': timestamp_msg,
+# 'timestamp_value': timestamp_value,
+# 'value': value,
+# 'qty': qty,
+# }
+# if CON is None:
+# logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
+# else:
+# if engine == 'mysql':
+# await CON.execute(text("""
+# INSERT INTO binance_btcusd_trades
+# (
+# timestamp_arrival,
+# timestamp_msg,
+# timestamp_value,
+# value,
+# qty
+# )
+# VALUES
+# (
+# :timestamp_arrival,
+# :timestamp_msg,
+# :timestamp_value,
+# :value,
+# :qty
+# )
+# """),
+# parameters=params
+# )
+# await CON.commit()
+# else:
+# raise ValueError('Only MySQL engine is implemented')
+
+### Websocket ###
+async def heartbeat(ws):
+ while True:
+ await asyncio.sleep(PING_INTERVAL_SEC)
+ logging.info("SENDING PING...")
+ ping_msg = {"op":"ping","args":[ str(round(datetime.now().timestamp()*1000)) ]}
+ await ws.send(json.dumps(ping_msg))
+
+async def ws_stream():
+ global TICKER_SNAPSHOT_DATA_LAST
+
+ async for websocket in websockets.connect(f'{WSS_URL}{round(datetime.now().timestamp())}'):
+ logging.info(f"Connected to {WSS_URL}")
+
+ asyncio.create_task(heartbeat(ws=websocket))
+
+ subscribe_msg = {
+ "op": "subscribe",
+ "args": ["instrumentInfo.H.ETHUSDT"]
+ }
+
+ await websocket.send(json.dumps(subscribe_msg))
+
+ try:
+ async for message in websocket:
+ ts_arrival = round(datetime.now().timestamp()*1000)
+ if isinstance(message, str):
+ try:
+ data = json.loads(message)
+ if data.get('op', None) == 'ping':
+ pong_msg = {"op":"pong","args":[ str(round(datetime.now().timestamp()*1000)) ]}
+ logging.info(f'RECEIVED PING: {data}; SENDING PONG: {pong_msg}')
+ await websocket.send(json.dumps(pong_msg))
+ continue
+ elif data.get('success', None):
+ # logging.info('CONNECTION SUCCESFUL RESP MSG')
+ continue
+
+ msg_type = data.get('type', None)
+ if msg_type is not None:
+ match msg_type:
+ case 'snapshot':
+ TICKER_SNAPSHOT_DATA_LAST = data['data']
+
+ nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000)
+ VAL_KEY_OBJ = json.dumps({
+ 'timestamp_arrival': ts_arrival,
+ 'timestamp_msg': data['ts'],
+ 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'],
+ 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']),
+ 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']),
+ 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']),
+ 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']),
+ 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']),
+ 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']),
+ 'nextFundingTime_ts_ms': nextFundingTime_ts,
+ })
+ VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
+ continue
+ case 'delta':
+ TICKER_SNAPSHOT_DATA_LAST.update(data['data'])
+
+ nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000)
+ VAL_KEY_OBJ = json.dumps({
+ 'timestamp_arrival': ts_arrival,
+ 'timestamp_msg': data['ts'],
+ 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'],
+ 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']),
+ 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']),
+ 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']),
+ 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']),
+ 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']),
+ 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']),
+ 'nextFundingTime_ts_ms': nextFundingTime_ts,
+ })
+ VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
+ continue
+ case _:
+ logging.warning(f'UNMATCHED OTHER MSG: {data}')
+ else:
+ logging.info(f'Initial or unexpected data struct, skipping: {data}')
+ continue
+ except (json.JSONDecodeError, ValueError):
+ logging.warning(f'Message not in JSON format, skipping: {message}')
+ continue
+ else:
+ raise ValueError(f'Type: {type(data)} not expected: {message}')
+ except websockets.ConnectionClosed as e:
+ logging.error(f'Connection closed: {e}')
+ logging.error(traceback.format_exc())
+ continue
+ except Exception as e:
+ logging.error(f'Connection closed: {e}')
+ logging.error(traceback.format_exc())
+
+
+async def main():
+ global VAL_KEY
+ global CON
+
+ if USE_VK:
+ VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
+ else:
+ VAL_KEY = None
+ logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
+
+ if USE_DB:
+ engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
+ async with engine.connect() as CON:
+ # await create_rtds_btcusd_table(CON=CON)
+ await ws_stream()
+ else:
+ CON = None
+ logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
+ await ws_stream()
+
+
+if __name__ == '__main__':
+ START_TIME = round(datetime.now().timestamp()*1000)
+
+ logging.info(f'Log FilePath: {LOG_FILEPATH}')
+
+ logging.basicConfig(
+ force=True,
+ filename=LOG_FILEPATH,
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ filemode='w'
+ )
+ logging.info(f"STARTED: {START_TIME}")
+
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ logging.info("Stream stopped")
\ No newline at end of file
diff --git a/ws_aster.py b/ws_aster.py
new file mode 100644
index 0000000..9a151a7
--- /dev/null
+++ b/ws_aster.py
@@ -0,0 +1,249 @@
+import asyncio
+import json
+import logging
+import socket
+import traceback
+from datetime import datetime
+from typing import AsyncContextManager
+
+import numpy as np
+import pandas as pd
+import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
+from sqlalchemy import text
+import websockets
+from sqlalchemy.ext.asyncio import create_async_engine
+import valkey
+import os
+from dotenv import load_dotenv
+
+
+### Allow only ipv4 ###
+def allowed_gai_family():
+ return socket.AF_INET
+urllib3_cn.allowed_gai_family = allowed_gai_family
+
+### Database ###
+USE_DB: bool = False
+USE_VK: bool = True
+# VK_FUND_RATE = 'fund_rate_apex'
+VK_TICKER = 'fut_ticker_apex'
+CON: AsyncContextManager | None = None
+VAL_KEY = None
+
+### Logging ###
+load_dotenv()
+LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Apex.log'
+
+### CONSTANTS ###
+PING_INTERVAL_SEC = 15
+
+### Globals ###
+WSS_URL = "wss://quote.omni.apex.exchange/realtime_public?v=2×tamp="
+TICKER_SNAPSHOT_DATA_LAST: dict = {}
+
+# HIST_TRADES = np.empty((0, 3))
+# HIST_TRADES_LOOKBACK_SEC = 6
+
+# ### Database Funcs ###
+# async def create_rtds_btcusd_table(
+# CON: AsyncContextManager,
+# engine: str = 'mysql', # mysql | duckdb
+# ) -> None:
+# if CON is None:
+# logging.info("NO DB CONNECTION, SKIPPING Create Statements")
+# else:
+# if engine == 'mysql':
+# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
+# await CON.execute(text("""
+# CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
+# timestamp_arrival BIGINT,
+# timestamp_msg BIGINT,
+# timestamp_value BIGINT,
+# value DOUBLE,
+# qty DOUBLE
+# );
+# """))
+# await CON.commit()
+# else:
+# raise ValueError('Only MySQL engine is implemented')
+
+# async def insert_rtds_btcusd_table(
+# timestamp_arrival: int,
+# timestamp_msg: int,
+# timestamp_value: int,
+# value: float,
+# qty: float,
+# CON: AsyncContextManager,
+# engine: str = 'mysql', # mysql | duckdb
+# ) -> None:
+# params={
+# 'timestamp_arrival': timestamp_arrival,
+# 'timestamp_msg': timestamp_msg,
+# 'timestamp_value': timestamp_value,
+# 'value': value,
+# 'qty': qty,
+# }
+# if CON is None:
+# logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
+# else:
+# if engine == 'mysql':
+# await CON.execute(text("""
+# INSERT INTO binance_btcusd_trades
+# (
+# timestamp_arrival,
+# timestamp_msg,
+# timestamp_value,
+# value,
+# qty
+# )
+# VALUES
+# (
+# :timestamp_arrival,
+# :timestamp_msg,
+# :timestamp_value,
+# :value,
+# :qty
+# )
+# """),
+# parameters=params
+# )
+# await CON.commit()
+# else:
+# raise ValueError('Only MySQL engine is implemented')
+
+### Websocket ###
+async def heartbeat(ws):
+ while True:
+ await asyncio.sleep(PING_INTERVAL_SEC)
+ logging.info("SENDING PING...")
+ ping_msg = {"op":"ping","args":[ str(round(datetime.now().timestamp()*1000)) ]}
+ await ws.send(json.dumps(ping_msg))
+
+async def ws_stream():
+ global TICKER_SNAPSHOT_DATA_LAST
+
+ async for websocket in websockets.connect(f'{WSS_URL}{round(datetime.now().timestamp())}'):
+ logging.info(f"Connected to {WSS_URL}")
+
+ asyncio.create_task(heartbeat(ws=websocket))
+
+ subscribe_msg = {
+ "op": "subscribe",
+ "args": ["instrumentInfo.H.ETHUSDT"]
+ }
+
+ await websocket.send(json.dumps(subscribe_msg))
+
+ try:
+ async for message in websocket:
+ ts_arrival = round(datetime.now().timestamp()*1000)
+ print(message)
+ # if isinstance(message, str):
+ # try:
+ # data = json.loads(message)
+ # if data.get('op', None) == 'ping':
+ # pong_msg = {"op":"pong","args":[ str(round(datetime.now().timestamp()*1000)) ]}
+ # logging.info(f'RECEIVED PING: {data}; SENDING PONG: {pong_msg}')
+ # await websocket.send(json.dumps(pong_msg))
+ # continue
+ # elif data.get('success', None):
+ # # logging.info('CONNECTION SUCCESFUL RESP MSG')
+ # continue
+
+ # msg_type = data.get('type', None)
+ # if msg_type is not None:
+ # match msg_type:
+ # case 'snapshot':
+ # TICKER_SNAPSHOT_DATA_LAST = data['data']
+
+ # nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000)
+ # VAL_KEY_OBJ = json.dumps({
+ # 'timestamp_arrival': ts_arrival,
+ # 'timestamp_msg': data['ts'],
+ # 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'],
+ # 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']),
+ # 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']),
+ # 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']),
+ # 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']),
+ # 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']),
+ # 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']),
+ # 'nextFundingTime_ts_ms': nextFundingTime_ts,
+ # })
+ # VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
+ # continue
+ # case 'delta':
+ # TICKER_SNAPSHOT_DATA_LAST.update(data['data'])
+
+ # nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000)
+ # VAL_KEY_OBJ = json.dumps({
+ # 'timestamp_arrival': ts_arrival,
+ # 'timestamp_msg': data['ts'],
+ # 'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'],
+ # 'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']),
+ # 'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']),
+ # 'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']),
+ # 'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']),
+ # 'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']),
+ # 'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']),
+ # 'nextFundingTime_ts_ms': nextFundingTime_ts,
+ # })
+ # VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
+ # continue
+ # case _:
+ # logging.warning(f'UNMATCHED OTHER MSG: {data}')
+ # else:
+ # logging.info(f'Initial or unexpected data struct, skipping: {data}')
+ # continue
+ # except (json.JSONDecodeError, ValueError):
+ # logging.warning(f'Message not in JSON format, skipping: {message}')
+ # continue
+ # else:
+ # raise ValueError(f'Type: {type(data)} not expected: {message}')
+ except websockets.ConnectionClosed as e:
+ logging.error(f'Connection closed: {e}')
+ logging.error(traceback.format_exc())
+ continue
+ except Exception as e:
+ logging.error(f'Connection closed: {e}')
+ logging.error(traceback.format_exc())
+
+
+async def main():
+ global VAL_KEY
+ global CON
+
+ if USE_VK:
+ VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
+ else:
+ VAL_KEY = None
+ logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
+
+ if USE_DB:
+ engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
+ async with engine.connect() as CON:
+ # await create_rtds_btcusd_table(CON=CON)
+ await ws_stream()
+ else:
+ CON = None
+ logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
+ await ws_stream()
+
+
+if __name__ == '__main__':
+ START_TIME = round(datetime.now().timestamp()*1000)
+
+ logging.info(f'Log FilePath: {LOG_FILEPATH}')
+
+ logging.basicConfig(
+ force=True,
+ filename=LOG_FILEPATH,
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ filemode='w'
+ )
+ logging.info(f"STARTED: {START_TIME}")
+
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ logging.info("Stream stopped")
\ No newline at end of file
diff --git a/ws_mexc.py b/ws_mexc.py
index e69de29..38d1dec 100644
--- a/ws_mexc.py
+++ b/ws_mexc.py
@@ -0,0 +1,244 @@
+import asyncio
+import json
+import logging
+import socket
+import traceback
+from datetime import datetime
+from typing import AsyncContextManager
+
+import numpy as np
+import pandas as pd
+import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
+from sqlalchemy import text
+import websockets
+from sqlalchemy.ext.asyncio import create_async_engine
+import valkey
+import os
+from dotenv import load_dotenv
+
+
+### Allow only ipv4 ###
+def allowed_gai_family():
+ return socket.AF_INET
+urllib3_cn.allowed_gai_family = allowed_gai_family
+
+### Database ###
+USE_DB: bool = False
+USE_VK: bool = True
+VK_FUND_RATE = 'fund_rate_mexc'
+VK_TICKER = 'fut_ticker_mexc'
+CON: AsyncContextManager | None = None
+VAL_KEY = None
+
+### Logging ###
+load_dotenv()
+LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_MEXC.log'
+
+### CONSTANTS ###
+PING_INTERVAL_SEC = 20
+
+### Globals ###
+WSS_URL = "wss://contract.mexc.com/edge"
+# HIST_TRADES = np.empty((0, 3))
+# HIST_TRADES_LOOKBACK_SEC = 6
+
+# ### Database Funcs ###
+# async def create_rtds_btcusd_table(
+# CON: AsyncContextManager,
+# engine: str = 'mysql', # mysql | duckdb
+# ) -> None:
+# if CON is None:
+# logging.info("NO DB CONNECTION, SKIPPING Create Statements")
+# else:
+# if engine == 'mysql':
+# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
+# await CON.execute(text("""
+# CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
+# timestamp_arrival BIGINT,
+# timestamp_msg BIGINT,
+# timestamp_value BIGINT,
+# value DOUBLE,
+# qty DOUBLE
+# );
+# """))
+# await CON.commit()
+# else:
+# raise ValueError('Only MySQL engine is implemented')
+
+# async def insert_rtds_btcusd_table(
+# timestamp_arrival: int,
+# timestamp_msg: int,
+# timestamp_value: int,
+# value: float,
+# qty: float,
+# CON: AsyncContextManager,
+# engine: str = 'mysql', # mysql | duckdb
+# ) -> None:
+# params={
+# 'timestamp_arrival': timestamp_arrival,
+# 'timestamp_msg': timestamp_msg,
+# 'timestamp_value': timestamp_value,
+# 'value': value,
+# 'qty': qty,
+# }
+# if CON is None:
+# logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
+# else:
+# if engine == 'mysql':
+# await CON.execute(text("""
+# INSERT INTO binance_btcusd_trades
+# (
+# timestamp_arrival,
+# timestamp_msg,
+# timestamp_value,
+# value,
+# qty
+# )
+# VALUES
+# (
+# :timestamp_arrival,
+# :timestamp_msg,
+# :timestamp_value,
+# :value,
+# :qty
+# )
+# """),
+# parameters=params
+# )
+# await CON.commit()
+# else:
+# raise ValueError('Only MySQL engine is implemented')
+
+### Websocket ###
+async def heartbeat(ws):
+ while True:
+ await asyncio.sleep(PING_INTERVAL_SEC)
+ logging.info("SENDING PING...")
+ ping_msg = {
+ "method": "ping"
+ }
+ await ws.send(json.dumps(ping_msg))
+
+
+async def ws_stream():
+ # global HIST_TRADES
+
+ async for websocket in websockets.connect(WSS_URL):
+ logging.info(f"Connected to {WSS_URL}")
+
+ asyncio.create_task(heartbeat(ws=websocket))
+
+ subscribe_msg = {
+ "method": "sub.ticker",
+ "param": {
+ "symbol": "ETH_USDT"
+ },
+ # "gzip": false
+ }
+ await websocket.send(json.dumps(subscribe_msg))
+ subscribe_msg = {
+ "method": "sub.funding.rate",
+ "param": {
+ "symbol": "ETH_USDT"
+ },
+ # "gzip": false
+ }
+ await websocket.send(json.dumps(subscribe_msg))
+
+ try:
+ async for message in websocket:
+ ts_arrival = round(datetime.now().timestamp()*1000)
+ if isinstance(message, str):
+ try:
+ data = json.loads(message)
+ channel = data.get('channel', None)
+ if channel is not None:
+ match channel:
+ case 'push.ticker':
+ # logging.info(f'TICKER: {data}')
+ VAL_KEY_OBJ = json.dumps({
+ 'timestamp_arrival': ts_arrival,
+ 'timestamp_msg': data['ts'],
+ 'timestamp_data': data['data']['timestamp'],
+ 'symbol': data['data']['symbol'],
+ 'lastPrice': float(data['data']['lastPrice']),
+ 'fairPrice': float(data['data']['fairPrice']),
+ 'indexPrice': float(data['data']['indexPrice']),
+ 'volume24': float(data['data']['volume24']),
+ 'bid1': float(data['data']['bid1']),
+ 'ask1': float(data['data']['ask1']),
+ 'fundingRate': float(data['data']['fundingRate']),
+
+ })
+ VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
+ case 'push.funding.rate':
+ # logging.info(f'FUNDING RATE: {data}')
+ VAL_KEY_OBJ = json.dumps({
+ 'timestamp_arrival': ts_arrival,
+ 'timestamp_msg': data['ts'],
+ 'symbol': data['data']['symbol'],
+ 'rate': float(data['data']['rate']),
+ 'nextSettleTime_ts_ms': data['data']['nextSettleTime'],
+ })
+ VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ)
+ case 'pong':
+ logging.info(f'PING RESPONSE: {data}')
+ case _:
+ logging.info(f'OTHER: {data}')
+
+ else:
+ logging.info(f'Initial or unexpected data struct, skipping: {data}')
+ continue
+ except (json.JSONDecodeError, ValueError):
+ logging.warning(f'Message not in JSON format, skipping: {message}')
+ continue
+ else:
+ raise ValueError(f'Type: {type(data)} not expected: {message}')
+ except websockets.ConnectionClosed as e:
+ logging.error(f'Connection closed: {e}')
+ logging.error(traceback.format_exc())
+ continue
+ except Exception as e:
+ logging.error(f'Connection closed: {e}')
+ logging.error(traceback.format_exc())
+
+
+async def main():
+ global VAL_KEY
+ global CON
+
+ if USE_VK:
+ VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
+ else:
+ VAL_KEY = None
+ logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
+
+ if USE_DB:
+ engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
+ async with engine.connect() as CON:
+ # await create_rtds_btcusd_table(CON=CON)
+ await ws_stream()
+ else:
+ CON = None
+ logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
+ await ws_stream()
+
+
+if __name__ == '__main__':
+ START_TIME = round(datetime.now().timestamp()*1000)
+
+ logging.info(f'Log FilePath: {LOG_FILEPATH}')
+
+ logging.basicConfig(
+ force=True,
+ filename=LOG_FILEPATH,
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ filemode='w'
+ )
+ logging.info(f"STARTED: {START_TIME}")
+
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ logging.info("Stream stopped")
\ No newline at end of file