diff --git a/aster.ipynb b/aster.ipynb index e69de29..9f1d669 100644 --- a/aster.ipynb +++ b/aster.ipynb @@ -0,0 +1,154 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "3a269644", + "metadata": {}, + "outputs": [], + "source": [ + "import modules.aster_auth_api as aster_auth_api" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "4395fabb", + "metadata": {}, + "outputs": [], + "source": [ + "listen_key_request = {\n", + " \"url\": \"/fapi/v3/listenKey\",\n", + " \"method\": \"POST\",\n", + " \"params\": {}\n", + "}\n", + "cancel_all_open_orders = {\n", + " \"url\": \"/fapi/v3/allOpenOrders\",\n", + " \"method\": \"DELETE\",\n", + " \"params\": {\n", + " 'symbol': 'ETHUSDT',\n", + " }\n", + "}\n", + "fut_acct_balances = {\n", + " \"url\": \"/fapi/v3/balance\",\n", + " \"method\": \"GET\",\n", + " \"params\": {}\n", + "}\n", + "commission_rate = {\n", + " \"url\": \"/fapi/v3/commissionRate\",\n", + " \"method\": \"GET\",\n", + " \"params\": {\n", + " 'symbol': 'ETHUSDT',\n", + " }\n", + "}\n", + "post_order = {\n", + " \"url\": \"/fapi/v3/order\",\n", + " \"method\": \"POST\",\n", + " \"params\": {\n", + " 'symbol': 'ETHUSDT',\n", + " 'side': 'BUY',\n", + " 'type': 'LIMIT',\n", + " 'timeInForce': 'GTC',\n", + " 'quantity': '0.01',\n", + " 'price': '2100',\n", + " }\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "2122885a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'orderId': 17340952438,\n", + " 'symbol': 'ETHUSDT',\n", + " 'status': 'NEW',\n", + " 'clientOrderId': '57UkyWpQE4iRP74eAwt7Hf',\n", + " 'price': '2100',\n", + " 'avgPrice': '0.00000',\n", + " 'origQty': '0.010',\n", + " 'executedQty': '0',\n", + " 'cumQty': '0',\n", + " 'cumQuote': '0',\n", + " 'timeInForce': 'GTC',\n", + " 'type': 'LIMIT',\n", + " 'reduceOnly': False,\n", + " 'closePosition': False,\n", + " 'side': 'BUY',\n", + " 'positionSide': 'BOTH',\n", + " 'stopPrice': '0',\n", + " 'workingType': 'CONTRACT_PRICE',\n", + " 'priceProtect': False,\n", + " 'origType': 'LIMIT',\n", + " 'updateTime': 1776835456300,\n", + " 'newChainData': {'hash': '0x12456fb0e14021975aa03e56adb339f8b906710ec3a8604a73a8b697c7ee1225'}}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "j = aster_auth_api.post_authenticated_url(post_order)\n", + "j" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "45d68bf4", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fd513311", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "940ceba7", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py_313", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..8abfeae --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,53 @@ +services: + ws_aster: + container_name: ws_aster + restart: "unless-stopped" + build: + context: ./ + dockerfile: ./ws_aster/Dockerfile + volumes: + - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + network_mode: "host" + ws_extended_fund_rate: + container_name: ws_extended_fund_rate + restart: "unless-stopped" + build: + context: ./ + dockerfile: ./ws_extended_fund_rate/Dockerfile + volumes: + - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + network_mode: "host" + ws_extended_orderbook: + container_name: ws_extended_orderbook + restart: "unless-stopped" + build: + context: ./ + dockerfile: ./ws_extended_orderbook/Dockerfile + volumes: + - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + network_mode: "host" + + + # ws_user: + # container_name: ws_user + # restart: "unless-stopped" + # build: + # context: ./ + # dockerfile: ./ws_user/Dockerfile + # volumes: + # - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + # - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + # network_mode: "host" + # ng: + # container_name: ng + # restart: "unless-stopped" + # build: + # context: ./ + # dockerfile: ./ng/Dockerfile + # volumes: + # - /home/ubuntu/data:/home/ubuntu/data:rw # Read-write access to data + # - /home/ubuntu/logs:/home/ubuntu/logs:rw # Read-write access to data + # network_mode: "host" \ No newline at end of file diff --git a/extended.ipynb b/extended.ipynb new file mode 100644 index 0000000..d0c0e03 --- /dev/null +++ b/extended.ipynb @@ -0,0 +1,169 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 15, + "id": "c9606e56", + "metadata": {}, + "outputs": [], + "source": [ + "import math\n", + "from datetime import datetime, timezone" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "2b5d7d21", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'type': 'SNAPSHOT',\n", + " 'data': {'t': 'SNAPSHOT',\n", + " 'm': 'ETH-USD',\n", + " 'b': [{'q': '362.381', 'p': '2318.1'}],\n", + " 'a': [{'q': '70.572', 'p': '2318.2'}],\n", + " 'd': '1'},\n", + " 'ts': 1776822518279,\n", + " 'seq': 14}" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "{\"type\":\"SNAPSHOT\",\"data\":{\"t\":\"SNAPSHOT\",\"m\":\"ETH-USD\",\"b\":[{\"q\":\"362.381\",\"p\":\"2318.1\"}],\"a\":[{\"q\":\"70.572\",\"p\":\"2318.2\"}],\"d\":\"1\"},\"ts\":1776822518279,\"seq\":14}" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "19f51572", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'data': {'m': 'ETH-USD', 'f': '-0.000001', 'T': 1776823319595},\n", + " 'ts': 1776823319595,\n", + " 'seq': 2}" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "{'data': {'m': 'ETH-USD', 'f': '-0.000001', 'T': 1776823319595}, 'ts': 1776823319595, 'seq': 2}" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "68006231", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Timestamp('2026-04-22 03:00:00')" + ] + }, + "execution_count": 41, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd\n", + "\n", + "pd.to_datetime(1776826800000, unit='ms')" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "5561153b", + "metadata": {}, + "outputs": [], + "source": [ + "def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds\n", + " interval_secs = interval_mins * 60\n", + " seconds = dt.timestamp()\n", + " rounded_seconds = math.floor(seconds / interval_secs) * interval_secs\n", + " \n", + " return rounded_seconds" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "bd83b59f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1776826800000" + ] + }, + "execution_count": 40, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "(time_round_down(dt=datetime.now(timezone.utc), interval_mins=60)+(60*60))*1000" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "665377af", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "af5c751b", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py_313", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/main.py b/main.py index 6e9004d..7f77fe1 100644 --- a/main.py +++ b/main.py @@ -36,12 +36,15 @@ async def run_algo(): loop_start = time.time() print('__________Start___________') - MEXC_FUND_RATE = json.loads(VAL_KEY.get('fund_rate_mexc')) - MEXC_TICKER = json.loads(VAL_KEY.get('fut_ticker_mexc')) - APEX_TICKER = json.loads(VAL_KEY.get('fut_ticker_apex')) - print(f'MEXC FUND RATE: {MEXC_FUND_RATE}') - print(f'MEXC TICKER: {MEXC_TICKER}') - print(f'APEX TICKER: {APEX_TICKER}') + ASTER_FUND_RATE = json.loads(VAL_KEY.get('fund_rate_aster')) + ASTER_TICKER = json.loads(VAL_KEY.get('fut_ticker_aster')) + print(f'ASTER FUND RATE: {ASTER_FUND_RATE}') + print(f'ASTER TICKER: {ASTER_TICKER}') + + EXTENDED_FUND_RATE = json.loads(VAL_KEY.get('fund_rate_extended')) + EXTENDED_TICKER = json.loads(VAL_KEY.get('fut_ticker_extended')) + print(f'EXTENDED FUND RATE: {EXTENDED_FUND_RATE}') + print(f'EXTENDED TICKER: {EXTENDED_TICKER}') time.sleep(5) diff --git a/modules/__pycache__/aster_auth_api.cpython-313.pyc b/modules/__pycache__/aster_auth_api.cpython-313.pyc new file mode 100644 index 0000000..9b7cb0d Binary files /dev/null and b/modules/__pycache__/aster_auth_api.cpython-313.pyc differ diff --git a/modules/aster_auth_api.py b/modules/aster_auth_api.py new file mode 100644 index 0000000..1880fe0 --- /dev/null +++ b/modules/aster_auth_api.py @@ -0,0 +1,108 @@ +import requests +from dotenv import load_dotenv +import os +import time +import threading +import urllib + +from eth_account.messages import encode_typed_data +from eth_account import Account + +load_dotenv() + +user = os.getenv("RABBY_WALLET") +signer = os.getenv("ASTER_API_WALLET_ADDRESS") +private_key = os.getenv("ASTER_API_PRIVATE_KEY") + +_last_ms = 0 +_i = 0 + +def post_authenticated_url(req: dict) -> dict: + typed_data = { + "types": { + "EIP712Domain": [ + {"name": "name", "type": "string"}, + {"name": "version", "type": "string"}, + {"name": "chainId", "type": "uint256"}, + {"name": "verifyingContract", "type": "address"} + ], + "Message": [ + { "name": "msg", "type": "string" } + ] + }, + "primaryType": "Message", + "domain": { + "name": "AsterSignTransaction", + "version": "1", + "chainId": 1666, + "verifyingContract": "0x0000000000000000000000000000000000000000" + }, + "message": { + "msg": "$msg" + } + } + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'User-Agent': 'PythonApp/1.0' + } + host = 'https://fapi.asterdex.com' + + + def get_nonce(): + _nonce_lock = threading.Lock() + global _last_ms, _i + with _nonce_lock: + now_ms = int(time.time()) + + if now_ms == _last_ms: + _i += 1 + else: + _last_ms = now_ms + _i = 0 + + return now_ms * 1_000_000 + _i + + def sign_typed_data(data: dict, private_key: str): + """Sign EIP-712 typed data using encode_typed_data.""" + message = encode_typed_data( + domain_data=data["domain"], + message_types={"Message": data["types"]["Message"]}, + message_data=data["message"], + ) + return Account.sign_message(message, private_key=private_key) + + def send_by_url(req): + my_dict = req['params'].copy() + url = host + req['url'] + method = req['method'] + + my_dict['nonce'] = str(get_nonce()) + my_dict['user'] = user + my_dict['signer'] = signer + + param = urllib.parse.urlencode(my_dict) + + typed_data['message']['msg'] = param + signed = sign_typed_data(typed_data, private_key) + + full_url = url + '?' + param + '&signature=' + signed.signature.hex() + # print(full_url) + + if method == 'GET': + res = requests.get(full_url, headers=headers) + # print(res.status_code, res.text) + return res.json() + elif method == 'POST': + res = requests.post(full_url, headers=headers) + # print(res.status_code, res.text) + return res.json() + elif method == 'PUT': + res = requests.put(full_url, headers=headers) + # print(res.status_code, res.text) + return res.json() + elif method == 'DELETE': + res = requests.delete(full_url, headers=headers) + # print(res.status_code, res.text) + return res.json() + + return send_by_url(req=req) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4a4d001 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,25 @@ +pandas +rel +websockets +pyarrow +# plotly +mysql-connector-python +sqlalchemy +requests +pymysql +scipy +asyncmy +cryptography +# TA-Lib +valkey +nicegui +# py_clob_client +# google +# google-api-core==2.30.0 +# google-api-python-client==2.190.0 +# googleapis-common-protos==1.72.0 +# grpcio==1.76.0 +# grpcio-tools==1.76.0 +x10-python-trading-starknet +eth-keys +eth-account \ No newline at end of file diff --git a/ws_aster/Dockerfile b/ws_aster/Dockerfile new file mode 100644 index 0000000..df4528b --- /dev/null +++ b/ws_aster/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.13-slim + +RUN apt-get update && \ + apt-get install -y build-essential + +RUN gcc --version +RUN rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Finally, run gunicorn. +CMD [ "python", "ws_aster.py"] +# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"] \ No newline at end of file diff --git a/ws_aster_user.py b/ws_aster_user.py new file mode 100644 index 0000000..8b144e1 --- /dev/null +++ b/ws_aster_user.py @@ -0,0 +1,200 @@ +import asyncio +import json +import logging +import socket +import traceback +from datetime import datetime +from typing import AsyncContextManager + +import numpy as np +import pandas as pd +import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore +from sqlalchemy import text +import websockets +from sqlalchemy.ext.asyncio import create_async_engine +import valkey +import os +from dotenv import load_dotenv + + +### Allow only ipv4 ### +def allowed_gai_family(): + return socket.AF_INET +urllib3_cn.allowed_gai_family = allowed_gai_family + +### Database ### +USE_DB: bool = False +USE_VK: bool = False +VK_USER = 'fund_rate_aster' +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Aster_User.log' + +### CONSTANTS ### + +### Globals ### +WSS_URL = "wss://fstream.asterdex.com" + +# HIST_TRADES = np.empty((0, 3)) +# HIST_TRADES_LOOKBACK_SEC = 6 + +# ### Database Funcs ### +# async def create_rtds_btcusd_table( +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Create Statements") +# else: +# if engine == 'mysql': +# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') +# await CON.execute(text(""" +# CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( +# timestamp_arrival BIGINT, +# timestamp_msg BIGINT, +# timestamp_value BIGINT, +# value DOUBLE, +# qty DOUBLE +# ); +# """)) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +# async def insert_rtds_btcusd_table( +# timestamp_arrival: int, +# timestamp_msg: int, +# timestamp_value: int, +# value: float, +# qty: float, +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# params={ +# 'timestamp_arrival': timestamp_arrival, +# 'timestamp_msg': timestamp_msg, +# 'timestamp_value': timestamp_value, +# 'value': value, +# 'qty': qty, +# } +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Insert Statements") +# else: +# if engine == 'mysql': +# await CON.execute(text(""" +# INSERT INTO binance_btcusd_trades +# ( +# timestamp_arrival, +# timestamp_msg, +# timestamp_value, +# value, +# qty +# ) +# VALUES +# ( +# :timestamp_arrival, +# :timestamp_msg, +# :timestamp_value, +# :value, +# :qty +# ) +# """), +# parameters=params +# ) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +### Websocket ### +async def ws_stream(): + async for websocket in websockets.connect(WSS_URL): + logging.info(f"Connected to {WSS_URL}") + + + try: + async for message in websocket: + ts_arrival = round(datetime.now().timestamp()*1000) + if isinstance(message, str): + try: + data = json.loads(message) + channel = data.get('stream', None) + if channel is not None: + match channel: + case : + continue + case : + # print(f'BT: {data}') + VAL_KEY_OBJ = json.dumps({ + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['data']['E'], + 'timestamp_transaction': data['data']['T'], + 'orderbook_update_id': data['data']['u'], + 'symbol': data['data']['s'], + 'best_bid_px': data['data']['b'], + 'best_bid_qty': data['data']['B'], + 'best_ask_px': data['data']['a'], + 'best_ask_qty': data['data']['A'], + }) + VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) + continue + case _: + logging.warning(f'UNMATCHED OTHER MSG: {data}') + else: + logging.info(f'Initial or unexpected data struct, skipping: {data}') + continue + except (json.JSONDecodeError, ValueError): + logging.warning(f'Message not in JSON format, skipping: {message}') + continue + else: + raise ValueError(f'Type: {type(data)} not expected: {message}') + except websockets.ConnectionClosed as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + continue + except Exception as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + + +async def main(): + global VAL_KEY + global CON + + if USE_VK: + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) + else: + VAL_KEY = None + logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") + + if USE_DB: + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') + async with engine.connect() as CON: + # await create_rtds_btcusd_table(CON=CON) + await ws_stream() + else: + CON = None + logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") + await ws_stream() + + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Stream stopped") \ No newline at end of file diff --git a/ws_extended.py b/ws_extended.py deleted file mode 100644 index e69de29..0000000 diff --git a/ws_extended_fund_rate.py b/ws_extended_fund_rate.py new file mode 100644 index 0000000..e3d4992 --- /dev/null +++ b/ws_extended_fund_rate.py @@ -0,0 +1,203 @@ +import asyncio +import json +import logging +import socket +import traceback +from datetime import datetime, timezone +from typing import AsyncContextManager +import math + +import numpy as np +import pandas as pd +import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore +from sqlalchemy import text +import websockets +from sqlalchemy.ext.asyncio import create_async_engine +import valkey +import os +from dotenv import load_dotenv + + +### Allow only ipv4 ### +def allowed_gai_family(): + return socket.AF_INET +urllib3_cn.allowed_gai_family = allowed_gai_family + +### Database ### +USE_DB: bool = False +USE_VK: bool = True +VK_FUND_RATE = 'fund_rate_extended' + +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Extended_FR.log' + +### CONSTANTS ### +WS_SYMBOL: str = 'ETH-USD' +FUNDING_RATE_INTERVAL_MIN = 60 + +### Globals ### +WSS_URL = f"wss://api.starknet.extended.exchange/stream.extended.exchange/v1/funding/{WS_SYMBOL}" + + +# HIST_TRADES = np.empty((0, 3)) +# HIST_TRADES_LOOKBACK_SEC = 6 + +def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds + interval_secs = interval_mins * 60 + seconds = dt.timestamp() + rounded_seconds = math.floor(seconds / interval_secs) * interval_secs + + return rounded_seconds + + +# ### Database Funcs ### +# async def create_rtds_btcusd_table( +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Create Statements") +# else: +# if engine == 'mysql': +# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') +# await CON.execute(text(""" +# CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( +# timestamp_arrival BIGINT, +# timestamp_msg BIGINT, +# timestamp_value BIGINT, +# value DOUBLE, +# qty DOUBLE +# ); +# """)) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +# async def insert_rtds_btcusd_table( +# timestamp_arrival: int, +# timestamp_msg: int, +# timestamp_value: int, +# value: float, +# qty: float, +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# params={ +# 'timestamp_arrival': timestamp_arrival, +# 'timestamp_msg': timestamp_msg, +# 'timestamp_value': timestamp_value, +# 'value': value, +# 'qty': qty, +# } +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Insert Statements") +# else: +# if engine == 'mysql': +# await CON.execute(text(""" +# INSERT INTO binance_btcusd_trades +# ( +# timestamp_arrival, +# timestamp_msg, +# timestamp_value, +# value, +# qty +# ) +# VALUES +# ( +# :timestamp_arrival, +# :timestamp_msg, +# :timestamp_value, +# :value, +# :qty +# ) +# """), +# parameters=params +# ) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +### Websocket ### +async def ws_stream(): + async for websocket in websockets.connect(WSS_URL): + logging.info(f"Connected to {WSS_URL}") + try: + async for message in websocket: + ts_arrival = round(datetime.now().timestamp()*1000) + if isinstance(message, str): + try: + data = json.loads(message) + if data.get('data', None) is not None: + print(f'FR: {data}') + fr_next_update_ts = (time_round_down(dt=datetime.now(timezone.utc), interval_mins=60)+(60*60))*1000 + VAL_KEY_OBJ = json.dumps({ + 'sequence_id': data['seq'], + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['ts'], + 'symbol': data['data']['m'], + 'funding_rate': float(data['data']['f']), + 'funding_rate_updated_ts_ms': data['data']['T'], + 'next_funding_time_ts_ms': fr_next_update_ts, + }) + VAL_KEY.set(VK_FUND_RATE, VAL_KEY_OBJ) + continue + else: + logging.info(f'Initial or unexpected data struct, skipping: {data}') + continue + except (json.JSONDecodeError, ValueError): + logging.warning(f'Message not in JSON format, skipping: {message}') + continue + else: + raise ValueError(f'Type: {type(data)} not expected: {message}') + except websockets.ConnectionClosed as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + continue + except Exception as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + + +async def main(): + global VAL_KEY + global CON + + if USE_VK: + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) + else: + VAL_KEY = None + logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") + + if USE_DB: + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') + async with engine.connect() as CON: + # await create_rtds_btcusd_table(CON=CON) + await ws_stream() + else: + CON = None + logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") + await ws_stream() + + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Stream stopped") \ No newline at end of file diff --git a/ws_extended_fund_rate/Dockerfile b/ws_extended_fund_rate/Dockerfile new file mode 100644 index 0000000..5504f0c --- /dev/null +++ b/ws_extended_fund_rate/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.13-slim + +RUN apt-get update && \ + apt-get install -y build-essential + +RUN gcc --version +RUN rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Finally, run gunicorn. +CMD [ "python", "ws_extended_fund_rate.py"] +# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"] \ No newline at end of file diff --git a/ws_extended_orderbook.py b/ws_extended_orderbook.py new file mode 100644 index 0000000..8abc140 --- /dev/null +++ b/ws_extended_orderbook.py @@ -0,0 +1,192 @@ +import asyncio +import json +import logging +import socket +import traceback +from datetime import datetime +from typing import AsyncContextManager + +import numpy as np +import pandas as pd +import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore +from sqlalchemy import text +import websockets +from sqlalchemy.ext.asyncio import create_async_engine +import valkey +import os +from dotenv import load_dotenv + + +### Allow only ipv4 ### +def allowed_gai_family(): + return socket.AF_INET +urllib3_cn.allowed_gai_family = allowed_gai_family + +### Database ### +USE_DB: bool = False +USE_VK: bool = True + +VK_TICKER = 'fut_ticker_extended' +CON: AsyncContextManager | None = None +VAL_KEY = None + +### Logging ### +load_dotenv() +LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Extended.log' + +### CONSTANTS ### +WS_SYMBOL: str = 'ETH-USD' + +### Globals ### +WSS_URL = f"wss://api.starknet.extended.exchange/stream.extended.exchange/v1/orderbooks/{WS_SYMBOL}?depth=1" + +# HIST_TRADES = np.empty((0, 3)) +# HIST_TRADES_LOOKBACK_SEC = 6 + +# ### Database Funcs ### +# async def create_rtds_btcusd_table( +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Create Statements") +# else: +# if engine == 'mysql': +# logging.info('Creating Table if Does Not Exist: binance_btcusd_trades') +# await CON.execute(text(""" +# CREATE TABLE IF NOT EXISTS binance_btcusd_trades ( +# timestamp_arrival BIGINT, +# timestamp_msg BIGINT, +# timestamp_value BIGINT, +# value DOUBLE, +# qty DOUBLE +# ); +# """)) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +# async def insert_rtds_btcusd_table( +# timestamp_arrival: int, +# timestamp_msg: int, +# timestamp_value: int, +# value: float, +# qty: float, +# CON: AsyncContextManager, +# engine: str = 'mysql', # mysql | duckdb +# ) -> None: +# params={ +# 'timestamp_arrival': timestamp_arrival, +# 'timestamp_msg': timestamp_msg, +# 'timestamp_value': timestamp_value, +# 'value': value, +# 'qty': qty, +# } +# if CON is None: +# logging.info("NO DB CONNECTION, SKIPPING Insert Statements") +# else: +# if engine == 'mysql': +# await CON.execute(text(""" +# INSERT INTO binance_btcusd_trades +# ( +# timestamp_arrival, +# timestamp_msg, +# timestamp_value, +# value, +# qty +# ) +# VALUES +# ( +# :timestamp_arrival, +# :timestamp_msg, +# :timestamp_value, +# :value, +# :qty +# ) +# """), +# parameters=params +# ) +# await CON.commit() +# else: +# raise ValueError('Only MySQL engine is implemented') + +### Websocket ### +async def ws_stream(): + async for websocket in websockets.connect(WSS_URL): + logging.info(f"Connected to {WSS_URL}") + try: + async for message in websocket: + ts_arrival = round(datetime.now().timestamp()*1000) + if isinstance(message, str): + try: + data = json.loads(message) + if data.get('type', None) is not None: + # print(f'OB: {data}') + VAL_KEY_OBJ = json.dumps({ + 'sequence_id': data['seq'], + 'timestamp_arrival': ts_arrival, + 'timestamp_msg': data['ts'], + 'symbol': data['data']['m'], + 'best_bid_px': float(data['data']['b'][0]['p']), + 'best_bid_qty': float(data['data']['b'][0]['q']), + 'best_ask_px': float(data['data']['a'][0]['p']), + 'best_ask_qty': float(data['data']['a'][0]['q']), + }) + VAL_KEY.set(VK_TICKER, VAL_KEY_OBJ) + continue + else: + logging.info(f'Initial or unexpected data struct, skipping: {data}') + continue + except (json.JSONDecodeError, ValueError): + logging.warning(f'Message not in JSON format, skipping: {message}') + continue + else: + raise ValueError(f'Type: {type(data)} not expected: {message}') + except websockets.ConnectionClosed as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + continue + except Exception as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + + +async def main(): + global VAL_KEY + global CON + + if USE_VK: + VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0) + else: + VAL_KEY = None + logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED") + + if USE_DB: + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket') + async with engine.connect() as CON: + # await create_rtds_btcusd_table(CON=CON) + await ws_stream() + else: + CON = None + logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") + await ws_stream() + + +if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Stream stopped") \ No newline at end of file diff --git a/ws_extended_orderbook/Dockerfile b/ws_extended_orderbook/Dockerfile new file mode 100644 index 0000000..91b9e1c --- /dev/null +++ b/ws_extended_orderbook/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.13-slim + +RUN apt-get update && \ + apt-get install -y build-essential + +RUN gcc --version +RUN rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Finally, run gunicorn. +CMD [ "python", "ws_extended_orderbook.py"] +# CMD [ "gunicorn", "--workers=5", "--threads=1", "-b 0.0.0.0:8000", "app:server"] \ No newline at end of file