Compare commits

...

5 Commits

Author SHA1 Message Date
ff209603b6 initial algo logic 2026-04-23 06:39:51 +00:00
539e6004cf feedhandlers extended 2026-04-23 03:11:52 +00:00
408a63fe58 aster auth hell 2026-04-22 05:24:40 +00:00
64266366dd adding aster 2026-04-22 01:33:35 +00:00
d2068b1c73 setting up feed handlers, more to come 2026-04-21 20:22:33 +00:00
34 changed files with 3661 additions and 66 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.env

4
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"python-envs.defaultEnvManager": "ms-python.python:conda",
"python-envs.defaultPackageManager": "ms-python.python:conda"
}

198
apex.ipynb Normal file
View File

@@ -0,0 +1,198 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"id": "b6c46d40",
"metadata": {},
"outputs": [],
"source": [
"import modules.apex_api as apex_api"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "7fb6d9dc",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Authenticating...\n",
"...Authenticated\n"
]
}
],
"source": [
"client = apex_api.apex_create_client()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "d5a1203a",
"metadata": {},
"outputs": [],
"source": [
"# print(\"*** POSTING ORDER ***\")\n",
"# createOrderRes = client.create_order_v3(\n",
"# symbol=\"ETH-USDT\", \n",
"# side=\"BUY\",\n",
"# type=\"LIMIT\",\n",
"# size=\"0.01\",\n",
"# price=\"2100\",\n",
"# )\n",
"# print(createOrderRes)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c21254eb",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'data': {'totalEquityValue': '13.840000000000000000',\n",
" 'availableBalance': '13.840000000000000000',\n",
" 'initialMargin': '0',\n",
" 'maintenanceMargin': '0',\n",
" 'walletBalance': '',\n",
" 'realizedPnl': '-5.399416243793950000',\n",
" 'unrealizedPnl': '0.00',\n",
" 'totalRisk': '0',\n",
" 'totalValueWithoutDiscount': '13.840000000000000000',\n",
" 'liabilities': '13.840000000000000000',\n",
" 'totalAvailableBalance': '13.840000000000000000'},\n",
" 'timeCost': 6327944}"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client.get_account_balance_v3()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "7cba63d4",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'data': [], 'timeCost': 3984811}"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client.open_orders_v3()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "b072c0de",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'timeCost': 4389124}"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client.delete_open_orders_v3(symbol=\"ETH-USDT\")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "5ea177f8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"TOKEN: USDT == 13.840000000000000000\n",
"TOKEN: USDC == 0.000000000000000000\n"
]
}
],
"source": [
"account_and_pos = client.get_account_v3()\n",
"for c in account_and_pos['contractWallets']:\n",
" print(f'TOKEN: {c['token']} == {c['balance']}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "70eb3b4f",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "fefca500",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc048386",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "py_313",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

357
aster.ipynb Normal file
View File

@@ -0,0 +1,357 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "3a269644",
"metadata": {},
"outputs": [],
"source": [
"import modules.aster_auth as aster_auth"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "4395fabb",
"metadata": {},
"outputs": [],
"source": [
"listen_key_request = {\n",
" \"url\": \"/fapi/v3/listenKey\",\n",
" \"method\": \"POST\",\n",
" \"params\": {}\n",
"}\n",
"cancel_all_open_orders = {\n",
" \"url\": \"/fapi/v3/allOpenOrders\",\n",
" \"method\": \"DELETE\",\n",
" \"params\": {\n",
" 'symbol': 'ETHUSDT',\n",
" }\n",
"}\n",
"fut_acct_balances = {\n",
" \"url\": \"/fapi/v3/balance\",\n",
" \"method\": \"GET\",\n",
" \"params\": {}\n",
"}\n",
"commission_rate = {\n",
" \"url\": \"/fapi/v3/commissionRate\",\n",
" \"method\": \"GET\",\n",
" \"params\": {\n",
" 'symbol': 'ETHUSDT',\n",
" }\n",
"}\n",
"post_order = {\n",
" \"url\": \"/fapi/v3/order\",\n",
" \"method\": \"POST\",\n",
" \"params\": {\n",
" 'symbol': 'ETHUSDT',\n",
" 'side': 'BUY',\n",
" 'type': 'LIMIT',\n",
" 'timeInForce': 'GTC',\n",
" 'quantity': '0.01',\n",
" 'price': '2100',\n",
" }\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2122885a",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'orderId': 17341253499,\n",
" 'symbol': 'ETHUSDT',\n",
" 'status': 'NEW',\n",
" 'clientOrderId': 'J1SvsvvDhlDAqwGau1T77F',\n",
" 'price': '2100',\n",
" 'avgPrice': '0.00000',\n",
" 'origQty': '0.010',\n",
" 'executedQty': '0',\n",
" 'cumQty': '0',\n",
" 'cumQuote': '0',\n",
" 'timeInForce': 'GTC',\n",
" 'type': 'LIMIT',\n",
" 'reduceOnly': False,\n",
" 'closePosition': False,\n",
" 'side': 'BUY',\n",
" 'positionSide': 'BOTH',\n",
" 'stopPrice': '0',\n",
" 'workingType': 'CONTRACT_PRICE',\n",
" 'priceProtect': False,\n",
" 'origType': 'LIMIT',\n",
" 'updateTime': 1776838867250,\n",
" 'newChainData': {'hash': '0xa2c23513491587b2961bfa77d2077489a14522d7ce359a9e3635e7c4f0d12a22'}}"
]
},
"execution_count": 28,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# j = aster_auth_api.post_authenticated_url(post_order)\n",
"# j"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "45d68bf4",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1776836583531"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from datetime import datetime\n",
"round(datetime.now().timestamp())"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1800"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"30*60"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "67a3bbb2",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 23,
"id": "e958e7da",
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"\n",
"### User Data Stream ###\n",
"# After posting limit order to rest\n",
"after_order_post = json.loads('{\"e\":\"ORDER_TRADE_UPDATE\",\"T\":1776836992350,\"E\":1776836992388,\"o\":{\"s\":\"ETHUSDT\",\"c\":\"KFi375tZh5kzHsQJWKMJHe\",\"S\":\"BUY\",\"o\":\"LIMIT\",\"f\":\"GTC\",\"q\":\"0.010\",\"p\":\"2100\",\"ap\":\"0\",\"sp\":\"0\",\"x\":\"NEW\",\"X\":\"NEW\",\"i\":17341121450,\"l\":\"0\",\"z\":\"0\",\"L\":\"0\",\"T\":1776836992350,\"t\":0,\"b\":\"21\",\"a\":\"0\",\"m\":false,\"R\":false,\"wt\":\"CONTRACT_PRICE\",\"ot\":\"LIMIT\",\"ps\":\"BOTH\",\"cp\":false,\"rp\":\"0\",\"pP\":false,\"si\":0,\"ss\":0,\"h\":\"0xeec41a368072d5a47c13b3ad83703eded2e2b8cf9ff427095beb8b7684968db0\"}}')\n",
"\n",
"# After order cancellation in GUI\n",
"after_order_cxl = json.loads('{\"e\":\"ORDER_TRADE_UPDATE\",\"T\":1776836998500,\"E\":1776836998533,\"o\":{\"s\":\"ETHUSDT\",\"c\":\"KFi375tZh5kzHsQJWKMJHe\",\"S\":\"BUY\",\"o\":\"LIMIT\",\"f\":\"GTC\",\"q\":\"0.010\",\"p\":\"2100\",\"ap\":\"0\",\"sp\":\"0\",\"x\":\"CANCELED\",\"X\":\"CANCELED\",\"i\":17341121450,\"l\":\"0\",\"z\":\"0\",\"L\":\"0\",\"T\":1776836998500,\"t\":0,\"b\":\"0\",\"a\":\"0\",\"m\":false,\"R\":false,\"wt\":\"CONTRACT_PRICE\",\"ot\":\"LIMIT\",\"ps\":\"BOTH\",\"cp\":false,\"rp\":\"0\",\"pP\":false,\"si\":0,\"ss\":0,\"h\":\"0x116bb48a4b41420aebbc76343d6f55f22d1ccbc36b04462e6172571fda836599\"}}')\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'e': 'ORDER_TRADE_UPDATE',\n",
" 'T': 1776836992350,\n",
" 'E': 1776836992388,\n",
" 'o': {'s': 'ETHUSDT',\n",
" 'c': 'KFi375tZh5kzHsQJWKMJHe',\n",
" 'S': 'BUY',\n",
" 'o': 'LIMIT',\n",
" 'f': 'GTC',\n",
" 'q': '0.010',\n",
" 'p': '2100',\n",
" 'ap': '0',\n",
" 'sp': '0',\n",
" 'x': 'NEW',\n",
" 'X': 'NEW',\n",
" 'i': 17341121450,\n",
" 'l': '0',\n",
" 'z': '0',\n",
" 'L': '0',\n",
" 'T': 1776836992350,\n",
" 't': 0,\n",
" 'b': '21',\n",
" 'a': '0',\n",
" 'm': False,\n",
" 'R': False,\n",
" 'wt': 'CONTRACT_PRICE',\n",
" 'ot': 'LIMIT',\n",
" 'ps': 'BOTH',\n",
" 'cp': False,\n",
" 'rp': '0',\n",
" 'pP': False,\n",
" 'si': 0,\n",
" 'ss': 0,\n",
" 'h': '0xeec41a368072d5a47c13b3ad83703eded2e2b8cf9ff427095beb8b7684968db0'}}"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"after_order_post"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "1ea320f2",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'e': 'ORDER_TRADE_UPDATE',\n",
" 'T': 1776836998500,\n",
" 'E': 1776836998533,\n",
" 'o': {'s': 'ETHUSDT',\n",
" 'c': 'KFi375tZh5kzHsQJWKMJHe',\n",
" 'S': 'BUY',\n",
" 'o': 'LIMIT',\n",
" 'f': 'GTC',\n",
" 'q': '0.010',\n",
" 'p': '2100',\n",
" 'ap': '0',\n",
" 'sp': '0',\n",
" 'x': 'CANCELED',\n",
" 'X': 'CANCELED',\n",
" 'i': 17341121450,\n",
" 'l': '0',\n",
" 'z': '0',\n",
" 'L': '0',\n",
" 'T': 1776836998500,\n",
" 't': 0,\n",
" 'b': '0',\n",
" 'a': '0',\n",
" 'm': False,\n",
" 'R': False,\n",
" 'wt': 'CONTRACT_PRICE',\n",
" 'ot': 'LIMIT',\n",
" 'ps': 'BOTH',\n",
" 'cp': False,\n",
" 'rp': '0',\n",
" 'pP': False,\n",
" 'si': 0,\n",
" 'ss': 0,\n",
" 'h': '0x116bb48a4b41420aebbc76343d6f55f22d1ccbc36b04462e6172571fda836599'}}"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"after_order_cxl"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "07ef4360",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Timestamp('2019-09-19 07:51:05.651000')"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"\n",
"pd.to_datetime(1568879465651, unit='ms')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "284b7266",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "98ed3a27",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "940ceba7",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "py_313",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

65
docker-compose.yml Normal file
View File

@@ -0,0 +1,65 @@
# tail -f Fund_Rate_Aster_User.log Fund_Rate_Aster.log Fund_Rate_Extended_FR.log Fund_Rate_Extended_OB.log Fund_Rate_Extended_User.log
services:
ws_aster:
container_name: ws_aster
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_aster/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
ws_aster_user:
container_name: ws_aster_user
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_aster_user/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
ws_extended_fund_rate:
container_name: ws_extended_fund_rate
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_extended_fund_rate/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
ws_extended_orderbook:
container_name: ws_extended_orderbook
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_extended_orderbook/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
ws_extended_user:
container_name: ws_extended_user
restart: "unless-stopped"
build:
context: ./
dockerfile: ./ws_extended_user/Dockerfile
volumes:
- /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
- /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
network_mode: "host"
# ng:
# container_name: ng
# restart: "unless-stopped"
# build:
# context: ./
# dockerfile: ./ng/Dockerfile
# volumes:
# - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data
# - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data
# network_mode: "host"

394
extended.ipynb Normal file
View File

@@ -0,0 +1,394 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 4,
"id": "6c70a8c3",
"metadata": {},
"outputs": [],
"source": [
"\n",
"import asyncio\n",
"import requests\n",
"from x10.config import MAINNET_CONFIG, TESTNET_CONFIG\n",
"from x10.core.stark_account import StarkPerpetualAccount\n",
"from x10.perpetual.trading_client import PerpetualTradingClient\n",
"from x10.models.order import OrderSide, OrderType\n",
"import time\n",
"from dotenv import load_dotenv\n",
"import os\n",
"import uuid\n",
"import asyncio\n",
"import logging\n",
"from decimal import Decimal\n",
"import modules.extended_auth as extend_auth\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "ff971ca9",
"metadata": {},
"outputs": [],
"source": [
"load_dotenv()\n",
"\n",
"API_KEY = os.getenv('EXTENDED_API_KEY')\n",
"PUBLIC_KEY = os.getenv('EXTENDED_STARK_KEY_PUBLIC') # public Stark key (l2Key from account info)\n",
"PRIVATE_KEY = os.getenv('EXTENDED_STARK_KEY_PRIVATE') # private Stark key (hex)\n",
"VAULT = int(os.getenv('EXTENDED_VAULT_NUMBER')) # l2Vault from account info (integer)\n",
"\n",
"CONFIG = MAINNET_CONFIG\n",
"\n",
"ORDER_MARKET = \"BTC-USD\"\n",
"ORDER_SIDE = OrderSide.BUY\n",
"ORDER_QTY = Decimal(\"0.001\")\n",
"ORDER_PRICE = Decimal(\"75000\")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "fc2c6d2b",
"metadata": {},
"outputs": [],
"source": [
"client, trading_client = await extend_auth.create_auth_account_and_trading_client()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c366706f",
"metadata": {},
"outputs": [],
"source": [
"# placed_order = await trading_client.place_order(\n",
"# market_name=ORDER_MARKET,\n",
"# amount_of_synthetic=ORDER_QTY,\n",
"# price=ORDER_PRICE,\n",
"# side=ORDER_SIDE,\n",
"# taker_fee=Decimal(\"0.00025\")\n",
"# )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "7cd3413d",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "a7212988",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 1,
"id": "2b5d7d21",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'type': 'SNAPSHOT',\n",
" 'data': {'t': 'SNAPSHOT',\n",
" 'm': 'ETH-USD',\n",
" 'b': [{'q': '362.381', 'p': '2318.1'}],\n",
" 'a': [{'q': '70.572', 'p': '2318.2'}],\n",
" 'd': '1'},\n",
" 'ts': 1776822518279,\n",
" 'seq': 14}"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"{\"type\":\"SNAPSHOT\",\"data\":{\"t\":\"SNAPSHOT\",\"m\":\"ETH-USD\",\"b\":[{\"q\":\"362.381\",\"p\":\"2318.1\"}],\"a\":[{\"q\":\"70.572\",\"p\":\"2318.2\"}],\"d\":\"1\"},\"ts\":1776822518279,\"seq\":14}"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "19f51572",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'data': {'m': 'ETH-USD', 'f': '-0.000001', 'T': 1776823319595},\n",
" 'ts': 1776823319595,\n",
" 'seq': 2}"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"{'data': {'m': 'ETH-USD', 'f': '-0.000001', 'T': 1776823319595}, 'ts': 1776823319595, 'seq': 2}"
]
},
{
"cell_type": "code",
"execution_count": 41,
"id": "68006231",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Timestamp('2026-04-22 03:00:00')"
]
},
"execution_count": 41,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"\n",
"pd.to_datetime(1776826800000, unit='ms')"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "5561153b",
"metadata": {},
"outputs": [],
"source": [
"def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds\n",
" interval_secs = interval_mins * 60\n",
" seconds = dt.timestamp()\n",
" rounded_seconds = math.floor(seconds / interval_secs) * interval_secs\n",
" \n",
" return rounded_seconds"
]
},
{
"cell_type": "code",
"execution_count": 40,
"id": "bd83b59f",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1776826800000"
]
},
"execution_count": 40,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"(time_round_down(dt=datetime.now(timezone.utc), interval_mins=60)+(60*60))*1000"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "086dbfb3",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "2d734f79",
"metadata": {},
"outputs": [],
"source": [
"\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "e6b59c51",
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"\n",
"### USER DATA STREAM ###\n",
"order = json.loads('{\"type\":\"ORDER\",\"data\":{\"isSnapshot\":true,\"orders\":[]},\"ts\":1776839215443,\"seq\":1}')\n",
"position = json.loads('{\"type\":\"POSITION\",\"data\":{\"isSnapshot\":true,\"positions\":[]},\"ts\":1776839215443,\"seq\":2}')\n",
"balance = json.loads('{\"type\":\"BALANCE\",\"data\":{\"isSnapshot\":true,\"balance\":{\"collateralName\":\"USD\",\"balance\":\"25.979998\",\"status\":\"ACTIVE\",\"equity\":\"25.979998\",\"spotEquity\":\"0\",\"spotEquityForAvailableForTrade\":\"0\",\"availableForTrade\":\"25.979998\",\"availableForWithdrawal\":\"25.979998\",\"unrealisedPnl\":\"0\",\"initialMargin\":\"0.000000\",\"marginRatio\":\"0.0000\",\"updatedTime\":1776822250184,\"exposure\":\"0\",\"leverage\":\"0.0000\"}},\"ts\":1776839215443,\"seq\":3}')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "ee8420c4",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'type': 'ORDER',\n",
" 'data': {'isSnapshot': True, 'orders': []},\n",
" 'ts': 1776839215443,\n",
" 'seq': 1}"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"order"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "e8d20d9f",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'type': 'POSITION',\n",
" 'data': {'isSnapshot': True, 'positions': []},\n",
" 'ts': 1776839215443,\n",
" 'seq': 2}"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"position"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'type': 'BALANCE',\n",
" 'data': {'isSnapshot': True,\n",
" 'balance': {'collateralName': 'USD',\n",
" 'balance': '25.979998',\n",
" 'status': 'ACTIVE',\n",
" 'equity': '25.979998',\n",
" 'spotEquity': '0',\n",
" 'spotEquityForAvailableForTrade': '0',\n",
" 'availableForTrade': '25.979998',\n",
" 'availableForWithdrawal': '25.979998',\n",
" 'unrealisedPnl': '0',\n",
" 'initialMargin': '0.000000',\n",
" 'marginRatio': '0.0000',\n",
" 'updatedTime': 1776822250184,\n",
" 'exposure': '0',\n",
" 'leverage': '0.0000'}},\n",
" 'ts': 1776839215443,\n",
" 'seq': 3}"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"balance"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fb16dfb9",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "665377af",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "af5c751b",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "py_313",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

282
main.py Normal file
View File

@@ -0,0 +1,282 @@
import asyncio
import json
import logging
import math
import os
import time
import traceback
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import AsyncContextManager
from dotenv import load_dotenv
import numpy as np
import pandas as pd
import requests
# import talib
import valkey
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
import modules.aster_auth as aster_auth
import modules.extended_auth as extend_auth
### Database ###
EXTEND_CLIENT = None
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo.log'
### CONSTANTS ###
ASTER_LH_ASSET: str = 'ETH'
ASTER_RH_ASSET: str = 'USDT'
ASTER_TICKER: str = ASTER_LH_ASSET + ASTER_RH_ASSET
EXTEND_LH_ASSET: str = 'ETH'
EXTEND_RH_ASSET: str = 'USD'
EXTEND_TICKER: str = EXTEND_LH_ASSET + '-' + EXTEND_RH_ASSET
TARGET_OPEN_CASH_POSITION: float = 10 # Each side (alpha and hedge)
### GLOBALS ###
ASTER_MULT = 150
EXTEND_MULT = 50
MAX_TARGET_NOTIONAL = min([ASTER_MULT, EXTEND_MULT]) * TARGET_OPEN_CASH_POSITION
ASTER_MIN_ORDER_QTY = 0.001
EXTEND_MIN_ORDER_QTY = 0.01
ASTER_AVAIL_COLLATERAL = 0
ASTER_NOTIONAL_POSITION = 0
EXTEND_AVAIL_COLLATERAL = 0
EXTEND_NOTIONAL_POSITION = 0
ASTER_OPEN_POSITIONS = []
EXTEND_OPEN_POSITIONS = []
ASTER_OPEN_ORDERS = []
EXTEND_OPEN_ORDERS = []
### FLAGS ###
LIQUIDATE_POS_AND_KILL_ALGO_FLAG: bool = False
async def aster_remainder_route():
# Check open orders...cancel replace or new order?
# Check collateral to confirm you have enough money to trade
# if CR, what should be the new price? has it changed? maybe no action needed? how long has it been working?
# if not enough collateral then need to liquidate and kill algo - flip flag
# if good to order, then create and post order. ADD to LOCAL OPEN ORDERS LIST
pass
async def extend_remainder_route():
pass
async def run_algo():
try:
while True:
loop_start = time.time()
print('__________Start___________')
ASTER_FUND_RATE_DICT = json.loads(VAL_KEY.get('fund_rate_aster'))
ASTER_TICKER_DICT = json.loads(VAL_KEY.get('fut_ticker_aster'))
# print(f'ASTER FUND RATE: {ASTER_FUND_RATE}')
# print(f'ASTER TICKER: {ASTER_TICKER}')
EXTENDED_FUND_RATE_DICT = json.loads(VAL_KEY.get('fund_rate_extended'))
EXTENDED_TICKER_DICT = json.loads(VAL_KEY.get('fut_ticker_extended'))
# print(f'EXTENDED FUND RATE: {EXTENDED_FUND_RATE}')
# print(f'EXTENDED TICKER: {EXTENDED_TICKER}')
ASTER_FUND_RATE = float(ASTER_FUND_RATE_DICT.get('funding_rate', 0))
EXTEND_FUND_RATE = float(EXTENDED_FUND_RATE_DICT.get('funding_rate', 0))
ASTER_FUND_RATE_TIME = float(ASTER_FUND_RATE_DICT.get('next_funding_time_ts_ms', 0))
EXTEND_FUND_RATE_TIME = float(EXTENDED_FUND_RATE_DICT.get('next_funding_time_ts_ms', 0))
ASTER_PAYOUT_DIRECTION_STR = 'LONG PAYS SHORT' if ASTER_FUND_RATE > 0 else 'SHORT PAYS LONG'
EXTEND_PAYOUT_DIRECTION_STR = 'LONG PAYS SHORT' if EXTEND_FUND_RATE > 0 else 'SHORT PAYS LONG'
FUNDINGS_AT_SAME_TIME_NEXT_HR = ( (ASTER_FUND_RATE_TIME < 60*60*1000) and (EXTEND_FUND_RATE < 60*60*1000) )
if ( abs(ASTER_FUND_RATE) > abs(EXTEND_FUND_RATE) ) and FUNDINGS_AT_SAME_TIME_NEXT_HR:
ALPHA_EXCH = 'ASTER'
ALPHA_FUND_RATE = ASTER_FUND_RATE
else:
ALPHA_EXCH = 'EXTEND'
ALPHA_FUND_RATE = EXTEND_FUND_RATE
if ALPHA_FUND_RATE < 0:
ALPHA_CARRY_SIDE = 'BUY'
ALPHA_TGT_NOTIONAL = MAX_TARGET_NOTIONAL
else:
ALPHA_CARRY_SIDE = 'SELL'
ALPHA_TGT_NOTIONAL = MAX_TARGET_NOTIONAL*-1
def calc_next_net_fund_rate(FUNDINGS_AT_SAME_TIME_NEXT_HR: bool) -> float:
if FUNDINGS_AT_SAME_TIME_NEXT_HR:
return ASTER_FUND_RATE + EXTEND_FUND_RATE
else:
return EXTEND_FUND_RATE
NEXT_NET_FUNDING_RATE = calc_next_net_fund_rate(FUNDINGS_AT_SAME_TIME_NEXT_HR)
if ALPHA_EXCH == 'EXTEND':
ASTER_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL*-1
EXTEND_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL
else:
ASTER_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL
EXTEND_TGT_NOTIONAL = ALPHA_TGT_NOTIONAL*-1
ASTER_TGT_TAIL = ASTER_TGT_NOTIONAL - ASTER_NOTIONAL_POSITION
EXTEND_TGT_TAIL = EXTEND_TGT_NOTIONAL - EXTEND_NOTIONAL_POSITION
ASTER_TGT_TAIL_ORDERABLE = abs(ASTER_TGT_TAIL) >= ASTER_MIN_ORDER_QTY
EXTEND_TGT_TAIL_ORDERABLE = abs(EXTEND_TGT_TAIL) >= EXTEND_MIN_ORDER_QTY
print(f'''
{pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(ASTER_FUND_RATE_TIME, unit='ms')-datetime.now()):}) | {pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')} ({(pd.to_datetime(EXTEND_FUND_RATE_TIME, unit='ms')-datetime.now()):})
ASTER: {ASTER_FUND_RATE:.6%} [{ASTER_FUND_RATE*10_000:.2f}bps] [{ASTER_FUND_RATE*1_000_000:.0f}pips] | EXTEND: {EXTEND_FUND_RATE:.6%} [{EXTEND_FUND_RATE*10_000:.2f}bps] [{EXTEND_FUND_RATE*1_000_000:.0f}pips]
ASTER: {ASTER_PAYOUT_DIRECTION_STR} | EXTEND: {EXTEND_PAYOUT_DIRECTION_STR}
ASTER: [ Available Collateral: {ASTER_AVAIL_COLLATERAL:.4f} ] | EXTEND: [ Available Collateral: {EXTEND_AVAIL_COLLATERAL:.4f} ]
ASTER: [ Notional Position $ : {ASTER_NOTIONAL_POSITION:.4f} ] | EXTEND: [ Notional Position $ : {EXTEND_NOTIONAL_POSITION:.4f} ]
SAME TIME? : {FUNDINGS_AT_SAME_TIME_NEXT_HR}
NET FUNDING : {NEXT_NET_FUNDING_RATE:.6%} [{NEXT_NET_FUNDING_RATE*10_000:.2f}bps] [{NEXT_NET_FUNDING_RATE*1_000_000:.0f}pips]
ALPHA SIDE : {ALPHA_EXCH} [{ALPHA_CARRY_SIDE}]
TGT NOTIONAL: $ {MAX_TARGET_NOTIONAL}
ASTER: {ASTER_NOTIONAL_POSITION:.4f} -> {ASTER_TGT_NOTIONAL:.2f} [ Remain: {ASTER_TGT_TAIL:.4f} ] | EXTEND: {EXTEND_NOTIONAL_POSITION:.4f} -> {EXTEND_TGT_NOTIONAL:.2f} [ Remain: {EXTEND_TGT_TAIL} ]
ASTER: {ASTER_TGT_TAIL:.4f} > {ASTER_MIN_ORDER_QTY:.4f} min [ Order: {ASTER_TGT_TAIL_ORDERABLE} ] | EXTEND: {EXTEND_TGT_TAIL:.4f} > {EXTEND_MIN_ORDER_QTY:.4f} min [ Order: {EXTEND_TGT_TAIL_ORDERABLE} ]
''')
### SCAN VALKEY USER FEEDS FOR BALANCE UPDATES ###
# or just to begin hit the rest API before ordering and update bals then
### ROUTES ###
if ASTER_TGT_TAIL_ORDERABLE:
await aster_remainder_route()
if EXTEND_TGT_TAIL_ORDERABLE:
await extend_remainder_route()
print(f'__________ End ___________ (Algo Engine ms: {(time.time() - loop_start)*1000})')
time.sleep(5)
except KeyboardInterrupt:
print('...algo stopped')
# await cancel_all_orders(CLIENT=CLIENT)
except Exception as e:
logging.critical(f'*** ALGO ENGINE CRASHED: {e}')
logging.error(traceback.format_exc())
# await cancel_all_orders(CLIENT=CLIENT)
### WALLLET ###
async def get_aster_collateral():
global ASTER_AVAIL_COLLATERAL
fut_acct_balances = {
"url": "/fapi/v3/balance",
"method": "GET",
"params": {}
}
r = aster_auth.post_authenticated_url(fut_acct_balances)
ASTER_AVAIL_COLLATERAL = float([d for d in r if d.get('asset')==ASTER_RH_ASSET][0].get('availableBalance'))
async def get_aster_notional_position():
global ASTER_NOTIONAL_POSITION
global ASTER_MULT
fut_acct_positionRisk = {
"url": "/fapi/v3/positionRisk",
"method": "GET",
"params": {}
}
r = aster_auth.post_authenticated_url(fut_acct_positionRisk)
d = [d for d in r if d.get('symbol', None) == ASTER_TICKER][0]
ASTER_NOTIONAL_POSITION = float(d.get('notional' ,0))
ASTER_MULT = float(d.get('leverage', ASTER_MULT))
async def get_extend_collateral():
global EXTEND_AVAIL_COLLATERAL
get_bals = dict(dict(await EXTEND_CLIENT.account.get_balance()).get('data', {}))
EXTEND_AVAIL_COLLATERAL = get_bals.get('available_for_trade', 0) if get_bals.get('collateral_name', None)==EXTEND_RH_ASSET else 0
async def get_extend_notional():
global EXTEND_NOTIONAL_POSITION
global EXTEND_MULT
get_pos = dict(await EXTEND_CLIENT.account.get_positions()).get('data', {})
pos_dict = [d for d in get_pos if d.get('market') == EXTEND_TICKER]
if pos_dict:
pos_dict = pos_dict[0]
EXTEND_NOTIONAL_POSITION = pos_dict.get('value', 0)
EXTEND_MULT = pos_dict.get('leverage', EXTEND_MULT)
else:
EXTEND_NOTIONAL_POSITION = 0
### EXCHANGE INFO ###
async def get_aster_exch_info():
global ASTER_MIN_ORDER_QTY
fut_acct_exchangeInfo = {
"url": "/fapi/v3/exchangeInfo",
"method": "GET",
"params": {}
}
r = aster_auth.post_authenticated_url(fut_acct_exchangeInfo)
s = r['symbols']
d = [d for d in s if d.get('symbol', None) == 'ETHUSDT'][0]
f = [f for f in d['filters'] if f.get('filterType', None) == 'LOT_SIZE'][0]
ASTER_MIN_ORDER_QTY = float(f['minQty'])
async def get_extend_exch_info():
global EXTEND_MIN_ORDER_QTY
r = await EXTEND_CLIENT.markets_info.get_markets_dict()
EXTEND_MIN_ORDER_QTY = float(r['ETH-USD'].trading_config.min_order_size)
async def main():
global EXTEND_CLIENT
global VAL_KEY
global CON
_, EXTEND_CLIENT = await extend_auth.create_auth_account_and_trading_client()
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
# await create_executions_orders_table(CON=CON)
await get_aster_collateral()
await get_aster_notional_position()
await get_extend_collateral()
await get_extend_notional()
await run_algo()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
asyncio.run(main())

View File

@@ -2,39 +2,52 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 2, "execution_count": 15,
"id": "7a3f41bd", "id": "7a3f41bd",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"import pandas as pd\n", "import pandas as pd\n",
"import requests" "import requests\n",
"from datetime import datetime"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 3, "execution_count": 44,
"id": "3b48e1ce", "id": "3b48e1ce",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"url_all_tickers = 'https://api.mexc.com/api/v1/contract/ticker'" "mexc_all_tickers = 'https://api.mexc.com/api/v1/contract/ticker'\n",
"edgex_all_tickers = 'https://pro.edgex.exchange/api/v1/public/quote/getTicker/?contractId=10000001'"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 10, "execution_count": 34,
"id": "ab38d984", "id": "ab38d984",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"r = requests.get(url_all_tickers)\n", "r = requests.get(mexc_all_tickers)\n",
"data = r.json()['data']" "data = r.json()['data']"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 21, "execution_count": 49,
"id": "2976b377",
"metadata": {},
"outputs": [],
"source": [
"r = requests.get(edgex_all_tickers)\n",
"data = r.json()['data']"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "1139b1a3", "id": "1139b1a3",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -45,19 +58,19 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 22, "execution_count": null,
"id": "b00512dc", "id": "b00512dc",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"df_trim = df[['symbol','fundingRate_pct','volume24']].copy()\n", "df_trim = df[['symbol','fundingRate_pct','volume24']].copy()\n",
"df_trim = df_trim.loc[df_trim['volume24'] > 10_000]\n" "df_trim = df_trim.loc[df_trim['volume24'] > 10_000]"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 23, "execution_count": 9,
"id": "43b053d0", "id": "f7b44068",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@@ -88,34 +101,34 @@
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>0</th>\n", " <th>55</th>\n",
" <td>BTC_USDT</td>\n", " <td>DRIFT_USDT</td>\n",
" <td>-0.0007</td>\n", " <td>-1.1360</td>\n",
" <td>208341522</td>\n", " <td>3308466</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>1</th>\n", " <th>10</th>\n",
" <td>ETH_USDT</td>\n", " <td>RED_USDT</td>\n",
" <td>0.0012</td>\n", " <td>-1.1138</td>\n",
" <td>27134917</td>\n", " <td>105826673</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>2</th>\n", " <th>157</th>\n",
" <td>XAUT_USDT</td>\n", " <td>PIXEL_USDT</td>\n",
" <td>0.0050</td>\n", " <td>-0.4059</td>\n",
" <td>429520428</td>\n", " <td>12415472</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>3</th>\n", " <th>105</th>\n",
" <td>SOL_USDT</td>\n", " <td>NIL_USDT</td>\n",
" <td>-0.0016</td>\n", " <td>-0.3846</td>\n",
" <td>188591783</td>\n", " <td>31438005</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>4</th>\n", " <th>33</th>\n",
" <td>SILVER_USDT</td>\n", " <td>SUPER_USDT</td>\n",
" <td>0.0000</td>\n", " <td>-0.3718</td>\n",
" <td>634682239</td>\n", " <td>2469502</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>...</th>\n", " <th>...</th>\n",
@@ -124,34 +137,34 @@
" <td>...</td>\n", " <td>...</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>837</th>\n", " <th>12</th>\n",
" <td>ENS_USDC</td>\n", " <td>PLAY_USDT</td>\n",
" <td>0.0100</td>\n", " <td>0.0838</td>\n",
" <td>196467</td>\n", " <td>22168649</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>838</th>\n", " <th>414</th>\n",
" <td>KAITO_USDC</td>\n", " <td>PUMPBTC_USDT</td>\n",
" <td>-0.0106</td>\n", " <td>0.0913</td>\n",
" <td>245467</td>\n", " <td>745129</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>839</th>\n", " <th>398</th>\n",
" <td>BIO_USDC</td>\n", " <td>QQQSTOCK_USDT</td>\n",
" <td>0.0050</td>\n", " <td>0.0969</td>\n",
" <td>586982</td>\n", " <td>59485</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>840</th>\n", " <th>222</th>\n",
" <td>ETC_USDC</td>\n", " <td>GUA_USDT</td>\n",
" <td>0.0100</td>\n", " <td>0.0996</td>\n",
" <td>117338</td>\n", " <td>45623</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>841</th>\n", " <th>265</th>\n",
" <td>MNT_USDC</td>\n", " <td>BROCCOLIF3B_USDT</td>\n",
" <td>0.0100</td>\n", " <td>0.1647</td>\n",
" <td>150912</td>\n", " <td>3936939</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
@@ -159,35 +172,88 @@
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" symbol fundingRate_pct volume24\n", " symbol fundingRate_pct volume24\n",
"0 BTC_USDT -0.0007 208341522\n", "55 DRIFT_USDT -1.1360 3308466\n",
"1 ETH_USDT 0.0012 27134917\n", "10 RED_USDT -1.1138 105826673\n",
"2 XAUT_USDT 0.0050 429520428\n", "157 PIXEL_USDT -0.4059 12415472\n",
"3 SOL_USDT -0.0016 188591783\n", "105 NIL_USDT -0.3846 31438005\n",
"4 SILVER_USDT 0.0000 634682239\n", "33 SUPER_USDT -0.3718 2469502\n",
".. ... ... ...\n", ".. ... ... ...\n",
"837 ENS_USDC 0.0100 196467\n", "12 PLAY_USDT 0.0838 22168649\n",
"838 KAITO_USDC -0.0106 245467\n", "414 PUMPBTC_USDT 0.0913 745129\n",
"839 BIO_USDC 0.0050 586982\n", "398 QQQSTOCK_USDT 0.0969 59485\n",
"840 ETC_USDC 0.0100 117338\n", "222 GUA_USDT 0.0996 45623\n",
"841 MNT_USDC 0.0100 150912\n", "265 BROCCOLIF3B_USDT 0.1647 3936939\n",
"\n", "\n",
"[837 rows x 3 columns]" "[837 rows x 3 columns]"
] ]
}, },
"execution_count": 23, "execution_count": 9,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
], ],
"source": [ "source": [
"df_trim" "df_trim.sort_values('fundingRate_pct', ascending=True)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "6775a03f",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1775583908"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"round(datetime.now().timestamp())"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"id": "f7b44068", "id": "edca3b9f",
"metadata": {},
"outputs": [],
"source": [
"### PRIVATE API ###\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "51e50e0d",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "21aabf30",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [] "source": []

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

37
modules/apex_api.py Normal file
View File

@@ -0,0 +1,37 @@
from apexomni.http_private_v3 import HttpPrivate_v3
from apexomni.http_private_sign import HttpPrivateSign
from apexomni.constants import APEX_OMNI_HTTP_MAIN, NETWORKID_MAIN
from dotenv import load_dotenv
import os
def apex_create_client() -> HttpPrivateSign:
load_dotenv()
print("Authenticating...")
eth_private_key = os.getenv("RABBY_PRIVATE_KEY")
key = os.getenv("APEX_API_KEY")
secret = os.getenv("APEX_API_SECRET")
passphrase = os.getenv("APEX_API_PASSPHRASE")
client = HttpPrivate_v3(APEX_OMNI_HTTP_MAIN, network_id=NETWORKID_MAIN, eth_private_key=eth_private_key)
zkKeys = client.derive_zk_key(client.default_address)
seeds = zkKeys['seeds']
l2Key = zkKeys['l2Key']
client = HttpPrivateSign(
APEX_OMNI_HTTP_MAIN,
network_id=NETWORKID_MAIN,
zk_seeds=seeds,
zk_l2Key=l2Key,
api_key_credentials={'key': key, 'secret': secret, 'passphrase': passphrase})
try:
client.configs_v3()
client.get_account_v3()
except Exception as e:
raise ConnectionError(f'Failed to authenticate with APEX OMNI: {e}')
print("...Authenticated")
return client

108
modules/aster_auth.py Normal file
View File

@@ -0,0 +1,108 @@
import requests
from dotenv import load_dotenv
import os
import time
import threading
import urllib
from eth_account.messages import encode_typed_data
from eth_account import Account
load_dotenv()
user = os.getenv("RABBY_WALLET")
signer = os.getenv("ASTER_API_WALLET_ADDRESS")
private_key = os.getenv("ASTER_API_PRIVATE_KEY")
_last_ms = 0
_i = 0
def post_authenticated_url(req: dict) -> dict:
typed_data = {
"types": {
"EIP712Domain": [
{"name": "name", "type": "string"},
{"name": "version", "type": "string"},
{"name": "chainId", "type": "uint256"},
{"name": "verifyingContract", "type": "address"}
],
"Message": [
{ "name": "msg", "type": "string" }
]
},
"primaryType": "Message",
"domain": {
"name": "AsterSignTransaction",
"version": "1",
"chainId": 1666,
"verifyingContract": "0x0000000000000000000000000000000000000000"
},
"message": {
"msg": "$msg"
}
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'PythonApp/1.0'
}
host = 'https://fapi.asterdex.com'
def get_nonce():
_nonce_lock = threading.Lock()
global _last_ms, _i
with _nonce_lock:
now_ms = int(time.time())
if now_ms == _last_ms:
_i += 1
else:
_last_ms = now_ms
_i = 0
return now_ms * 1_000_000 + _i
def sign_typed_data(data: dict, private_key: str):
"""Sign EIP-712 typed data using encode_typed_data."""
message = encode_typed_data(
domain_data=data["domain"],
message_types={"Message": data["types"]["Message"]},
message_data=data["message"],
)
return Account.sign_message(message, private_key=private_key)
def send_by_url(req):
my_dict = req['params'].copy()
url = host + req['url']
method = req['method']
my_dict['nonce'] = str(get_nonce())
my_dict['user'] = user
my_dict['signer'] = signer
param = urllib.parse.urlencode(my_dict)
typed_data['message']['msg'] = param
signed = sign_typed_data(typed_data, private_key)
full_url = url + '?' + param + '&signature=' + signed.signature.hex()
# print(full_url)
if method == 'GET':
res = requests.get(full_url, headers=headers)
# print(res.status_code, res.text)
return res.json()
elif method == 'POST':
res = requests.post(full_url, headers=headers)
# print(res.status_code, res.text)
return res.json()
elif method == 'PUT':
res = requests.put(full_url, headers=headers)
# print(res.status_code, res.text)
return res.json()
elif method == 'DELETE':
res = requests.delete(full_url, headers=headers)
# print(res.status_code, res.text)
return res.json()
return send_by_url(req=req)

147
modules/aster_db.py Normal file
View File

@@ -0,0 +1,147 @@
import logging
from typing import AsyncContextManager
from sqlalchemy import text
### Orders and Trades Tables ####
async def create_fr_aster_user_order_trade_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: fr_aster_user_order_trade')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS fr_aster_user_order_trade (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
symbol VARCHAR(20),
client_order_id VARCHAR(100),
side VARCHAR(20),
order_type VARCHAR(100),
time_in_force VARCHAR(20),
original_qty DOUBLE,
original_price DOUBLE,
avg_price DOUBLE,
stop_price DOUBLE,
execution_type VARCHAR(100),
order_status VARCHAR(100),
order_id BIGINT,
last_filled_qty DOUBLE,
filled_accumulated_qty DOUBLE,
last_filled_price DOUBLE,
commission_asset VARCHAR(20),
commission DOUBLE,
order_trade_time_ts BIGINT,
trade_id VARCHAR(100),
bid_notional DOUBLE,
ask_notional DOUBLE,
trade_is_maker BOOL,
trade_is_reduce_only BOOL,
stop_px_working_type VARCHAR(100),
original_order_type VARCHAR(100),
position_side VARCHAR(100),
pushed_w_conditional_order BOOL,
activation_price DOUBLE,
callback_rate DOUBLE,
realized_profit DOUBLE
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Margin Calls Table ####
async def create_fr_aster_user_margin_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: fr_aster_user_margin')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS fr_aster_user_margin (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
cross_wallet_balance DOUBLE,
symbol VARCHAR(20),
position_side VARCHAR(20),
position_amount DOUBLE,
margin_type VARCHAR(20),
isolated_wallet DOUBLE,
mark_price DOUBLE,
unrealized_pnl DOUBLE,
maint_margin_required DOUBLE
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Account Balance Table ####
async def create_fr_aster_user_account_bal(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: fr_aster_user_account_bal')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS fr_aster_user_account_bal (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_transaction BIGINT,
event_reason_type VARCHAR(20),
asset VARCHAR(20),
wallet_balance DOUBLE,
cross_wallet_balance DOUBLE,
balance_change_excl_pnl_comms DOUBLE
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Account Positions Table ####
async def create_fr_aster_user_account_pos(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: fr_aster_user_account_pos')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS fr_aster_user_account_pos (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_transaction BIGINT,
event_reason_type VARCHAR(20),
symbol VARCHAR(20),
position_amount DOUBLE,
entry_price DOUBLE,
accumulated_realized_pre_fees DOUBLE,
unrealized_pnl DOUBLE,
margin_type VARCHAR(20),
isolated_wallet DOUBLE,
position_side VARCHAR(20)
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')

27
modules/db.py Normal file
View File

@@ -0,0 +1,27 @@
### Database Funcs ###
import logging
from typing import AsyncContextManager
import pandas as pd
async def insert_df_to_mysql(
table_name: str,
params: dict | list | pd.DataFrame,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
if not isinstance(params, pd.DataFrame):
if isinstance(params, dict):
params = [params]
df = pd.DataFrame(params)
await CON.run_sync(
lambda sync_conn: df.to_sql(name=table_name, con=sync_conn, if_exists='append', index=False)
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')

30
modules/extended_auth.py Normal file
View File

@@ -0,0 +1,30 @@
import os
from dotenv import load_dotenv
from x10.config import MAINNET_CONFIG
from x10.core.stark_account import StarkPerpetualAccount
from x10.perpetual.trading_client import PerpetualTradingClient
import logging
async def create_auth_account_and_trading_client() -> tuple[StarkPerpetualAccount, PerpetualTradingClient]:
load_dotenv()
API_KEY = os.getenv('EXTENDED_API_KEY')
PUBLIC_KEY = os.getenv('EXTENDED_STARK_KEY_PUBLIC')
PRIVATE_KEY = os.getenv('EXTENDED_STARK_KEY_PRIVATE')
VAULT = int(os.getenv('EXTENDED_VAULT_NUMBER'))
stark_account = StarkPerpetualAccount(
vault=VAULT,
private_key=PRIVATE_KEY,
public_key=PUBLIC_KEY,
api_key=API_KEY,
)
trading_client = PerpetualTradingClient(MAINNET_CONFIG, stark_account)
try:
await trading_client.account.get_balance()
except ValueError as e:
logging.critical(f'Failed to get balance after creation of trading account: {e}')
return stark_account, trading_client

152
modules/extended_db.py Normal file
View File

@@ -0,0 +1,152 @@
import logging
from typing import AsyncContextManager
from sqlalchemy import text
### Orders Table ####
async def create_fr_extended_user_order(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: fr_extended_user_order')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS fr_extended_user_order (
sequence_id INT,
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
order_id VARCHAR(100),
account_id VARCHAR(100),
external_id VARCHAR(100),
market VARCHAR(20),
type VARCHAR(20),
side VARCHAR(20),
status VARCHAR(20),
status_reason VARCHAR(100),
price DOUBLE,
averagePrice DOUBLE,
qty DOUBLE,
filled_qty DOUBLE,
payed_fee DOUBLE,
tp_sl_type VARCHAR(20),
reduce_only BOOL,
post_only BOOL,
created_time_ts BIGINT,
updated_time_ts BIGINT,
expire_time_ts BIGINT
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Orders Table ####
async def create_fr_extended_user_trade(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: fr_extended_user_trade')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS fr_extended_user_trade (
sequence_id INT,
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
trade_id VARCHAR(100),
account_id VARCHAR(100),
market VARCHAR(20),
order_id VARCHAR(100),
external_order_id VARCHAR(100),
side VARCHAR(20),
price DOUBLE,
qty DOUBLE,
value DOUBLE,
fee DOUBLE,
trade_type VARCHAR(20),
created_time_ts BIGINT,
is_taker BOOL
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Balance Table ####
async def create_fr_extended_user_balance(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: fr_extended_user_balance')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS fr_extended_user_balance (
sequence_id INT,
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
collateral_name VARCHAR(20),
balance DOUBLE,
equity DOUBLE,
available_for_trade DOUBLE,
available_for_withdrawal DOUBLE,
unrealised_pnl DOUBLE,
initial_margin DOUBLE,
margin_ratio DOUBLE,
updated_time_ts BIGINT,
exposure DOUBLE,
leverage DOUBLE
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Balance Table ####
async def create_fr_extended_user_position(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: fr_extended_user_position')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS fr_extended_user_position (
sequence_id INT,
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
position_id VARCHAR(100),
account_id VARCHAR(100),
market VARCHAR(20),
side VARCHAR(20),
leverage DOUBLE,
size DOUBLE,
value DOUBLE,
open_price DOUBLE,
mark_price DOUBLE,
liquidation_price DOUBLE,
margin DOUBLE,
unrealised_pnl DOUBLE,
realised_pnl DOUBLE,
tp_trigger_price DOUBLE,
tp_limit_price DOUBLE,
sl_trigger_price DOUBLE,
sl_limit_price DOUBLE,
adl_percentile INT,
created_at_ts BIGINT,
updated_at_ts BIGINT
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')

25
requirements.txt Normal file
View File

@@ -0,0 +1,25 @@
pandas
rel
websockets
pyarrow
# plotly
mysql-connector-python
sqlalchemy
requests
pymysql
scipy
asyncmy
cryptography
# TA-Lib
valkey
nicegui
# py_clob_client
# google
# google-api-core==2.30.0
# google-api-python-client==2.190.0
# googleapis-common-protos==1.72.0
# grpcio==1.76.0
# grpcio-tools==1.76.0
x10-python-trading-starknet
eth-keys
eth-account

248
ws_apex.py Normal file
View File

@@ -0,0 +1,248 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = False
USE_VK: bool = True
# VK_FUND_RATE = 'fund_rate_apex'
VK_TICKER = 'fut_ticker_apex'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Apex.log'
### CONSTANTS ###
PING_INTERVAL_SEC = 15
### Globals ###
WSS_URL = "wss://quote.omni.apex.exchange/realtime_public?v=2&timestamp="
TICKER_SNAPSHOT_DATA_LAST: dict = {}
# HIST_TRADES = np.empty((0, 3))
# HIST_TRADES_LOOKBACK_SEC = 6
# ### Database Funcs ###
# async def create_rtds_btcusd_table(
# CON: AsyncContextManager,
# engine: str = 'mysql', # mysql | duckdb
# ) -> None:
# if CON is None:
# logging.info("NO DB CONNECTION, SKIPPING Create Statements")
# else:
# if engine == 'mysql':
# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
# await CON.execute(text("""
# CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
# timestamp_arrival BIGINT,
# timestamp_msg BIGINT,
# timestamp_value BIGINT,
# value DOUBLE,
# qty DOUBLE
# );
# """))
# await CON.commit()
# else:
# raise ValueError('Only MySQL engine is implemented')
# async def insert_rtds_btcusd_table(
# timestamp_arrival: int,
# timestamp_msg: int,
# timestamp_value: int,
# value: float,
# qty: float,
# CON: AsyncContextManager,
# engine: str = 'mysql', # mysql | duckdb
# ) -> None:
# params={
# 'timestamp_arrival': timestamp_arrival,
# 'timestamp_msg': timestamp_msg,
# 'timestamp_value': timestamp_value,
# 'value': value,
# 'qty': qty,
# }
# if CON is None:
# logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
# else:
# if engine == 'mysql':
# await CON.execute(text("""
# INSERT INTO binance_btcusd_trades
# (
# timestamp_arrival,
# timestamp_msg,
# timestamp_value,
# value,
# qty
# )
# VALUES
# (
# :timestamp_arrival,
# :timestamp_msg,
# :timestamp_value,
# :value,
# :qty
# )
# """),
# parameters=params
# )
# await CON.commit()
# else:
# raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def heartbeat(ws):
while True:
await asyncio.sleep(PING_INTERVAL_SEC)
logging.info("SENDING PING...")
ping_msg = {"op":"ping","args":[ str(round(datetime.now().timestamp()*1000)) ]}
await ws.send(json.dumps(ping_msg))
async def ws_stream():
global TICKER_SNAPSHOT_DATA_LAST
async for websocket in websockets.connect(f'{WSS_URL}{round(datetime.now().timestamp())}'):
logging.info(f"Connected to {WSS_URL}")
asyncio.create_task(heartbeat(ws=websocket))
subscribe_msg = {
"op": "subscribe",
"args": ["instrumentInfo.H.ETHUSDT"]
}
await websocket.send(json.dumps(subscribe_msg))
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
if data.get('op', None) == 'ping':
pong_msg = {"op":"pong","args":[ str(round(datetime.now().timestamp()*1000)) ]}
logging.info(f'RECEIVED PING: {data}; SENDING PONG: {pong_msg}')
await websocket.send(json.dumps(pong_msg))
continue
elif data.get('success', None):
# logging.info('CONNECTION SUCCESFUL RESP MSG')
continue
msg_type = data.get('type', None)
if msg_type is not None:
match msg_type:
case 'snapshot':
TICKER_SNAPSHOT_DATA_LAST = data['data']
nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000)
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'],
'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']),
'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']),
'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']),
'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']),
'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']),
'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']),
'nextFundingTime_ts_ms': nextFundingTime_ts,
})
VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
continue
case 'delta':
TICKER_SNAPSHOT_DATA_LAST.update(data['data'])
nextFundingTime_ts = round(datetime.strptime(TICKER_SNAPSHOT_DATA_LAST['nextFundingTime'], "%Y-%m-%dT%H:%M:%SZ").timestamp()*1000)
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'symbol': TICKER_SNAPSHOT_DATA_LAST['symbol'],
'lastPrice': float(TICKER_SNAPSHOT_DATA_LAST['lastPrice']),
'markPrice': float(TICKER_SNAPSHOT_DATA_LAST['markPrice']),
'indexPrice': float(TICKER_SNAPSHOT_DATA_LAST['indexPrice']),
'volume24h': float(TICKER_SNAPSHOT_DATA_LAST['volume24h']),
'fundingRate': float(TICKER_SNAPSHOT_DATA_LAST['fundingRate']),
'predictedFundingRate': float(TICKER_SNAPSHOT_DATA_LAST['predictedFundingRate']),
'nextFundingTime_ts_ms': nextFundingTime_ts,
})
VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
continue
case _:
logging.warning(f'UNMATCHED OTHER MSG: {data}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
# await create_rtds_btcusd_table(CON=CON)
await ws_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await ws_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

216
ws_aster.py Normal file
View File

@@ -0,0 +1,216 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = False
USE_VK: bool = True
VK_FUND_RATE = 'fund_rate_aster'
VK_TICKER = 'fut_ticker_aster'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Aster.log'
### CONSTANTS ###
SYMBOL: str = 'ETHUSDT'
STREAM_MARKPRICE: str = f'{SYMBOL.lower()}@markPrice@1s'
STREAM_BOOKTICKER: str = f'{SYMBOL.lower()}@bookTicker'
### Globals ###
WSS_URL = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}/{STREAM_BOOKTICKER}"
# WSS_URL = f"wss://fstream.asterdex.com/stream?streams={STREAM_MARKPRICE}"
# HIST_TRADES = np.empty((0, 3))
# HIST_TRADES_LOOKBACK_SEC = 6
# ### Database Funcs ###
# async def create_rtds_btcusd_table(
# CON: AsyncContextManager,
# engine: str = 'mysql', # mysql | duckdb
# ) -> None:
# if CON is None:
# logging.info("NO DB CONNECTION, SKIPPING Create Statements")
# else:
# if engine == 'mysql':
# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
# await CON.execute(text("""
# CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
# timestamp_arrival BIGINT,
# timestamp_msg BIGINT,
# timestamp_value BIGINT,
# value DOUBLE,
# qty DOUBLE
# );
# """))
# await CON.commit()
# else:
# raise ValueError('Only MySQL engine is implemented')
# async def insert_rtds_btcusd_table(
# timestamp_arrival: int,
# timestamp_msg: int,
# timestamp_value: int,
# value: float,
# qty: float,
# CON: AsyncContextManager,
# engine: str = 'mysql', # mysql | duckdb
# ) -> None:
# params={
# 'timestamp_arrival': timestamp_arrival,
# 'timestamp_msg': timestamp_msg,
# 'timestamp_value': timestamp_value,
# 'value': value,
# 'qty': qty,
# }
# if CON is None:
# logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
# else:
# if engine == 'mysql':
# await CON.execute(text("""
# INSERT INTO binance_btcusd_trades
# (
# timestamp_arrival,
# timestamp_msg,
# timestamp_value,
# value,
# qty
# )
# VALUES
# (
# :timestamp_arrival,
# :timestamp_msg,
# :timestamp_value,
# :value,
# :qty
# )
# """),
# parameters=params
# )
# await CON.commit()
# else:
# raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def ws_stream():
async for websocket in websockets.connect(WSS_URL):
logging.info(f"Connected to {WSS_URL}")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
channel = data.get('stream', None)
if channel is not None:
match channel:
case c if c == STREAM_MARKPRICE:
# print(f'MP: {data}')
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['data']['E'],
'symbol': data['data']['s'],
'mark_price': data['data']['p'],
'index_price': data['data']['i'],
'estimated_settle_price': data['data']['P'],
'funding_rate': data['data']['r'],
'next_funding_time_ts_ms': data['data']['T'],
})
VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ)
continue
case c if c == STREAM_BOOKTICKER:
# print(f'BT: {data}')
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['data']['E'],
'timestamp_transaction': data['data']['T'],
'orderbook_update_id': data['data']['u'],
'symbol': data['data']['s'],
'best_bid_px': data['data']['b'],
'best_bid_qty': data['data']['B'],
'best_ask_px': data['data']['a'],
'best_ask_qty': data['data']['A'],
})
VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
continue
case _:
logging.warning(f'UNMATCHED OTHER MSG: {data}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
# await create_rtds_btcusd_table(CON=CON)
await ws_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await ws_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

19
ws_aster/Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_aster.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

301
ws_aster_user.py Normal file
View File

@@ -0,0 +1,301 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
import modules.aster_auth as aster_auth
import modules.aster_db as aster_db
import modules.db as db
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_ORDERS_TRADES = 'fr_aster_user_orders'
VK_MARGIN_CALLS = 'fr_aster_user_margin_calls'
VK_BALANCES = 'fr_aster_user_balances'
VK_POSITIONS = 'fr_aster_user_positions'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Aster_User.log'
### CONSTANTS ###
WSS_URL = "wss://fstream.asterdex.com/ws/"
LOCAL_RECENT_UPDATES_LOOKBACK_SEC = 30
### Globals ###
LISTEN_KEY: str | None = None
LISTEN_KEY_LAST_UPDATE_TS_S: int = 0
LISTEN_KEY_PUT_INTERVAL_SEC = 1800
LOCAL_RECENT_ORDERS: list = []
LOCAL_RECENT_MARGIN_CALLS: list = []
LOCAL_RECENT_BALANCES: list = []
LOCAL_RECENT_POSITIONS: list = []
def upsert_list_of_dicts_by_id(list_of_dicts, new_dict, id='id'):
for index, item in enumerate(list_of_dicts):
if item.get(id) == new_dict.get(id):
list_of_dicts[index] = new_dict
return list_of_dicts
list_of_dicts.append(new_dict)
return list_of_dicts
def get_new_listen_key() -> str:
global LISTEN_KEY_LAST_UPDATE_TS_S
listen_key_request = {
"url": "/fapi/v3/listenKey",
"method": "POST",
"params": {}
}
r = aster_auth.post_authenticated_url(listen_key_request)
listen_key = r.get('listenKey', None)
print(f'LISTEN KEY: {listen_key}')
if listen_key is not None:
LISTEN_KEY_LAST_UPDATE_TS_S = round(datetime.now().timestamp())
return listen_key
else:
raise ValueError(f'Listen Key is None; Failed to Update. response: {r}')
async def listen_key_interval():
global LISTEN_KEY
while True:
await asyncio.sleep(LISTEN_KEY_PUT_INTERVAL_SEC)
LISTEN_KEY = get_new_listen_key()
### Websocket ###
async def ws_stream():
global LISTEN_KEY
global LOCAL_RECENT_ORDERS
global LOCAL_RECENT_MARGIN_CALLS
global LOCAL_RECENT_BALANCES
global LOCAL_RECENT_POSITIONS
LISTEN_KEY = get_new_listen_key()
async for websocket in websockets.connect(WSS_URL+LISTEN_KEY):
logging.info(f"Connected to {WSS_URL}")
asyncio.create_task(listen_key_interval())
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
channel = data.get('e', None)
if channel is not None:
LOOKBACK_MIN_TS_MS = ts_arrival - (LOCAL_RECENT_UPDATES_LOOKBACK_SEC*1000)
match channel:
case 'ORDER_TRADE_UPDATE':
logging.info(f'ORDER_TRADE_UPDATE: {data}')
new_order_update = {
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['E'],
'timestamp_transaction': data['T'],
'symbol': data['o']["s"], # "BTCUSDT", // Symbol
'client_order_id': data['o']["c"], # "TEST", // Client Order Id
'side': data['o']["S"], # "SELL", // Side
'order_type': data['o']["o"], # "TRAILING_STOP_MARKET", // Order Type
'time_in_force': data['o']["f"], # "GTC", // Time in Force
'original_qty': float(data['o']["q"]), # "0.001", // Original Quantity
'original_price': float(data['o']["p"]), # "0", // Original Price
'avg_price': float(data['o']["ap"]), # :"0", // Average Price
'stop_price': float(data['o'].get("sp", 0)), # :"7103.04", // Stop Price. Please ignore with TRAILING_STOP_MARKET order
'execution_type': data['o']["x"], # "NEW", // Execution Type
'order_status': data['o']["X"], # "NEW", // Order Status
'order_id': data['o']["i"], # 8886774, // Order Id
'last_filled_qty': float(data['o']["l"]), # "0", // Order Last Filled Quantity
'filled_accumulated_qty': float(data['o']["z"]), # "0", // Order Filled Accumulated Quantity
'last_filled_price': float(data['o']["L"]), # "0", // Last Filled Price
'commission_asset': data['o'].get("N", None), # "USDT", // Commission Asset, will not push if no commission
'commission': float(data['o'].get("n",0)), # "0", // Commission, will not push if no commission
'order_trade_time_ts': data['o']["T"], # 1568879465651, // Order Trade Time
'trade_id': data['o']["t"], # 0, // Trade Id
'bid_notional': float(data['o']["b"]), # "0", // Bids Notional
'ask_notional': float(data['o']["a"]), # "9.91", // Ask Notional
'trade_is_maker': data['o']["m"], # false, // Is this trade the maker side?
'trade_is_reduce_only': data['o']["R"], # false, // Is this reduce only
'stop_px_working_type': data['o']["wt"], # :"CONTRACT_PRICE", // Stop Price Working Type
'original_order_type': data['o']["ot"], # :"TRAILING_STOP_MARKET", // Original Order Type
'position_side': data['o']["ps"], # :"LONG", // Position Side
'pushed_w_conditional_order': bool(data['o'].get("cp", False)), # :false, // If Close-All, pushed with conditional order
'activation_price': float(data['o'].get("AP", 0)), # :"7476.89", // Activation Price, only puhed with TRAILING_STOP_MARKET order
'callback_rate': float(data['o'].get("cr", 0)), # :"5.0", // Callback Rate, only puhed with TRAILING_STOP_MARKET order
'realized_profit': float(data['o']["rp"]), # :"0" // Realized Profit of the trade
}
LOCAL_RECENT_ORDERS = upsert_list_of_dicts_by_id(LOCAL_RECENT_ORDERS, new_order_update, id='client_order_id')
LOCAL_RECENT_ORDERS = [t for t in LOCAL_RECENT_ORDERS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_ORDERS)
VAL_KEY.set(VK_ORDERS_TRADES, VAL_KEY_OBJ)
await db.insert_df_to_mysql(table_name='fr_aster_user_order_trade', params=new_order_update, CON=CON)
continue
case 'MARGIN_CALL':
logging.info(f'MARGIN_CALL: {data}')
list_for_df = []
for p in list(data['p']):
margin_call_update = {
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['E'],
'cross_wallet_balance': float(data.get('cw', 0)),
'symbol': p["s"], # "ETHUSDT", // Symbol
'position_side': p["ps"], # :"LONG", // Position Side
'position_amount': float(p["pa"]), # :"1.327", // Position Amount
'margin_type': p["mt"], # :"CROSSED", // Margin Type
'isolated_wallet': float(p.get("iw", 0)), # :"0", // Isolated Wallet (if isolated position)
'mark_price': float(p["mp"]), # :"187.17127", // Mark Price
'unrealized_pnl': float(p["up"]), # :"-1.166074", // Unrealized PnL
'maint_margin_required': float(p["mm"]), # :"1.614445" // Maintenance Margin Required
}
list_for_df.append(margin_call_update)
LOCAL_RECENT_MARGIN_CALLS = upsert_list_of_dicts_by_id(LOCAL_RECENT_MARGIN_CALLS, margin_call_update, id='symbol')
LOCAL_RECENT_MARGIN_CALLS = [t for t in LOCAL_RECENT_MARGIN_CALLS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_MARGIN_CALLS)
VAL_KEY.set(VK_MARGIN_CALLS, VAL_KEY_OBJ)
await db.insert_df_to_mysql(table_name='fr_aster_user_margin', params=list_for_df, CON=CON)
continue
case 'ACCOUNT_UPDATE':
logging.info(f'ACCOUNT_UPDATE: {data}')
list_for_df_bal = []
list_for_df_pos = []
### Balance Updates ###
if len(list(data['a']['B'])) > 0:
for b in list(data['a']['B']):
balance_update = {
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['E'],
'timestamp_transaction': data['T'],
'event_reason_type': data['a']["m"],
'asset': b['a'],
'wallet_balance': float(b['wb']),
'cross_wallet_balance': float(b.get('cw', 0)),
'balance_change_excl_pnl_comms': float(b['bc']),
}
list_for_df_bal.append(balance_update)
LOCAL_RECENT_BALANCES = upsert_list_of_dicts_by_id(LOCAL_RECENT_BALANCES, balance_update, id='asset')
LOCAL_RECENT_BALANCES = [t for t in LOCAL_RECENT_BALANCES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY.set(VK_BALANCES, json.dumps(LOCAL_RECENT_BALANCES))
### Position Updates ###
if len(list(data['a']['P'])) > 0:
for p in list(data['a']['P']):
position_update = {
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['E'],
'timestamp_transaction': data['T'],
'event_reason_type': data['a']["m"],
'symbol': p['s'],
'position_amount': float(p['pa']),
'entry_price': float(p['ep']),
'accumulated_realized_pre_fees': float(p['cr']),
'unrealized_pnl': float(p['up']),
'margin_type': p['mt'],
'isolated_wallet': float(p.get('iw', 0)),
'position_side': p['ps'],
}
list_for_df_pos.append(position_update)
LOCAL_RECENT_POSITIONS = upsert_list_of_dicts_by_id(LOCAL_RECENT_POSITIONS, position_update, id='symbol')
LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY.set(VK_POSITIONS, json.dumps(LOCAL_RECENT_POSITIONS))
if balance_update:
await db.insert_df_to_mysql(table_name='fr_aster_user_account_bal', params=list_for_df_bal, CON=CON)
if position_update:
await db.insert_df_to_mysql(table_name='fr_aster_user_account_pos', params=list_for_df_bal, CON=CON)
continue
case 'listenKeyExpired':
raise('Listen Key Has Expired; Failed to Update Properly. Restarting.')
case _:
logging.warning(f'UNMATCHED OTHER MSG: {data}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
await aster_db.create_fr_aster_user_order_trade_table(CON=CON)
await aster_db.create_fr_aster_user_margin_table(CON=CON)
await aster_db.create_fr_aster_user_account_bal(CON=CON)
await aster_db.create_fr_aster_user_account_pos(CON=CON)
await ws_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await ws_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

19
ws_aster_user/Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_aster_user.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

203
ws_extended_fund_rate.py Normal file
View File

@@ -0,0 +1,203 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime, timezone
from typing import AsyncContextManager
import math
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = False
USE_VK: bool = True
VK_FUND_RATE = 'fund_rate_extended'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Extended_FR.log'
### CONSTANTS ###
WS_SYMBOL: str = 'ETH-USD'
FUNDING_RATE_INTERVAL_MIN = 60
### Globals ###
WSS_URL = f"wss://api.starknet.extended.exchange/stream.extended.exchange/v1/funding/{WS_SYMBOL}"
# HIST_TRADES = np.empty((0, 3))
# HIST_TRADES_LOOKBACK_SEC = 6
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
interval_secs = interval_mins * 60
seconds = dt.timestamp()
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
return rounded_seconds
# ### Database Funcs ###
# async def create_rtds_btcusd_table(
# CON: AsyncContextManager,
# engine: str = 'mysql', # mysql | duckdb
# ) -> None:
# if CON is None:
# logging.info("NO DB CONNECTION, SKIPPING Create Statements")
# else:
# if engine == 'mysql':
# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
# await CON.execute(text("""
# CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
# timestamp_arrival BIGINT,
# timestamp_msg BIGINT,
# timestamp_value BIGINT,
# value DOUBLE,
# qty DOUBLE
# );
# """))
# await CON.commit()
# else:
# raise ValueError('Only MySQL engine is implemented')
# async def insert_rtds_btcusd_table(
# timestamp_arrival: int,
# timestamp_msg: int,
# timestamp_value: int,
# value: float,
# qty: float,
# CON: AsyncContextManager,
# engine: str = 'mysql', # mysql | duckdb
# ) -> None:
# params={
# 'timestamp_arrival': timestamp_arrival,
# 'timestamp_msg': timestamp_msg,
# 'timestamp_value': timestamp_value,
# 'value': value,
# 'qty': qty,
# }
# if CON is None:
# logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
# else:
# if engine == 'mysql':
# await CON.execute(text("""
# INSERT INTO binance_btcusd_trades
# (
# timestamp_arrival,
# timestamp_msg,
# timestamp_value,
# value,
# qty
# )
# VALUES
# (
# :timestamp_arrival,
# :timestamp_msg,
# :timestamp_value,
# :value,
# :qty
# )
# """),
# parameters=params
# )
# await CON.commit()
# else:
# raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def ws_stream():
async for websocket in websockets.connect(WSS_URL):
logging.info(f"Connected to {WSS_URL}")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
if data.get('data', None) is not None:
# print(f'FR: {data}')
fr_next_update_ts = (time_round_down(dt=datetime.now(timezone.utc), interval_mins=60)+(60*60))*1000
VAL_KEY_OBJ = json.dumps({
'sequence_id': data['seq'],
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'symbol': data['data']['m'],
'funding_rate': float(data['data']['f']),
'funding_rate_updated_ts_ms': data['data']['T'],
'next_funding_time_ts_ms': fr_next_update_ts,
})
VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ)
continue
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
# await create_rtds_btcusd_table(CON=CON)
await ws_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await ws_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_extended_fund_rate.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

122
ws_extended_orderbook.py Normal file
View File

@@ -0,0 +1,122 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = False
USE_VK: bool = True
VK_TICKER = 'fut_ticker_extended'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Extended_OB.log'
### CONSTANTS ###
WS_SYMBOL: str = 'ETH-USD'
### Globals ###
WSS_URL = f"wss://api.starknet.extended.exchange/stream.extended.exchange/v1/orderbooks/{WS_SYMBOL}?depth=1"
### Websocket ###
async def ws_stream():
async for websocket in websockets.connect(WSS_URL):
logging.info(f"Connected to {WSS_URL}")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
if data.get('type', None) is not None:
# print(f'OB: {data}')
VAL_KEY_OBJ = json.dumps({
'sequence_id': data['seq'],
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'symbol': data['data']['m'],
'best_bid_px': float(data['data']['b'][0]['p']),
'best_bid_qty': float(data['data']['b'][0]['q']),
'best_ask_px': float(data['data']['a'][0]['p']),
'best_ask_qty': float(data['data']['a'][0]['q']),
})
VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
continue
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
# await create_rtds_btcusd_table(CON=CON)
await ws_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await ws_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_extended_orderbook.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

273
ws_extended_user.py Normal file
View File

@@ -0,0 +1,273 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime, timezone
from typing import AsyncContextManager
import math
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
import modules.extended_db as extended_db
import modules.db as db
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_ORDERS = 'fr_extended_user_orders'
VK_TRADES = 'fr_extended_user_trades'
VK_BALANCES = 'fr_extended_user_balances'
VK_POSITIONS = 'fr_extended_user_positions'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Extended_User.log'
### CONSTANTS ###
WSS_URL = "wss://api.starknet.extended.exchange/stream.extended.exchange/v1/account"
API_KEY = os.getenv('EXTENDED_API_KEY')
LOCAL_RECENT_UPDATES_LOOKBACK_SEC = 30
### Globals ###
LOCAL_RECENT_ORDERS: list = []
LOCAL_RECENT_TRADES: list = []
LOCAL_RECENT_BALANCES: list = []
LOCAL_RECENT_POSITIONS: list = []
def upsert_list_of_dicts_by_id(list_of_dicts, new_dict, id='id', seq_check_field: str | None = None):
for index, item in enumerate(list_of_dicts):
if item.get(id) == new_dict.get(id):
if seq_check_field is not None:
if item.get(seq_check_field) < new_dict.get(seq_check_field):
logging.info('Skipping out of sequence msg')
return list_of_dicts
list_of_dicts[index] = new_dict
return list_of_dicts
list_of_dicts.append(new_dict)
return list_of_dicts
### Websocket ###
async def ws_stream():
global LOCAL_RECENT_ORDERS
global LOCAL_RECENT_TRADES
global LOCAL_RECENT_BALANCES
global LOCAL_RECENT_POSITIONS
async for websocket in websockets.connect(WSS_URL, extra_headers={'X-Api-Key': API_KEY}):
logging.info(f"Connected to {WSS_URL}")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
channel = data.get('type', None)
if channel is not None:
LOOKBACK_MIN_TS_MS = ts_arrival - (LOCAL_RECENT_UPDATES_LOOKBACK_SEC*1000)
match channel:
case 'ORDER':
list_for_df = []
for o in data['data']['orders']:
order_update = {
'sequence_id': data['seq'],
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'order_id': o['id'],
'account_id': o['accountId'],
'external_id': o.get('externalId', None),
'market': o['market'],
'type': o['type'],
'side': o['side'],
'status': o['status'],
'status_reason': o.get('statusReason', None),
'price': float(o.get('price', 0)),
'averagePrice': float(o.get('averagePrice', 0)),
'qty': float(o['qty']),
'filled_qty': float(o.get('filledQty', 0)),
'payed_fee': float(o.get('payedFee', 0)),
# 'trigger_dict': o.get('trigger', None),
'tp_sl_type': o.get('tpSlType', None),
# 'take_profit_dict': o.get('takeProfit', None),
# 'stop_loss_dict': o.get('stopLoss', None),
'reduce_only': o.get('reduceOnly', False),
'post_only': o.get('postOnly', False),
'created_time_ts': o['createdTime'],
'updated_time_ts': o['updatedTime'],
'expire_time_ts': o['expireTime'],
}
list_for_df.append(order_update)
LOCAL_RECENT_ORDERS = upsert_list_of_dicts_by_id(LOCAL_RECENT_ORDERS, order_update, id='order_id', seq_check_field='sequence_id')
LOCAL_RECENT_ORDERS = [t for t in LOCAL_RECENT_ORDERS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_ORDERS)
VAL_KEY.set(VK_ORDERS, VAL_KEY_OBJ)
await db.insert_df_to_mysql(table_name='fr_extended_user_order', params=list_for_df, CON=CON)
continue
case 'TRADE':
list_for_df = []
for t in data['data']['trades']:
trade_update = {
'sequence_id': data['seq'],
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'trade_id': t['id'],
'account_id': t['accountId'],
'market': t['market'],
'order_id': t['orderId'],
'external_order_id': t.get('externalOrderId', None),
'side': t['side'],
'price': float(t['price']),
'qty': float(t['qty']),
'value': float(t['value']),
'fee': float(t['fee']),
'trade_type': t['tradeType'],
'created_time_ts': t['createdTime'],
'is_taker': t['isTaker'],
}
LOCAL_RECENT_TRADES = upsert_list_of_dicts_by_id(LOCAL_RECENT_TRADES, trade_update, id='trade_id', seq_check_field='sequence_id')
LOCAL_RECENT_TRADES = [t for t in LOCAL_RECENT_TRADES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_TRADES)
VAL_KEY.set(VK_TRADES, VAL_KEY_OBJ)
await db.insert_df_to_mysql(table_name='fr_extended_user_trade', params=list_for_df, CON=CON)
continue
case 'BALANCE':
balance_update = {
'sequence_id': data['seq'],
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'collateral_name': data['data']['balance']['collateralName'],
'balance': float(data['data']['balance']['balance']),
'equity': float(data['data']['balance']['equity']),
'available_for_trade': float(data['data']['balance']['availableForTrade']),
'available_for_withdrawal': float(data['data']['balance']['availableForWithdrawal']),
'unrealised_pnl': float(data['data']['balance']['unrealisedPnl']),
'initial_margin': float(data['data']['balance']['initialMargin']),
'margin_ratio': float(data['data']['balance']['marginRatio']),
'updated_time_ts': data['data']['balance']['updatedTime'],
'exposure': float(data['data']['balance']['exposure']),
'leverage': float(data['data']['balance']['leverage']),
}
LOCAL_RECENT_BALANCES = upsert_list_of_dicts_by_id(LOCAL_RECENT_BALANCES, balance_update, id='collateral_name', seq_check_field='sequence_id')
LOCAL_RECENT_BALANCES = [t for t in LOCAL_RECENT_BALANCES if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_BALANCES)
VAL_KEY.set(VK_BALANCES, VAL_KEY_OBJ)
await db.insert_df_to_mysql(table_name='fr_extended_user_balance', params=balance_update, CON=CON)
continue
case 'POSITION':
list_for_df = []
for p in data['data']['positions']:
position_update = {
'sequence_id': data['seq'],
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'position_id': p['id'],
'account_id': p['accountId'],
'market': p['market'],
'side': p['side'],
'leverage': float(p['leverage']),
'size': float(p['size']),
'value': float(p['value']),
'open_price': float(p['openPrice']),
'mark_price': float(p['markPrice']),
'liquidation_price': float(p['liquidationPrice']),
'margin': float(p['margin']),
'unrealised_pnl': float(p['unrealisedPnl']),
'realised_pnl': float(p['realisedPnl']),
'tp_trigger_price': float(p.get('tpTriggerPrice', 0)),
'tp_limit_price': float(p.get('tpLimitPrice', 0)),
'sl_trigger_price': float(p.get('slTriggerPrice', 0)),
'sl_limit_price': float(p.get('slLimitPrice', 0)),
'adl_percentile': p['adl'], # closer to 100 means higher chance of auto-deleveraging
'created_at_ts': p['createdAt'],
'updated_at_ts': p['updatedAt'],
}
LOCAL_RECENT_POSITIONS = upsert_list_of_dicts_by_id(LOCAL_RECENT_POSITIONS, position_update, id='position_id', seq_check_field='sequence_id')
LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_POSITIONS)
VAL_KEY.set(VK_POSITIONS, VAL_KEY_OBJ)
await db.insert_df_to_mysql(table_name='fr_extended_user_position', params=list_for_df, CON=CON)
continue
case _:
logging.warning(f'UNMATCHED OTHER MSG: {data}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
async with engine.connect() as CON:
await extended_db.create_fr_extended_user_balance(CON=CON)
await extended_db.create_fr_extended_user_order(CON=CON)
await extended_db.create_fr_extended_user_position(CON=CON)
await extended_db.create_fr_extended_user_trade(CON=CON)
await ws_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await ws_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

View File

@@ -0,0 +1,19 @@
FROM python:3.13-slim
RUN apt-get update && \
apt-get install -y build-essential
RUN gcc --version
RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Finally, run gunicorn.
CMD [ "python", "ws_extended_user.py"]
# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"]

View File

@@ -0,0 +1,244 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
import os
from dotenv import load_dotenv
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = False
USE_VK: bool = True
VK_FUND_RATE = 'fund_rate_mexc'
VK_TICKER = 'fut_ticker_mexc'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_MEXC.log'
### CONSTANTS ###
PING_INTERVAL_SEC = 20
### Globals ###
WSS_URL = "wss://contract.mexc.com/edge"
# HIST_TRADES = np.empty((0, 3))
# HIST_TRADES_LOOKBACK_SEC = 6
# ### Database Funcs ###
# async def create_rtds_btcusd_table(
# CON: AsyncContextManager,
# engine: str = 'mysql', # mysql | duckdb
# ) -> None:
# if CON is None:
# logging.info("NO DB CONNECTION, SKIPPING Create Statements")
# else:
# if engine == 'mysql':
# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
# await CON.execute(text("""
# CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
# timestamp_arrival BIGINT,
# timestamp_msg BIGINT,
# timestamp_value BIGINT,
# value DOUBLE,
# qty DOUBLE
# );
# """))
# await CON.commit()
# else:
# raise ValueError('Only MySQL engine is implemented')
# async def insert_rtds_btcusd_table(
# timestamp_arrival: int,
# timestamp_msg: int,
# timestamp_value: int,
# value: float,
# qty: float,
# CON: AsyncContextManager,
# engine: str = 'mysql', # mysql | duckdb
# ) -> None:
# params={
# 'timestamp_arrival': timestamp_arrival,
# 'timestamp_msg': timestamp_msg,
# 'timestamp_value': timestamp_value,
# 'value': value,
# 'qty': qty,
# }
# if CON is None:
# logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
# else:
# if engine == 'mysql':
# await CON.execute(text("""
# INSERT INTO binance_btcusd_trades
# (
# timestamp_arrival,
# timestamp_msg,
# timestamp_value,
# value,
# qty
# )
# VALUES
# (
# :timestamp_arrival,
# :timestamp_msg,
# :timestamp_value,
# :value,
# :qty
# )
# """),
# parameters=params
# )
# await CON.commit()
# else:
# raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def heartbeat(ws):
while True:
await asyncio.sleep(PING_INTERVAL_SEC)
logging.info("SENDING PING...")
ping_msg = {
"method": "ping"
}
await ws.send(json.dumps(ping_msg))
async def ws_stream():
# global HIST_TRADES
async for websocket in websockets.connect(WSS_URL):
logging.info(f"Connected to {WSS_URL}")
asyncio.create_task(heartbeat(ws=websocket))
subscribe_msg = {
"method": "sub.ticker",
"param": {
"symbol": "ETH_USDT"
},
# "gzip": false
}
await websocket.send(json.dumps(subscribe_msg))
subscribe_msg = {
"method": "sub.funding.rate",
"param": {
"symbol": "ETH_USDT"
},
# "gzip": false
}
await websocket.send(json.dumps(subscribe_msg))
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
if isinstance(message, str):
try:
data = json.loads(message)
channel = data.get('channel', None)
if channel is not None:
match channel:
case 'push.ticker':
# logging.info(f'TICKER: {data}')
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'timestamp_data': data['data']['timestamp'],
'symbol': data['data']['symbol'],
'lastPrice': float(data['data']['lastPrice']),
'fairPrice': float(data['data']['fairPrice']),
'indexPrice': float(data['data']['indexPrice']),
'volume24': float(data['data']['volume24']),
'bid1': float(data['data']['bid1']),
'ask1': float(data['data']['ask1']),
'fundingRate': float(data['data']['fundingRate']),
})
VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ)
case 'push.funding.rate':
# logging.info(f'FUNDING RATE: {data}')
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['ts'],
'symbol': data['data']['symbol'],
'rate': float(data['data']['rate']),
'nextSettleTime_ts_ms': data['data']['nextSettleTime'],
})
VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ)
case 'pong':
logging.info(f'PING RESPONSE: {data}')
case _:
logging.info(f'OTHER: {data}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
# await create_rtds_btcusd_table(CON=CON)
await ws_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await ws_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")