Compare commits
2 Commits
main
...
inter_feed
| Author | SHA1 | Date | |
|---|---|---|---|
| 8d7d99d749 | |||
| c051130867 |
BIN
Screenshot 2026-03-29 at 12.35.51 AM.png
Normal file
BIN
Screenshot 2026-03-29 at 12.35.51 AM.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 337 KiB |
277
database.ipynb
Normal file
277
database.ipynb
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 2,
|
||||||
|
"id": "4cae6bf1",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from sqlalchemy import create_engine, text\n",
|
||||||
|
"import pandas as pd\n",
|
||||||
|
"from datetime import datetime"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 6,
|
||||||
|
"id": "f5040527",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
"Connection successful\n"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"### MYSQL ###\n",
|
||||||
|
"engine = create_engine('mysql+pymysql://root:pwd@localhost/polymarket')\n",
|
||||||
|
"try:\n",
|
||||||
|
" with engine.connect() as conn:\n",
|
||||||
|
" print(\"Connection successful\")\n",
|
||||||
|
"except Exception as e:\n",
|
||||||
|
" print(f\"Connection failed: {e}\") "
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 7,
|
||||||
|
"id": "5c23110d",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"query = '''\n",
|
||||||
|
"SELECT timestamp_msg,timestamp_value,`value` FROM poly_rtds_cl_btcusd;\n",
|
||||||
|
"'''"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 8,
|
||||||
|
"id": "a866e9ca",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"df = pd.read_sql(query, con=engine)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 10,
|
||||||
|
"id": "954a3c3c",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"df['ts'] = pd.to_datetime(df['timestamp_value'], unit='ms')"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 6,
|
||||||
|
"id": "f11fd680",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"1774752413939"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 6,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"round(datetime.now().timestamp()*1000)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 11,
|
||||||
|
"id": "eadb0364",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"Timestamp('2026-03-29 03:22:03.145000')"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 11,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"pd.to_datetime(1774754523145, unit='ms')"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 14,
|
||||||
|
"id": "3fbf525c",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"d = {'id': '1753563', 'question': 'Bitcoin Up or Down - March 29, 12:40AM-12:45AM ET', 'conditionId': '0x4cf6815a61939a9a0ee308772e727bbaf8e95803577aad7b9a9d3e028e37f13b', 'slug': 'btc-updown-5m-1774759200', 'resolutionSource': 'https://data.chain.link/streams/btc-usd', 'endDate': '2026-03-29T04:45:00Z', 'liquidity': '40412.2984', 'startDate': '2026-03-28T04:48:26.823506Z', 'image': 'https://polymarket-upload.s3.us-east-2.amazonaws.com/BTC+fullsize.png', 'icon': 'https://polymarket-upload.s3.us-east-2.amazonaws.com/BTC+fullsize.png', 'description': 'This market will resolve to \"Up\" if the Bitcoin price at the end of the time range specified in the title is greater than or equal to the price at the beginning of that range. Otherwise, it will resolve to \"Down\".\\nThe resolution source for this market is information from Chainlink, specifically the BTC/USD data stream available at https://data.chain.link/streams/btc-usd.\\nPlease note that this market is about the price according to Chainlink data stream BTC/USD, not according to other sources or spot markets.', 'outcomes': '[\"Up\", \"Down\"]', 'outcomePrices': '[\"0.505\", \"0.495\"]', 'volume': '1154.594883', 'active': True, 'closed': False, 'marketMakerAddress': '', 'createdAt': '2026-03-28T04:47:13.404208Z', 'updatedAt': '2026-03-29T04:38:45.64501Z', 'new': False, 'featured': False, 'archived': False, 'restricted': True, 'groupItemThreshold': '0', 'questionID': '0x7e86ce5e7ba0eb24758756db7c1443bf48041d08ac10364cf6771ef3f9c26733', 'enableOrderBook': True, 'orderPriceMinTickSize': 0.01, 'orderMinSize': 5, 'volumeNum': 1154.594883, 'liquidityNum': 40412.2984, 'endDateIso': '2026-03-29', 'startDateIso': '2026-03-28', 'hasReviewedDates': True, 'volume24hr': 1154.594883, 'volume1wk': 1154.594883, 'volume1mo': 1154.594883, 'volume1yr': 1154.594883, 'clobTokenIds': '[\"3132608671382208432230794800974499111421928258370863811882545679011841068490\", \"6273455409880805876304408793123549040642604317110995252288748628688958052125\"]', 'volume24hrClob': 1154.594883, 'volume1wkClob': 1154.594883, 'volume1moClob': 1154.594883, 'volume1yrClob': 1154.594883, 'volumeClob': 1154.594883, 'liquidityClob': 40412.2984, 'makerBaseFee': 1000, 'takerBaseFee': 1000, 'acceptingOrders': True, 'negRisk': False, 'ready': False, 'funded': False, 'acceptingOrdersTimestamp': '2026-03-28T04:47:21Z', 'cyom': False, 'competitive': 0.9999750006249843, 'pagerDutyNotificationEnabled': False, 'approved': True, 'rewardsMinSize': 50, 'rewardsMaxSpread': 4.5, 'spread': 0.01, 'lastTradePrice': 0.51, 'bestBid': 0.5, 'bestAsk': 0.51, 'automaticallyActive': True, 'clearBookOnStart': False, 'showGmpSeries': False, 'showGmpOutcome': False, 'manualActivation': False, 'negRiskOther': False, 'umaResolutionStatuses': '[]', 'pendingDeployment': False, 'deploying': False, 'rfqEnabled': False, 'eventStartTime': '2026-03-29T04:40:00Z', 'holdingRewardsEnabled': False, 'feesEnabled': True, 'requiresTranslation': False, 'makerRebatesFeeShareBps': 10000, 'feeType': 'crypto_fees', 'feeSchedule': {'exponent': 2, 'rate': 0.25, 'takerOnly': True, 'rebateRate': 0.2}}"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 15,
|
||||||
|
"id": "699031ee",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"{'id': '1753563',\n",
|
||||||
|
" 'question': 'Bitcoin Up or Down - March 29, 12:40AM-12:45AM ET',\n",
|
||||||
|
" 'conditionId': '0x4cf6815a61939a9a0ee308772e727bbaf8e95803577aad7b9a9d3e028e37f13b',\n",
|
||||||
|
" 'slug': 'btc-updown-5m-1774759200',\n",
|
||||||
|
" 'resolutionSource': 'https://data.chain.link/streams/btc-usd',\n",
|
||||||
|
" 'endDate': '2026-03-29T04:45:00Z',\n",
|
||||||
|
" 'liquidity': '40412.2984',\n",
|
||||||
|
" 'startDate': '2026-03-28T04:48:26.823506Z',\n",
|
||||||
|
" 'image': 'https://polymarket-upload.s3.us-east-2.amazonaws.com/BTC+fullsize.png',\n",
|
||||||
|
" 'icon': 'https://polymarket-upload.s3.us-east-2.amazonaws.com/BTC+fullsize.png',\n",
|
||||||
|
" 'description': 'This market will resolve to \"Up\" if the Bitcoin price at the end of the time range specified in the title is greater than or equal to the price at the beginning of that range. Otherwise, it will resolve to \"Down\".\\nThe resolution source for this market is information from Chainlink, specifically the BTC/USD data stream available at https://data.chain.link/streams/btc-usd.\\nPlease note that this market is about the price according to Chainlink data stream BTC/USD, not according to other sources or spot markets.',\n",
|
||||||
|
" 'outcomes': '[\"Up\", \"Down\"]',\n",
|
||||||
|
" 'outcomePrices': '[\"0.505\", \"0.495\"]',\n",
|
||||||
|
" 'volume': '1154.594883',\n",
|
||||||
|
" 'active': True,\n",
|
||||||
|
" 'closed': False,\n",
|
||||||
|
" 'marketMakerAddress': '',\n",
|
||||||
|
" 'createdAt': '2026-03-28T04:47:13.404208Z',\n",
|
||||||
|
" 'updatedAt': '2026-03-29T04:38:45.64501Z',\n",
|
||||||
|
" 'new': False,\n",
|
||||||
|
" 'featured': False,\n",
|
||||||
|
" 'archived': False,\n",
|
||||||
|
" 'restricted': True,\n",
|
||||||
|
" 'groupItemThreshold': '0',\n",
|
||||||
|
" 'questionID': '0x7e86ce5e7ba0eb24758756db7c1443bf48041d08ac10364cf6771ef3f9c26733',\n",
|
||||||
|
" 'enableOrderBook': True,\n",
|
||||||
|
" 'orderPriceMinTickSize': 0.01,\n",
|
||||||
|
" 'orderMinSize': 5,\n",
|
||||||
|
" 'volumeNum': 1154.594883,\n",
|
||||||
|
" 'liquidityNum': 40412.2984,\n",
|
||||||
|
" 'endDateIso': '2026-03-29',\n",
|
||||||
|
" 'startDateIso': '2026-03-28',\n",
|
||||||
|
" 'hasReviewedDates': True,\n",
|
||||||
|
" 'volume24hr': 1154.594883,\n",
|
||||||
|
" 'volume1wk': 1154.594883,\n",
|
||||||
|
" 'volume1mo': 1154.594883,\n",
|
||||||
|
" 'volume1yr': 1154.594883,\n",
|
||||||
|
" 'clobTokenIds': '[\"3132608671382208432230794800974499111421928258370863811882545679011841068490\", \"6273455409880805876304408793123549040642604317110995252288748628688958052125\"]',\n",
|
||||||
|
" 'volume24hrClob': 1154.594883,\n",
|
||||||
|
" 'volume1wkClob': 1154.594883,\n",
|
||||||
|
" 'volume1moClob': 1154.594883,\n",
|
||||||
|
" 'volume1yrClob': 1154.594883,\n",
|
||||||
|
" 'volumeClob': 1154.594883,\n",
|
||||||
|
" 'liquidityClob': 40412.2984,\n",
|
||||||
|
" 'makerBaseFee': 1000,\n",
|
||||||
|
" 'takerBaseFee': 1000,\n",
|
||||||
|
" 'acceptingOrders': True,\n",
|
||||||
|
" 'negRisk': False,\n",
|
||||||
|
" 'ready': False,\n",
|
||||||
|
" 'funded': False,\n",
|
||||||
|
" 'acceptingOrdersTimestamp': '2026-03-28T04:47:21Z',\n",
|
||||||
|
" 'cyom': False,\n",
|
||||||
|
" 'competitive': 0.9999750006249843,\n",
|
||||||
|
" 'pagerDutyNotificationEnabled': False,\n",
|
||||||
|
" 'approved': True,\n",
|
||||||
|
" 'rewardsMinSize': 50,\n",
|
||||||
|
" 'rewardsMaxSpread': 4.5,\n",
|
||||||
|
" 'spread': 0.01,\n",
|
||||||
|
" 'lastTradePrice': 0.51,\n",
|
||||||
|
" 'bestBid': 0.5,\n",
|
||||||
|
" 'bestAsk': 0.51,\n",
|
||||||
|
" 'automaticallyActive': True,\n",
|
||||||
|
" 'clearBookOnStart': False,\n",
|
||||||
|
" 'showGmpSeries': False,\n",
|
||||||
|
" 'showGmpOutcome': False,\n",
|
||||||
|
" 'manualActivation': False,\n",
|
||||||
|
" 'negRiskOther': False,\n",
|
||||||
|
" 'umaResolutionStatuses': '[]',\n",
|
||||||
|
" 'pendingDeployment': False,\n",
|
||||||
|
" 'deploying': False,\n",
|
||||||
|
" 'rfqEnabled': False,\n",
|
||||||
|
" 'eventStartTime': '2026-03-29T04:40:00Z',\n",
|
||||||
|
" 'holdingRewardsEnabled': False,\n",
|
||||||
|
" 'feesEnabled': True,\n",
|
||||||
|
" 'requiresTranslation': False,\n",
|
||||||
|
" 'makerRebatesFeeShareBps': 10000,\n",
|
||||||
|
" 'feeType': 'crypto_fees',\n",
|
||||||
|
" 'feeSchedule': {'exponent': 2,\n",
|
||||||
|
" 'rate': 0.25,\n",
|
||||||
|
" 'takerOnly': True,\n",
|
||||||
|
" 'rebateRate': 0.2}}"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 15,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"d"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"id": "a620fa17",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": []
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"id": "2071f014",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": []
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": []
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"id": "5ba7be5f",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": []
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": "py_313",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.13.12"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 5
|
||||||
|
}
|
||||||
Binary file not shown.
117
ng.py
Normal file
117
ng.py
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
import os
|
||||||
|
from nicegui import ui, app
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
# import requests
|
||||||
|
import json
|
||||||
|
# import time
|
||||||
|
# import re
|
||||||
|
import valkey
|
||||||
|
# import asyncio
|
||||||
|
# import datetime as dt
|
||||||
|
# from random import random
|
||||||
|
# from nicegui_modules import data
|
||||||
|
# from nicegui_modules import ui_components
|
||||||
|
# from glide import GlideClient, NodeAddress, GlideClientConfiguration
|
||||||
|
|
||||||
|
|
||||||
|
LISTENING_CLIENT = None
|
||||||
|
LH_PAIR = 'BTC'
|
||||||
|
RH_PAIR = 'USD'
|
||||||
|
|
||||||
|
DEFAULT_TO_DARKMODE: bool = True
|
||||||
|
ALLOW_BODY_SCROLL: bool = True
|
||||||
|
LOOKBACK: int = 60
|
||||||
|
LOOKBACK_RT_TV_MAX_POINTS: int = 300
|
||||||
|
REFRESH_INTERVAL_SEC: int = 10
|
||||||
|
REFRESH_INTERVAL_RT_SEC: int = 0.1
|
||||||
|
|
||||||
|
ENGINE = create_engine('mysql+pymysql://root:pwd@localhost/polymarket')
|
||||||
|
VALKEY_R = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
|
||||||
|
# VALKEY_P = VALKEY_R.pubsub()
|
||||||
|
# VALKEY_P.subscribe('mexc_mkt_bookTicker')
|
||||||
|
|
||||||
|
|
||||||
|
def root():
|
||||||
|
app.add_static_files(max_cache_age=0, url_path='/static', local_directory=os.path.join(os.path.dirname(__file__), 'nicegui_modules/static'))
|
||||||
|
ui.add_head_html('''
|
||||||
|
<meta name="darkreader-lock">
|
||||||
|
<link rel="stylesheet" type="text/css" href="/static/styles.css">
|
||||||
|
<script type="text/javascript" src="https://unpkg.com/lightweight-charts/dist/lightweight-charts.standalone.production.js"></script>
|
||||||
|
<script src="/static/script.js"></script>
|
||||||
|
'''
|
||||||
|
)
|
||||||
|
|
||||||
|
# ui.add_head_html('<meta name="darkreader-lock">')
|
||||||
|
update_body_scroll(bool_override=ALLOW_BODY_SCROLL)
|
||||||
|
|
||||||
|
ui.sub_pages({
|
||||||
|
'/': rt_chart_page,
|
||||||
|
}).classes('w-full')
|
||||||
|
|
||||||
|
|
||||||
|
async def update_tv():
|
||||||
|
series_update = json.loads(VALKEY_R.get('poly_rtds_cl_btcusd'))
|
||||||
|
series_update_b = json.loads(VALKEY_R.get('poly_coinbase_btcusd'))
|
||||||
|
series_update_c = json.loads(VALKEY_R.get('poly_5min_btcusd'))
|
||||||
|
timestamp = round( ( series_update['timestamp_arrival'] / 1000 ) , 2)
|
||||||
|
timestamp_b = round( ( series_update_b['timestamp_arrival'] / 1000 ) , 2)
|
||||||
|
timestamp_c = round( ( series_update_c['timestamp_arrival'] / 1000 ) , 2)
|
||||||
|
value = float(series_update['value'])
|
||||||
|
value_b = float(series_update_b['value'])
|
||||||
|
value_c = float(series_update_c['price'])
|
||||||
|
|
||||||
|
data_dict = {
|
||||||
|
'timestamp': timestamp,
|
||||||
|
'timestamp_b': timestamp_b,
|
||||||
|
'timestamp_c': timestamp_c,
|
||||||
|
'value': value,
|
||||||
|
'value_b': value_b,
|
||||||
|
'value_c': value_c,
|
||||||
|
'target': series_update_c['target_price'],
|
||||||
|
'LOOKBACK_RT_TV_MAX_POINTS': LOOKBACK_RT_TV_MAX_POINTS,
|
||||||
|
}
|
||||||
|
|
||||||
|
ui.run_javascript(f'await update_tv(data_dict={data_dict});')
|
||||||
|
|
||||||
|
|
||||||
|
def update_body_scroll(e=None, bool_override=False):
|
||||||
|
if e is None:
|
||||||
|
if bool_override:
|
||||||
|
ui.query('body').style('height: 100%; overflow-y: auto;')
|
||||||
|
else:
|
||||||
|
ui.query('body').style('height: 100%; overflow-y: hidden;')
|
||||||
|
else:
|
||||||
|
if e.value:
|
||||||
|
ui.query('body').style('height: 100%; overflow-y: auto;')
|
||||||
|
else:
|
||||||
|
ui.query('body').style('height: 100%; overflow-y: hidden;')
|
||||||
|
|
||||||
|
# async def refresh_lookback_funcs(lookback: int = LOOKBACK):
|
||||||
|
# lookback = app.storage.user.get('lookback', lookback)
|
||||||
|
|
||||||
|
# await data.trades_pnl_graph.refresh(ENGINE=ENGINE, LH_PAIR=LH_PAIR, RH_PAIR=RH_PAIR, lookback=lookback)
|
||||||
|
# await ui_components.er_table.refresh(ENGINE=ENGINE, lookback=lookback)
|
||||||
|
# await ui_components.trades_table.refresh(ENGINE=ENGINE, lookback=lookback)
|
||||||
|
# await ui_components.er_stats.refresh(ENGINE=ENGINE, lookback=lookback)
|
||||||
|
|
||||||
|
async def rt_chart_page():
|
||||||
|
global LOOKBACK
|
||||||
|
|
||||||
|
LOOKBACK = app.storage.user.get('lookback', LOOKBACK)
|
||||||
|
timer = ui.timer(REFRESH_INTERVAL_RT_SEC, update_tv)
|
||||||
|
|
||||||
|
with ui.row():
|
||||||
|
with ui.column():
|
||||||
|
ui.switch('☸︎', value=ALLOW_BODY_SCROLL, on_change=lambda e: update_body_scroll(e))
|
||||||
|
with ui.column():
|
||||||
|
ui.switch('▶️', value=True).bind_value_to(timer, 'active')
|
||||||
|
with ui.column().style('position: absolute; right: 20px; font-family: monospace; align-self: center;'):
|
||||||
|
ui.label('Atwater Trading: Orderbook')
|
||||||
|
|
||||||
|
with ui.grid(columns=16).classes('w-full gap-0 auto-fit'):
|
||||||
|
with ui.card().tight().classes('w-full col-span-full no-shadow border border-black-200').style('overflow: auto;'):
|
||||||
|
ui.html('<div id="tv" style="width:100%; height:800px;"></div>', sanitize=False).classes('w-full')
|
||||||
|
ui.run_javascript('await create_tv();')
|
||||||
|
|
||||||
|
|
||||||
|
ui.run(root, storage_secret="123ABC", reload=True, dark=True, title='Atwater Trading')
|
||||||
220
nicegui_modules/static/script.js
Normal file
220
nicegui_modules/static/script.js
Normal file
@@ -0,0 +1,220 @@
|
|||||||
|
async function waitForVariable(variableName, timeout = 5000) {
|
||||||
|
const startTime = Date.now();
|
||||||
|
while (typeof window[variableName] === 'undefined') {
|
||||||
|
if (Date.now() - startTime > timeout) {
|
||||||
|
throw new Error(`Variable '${variableName}' not defined within ${timeout}ms`);
|
||||||
|
}
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
|
}
|
||||||
|
console.log(`Variable '${variableName}' is now defined.`);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function update_tv(data_dict) {
|
||||||
|
|
||||||
|
window.data.push({ time: data_dict.timestamp, value: data_dict.value });
|
||||||
|
window.data_b.push({ time: data_dict.timestamp_b, value: data_dict.value_b });
|
||||||
|
window.data_c.push({ time: data_dict.timestamp_c, value: data_dict.value_c });
|
||||||
|
window.data_tgt.push({ time: data_dict.timestamp_c, value: data_dict.target });
|
||||||
|
window.lineSeries.update({ time: data_dict.timestamp, value: data_dict.value });
|
||||||
|
window.lineSeries_b.update({ time: data_dict.timestamp_b, value: data_dict.value_b });
|
||||||
|
window.lineSeries_c.update({ time: data_dict.timestamp_c, value: data_dict.value_c });
|
||||||
|
window.lineSeries_tgt.update({ time: data_dict.timestamp_c, value: data_dict.target });
|
||||||
|
|
||||||
|
// midPriceLine.applyOptions({
|
||||||
|
// price: data_dict.mid_px,
|
||||||
|
// color: '#c78228',
|
||||||
|
// lineWidth: 3,
|
||||||
|
// lineStyle: LightweightCharts.LineStyle.Dashed,
|
||||||
|
// axisLabelVisible: true,
|
||||||
|
// });
|
||||||
|
|
||||||
|
window.chart.timeScale().scrollToRealTime();
|
||||||
|
// const currentRange = window.chart.timeScale().getVisibleLogicalRange();
|
||||||
|
// window.chart.timeScale().fitContent();
|
||||||
|
// window.chart.timeScale().setVisibleLogicalRange(currentRange);
|
||||||
|
|
||||||
|
const MAX_DATA_POINTS = data_dict.LOOKBACK_RT_TV_MAX_POINTS;
|
||||||
|
if (window.lineSeries.data().length > MAX_DATA_POINTS) {
|
||||||
|
window.lineSeries.setData(lineSeries.data().slice(-MAX_DATA_POINTS));
|
||||||
|
}
|
||||||
|
if (window.lineSeries_b.data().length > MAX_DATA_POINTS) {
|
||||||
|
window.lineSeries_b.setData(lineSeries_b.data().slice(-MAX_DATA_POINTS));
|
||||||
|
}
|
||||||
|
if (window.lineSeries_c.data().length > MAX_DATA_POINTS) {
|
||||||
|
window.lineSeries_c.setData(lineSeries_c.data().slice(-MAX_DATA_POINTS));
|
||||||
|
}
|
||||||
|
if (window.lineSeries_tgt.data().length > MAX_DATA_POINTS) {
|
||||||
|
window.lineSeries_tgt.setData(lineSeries_tgt.data().slice(-MAX_DATA_POINTS));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
async function create_tv() {
|
||||||
|
window.chart = LightweightCharts.createChart(document.getElementById('tv'),
|
||||||
|
{
|
||||||
|
autoSize: true,
|
||||||
|
toolbox: true,
|
||||||
|
timeScale: {
|
||||||
|
timeVisible: true, // Shows HH:mm on x-axis
|
||||||
|
secondsVisible: true // Optional: show seconds
|
||||||
|
},
|
||||||
|
rightPriceScale: {
|
||||||
|
visible: true
|
||||||
|
},
|
||||||
|
leftPriceScale: {
|
||||||
|
visible: true
|
||||||
|
},
|
||||||
|
|
||||||
|
layout: {
|
||||||
|
background: { type: 'solid', color: '#222' },
|
||||||
|
textColor: '#DDD',
|
||||||
|
},
|
||||||
|
grid: {
|
||||||
|
vertLines: {
|
||||||
|
color: '#e1e1e1', // Set vertical line color
|
||||||
|
visible: true,
|
||||||
|
style: 2, // 0: Solid, 1: Dashed, 2: Dotted, 3: LargeDashed, 4: SparseDotted
|
||||||
|
},
|
||||||
|
horzLines: {
|
||||||
|
color: '#e1e1e1', // Set horizontal line color
|
||||||
|
visible: true,
|
||||||
|
style: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
crosshair: { mode: LightweightCharts.CrosshairMode.Normal },
|
||||||
|
}
|
||||||
|
);
|
||||||
|
window.lineSeries = chart.addSeries(LightweightCharts.LineSeries, {
|
||||||
|
color: '#94fcdf',
|
||||||
|
priceScaleId: 'right'
|
||||||
|
// topColor: '#94fcdf',
|
||||||
|
// bottomColor: 'rgba(112, 171, 249, 0.28)',
|
||||||
|
// invertFilledArea: false
|
||||||
|
});
|
||||||
|
window.lineSeries_b = chart.addSeries(LightweightCharts.LineSeries, {
|
||||||
|
color: '#dd7525',
|
||||||
|
priceScaleId: 'right'
|
||||||
|
// topColor: '#94fcdf',
|
||||||
|
// bottomColor: 'rgba(112, 171, 249, 0.28)',
|
||||||
|
// invertFilledArea: false
|
||||||
|
});
|
||||||
|
window.lineSeries_c = chart.addSeries(LightweightCharts.LineSeries, {
|
||||||
|
color: '#ea0707',
|
||||||
|
priceScaleId: 'left',
|
||||||
|
priceRange: {
|
||||||
|
minValue: 0,
|
||||||
|
maxValue: 1
|
||||||
|
},
|
||||||
|
// topColor: '#94fcdf',
|
||||||
|
// bottomColor: 'rgba(112, 171, 249, 0.28)',
|
||||||
|
// invertFilledArea: false
|
||||||
|
});
|
||||||
|
window.lineSeries_tgt = chart.addSeries(LightweightCharts.LineSeries, {
|
||||||
|
color: '#ffffff',
|
||||||
|
priceScaleId: 'right',
|
||||||
|
lineStyle: LightweightCharts.LineStyle.Dashed
|
||||||
|
// topColor: '#94fcdf',
|
||||||
|
// bottomColor: 'rgba(112, 171, 249, 0.28)',
|
||||||
|
// invertFilledArea: false
|
||||||
|
});
|
||||||
|
// window.midPriceLine_Config = {
|
||||||
|
// price: 0,
|
||||||
|
// color: '#c78228',
|
||||||
|
// lineWidth: 3,
|
||||||
|
// lineStyle: LightweightCharts.LineStyle.Dashed,
|
||||||
|
// axisLabelVisible: false,
|
||||||
|
// };
|
||||||
|
// window.midPriceLine = window.lineSeries.createPriceLine(midPriceLine_Config);
|
||||||
|
window.data = [];
|
||||||
|
window.data_b = [];
|
||||||
|
window.data_c = [];
|
||||||
|
window.data_tgt = [];
|
||||||
|
window.lineSeries.setData(window.data);
|
||||||
|
window.lineSeries_b.setData(window.data_b);
|
||||||
|
window.lineSeries_c.setData(window.data_c);
|
||||||
|
window.lineSeries_tgt.setData(window.data_tgt);
|
||||||
|
|
||||||
|
// Create and style the tooltip html element
|
||||||
|
const container = document.getElementById('tv');
|
||||||
|
|
||||||
|
window.toolTipWidth = 200;
|
||||||
|
|
||||||
|
const toolTip = document.createElement('div');
|
||||||
|
toolTip.style = `width: ${window.toolTipWidth}px; height: 100%; position: absolute; display: none; padding: 8px; box-sizing: border-box; font-size: 12px; text-align: left; z-index: 1000; top: 12px; left: 12px; pointer-events: none; border-radius: 4px 4px 0px 0px; border-bottom: none; box-shadow: 0 2px 5px 0 rgba(117, 134, 150, 0.45);font-family: -apple-system, BlinkMacSystemFont, 'Trebuchet MS', Roboto, Ubuntu, sans-serif; -webkit-font-smoothing: antialiased; -moz-osx-font-smoothing: grayscale;`;
|
||||||
|
toolTip.style.background = `rgba(${'0, 0, 0'}, 0.25)`;
|
||||||
|
toolTip.style.color = 'white';
|
||||||
|
toolTip.style.borderColor = 'rgba( 239, 83, 80, 1)';
|
||||||
|
container.appendChild(toolTip);
|
||||||
|
|
||||||
|
// update tooltip
|
||||||
|
window.chart.subscribeCrosshairMove(async param => {
|
||||||
|
|
||||||
|
if (
|
||||||
|
param.point === undefined ||
|
||||||
|
!param.time ||
|
||||||
|
param.point.x < 0 ||
|
||||||
|
param.point.x > container.clientWidth ||
|
||||||
|
param.point.y < 0 ||
|
||||||
|
param.point.y > container.clientHeight
|
||||||
|
) {
|
||||||
|
toolTip.style.display = 'none';
|
||||||
|
} else {
|
||||||
|
|
||||||
|
// toolTip.style.height = '100%';
|
||||||
|
toolTip.style.alignContent = 'center';
|
||||||
|
|
||||||
|
const dateStr = new Date(param.time*1000).toISOString();
|
||||||
|
|
||||||
|
let data = await param.seriesData.get(window.lineSeries);
|
||||||
|
if (data === undefined) {
|
||||||
|
data = {}
|
||||||
|
data.value = 0
|
||||||
|
console.log('data is UNDEFINED, SETTING TO 0')
|
||||||
|
};
|
||||||
|
|
||||||
|
let data_b = await param.seriesData.get(window.lineSeries_b);
|
||||||
|
if (data_b === undefined) {
|
||||||
|
data_b = {}
|
||||||
|
data_b.value = 0
|
||||||
|
console.log('data is UNDEFINED, SETTING TO 0')
|
||||||
|
};
|
||||||
|
|
||||||
|
const value_px = data.value
|
||||||
|
const value_px_b = window.data_b.value
|
||||||
|
const value_px_c = window.data_c.value
|
||||||
|
const value_px_tgt = window.data_tgt.value
|
||||||
|
|
||||||
|
toolTip.style.display = 'block';
|
||||||
|
// <div style="color: ${'rgba( 239, 83, 80, 1)'}">
|
||||||
|
// Atwater Trading
|
||||||
|
// </div>
|
||||||
|
toolTip.innerHTML = `
|
||||||
|
<div style="font-size: 24px; margin: 4px 0px; color: ${'white'}">
|
||||||
|
Chainlink: ${Math.round(100 * value_px) / 100}
|
||||||
|
Binance: ${Math.round(100 * value_px_b) / 100}
|
||||||
|
</div>
|
||||||
|
<div style="color: ${'white'}">
|
||||||
|
${dateStr}
|
||||||
|
</div>
|
||||||
|
`;
|
||||||
|
|
||||||
|
let left = param.point.x; // relative to timeScale
|
||||||
|
const timeScaleWidth = chart.timeScale().width();
|
||||||
|
const priceScaleWidth = chart.priceScale('left').width();
|
||||||
|
const halfTooltipWidth = toolTipWidth / 2;
|
||||||
|
left += priceScaleWidth - halfTooltipWidth;
|
||||||
|
left = Math.min(left, priceScaleWidth + timeScaleWidth - toolTipWidth);
|
||||||
|
left = Math.max(left, priceScaleWidth);
|
||||||
|
|
||||||
|
toolTip.style.left = left + 'px';
|
||||||
|
toolTip.style.top = 0 + 'px';
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
window.chart.timeScale().fitContent();
|
||||||
|
|
||||||
|
console.log("TV Created!")
|
||||||
|
};
|
||||||
33
nicegui_modules/static/styles.css
Normal file
33
nicegui_modules/static/styles.css
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
/* Sticky Quasar Table for Dark Mode */
|
||||||
|
.table-sticky-dark .q-table__top,
|
||||||
|
.table-sticky-dark .q-table__bottom,
|
||||||
|
.table-sticky-dark thead tr:first-child th {
|
||||||
|
background-color: black;
|
||||||
|
}
|
||||||
|
.table-sticky-dark thead tr th {
|
||||||
|
position: sticky;
|
||||||
|
z-index: 1;
|
||||||
|
}
|
||||||
|
.table-sticky-dark thead tr:first-child th {
|
||||||
|
top: 0;
|
||||||
|
}
|
||||||
|
.table-sticky-dark tbody {
|
||||||
|
scroll-margin-top: 48px;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Sticky Quasar Table for Light Mode */
|
||||||
|
/* .table-sticky-light .q-table__top,
|
||||||
|
.table-sticky-light .q-table__bottom,
|
||||||
|
.table-sticky-light thead tr:first-child th {
|
||||||
|
background-color: rgb(229, 223, 223);
|
||||||
|
}
|
||||||
|
.table-sticky-light thead tr th {
|
||||||
|
position: sticky;
|
||||||
|
z-index: 1;
|
||||||
|
}
|
||||||
|
.table-sticky-light thead tr:first-child th {
|
||||||
|
top: 0;
|
||||||
|
}
|
||||||
|
.table-sticky-light tbody {
|
||||||
|
scroll-margin-top: 48px;
|
||||||
|
} */
|
||||||
File diff suppressed because one or more lines are too long
150
ws.py
150
ws.py
@@ -1,150 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import json
|
|
||||||
import math
|
|
||||||
import pandas as pd
|
|
||||||
import os
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
import websockets
|
|
||||||
import numpy as np
|
|
||||||
import talib
|
|
||||||
import requests
|
|
||||||
|
|
||||||
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
|
|
||||||
SLUG_END_TIME = 0
|
|
||||||
|
|
||||||
HIST_TRADES = np.empty((0, 2))
|
|
||||||
|
|
||||||
def format_timestamp(total_seconds) -> str:
|
|
||||||
minutes, seconds = divmod(total_seconds, 60)
|
|
||||||
return f"{minutes} minutes and {seconds} seconds"
|
|
||||||
|
|
||||||
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
|
|
||||||
interval_secs = interval_mins * 60
|
|
||||||
seconds = dt.timestamp()
|
|
||||||
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
|
|
||||||
|
|
||||||
return rounded_seconds
|
|
||||||
|
|
||||||
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
|
|
||||||
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
|
|
||||||
market = r.json()['markets'][0]
|
|
||||||
token_ids = json.loads(market.get("clobTokenIds", "[]"))
|
|
||||||
outcomes = json.loads(market.get("outcomes", "[]"))
|
|
||||||
d = dict(zip(outcomes, token_ids))
|
|
||||||
d['isActive'] = market['negRisk']
|
|
||||||
d['MinTickSize'] = market['orderPriceMinTickSize']
|
|
||||||
d['isNegRisk'] = market['negRisk']
|
|
||||||
d['ConditionId'] = market['conditionId']
|
|
||||||
d['EndDateTime'] = market['endDate']
|
|
||||||
|
|
||||||
return d, market
|
|
||||||
|
|
||||||
def gen_slug():
|
|
||||||
slug_prefix = 'btc-updown-5m-'
|
|
||||||
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
|
|
||||||
return slug_prefix + str(slug_time_id)
|
|
||||||
|
|
||||||
|
|
||||||
async def polymarket_stream():
|
|
||||||
global SLUG_END_TIME
|
|
||||||
global HIST_TRADES
|
|
||||||
|
|
||||||
slug_full = gen_slug()
|
|
||||||
market_details, market = get_mkt_details_by_slug(slug_full)
|
|
||||||
TARGET_ASSET_ID = market_details['Up']
|
|
||||||
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
|
|
||||||
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
|
|
||||||
|
|
||||||
async with websockets.connect(WSS_URL) as websocket:
|
|
||||||
print(f"Connected to {WSS_URL}")
|
|
||||||
|
|
||||||
subscribe_msg = {
|
|
||||||
"assets_ids": [TARGET_ASSET_ID],
|
|
||||||
"type": "market",
|
|
||||||
"custom_feature_enabled": True
|
|
||||||
}
|
|
||||||
|
|
||||||
await websocket.send(json.dumps(subscribe_msg))
|
|
||||||
print(f"Subscribed to Asset: {TARGET_ASSET_ID}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
async for message in websocket:
|
|
||||||
current_ts = round(datetime.now().timestamp())
|
|
||||||
sec_remaining = SLUG_END_TIME - current_ts
|
|
||||||
|
|
||||||
if sec_remaining <= 0:
|
|
||||||
HIST_TRADES = np.empty((0, 2))
|
|
||||||
|
|
||||||
print('*** Attempting to unsub from past 5min')
|
|
||||||
update_unsub_msg = {
|
|
||||||
"operation": 'unsubscribe',
|
|
||||||
"assets_ids": [TARGET_ASSET_ID],
|
|
||||||
"custom_feature_enabled": True
|
|
||||||
}
|
|
||||||
await websocket.send(json.dumps(update_unsub_msg))
|
|
||||||
|
|
||||||
print('*** Attempting to SUB to new 5min')
|
|
||||||
slug_full = gen_slug()
|
|
||||||
market_details, market = get_mkt_details_by_slug(slug_full)
|
|
||||||
TARGET_ASSET_ID = market_details['Up']
|
|
||||||
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
|
|
||||||
|
|
||||||
update_sub_msg = {
|
|
||||||
"operation": 'subscribe',
|
|
||||||
"assets_ids": [TARGET_ASSET_ID],
|
|
||||||
"custom_feature_enabled": True
|
|
||||||
}
|
|
||||||
await websocket.send(json.dumps(update_sub_msg))
|
|
||||||
|
|
||||||
|
|
||||||
if isinstance(message, str):
|
|
||||||
data = json.loads(message)
|
|
||||||
|
|
||||||
if isinstance(data, dict):
|
|
||||||
# print(data.get("event_type", None))
|
|
||||||
pass
|
|
||||||
elif isinstance(data, list):
|
|
||||||
print('initial book: ')
|
|
||||||
print(data)
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
|
||||||
|
|
||||||
event_type = data.get("event_type", None)
|
|
||||||
|
|
||||||
if event_type == "price_change":
|
|
||||||
# print("📈 Price Change")
|
|
||||||
# print(pd.DataFrame(data['price_changes']))
|
|
||||||
pass
|
|
||||||
elif event_type == "best_bid_ask":
|
|
||||||
# print(pd.DataFrame([data]))
|
|
||||||
pass
|
|
||||||
elif event_type == "last_trade_price":
|
|
||||||
px = float(data['price'])
|
|
||||||
qty = float(data['size'])
|
|
||||||
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
|
|
||||||
SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
|
|
||||||
print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
|
|
||||||
elif event_type == "book":
|
|
||||||
pass
|
|
||||||
elif event_type == "new_market":
|
|
||||||
print('Received new_market')
|
|
||||||
elif event_type == "market_resolved":
|
|
||||||
print(f"Received: {event_type}")
|
|
||||||
print(data)
|
|
||||||
elif event_type == "tick_size_change": # may want for CLOB order routing
|
|
||||||
print(f"Received: {event_type}")
|
|
||||||
print(data)
|
|
||||||
else:
|
|
||||||
print(f"Received: {event_type}")
|
|
||||||
print(data)
|
|
||||||
|
|
||||||
except websockets.ConnectionClosed:
|
|
||||||
print("Connection closed by server.")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
try:
|
|
||||||
asyncio.run(polymarket_stream())
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("Stream stopped.")
|
|
||||||
199
ws_binance.py
Normal file
199
ws_binance.py
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
import traceback
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import AsyncContextManager
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
|
||||||
|
from sqlalchemy import text
|
||||||
|
import websockets
|
||||||
|
from sqlalchemy.ext.asyncio import create_async_engine
|
||||||
|
import valkey
|
||||||
|
|
||||||
|
### Allow only ipv4 ###
|
||||||
|
def allowed_gai_family():
|
||||||
|
return socket.AF_INET
|
||||||
|
urllib3_cn.allowed_gai_family = allowed_gai_family
|
||||||
|
|
||||||
|
### Database ###
|
||||||
|
USE_DB: bool = True
|
||||||
|
USE_VK: bool = True
|
||||||
|
VK_CHANNEL = 'poly_binance_btcusd'
|
||||||
|
CON: AsyncContextManager | None = None
|
||||||
|
VAL_KEY = None
|
||||||
|
|
||||||
|
|
||||||
|
### Logging ###
|
||||||
|
LOG_FILEPATH: str = '/root/logs/Polymarket_Binance_Trades.log'
|
||||||
|
|
||||||
|
### Globals ###
|
||||||
|
WSS_URL = "wss://stream.binance.com:9443/ws/BTCUSDT@trade"
|
||||||
|
# HIST_TRADES = np.empty((0, 2))
|
||||||
|
|
||||||
|
### Database Funcs ###
|
||||||
|
async def create_rtds_btcusd_table(
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
|
||||||
|
await CON.execute(text("""
|
||||||
|
CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
|
||||||
|
timestamp_msg BIGINT,
|
||||||
|
timestamp_value BIGINT,
|
||||||
|
value DOUBLE,
|
||||||
|
qty DOUBLE
|
||||||
|
);
|
||||||
|
"""))
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
async def insert_rtds_btcusd_table(
|
||||||
|
timestamp_msg: int,
|
||||||
|
timestamp_value: int,
|
||||||
|
value: float,
|
||||||
|
qty: float,
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
params={
|
||||||
|
'timestamp_msg': timestamp_msg,
|
||||||
|
'timestamp_value': timestamp_value,
|
||||||
|
'value': value,
|
||||||
|
'qty': qty,
|
||||||
|
}
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
await CON.execute(text("""
|
||||||
|
INSERT INTO binance_btcusd_trades
|
||||||
|
(
|
||||||
|
timestamp_msg,
|
||||||
|
timestamp_value,
|
||||||
|
value,
|
||||||
|
qty
|
||||||
|
)
|
||||||
|
VALUES
|
||||||
|
(
|
||||||
|
:timestamp_msg,
|
||||||
|
:timestamp_value,
|
||||||
|
:value,
|
||||||
|
:qty
|
||||||
|
)
|
||||||
|
"""),
|
||||||
|
parameters=params
|
||||||
|
)
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
|
||||||
|
### Websocket ###
|
||||||
|
async def binance_trades_stream():
|
||||||
|
global HIST_TRADES
|
||||||
|
|
||||||
|
async for websocket in websockets.connect(WSS_URL):
|
||||||
|
logging.info(f"Connected to {WSS_URL}")
|
||||||
|
|
||||||
|
subscribe_msg = {
|
||||||
|
"method": "SUBSCRIBE",
|
||||||
|
"params": ["btcusdt@trade"],
|
||||||
|
"id": 1
|
||||||
|
}
|
||||||
|
|
||||||
|
await websocket.send(json.dumps(subscribe_msg))
|
||||||
|
|
||||||
|
try:
|
||||||
|
async for message in websocket:
|
||||||
|
if isinstance(message, str):
|
||||||
|
try:
|
||||||
|
data = json.loads(message)
|
||||||
|
if data.get('t', None) is not None:
|
||||||
|
last_px = float(data['p'])
|
||||||
|
print(f'🤑 BTC Binance Last Px: {last_px:_.4f}; TS: {pd.to_datetime(data['T'], unit='ms')}')
|
||||||
|
VAL_KEY.publish(VK_CHANNEL, json.dumps({
|
||||||
|
'timestamp_msg': data['E'],
|
||||||
|
'timestamp_value': data['T'],
|
||||||
|
'value': last_px,
|
||||||
|
'qty': data['q'],
|
||||||
|
}))
|
||||||
|
VAL_KEY.set(VK_CHANNEL, json.dumps({
|
||||||
|
'timestamp_msg': data['E'],
|
||||||
|
'timestamp_value': data['T'],
|
||||||
|
'value': last_px,
|
||||||
|
'qty': data['q'],
|
||||||
|
}))
|
||||||
|
await insert_rtds_btcusd_table(
|
||||||
|
CON=CON,
|
||||||
|
timestamp_msg=data['E'],
|
||||||
|
timestamp_value=data['T'],
|
||||||
|
value=last_px,
|
||||||
|
qty=data['q'],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logging.info(f'Initial or unexpected data struct, skipping: {data}')
|
||||||
|
continue
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
logging.warning(f'Message not in JSON format, skipping: {message}')
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
||||||
|
except websockets.ConnectionClosed as e:
|
||||||
|
logging.error(f'Connection closed: {e}')
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f'Connection closed: {e}')
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
global VAL_KEY
|
||||||
|
global CON
|
||||||
|
|
||||||
|
if USE_VK:
|
||||||
|
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||||
|
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||||
|
else:
|
||||||
|
VAL_KEY = None
|
||||||
|
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||||
|
|
||||||
|
if USE_DB:
|
||||||
|
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||||
|
async with engine.connect() as CON:
|
||||||
|
await create_rtds_btcusd_table(CON=CON)
|
||||||
|
await binance_trades_stream()
|
||||||
|
else:
|
||||||
|
CON = None
|
||||||
|
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||||
|
await binance_trades_stream()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
START_TIME = round(datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
|
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
force=True,
|
||||||
|
filename=LOG_FILEPATH,
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
filemode='w'
|
||||||
|
)
|
||||||
|
logging.info(f"STARTED: {START_TIME}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info("Stream stopped")
|
||||||
312
ws_clob.py
Normal file
312
ws_clob.py
Normal file
@@ -0,0 +1,312 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import logging
|
||||||
|
import pandas as pd
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
import websockets
|
||||||
|
import numpy as np
|
||||||
|
import talib
|
||||||
|
import requests
|
||||||
|
from typing import AsyncContextManager
|
||||||
|
from sqlalchemy.ext.asyncio import create_async_engine
|
||||||
|
from sqlalchemy import text
|
||||||
|
import valkey
|
||||||
|
|
||||||
|
### Database ###
|
||||||
|
USE_DB: bool = True
|
||||||
|
USE_VK: bool = True
|
||||||
|
VK_CHANNEL = 'poly_5min_btcusd'
|
||||||
|
CON: AsyncContextManager | None = None
|
||||||
|
VAL_KEY = None
|
||||||
|
|
||||||
|
### Logging ###
|
||||||
|
LOG_FILEPATH: str = '/root/logs/Polymarket_5min.log'
|
||||||
|
|
||||||
|
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
|
||||||
|
SLUG_END_TIME = 0
|
||||||
|
|
||||||
|
HIST_TRADES = np.empty((0, 2))
|
||||||
|
TARGET_PX = 0
|
||||||
|
|
||||||
|
def format_timestamp(total_seconds) -> str:
|
||||||
|
minutes, seconds = divmod(total_seconds, 60)
|
||||||
|
|
||||||
|
return f"{minutes} minutes and {seconds} seconds"
|
||||||
|
|
||||||
|
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
|
||||||
|
interval_secs = interval_mins * 60
|
||||||
|
seconds = dt.timestamp()
|
||||||
|
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
|
||||||
|
|
||||||
|
return rounded_seconds
|
||||||
|
|
||||||
|
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
|
||||||
|
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
|
||||||
|
market = r.json()['markets'][0]
|
||||||
|
token_ids = json.loads(market.get("clobTokenIds", "[]"))
|
||||||
|
outcomes = json.loads(market.get("outcomes", "[]"))
|
||||||
|
d = dict(zip(outcomes, token_ids))
|
||||||
|
d['isActive'] = market['negRisk']
|
||||||
|
d['MinTickSize'] = market['orderPriceMinTickSize']
|
||||||
|
d['OrderMinSize'] = market['orderMinSize']
|
||||||
|
d['isNegRisk'] = market['negRisk']
|
||||||
|
d['ConditionId'] = market['conditionId']
|
||||||
|
d['EndDateTime'] = market['endDate']
|
||||||
|
# d['Liquidity'] = market['liquidity']
|
||||||
|
d['LiquidityClob'] = market['liquidityClob']
|
||||||
|
d['VolumeNum'] = market['volumeNum']
|
||||||
|
d['Volume24hr'] = market['volume24hr']
|
||||||
|
print(market)
|
||||||
|
|
||||||
|
return d, market
|
||||||
|
|
||||||
|
def gen_slug():
|
||||||
|
slug_prefix = 'btc-updown-5m-'
|
||||||
|
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
return slug_prefix + str(slug_time_id)
|
||||||
|
|
||||||
|
|
||||||
|
### Database Funcs ###
|
||||||
|
async def create_poly_btcusd_trades_table(
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
logging.info('Creating Table if Does Not Exist: poly_btcusd_trades')
|
||||||
|
await CON.execute(text("""
|
||||||
|
CREATE TABLE IF NOT EXISTS poly_btcusd_trades (
|
||||||
|
timestamp_arrival BIGINT,
|
||||||
|
timestamp_msg BIGINT,
|
||||||
|
timestamp_value BIGINT,
|
||||||
|
price DOUBLE,
|
||||||
|
qty DOUBLE,
|
||||||
|
side_taker VARCHAR(8)
|
||||||
|
);
|
||||||
|
"""))
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
async def insert_poly_btcusd_trades_table(
|
||||||
|
timestamp_arrival: int,
|
||||||
|
timestamp_msg: int,
|
||||||
|
timestamp_value: int,
|
||||||
|
price: float,
|
||||||
|
qty: float,
|
||||||
|
side_taker: str,
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
params={
|
||||||
|
'timestamp_arrival': timestamp_arrival,
|
||||||
|
'timestamp_msg': timestamp_msg,
|
||||||
|
'timestamp_value': timestamp_value,
|
||||||
|
'price': price,
|
||||||
|
'qty': qty,
|
||||||
|
'side_taker': side_taker,
|
||||||
|
}
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
await CON.execute(text("""
|
||||||
|
INSERT INTO poly_btcusd_trades
|
||||||
|
(
|
||||||
|
timestamp_arrival,
|
||||||
|
timestamp_msg,
|
||||||
|
timestamp_value,
|
||||||
|
price,
|
||||||
|
qty,
|
||||||
|
side_taker
|
||||||
|
)
|
||||||
|
VALUES
|
||||||
|
(
|
||||||
|
:timestamp_arrival,
|
||||||
|
:timestamp_msg,
|
||||||
|
:timestamp_value,
|
||||||
|
:price,
|
||||||
|
:qty,
|
||||||
|
:side_taker
|
||||||
|
)
|
||||||
|
"""),
|
||||||
|
parameters=params
|
||||||
|
)
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
|
||||||
|
async def polymarket_stream():
|
||||||
|
global SLUG_END_TIME
|
||||||
|
global TARGET_PX
|
||||||
|
global HIST_TRADES
|
||||||
|
|
||||||
|
slug_full = gen_slug()
|
||||||
|
market_details, market = get_mkt_details_by_slug(slug_full)
|
||||||
|
TARGET_ASSET_ID = market_details['Up']
|
||||||
|
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
|
||||||
|
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
|
||||||
|
|
||||||
|
async with websockets.connect(WSS_URL) as websocket:
|
||||||
|
print(f"Connected to {WSS_URL}")
|
||||||
|
|
||||||
|
subscribe_msg = {
|
||||||
|
"assets_ids": [TARGET_ASSET_ID],
|
||||||
|
"type": "market",
|
||||||
|
"custom_feature_enabled": True
|
||||||
|
}
|
||||||
|
|
||||||
|
await websocket.send(json.dumps(subscribe_msg))
|
||||||
|
print(f"Subscribed to Asset: {TARGET_ASSET_ID}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
async for message in websocket:
|
||||||
|
ts_arrival = round(datetime.now().timestamp()*1000)
|
||||||
|
sec_remaining = SLUG_END_TIME - round(datetime.now().timestamp())
|
||||||
|
|
||||||
|
if sec_remaining <= 0:
|
||||||
|
ref_data = json.loads(VAL_KEY.get('poly_rtds_cl_btcusd'))
|
||||||
|
TARGET_PX = float(ref_data['value'])
|
||||||
|
HIST_TRADES = np.empty((0, 2))
|
||||||
|
|
||||||
|
print('*** Attempting to unsub from past 5min')
|
||||||
|
update_unsub_msg = {
|
||||||
|
"operation": 'unsubscribe',
|
||||||
|
"assets_ids": [TARGET_ASSET_ID],
|
||||||
|
"custom_feature_enabled": True
|
||||||
|
}
|
||||||
|
await websocket.send(json.dumps(update_unsub_msg))
|
||||||
|
|
||||||
|
print('*** Attempting to SUB to new 5min')
|
||||||
|
slug_full = gen_slug()
|
||||||
|
market_details, market = get_mkt_details_by_slug(slug_full)
|
||||||
|
TARGET_ASSET_ID = market_details['Up']
|
||||||
|
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
|
||||||
|
|
||||||
|
update_sub_msg = {
|
||||||
|
"operation": 'subscribe',
|
||||||
|
"assets_ids": [TARGET_ASSET_ID],
|
||||||
|
"custom_feature_enabled": True
|
||||||
|
}
|
||||||
|
await websocket.send(json.dumps(update_sub_msg))
|
||||||
|
|
||||||
|
if isinstance(message, str):
|
||||||
|
data = json.loads(message)
|
||||||
|
if isinstance(data, list):
|
||||||
|
print('initial book:')
|
||||||
|
print(data)
|
||||||
|
continue
|
||||||
|
|
||||||
|
event_type = data.get("event_type", None)
|
||||||
|
|
||||||
|
if event_type == "price_change":
|
||||||
|
# print("📈 Price Change")
|
||||||
|
# print(pd.DataFrame(data['price_changes']))
|
||||||
|
pass
|
||||||
|
elif event_type == "best_bid_ask":
|
||||||
|
# print(pd.DataFrame([data]))
|
||||||
|
pass
|
||||||
|
elif event_type == "last_trade_price":
|
||||||
|
ts_msg = int(data['timestamp'])
|
||||||
|
ts_value = int(ts_msg)
|
||||||
|
px = float(data['price'])
|
||||||
|
qty = float(data['size'])
|
||||||
|
side_taker = data['side']
|
||||||
|
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
|
||||||
|
# SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
|
||||||
|
# print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
|
||||||
|
print(f"✨ Last Px: {px:.2f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
|
||||||
|
|
||||||
|
if USE_VK:
|
||||||
|
VAL_KEY_OBJ = json.dumps({
|
||||||
|
'timestamp_arrival': ts_arrival,
|
||||||
|
'timestamp_msg': ts_msg,
|
||||||
|
'timestamp_value': ts_value,
|
||||||
|
'price': px,
|
||||||
|
'qty': qty,
|
||||||
|
'side_taker': side_taker,
|
||||||
|
'sec_remaining': sec_remaining,
|
||||||
|
'target_price': TARGET_PX
|
||||||
|
})
|
||||||
|
VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
|
||||||
|
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
|
||||||
|
if USE_DB:
|
||||||
|
await insert_poly_btcusd_trades_table(
|
||||||
|
CON=CON,
|
||||||
|
timestamp_arrival=ts_arrival,
|
||||||
|
timestamp_msg=ts_msg,
|
||||||
|
timestamp_value=ts_value,
|
||||||
|
price=px,
|
||||||
|
qty=qty,
|
||||||
|
side_taker=side_taker,
|
||||||
|
)
|
||||||
|
|
||||||
|
elif event_type == "book":
|
||||||
|
pass
|
||||||
|
elif event_type == "new_market":
|
||||||
|
print('Received new_market')
|
||||||
|
elif event_type == "market_resolved":
|
||||||
|
print(f"Received: {event_type}")
|
||||||
|
print(data)
|
||||||
|
elif event_type == "tick_size_change": # may want for CLOB order routing
|
||||||
|
print(f"Received: {event_type}")
|
||||||
|
print(data)
|
||||||
|
else:
|
||||||
|
print(f"*********** REC UNMAPPED EVENT: {event_type}")
|
||||||
|
print(data)
|
||||||
|
elif isinstance(data, dict):
|
||||||
|
# print(data.get("event_type", None))
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
||||||
|
|
||||||
|
except websockets.ConnectionClosed as e:
|
||||||
|
print(f"Connection closed by server. Exception: {e}")
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
global VAL_KEY
|
||||||
|
global CON
|
||||||
|
|
||||||
|
if USE_VK:
|
||||||
|
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||||
|
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||||
|
else:
|
||||||
|
VAL_KEY = None
|
||||||
|
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||||
|
|
||||||
|
if USE_DB:
|
||||||
|
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||||
|
async with engine.connect() as CON:
|
||||||
|
await create_poly_btcusd_trades_table(CON=CON)
|
||||||
|
await polymarket_stream()
|
||||||
|
else:
|
||||||
|
CON = None
|
||||||
|
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||||
|
await polymarket_stream()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
START_TIME = round(datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
|
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
force=True,
|
||||||
|
filename=LOG_FILEPATH,
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
filemode='w'
|
||||||
|
)
|
||||||
|
logging.info(f"STARTED: {START_TIME}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt as e:
|
||||||
|
print(f"Stream stopped: {e}")
|
||||||
224
ws_coinbase.py
Normal file
224
ws_coinbase.py
Normal file
@@ -0,0 +1,224 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
import traceback
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import AsyncContextManager
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
|
||||||
|
from sqlalchemy import text
|
||||||
|
import websockets
|
||||||
|
from sqlalchemy.ext.asyncio import create_async_engine
|
||||||
|
import valkey
|
||||||
|
|
||||||
|
### Allow only ipv4 ###
|
||||||
|
def allowed_gai_family():
|
||||||
|
return socket.AF_INET
|
||||||
|
urllib3_cn.allowed_gai_family = allowed_gai_family
|
||||||
|
|
||||||
|
### Database ###
|
||||||
|
USE_DB: bool = True
|
||||||
|
USE_VK: bool = True
|
||||||
|
VK_CHANNEL = 'poly_coinbase_btcusd'
|
||||||
|
CON: AsyncContextManager | None = None
|
||||||
|
VAL_KEY = None
|
||||||
|
|
||||||
|
### Logging ###
|
||||||
|
LOG_FILEPATH: str = '/root/logs/Polymarket_coinbase_Trades.log'
|
||||||
|
|
||||||
|
### Globals ###
|
||||||
|
WSS_URL = "wss://ws-feed.exchange.coinbase.com"
|
||||||
|
# HIST_TRADES = np.empty((0, 2))
|
||||||
|
|
||||||
|
### Database Funcs ###
|
||||||
|
async def create_rtds_btcusd_table(
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
logging.info('Creating Table if Does Not Exist: coinbase_btcusd_trades')
|
||||||
|
await CON.execute(text("""
|
||||||
|
CREATE TABLE IF NOT EXISTS coinbase_btcusd_trades (
|
||||||
|
timestamp_arrival BIGINT,
|
||||||
|
timestamp_msg BIGINT,
|
||||||
|
timestamp_value BIGINT,
|
||||||
|
value DOUBLE,
|
||||||
|
qty DOUBLE,
|
||||||
|
side VARCHAR(8)
|
||||||
|
);
|
||||||
|
"""))
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
async def insert_rtds_btcusd_table(
|
||||||
|
timestamp_arrival: int,
|
||||||
|
timestamp_msg: int,
|
||||||
|
timestamp_value: int,
|
||||||
|
value: float,
|
||||||
|
qty: float,
|
||||||
|
side: str,
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
params={
|
||||||
|
'timestamp_arrival': timestamp_arrival,
|
||||||
|
'timestamp_msg': timestamp_msg,
|
||||||
|
'timestamp_value': timestamp_value,
|
||||||
|
'value': value,
|
||||||
|
'qty': qty,
|
||||||
|
'side': side,
|
||||||
|
}
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
await CON.execute(text("""
|
||||||
|
INSERT INTO coinbase_btcusd_trades
|
||||||
|
(
|
||||||
|
timestamp_arrival,
|
||||||
|
timestamp_msg,
|
||||||
|
timestamp_value,
|
||||||
|
value,
|
||||||
|
qty,
|
||||||
|
side
|
||||||
|
)
|
||||||
|
VALUES
|
||||||
|
(
|
||||||
|
:timestamp_arrival,
|
||||||
|
:timestamp_msg,
|
||||||
|
:timestamp_value,
|
||||||
|
:value,
|
||||||
|
:qty,
|
||||||
|
:side
|
||||||
|
)
|
||||||
|
"""),
|
||||||
|
parameters=params
|
||||||
|
)
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
|
||||||
|
### Websocket ###
|
||||||
|
async def coinbase_trades_stream():
|
||||||
|
global HIST_TRADES
|
||||||
|
|
||||||
|
async with websockets.connect(WSS_URL) as websocket:
|
||||||
|
logging.info(f"Connected to {WSS_URL}")
|
||||||
|
|
||||||
|
subscribe_msg = {
|
||||||
|
"type": "subscribe",
|
||||||
|
"product_ids": ["BTC-USD"],
|
||||||
|
"channels": [
|
||||||
|
{
|
||||||
|
"name": "ticker",
|
||||||
|
"product_ids": ["BTC-USD"]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
await websocket.send(json.dumps(subscribe_msg))
|
||||||
|
|
||||||
|
try:
|
||||||
|
async for message in websocket:
|
||||||
|
if isinstance(message, str) or isinstance(message, bytes):
|
||||||
|
try:
|
||||||
|
data = json.loads(message)
|
||||||
|
if data.get('price', None) is not None:
|
||||||
|
ts_arrival = round(datetime.now().timestamp()*1000)
|
||||||
|
ts_msg = round(datetime.strptime(data['time'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()*1000)
|
||||||
|
ts_value = ts_msg
|
||||||
|
last_px = float(data['price'])
|
||||||
|
qty = float(data['last_size'])
|
||||||
|
side = data['side']
|
||||||
|
print(f'🤑 BTC Coinbase Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};')
|
||||||
|
if USE_VK:
|
||||||
|
VAL_KEY_OBJ = json.dumps({
|
||||||
|
'timestamp_arrival': ts_arrival,
|
||||||
|
'timestamp_msg': ts_msg,
|
||||||
|
'timestamp_value': ts_value,
|
||||||
|
'value': last_px,
|
||||||
|
'qty': qty,
|
||||||
|
'side': side,
|
||||||
|
})
|
||||||
|
VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
|
||||||
|
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
|
||||||
|
if USE_DB:
|
||||||
|
await insert_rtds_btcusd_table(
|
||||||
|
CON=CON,
|
||||||
|
timestamp_arrival=ts_arrival,
|
||||||
|
timestamp_msg=ts_msg,
|
||||||
|
timestamp_value=ts_value,
|
||||||
|
value=last_px,
|
||||||
|
qty=qty,
|
||||||
|
side=side,
|
||||||
|
)
|
||||||
|
# elif data.get('op'):
|
||||||
|
# if data['op'] == 'PING':
|
||||||
|
# pong = {"op": "PONG", "timestamp": ts_arrival}
|
||||||
|
# await websocket.send(json.dumps(pong))
|
||||||
|
# logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}')
|
||||||
|
else:
|
||||||
|
logging.info(f'Initial or unexpected data struct, skipping: {data}')
|
||||||
|
continue
|
||||||
|
except (json.JSONDecodeError, ValueError) as e:
|
||||||
|
logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}')
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Type: {type(message)} not expected: {message}')
|
||||||
|
except websockets.ConnectionClosed as e:
|
||||||
|
logging.error(f'Connection closed: {e}')
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f'Connection closed: {e}')
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
global VAL_KEY
|
||||||
|
global CON
|
||||||
|
|
||||||
|
if USE_VK:
|
||||||
|
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||||
|
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||||
|
else:
|
||||||
|
VAL_KEY = None
|
||||||
|
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||||
|
|
||||||
|
if USE_DB:
|
||||||
|
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||||
|
async with engine.connect() as CON:
|
||||||
|
await create_rtds_btcusd_table(CON=CON)
|
||||||
|
await coinbase_trades_stream()
|
||||||
|
else:
|
||||||
|
CON = None
|
||||||
|
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||||
|
await coinbase_trades_stream()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
START_TIME = round(datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
|
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
force=True,
|
||||||
|
filename=LOG_FILEPATH,
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
filemode='w'
|
||||||
|
)
|
||||||
|
logging.info(f"STARTED: {START_TIME}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info("Stream stopped")
|
||||||
219
ws_pionex.py
Normal file
219
ws_pionex.py
Normal file
@@ -0,0 +1,219 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
import traceback
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import AsyncContextManager
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
|
||||||
|
from sqlalchemy import text
|
||||||
|
import websockets
|
||||||
|
from sqlalchemy.ext.asyncio import create_async_engine
|
||||||
|
import valkey
|
||||||
|
|
||||||
|
### Allow only ipv4 ###
|
||||||
|
def allowed_gai_family():
|
||||||
|
return socket.AF_INET
|
||||||
|
urllib3_cn.allowed_gai_family = allowed_gai_family
|
||||||
|
|
||||||
|
### Database ###
|
||||||
|
USE_DB: bool = True
|
||||||
|
USE_VK: bool = True
|
||||||
|
VK_CHANNEL = 'poly_pionex_btcusd'
|
||||||
|
CON: AsyncContextManager | None = None
|
||||||
|
VAL_KEY = None
|
||||||
|
|
||||||
|
|
||||||
|
### Logging ###
|
||||||
|
LOG_FILEPATH: str = '/root/logs/Polymarket_Pionex_Trades.log'
|
||||||
|
|
||||||
|
### Globals ###
|
||||||
|
WSS_URL = "wss://ws.pionex.com/wsPub"
|
||||||
|
# HIST_TRADES = np.empty((0, 2))
|
||||||
|
|
||||||
|
### Database Funcs ###
|
||||||
|
async def create_rtds_btcusd_table(
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
logging.info('Creating Table if Does Not Exist: pionex_btcusd_trades')
|
||||||
|
await CON.execute(text("""
|
||||||
|
CREATE TABLE IF NOT EXISTS pionex_btcusd_trades (
|
||||||
|
timestamp_msg BIGINT,
|
||||||
|
timestamp_value BIGINT,
|
||||||
|
value DOUBLE,
|
||||||
|
qty DOUBLE,
|
||||||
|
side VARCHAR(8)
|
||||||
|
);
|
||||||
|
"""))
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
async def insert_rtds_btcusd_table(
|
||||||
|
timestamp_msg: int,
|
||||||
|
timestamp_value: int,
|
||||||
|
value: float,
|
||||||
|
qty: float,
|
||||||
|
side: str,
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
params={
|
||||||
|
'timestamp_msg': timestamp_msg,
|
||||||
|
'timestamp_value': timestamp_value,
|
||||||
|
'value': value,
|
||||||
|
'qty': qty,
|
||||||
|
'side': side,
|
||||||
|
}
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
await CON.execute(text("""
|
||||||
|
INSERT INTO pionex_btcusd_trades
|
||||||
|
(
|
||||||
|
timestamp_msg,
|
||||||
|
timestamp_value,
|
||||||
|
value,
|
||||||
|
qty,
|
||||||
|
side
|
||||||
|
)
|
||||||
|
VALUES
|
||||||
|
(
|
||||||
|
:timestamp_msg,
|
||||||
|
:timestamp_value,
|
||||||
|
:value,
|
||||||
|
:qty,
|
||||||
|
:side
|
||||||
|
)
|
||||||
|
"""),
|
||||||
|
parameters=params
|
||||||
|
)
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
|
||||||
|
### Websocket ###
|
||||||
|
async def pionex_trades_stream():
|
||||||
|
global HIST_TRADES
|
||||||
|
|
||||||
|
async with websockets.connect(WSS_URL) as websocket:
|
||||||
|
logging.info(f"Connected to {WSS_URL}")
|
||||||
|
|
||||||
|
subscribe_msg = {
|
||||||
|
"op": "SUBSCRIBE",
|
||||||
|
"topic": "TRADE",
|
||||||
|
"symbol": "BTC_USDT"
|
||||||
|
}
|
||||||
|
|
||||||
|
await websocket.send(json.dumps(subscribe_msg))
|
||||||
|
|
||||||
|
try:
|
||||||
|
async for message in websocket:
|
||||||
|
if isinstance(message, str) or isinstance(message, bytes):
|
||||||
|
try:
|
||||||
|
data = json.loads(message)
|
||||||
|
if data.get('data', None) is not None:
|
||||||
|
ts_msg = data['timestamp']
|
||||||
|
data = data['data']
|
||||||
|
ts_value = data[0]['timestamp']
|
||||||
|
last_px = float(data[0]['price'])
|
||||||
|
qty = float(data[0]['size'])
|
||||||
|
side = data[0]['side']
|
||||||
|
print(f'🤑 BTC Pionex Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};')
|
||||||
|
print(ts_value)
|
||||||
|
if USE_VK:
|
||||||
|
VAL_KEY.publish(VK_CHANNEL, json.dumps({
|
||||||
|
'timestamp_msg': ts_msg,
|
||||||
|
'timestamp_value': ts_value,
|
||||||
|
'value': last_px,
|
||||||
|
'qty': qty,
|
||||||
|
'side': side,
|
||||||
|
}))
|
||||||
|
VAL_KEY.set(VK_CHANNEL, json.dumps({
|
||||||
|
'timestamp_msg': ts_msg,
|
||||||
|
'timestamp_value': ts_value,
|
||||||
|
'value': last_px,
|
||||||
|
'qty': qty,
|
||||||
|
'side': side,
|
||||||
|
}))
|
||||||
|
if USE_DB:
|
||||||
|
await insert_rtds_btcusd_table(
|
||||||
|
CON=CON,
|
||||||
|
timestamp_msg=ts_msg,
|
||||||
|
timestamp_value=ts_value,
|
||||||
|
value=last_px,
|
||||||
|
qty=qty,
|
||||||
|
side=side,
|
||||||
|
)
|
||||||
|
elif data.get('op'):
|
||||||
|
if data['op'] == 'PING':
|
||||||
|
pong = {"op": "PONG", "timestamp": round(datetime.now().timestamp()*1000)}
|
||||||
|
await websocket.send(json.dumps(pong))
|
||||||
|
logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}')
|
||||||
|
else:
|
||||||
|
logging.info(f'Initial or unexpected data struct, skipping: {data}')
|
||||||
|
continue
|
||||||
|
except (json.JSONDecodeError, ValueError) as e:
|
||||||
|
logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}')
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Type: {type(message)} not expected: {message}')
|
||||||
|
except websockets.ConnectionClosed as e:
|
||||||
|
logging.error(f'Connection closed: {e}')
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f'Connection closed: {e}')
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
global VAL_KEY
|
||||||
|
global CON
|
||||||
|
|
||||||
|
if USE_VK:
|
||||||
|
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||||
|
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||||
|
else:
|
||||||
|
VAL_KEY = None
|
||||||
|
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||||
|
|
||||||
|
if USE_DB:
|
||||||
|
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||||
|
async with engine.connect() as CON:
|
||||||
|
await create_rtds_btcusd_table(CON=CON)
|
||||||
|
await pionex_trades_stream()
|
||||||
|
else:
|
||||||
|
CON = None
|
||||||
|
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||||
|
await pionex_trades_stream()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
START_TIME = round(datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
|
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
force=True,
|
||||||
|
filename=LOG_FILEPATH,
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
filemode='w'
|
||||||
|
)
|
||||||
|
logging.info(f"STARTED: {START_TIME}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info("Stream stopped")
|
||||||
176
ws_rtds.py
176
ws_rtds.py
@@ -1,24 +1,108 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import math
|
import logging
|
||||||
import pandas as pd
|
import socket
|
||||||
import os
|
import traceback
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime
|
||||||
import websockets
|
from typing import AsyncContextManager
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import talib
|
import pandas as pd
|
||||||
import requests
|
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
|
||||||
|
from sqlalchemy import text
|
||||||
|
import websockets
|
||||||
|
from sqlalchemy.ext.asyncio import create_async_engine
|
||||||
|
import valkey
|
||||||
|
|
||||||
|
### Allow only ipv4 ###
|
||||||
|
def allowed_gai_family():
|
||||||
|
return socket.AF_INET
|
||||||
|
urllib3_cn.allowed_gai_family = allowed_gai_family
|
||||||
|
|
||||||
|
### Database ###
|
||||||
|
USE_DB: bool = True
|
||||||
|
USE_VK: bool = True
|
||||||
|
VK_CHANNEL = 'poly_rtds_cl_btcusd'
|
||||||
|
CON: AsyncContextManager | None = None
|
||||||
|
VAL_KEY = None
|
||||||
|
|
||||||
|
|
||||||
|
### Logging ###
|
||||||
|
LOG_FILEPATH: str = '/root/logs/Polymarket_RTDS.log'
|
||||||
|
|
||||||
|
### Globals ###
|
||||||
WSS_URL = "wss://ws-live-data.polymarket.com"
|
WSS_URL = "wss://ws-live-data.polymarket.com"
|
||||||
|
# HIST_TRADES = np.empty((0, 2))
|
||||||
|
|
||||||
HIST_TRADES = np.empty((0, 2))
|
### Database Funcs ###
|
||||||
|
async def create_rtds_btcusd_table(
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
logging.info('Creating Table if Does Not Exist: poly_rtds_cl_btcusd')
|
||||||
|
await CON.execute(text("""
|
||||||
|
CREATE TABLE IF NOT EXISTS poly_rtds_cl_btcusd (
|
||||||
|
timestamp_arrival BIGINT,
|
||||||
|
timestamp_msg BIGINT,
|
||||||
|
timestamp_value BIGINT,
|
||||||
|
value DOUBLE
|
||||||
|
);
|
||||||
|
"""))
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
async def insert_rtds_btcusd_table(
|
||||||
|
timestamp_arrival: int,
|
||||||
|
timestamp_msg: int,
|
||||||
|
timestamp_value: int,
|
||||||
|
value: int,
|
||||||
|
CON: AsyncContextManager,
|
||||||
|
engine: str = 'mysql', # mysql | duckdb
|
||||||
|
) -> None:
|
||||||
|
params={
|
||||||
|
'timestamp_arrival': timestamp_arrival,
|
||||||
|
'timestamp_msg': timestamp_msg,
|
||||||
|
'timestamp_value': timestamp_value,
|
||||||
|
'value': value,
|
||||||
|
}
|
||||||
|
if CON is None:
|
||||||
|
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
|
||||||
|
else:
|
||||||
|
if engine == 'mysql':
|
||||||
|
await CON.execute(text("""
|
||||||
|
INSERT INTO poly_rtds_cl_btcusd
|
||||||
|
(
|
||||||
|
timestamp_arrival,
|
||||||
|
timestamp_msg,
|
||||||
|
timestamp_value,
|
||||||
|
value
|
||||||
|
)
|
||||||
|
VALUES
|
||||||
|
(
|
||||||
|
:timestamp_arrival,
|
||||||
|
:timestamp_msg,
|
||||||
|
:timestamp_value,
|
||||||
|
:value
|
||||||
|
)
|
||||||
|
"""),
|
||||||
|
parameters=params
|
||||||
|
)
|
||||||
|
await CON.commit()
|
||||||
|
else:
|
||||||
|
raise ValueError('Only MySQL engine is implemented')
|
||||||
|
|
||||||
|
|
||||||
|
### Websocket ###
|
||||||
async def rtds_stream():
|
async def rtds_stream():
|
||||||
global HIST_TRADES
|
global HIST_TRADES
|
||||||
|
|
||||||
async with websockets.connect(WSS_URL) as websocket:
|
async for websocket in websockets.connect(WSS_URL):
|
||||||
print(f"Connected to {WSS_URL}")
|
logging.info(f"Connected to {WSS_URL}")
|
||||||
|
|
||||||
subscribe_msg = {
|
subscribe_msg = {
|
||||||
"action": "subscribe",
|
"action": "subscribe",
|
||||||
@@ -39,23 +123,79 @@ async def rtds_stream():
|
|||||||
try:
|
try:
|
||||||
data = json.loads(message)
|
data = json.loads(message)
|
||||||
if data['payload'].get('value', None) is not None:
|
if data['payload'].get('value', None) is not None:
|
||||||
print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['timestamp'], unit='ms')}')
|
ts_arrival = round(datetime.now().timestamp()*1000)
|
||||||
|
print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['payload']['timestamp'], unit='ms')}')
|
||||||
|
VAL_KEY_OBJ = json.dumps({
|
||||||
|
'timestamp_arrival': ts_arrival,
|
||||||
|
'timestamp_msg': data['timestamp'],
|
||||||
|
'timestamp_value': data['payload']['timestamp'],
|
||||||
|
'value': data['payload']['value'],
|
||||||
|
})
|
||||||
|
VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
|
||||||
|
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
|
||||||
|
await insert_rtds_btcusd_table(
|
||||||
|
CON=CON,
|
||||||
|
timestamp_arrival=ts_arrival,
|
||||||
|
timestamp_msg=data['timestamp'],
|
||||||
|
timestamp_value=data['payload']['timestamp'],
|
||||||
|
value=data['payload']['value'],
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
print(f'Initial or unexpected data struct, skipping: {data}')
|
# logging.info(f'Initial or unexpected data struct, skipping: {data}')
|
||||||
|
logging.info('Initial or unexpected data struct, skipping')
|
||||||
continue
|
continue
|
||||||
except (json.JSONDecodeError, ValueError):
|
except (json.JSONDecodeError, ValueError):
|
||||||
print(f'Message not in JSON format, skipping: {message}')
|
logging.warning(f'Message not in JSON format, skipping: {message}')
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
raise ValueError(f'Type: {type(data)} not expected: {message}')
|
||||||
|
except websockets.ConnectionClosed as e:
|
||||||
|
logging.error(f'Connection closed: {e}')
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f'Connection closed: {e}')
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
|
||||||
except websockets.ConnectionClosed:
|
|
||||||
print("Connection closed by server.")
|
async def main():
|
||||||
|
global VAL_KEY
|
||||||
|
global CON
|
||||||
|
|
||||||
|
if USE_VK:
|
||||||
|
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
|
||||||
|
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
|
||||||
|
else:
|
||||||
|
VAL_KEY = None
|
||||||
|
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
|
||||||
|
|
||||||
|
if USE_DB:
|
||||||
|
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
|
||||||
|
async with engine.connect() as CON:
|
||||||
|
await create_rtds_btcusd_table(CON=CON)
|
||||||
|
await rtds_stream()
|
||||||
|
else:
|
||||||
|
CON = None
|
||||||
|
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
|
||||||
|
await rtds_stream()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
START_TIME = round(datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
|
logging.info(f'Log FilePath: {LOG_FILEPATH}')
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
force=True,
|
||||||
|
filename=LOG_FILEPATH,
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
filemode='w'
|
||||||
|
)
|
||||||
|
logging.info(f"STARTED: {START_TIME}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
asyncio.run(rtds_stream())
|
asyncio.run(main())
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("Stream stopped.")
|
logging.info("Stream stopped")
|
||||||
Reference in New Issue
Block a user