transfer to aws

This commit is contained in:
2026-03-29 16:27:58 +00:00
parent c051130867
commit 8d7d99d749
13 changed files with 1607 additions and 188 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 337 KiB

View File

@@ -2,18 +2,19 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 2,
"id": "4cae6bf1",
"metadata": {},
"outputs": [],
"source": [
"from sqlalchemy import create_engine, text\n",
"import pandas as pd"
"import pandas as pd\n",
"from datetime import datetime"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 6,
"id": "f5040527",
"metadata": {},
"outputs": [
@@ -21,14 +22,13 @@
"name": "stdout",
"output_type": "stream",
"text": [
"Connection failed: (pymysql.err.OperationalError) (2003, \"Can't connect to MySQL server on 'localhost' ([Errno 111] Connection refused)\")\n",
"(Background on this error at: https://sqlalche.me/e/20/e3q8)\n"
"Connection successful\n"
]
}
],
"source": [
"### MYSQL ###\n",
"engine = create_engine('mysql+pymysql://root:pwd@localhost/mkt_maker')\n",
"engine = create_engine('mysql+pymysql://root:pwd@localhost/polymarket')\n",
"try:\n",
" with engine.connect() as conn:\n",
" print(\"Connection successful\")\n",
@@ -38,10 +38,210 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 7,
"id": "5c23110d",
"metadata": {},
"outputs": [],
"source": [
"query = '''\n",
"SELECT timestamp_msg,timestamp_value,`value` FROM poly_rtds_cl_btcusd;\n",
"'''"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "a866e9ca",
"metadata": {},
"outputs": [],
"source": [
"df = pd.read_sql(query, con=engine)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "954a3c3c",
"metadata": {},
"outputs": [],
"source": [
"df['ts'] = pd.to_datetime(df['timestamp_value'], unit='ms')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "f11fd680",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1774752413939"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"round(datetime.now().timestamp()*1000)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "eadb0364",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Timestamp('2026-03-29 03:22:03.145000')"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pd.to_datetime(1774754523145, unit='ms')"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "3fbf525c",
"metadata": {},
"outputs": [],
"source": [
"d = {'id': '1753563', 'question': 'Bitcoin Up or Down - March 29, 12:40AM-12:45AM ET', 'conditionId': '0x4cf6815a61939a9a0ee308772e727bbaf8e95803577aad7b9a9d3e028e37f13b', 'slug': 'btc-updown-5m-1774759200', 'resolutionSource': 'https://data.chain.link/streams/btc-usd', 'endDate': '2026-03-29T04:45:00Z', 'liquidity': '40412.2984', 'startDate': '2026-03-28T04:48:26.823506Z', 'image': 'https://polymarket-upload.s3.us-east-2.amazonaws.com/BTC+fullsize.png', 'icon': 'https://polymarket-upload.s3.us-east-2.amazonaws.com/BTC+fullsize.png', 'description': 'This market will resolve to \"Up\" if the Bitcoin price at the end of the time range specified in the title is greater than or equal to the price at the beginning of that range. Otherwise, it will resolve to \"Down\".\\nThe resolution source for this market is information from Chainlink, specifically the BTC/USD data stream available at https://data.chain.link/streams/btc-usd.\\nPlease note that this market is about the price according to Chainlink data stream BTC/USD, not according to other sources or spot markets.', 'outcomes': '[\"Up\", \"Down\"]', 'outcomePrices': '[\"0.505\", \"0.495\"]', 'volume': '1154.594883', 'active': True, 'closed': False, 'marketMakerAddress': '', 'createdAt': '2026-03-28T04:47:13.404208Z', 'updatedAt': '2026-03-29T04:38:45.64501Z', 'new': False, 'featured': False, 'archived': False, 'restricted': True, 'groupItemThreshold': '0', 'questionID': '0x7e86ce5e7ba0eb24758756db7c1443bf48041d08ac10364cf6771ef3f9c26733', 'enableOrderBook': True, 'orderPriceMinTickSize': 0.01, 'orderMinSize': 5, 'volumeNum': 1154.594883, 'liquidityNum': 40412.2984, 'endDateIso': '2026-03-29', 'startDateIso': '2026-03-28', 'hasReviewedDates': True, 'volume24hr': 1154.594883, 'volume1wk': 1154.594883, 'volume1mo': 1154.594883, 'volume1yr': 1154.594883, 'clobTokenIds': '[\"3132608671382208432230794800974499111421928258370863811882545679011841068490\", \"6273455409880805876304408793123549040642604317110995252288748628688958052125\"]', 'volume24hrClob': 1154.594883, 'volume1wkClob': 1154.594883, 'volume1moClob': 1154.594883, 'volume1yrClob': 1154.594883, 'volumeClob': 1154.594883, 'liquidityClob': 40412.2984, 'makerBaseFee': 1000, 'takerBaseFee': 1000, 'acceptingOrders': True, 'negRisk': False, 'ready': False, 'funded': False, 'acceptingOrdersTimestamp': '2026-03-28T04:47:21Z', 'cyom': False, 'competitive': 0.9999750006249843, 'pagerDutyNotificationEnabled': False, 'approved': True, 'rewardsMinSize': 50, 'rewardsMaxSpread': 4.5, 'spread': 0.01, 'lastTradePrice': 0.51, 'bestBid': 0.5, 'bestAsk': 0.51, 'automaticallyActive': True, 'clearBookOnStart': False, 'showGmpSeries': False, 'showGmpOutcome': False, 'manualActivation': False, 'negRiskOther': False, 'umaResolutionStatuses': '[]', 'pendingDeployment': False, 'deploying': False, 'rfqEnabled': False, 'eventStartTime': '2026-03-29T04:40:00Z', 'holdingRewardsEnabled': False, 'feesEnabled': True, 'requiresTranslation': False, 'makerRebatesFeeShareBps': 10000, 'feeType': 'crypto_fees', 'feeSchedule': {'exponent': 2, 'rate': 0.25, 'takerOnly': True, 'rebateRate': 0.2}}"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "699031ee",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': '1753563',\n",
" 'question': 'Bitcoin Up or Down - March 29, 12:40AM-12:45AM ET',\n",
" 'conditionId': '0x4cf6815a61939a9a0ee308772e727bbaf8e95803577aad7b9a9d3e028e37f13b',\n",
" 'slug': 'btc-updown-5m-1774759200',\n",
" 'resolutionSource': 'https://data.chain.link/streams/btc-usd',\n",
" 'endDate': '2026-03-29T04:45:00Z',\n",
" 'liquidity': '40412.2984',\n",
" 'startDate': '2026-03-28T04:48:26.823506Z',\n",
" 'image': 'https://polymarket-upload.s3.us-east-2.amazonaws.com/BTC+fullsize.png',\n",
" 'icon': 'https://polymarket-upload.s3.us-east-2.amazonaws.com/BTC+fullsize.png',\n",
" 'description': 'This market will resolve to \"Up\" if the Bitcoin price at the end of the time range specified in the title is greater than or equal to the price at the beginning of that range. Otherwise, it will resolve to \"Down\".\\nThe resolution source for this market is information from Chainlink, specifically the BTC/USD data stream available at https://data.chain.link/streams/btc-usd.\\nPlease note that this market is about the price according to Chainlink data stream BTC/USD, not according to other sources or spot markets.',\n",
" 'outcomes': '[\"Up\", \"Down\"]',\n",
" 'outcomePrices': '[\"0.505\", \"0.495\"]',\n",
" 'volume': '1154.594883',\n",
" 'active': True,\n",
" 'closed': False,\n",
" 'marketMakerAddress': '',\n",
" 'createdAt': '2026-03-28T04:47:13.404208Z',\n",
" 'updatedAt': '2026-03-29T04:38:45.64501Z',\n",
" 'new': False,\n",
" 'featured': False,\n",
" 'archived': False,\n",
" 'restricted': True,\n",
" 'groupItemThreshold': '0',\n",
" 'questionID': '0x7e86ce5e7ba0eb24758756db7c1443bf48041d08ac10364cf6771ef3f9c26733',\n",
" 'enableOrderBook': True,\n",
" 'orderPriceMinTickSize': 0.01,\n",
" 'orderMinSize': 5,\n",
" 'volumeNum': 1154.594883,\n",
" 'liquidityNum': 40412.2984,\n",
" 'endDateIso': '2026-03-29',\n",
" 'startDateIso': '2026-03-28',\n",
" 'hasReviewedDates': True,\n",
" 'volume24hr': 1154.594883,\n",
" 'volume1wk': 1154.594883,\n",
" 'volume1mo': 1154.594883,\n",
" 'volume1yr': 1154.594883,\n",
" 'clobTokenIds': '[\"3132608671382208432230794800974499111421928258370863811882545679011841068490\", \"6273455409880805876304408793123549040642604317110995252288748628688958052125\"]',\n",
" 'volume24hrClob': 1154.594883,\n",
" 'volume1wkClob': 1154.594883,\n",
" 'volume1moClob': 1154.594883,\n",
" 'volume1yrClob': 1154.594883,\n",
" 'volumeClob': 1154.594883,\n",
" 'liquidityClob': 40412.2984,\n",
" 'makerBaseFee': 1000,\n",
" 'takerBaseFee': 1000,\n",
" 'acceptingOrders': True,\n",
" 'negRisk': False,\n",
" 'ready': False,\n",
" 'funded': False,\n",
" 'acceptingOrdersTimestamp': '2026-03-28T04:47:21Z',\n",
" 'cyom': False,\n",
" 'competitive': 0.9999750006249843,\n",
" 'pagerDutyNotificationEnabled': False,\n",
" 'approved': True,\n",
" 'rewardsMinSize': 50,\n",
" 'rewardsMaxSpread': 4.5,\n",
" 'spread': 0.01,\n",
" 'lastTradePrice': 0.51,\n",
" 'bestBid': 0.5,\n",
" 'bestAsk': 0.51,\n",
" 'automaticallyActive': True,\n",
" 'clearBookOnStart': False,\n",
" 'showGmpSeries': False,\n",
" 'showGmpOutcome': False,\n",
" 'manualActivation': False,\n",
" 'negRiskOther': False,\n",
" 'umaResolutionStatuses': '[]',\n",
" 'pendingDeployment': False,\n",
" 'deploying': False,\n",
" 'rfqEnabled': False,\n",
" 'eventStartTime': '2026-03-29T04:40:00Z',\n",
" 'holdingRewardsEnabled': False,\n",
" 'feesEnabled': True,\n",
" 'requiresTranslation': False,\n",
" 'makerRebatesFeeShareBps': 10000,\n",
" 'feeType': 'crypto_fees',\n",
" 'feeSchedule': {'exponent': 2,\n",
" 'rate': 0.25,\n",
" 'takerOnly': True,\n",
" 'rebateRate': 0.2}}"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"d"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a620fa17",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "2071f014",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
@@ -69,7 +269,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.5"
"version": "3.13.12"
}
},
"nbformat": 4,

117
ng.py Normal file
View File

@@ -0,0 +1,117 @@
import os
from nicegui import ui, app
from sqlalchemy import create_engine
# import requests
import json
# import time
# import re
import valkey
# import asyncio
# import datetime as dt
# from random import random
# from nicegui_modules import data
# from nicegui_modules import ui_components
# from glide import GlideClient, NodeAddress, GlideClientConfiguration
LISTENING_CLIENT = None
LH_PAIR = 'BTC'
RH_PAIR = 'USD'
DEFAULT_TO_DARKMODE: bool = True
ALLOW_BODY_SCROLL: bool = True
LOOKBACK: int = 60
LOOKBACK_RT_TV_MAX_POINTS: int = 300
REFRESH_INTERVAL_SEC: int = 10
REFRESH_INTERVAL_RT_SEC: int = 0.1
ENGINE = create_engine('mysql+pymysql://root:pwd@localhost/polymarket')
VALKEY_R = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
# VALKEY_P = VALKEY_R.pubsub()
# VALKEY_P.subscribe('mexc_mkt_bookTicker')
def root():
app.add_static_files(max_cache_age=0, url_path='/static', local_directory=os.path.join(os.path.dirname(__file__), 'nicegui_modules/static'))
ui.add_head_html('''
<meta name="darkreader-lock">
<link rel="stylesheet" type="text/css" href="/static/styles.css">
<script type="text/javascript" src="https://unpkg.com/lightweight-charts/dist/lightweight-charts.standalone.production.js"></script>
<script src="/static/script.js"></script>
'''
)
# ui.add_head_html('<meta name="darkreader-lock">')
update_body_scroll(bool_override=ALLOW_BODY_SCROLL)
ui.sub_pages({
'/': rt_chart_page,
}).classes('w-full')
async def update_tv():
series_update = json.loads(VALKEY_R.get('poly_rtds_cl_btcusd'))
series_update_b = json.loads(VALKEY_R.get('poly_coinbase_btcusd'))
series_update_c = json.loads(VALKEY_R.get('poly_5min_btcusd'))
timestamp = round( ( series_update['timestamp_arrival'] / 1000 ) , 2)
timestamp_b = round( ( series_update_b['timestamp_arrival'] / 1000 ) , 2)
timestamp_c = round( ( series_update_c['timestamp_arrival'] / 1000 ) , 2)
value = float(series_update['value'])
value_b = float(series_update_b['value'])
value_c = float(series_update_c['price'])
data_dict = {
'timestamp': timestamp,
'timestamp_b': timestamp_b,
'timestamp_c': timestamp_c,
'value': value,
'value_b': value_b,
'value_c': value_c,
'target': series_update_c['target_price'],
'LOOKBACK_RT_TV_MAX_POINTS': LOOKBACK_RT_TV_MAX_POINTS,
}
ui.run_javascript(f'await update_tv(data_dict={data_dict});')
def update_body_scroll(e=None, bool_override=False):
if e is None:
if bool_override:
ui.query('body').style('height: 100%; overflow-y: auto;')
else:
ui.query('body').style('height: 100%; overflow-y: hidden;')
else:
if e.value:
ui.query('body').style('height: 100%; overflow-y: auto;')
else:
ui.query('body').style('height: 100%; overflow-y: hidden;')
# async def refresh_lookback_funcs(lookback: int = LOOKBACK):
# lookback = app.storage.user.get('lookback', lookback)
# await data.trades_pnl_graph.refresh(ENGINE=ENGINE, LH_PAIR=LH_PAIR, RH_PAIR=RH_PAIR, lookback=lookback)
# await ui_components.er_table.refresh(ENGINE=ENGINE, lookback=lookback)
# await ui_components.trades_table.refresh(ENGINE=ENGINE, lookback=lookback)
# await ui_components.er_stats.refresh(ENGINE=ENGINE, lookback=lookback)
async def rt_chart_page():
global LOOKBACK
LOOKBACK = app.storage.user.get('lookback', LOOKBACK)
timer = ui.timer(REFRESH_INTERVAL_RT_SEC, update_tv)
with ui.row():
with ui.column():
ui.switch('☸︎', value=ALLOW_BODY_SCROLL, on_change=lambda e: update_body_scroll(e))
with ui.column():
ui.switch('▶️', value=True).bind_value_to(timer, 'active')
with ui.column().style('position: absolute; right: 20px; font-family: monospace; align-self: center;'):
ui.label('Atwater Trading: Orderbook')
with ui.grid(columns=16).classes('w-full gap-0 auto-fit'):
with ui.card().tight().classes('w-full col-span-full no-shadow border border-black-200').style('overflow: auto;'):
ui.html('<div id="tv" style="width:100%; height:800px;"></div>', sanitize=False).classes('w-full')
ui.run_javascript('await create_tv();')
ui.run(root, storage_secret="123ABC", reload=True, dark=True, title='Atwater Trading')

View File

@@ -0,0 +1,220 @@
async function waitForVariable(variableName, timeout = 5000) {
const startTime = Date.now();
while (typeof window[variableName] === 'undefined') {
if (Date.now() - startTime > timeout) {
throw new Error(`Variable '${variableName}' not defined within ${timeout}ms`);
}
await new Promise(resolve => setTimeout(resolve, 100));
}
console.log(`Variable '${variableName}' is now defined.`);
}
async function update_tv(data_dict) {
window.data.push({ time: data_dict.timestamp, value: data_dict.value });
window.data_b.push({ time: data_dict.timestamp_b, value: data_dict.value_b });
window.data_c.push({ time: data_dict.timestamp_c, value: data_dict.value_c });
window.data_tgt.push({ time: data_dict.timestamp_c, value: data_dict.target });
window.lineSeries.update({ time: data_dict.timestamp, value: data_dict.value });
window.lineSeries_b.update({ time: data_dict.timestamp_b, value: data_dict.value_b });
window.lineSeries_c.update({ time: data_dict.timestamp_c, value: data_dict.value_c });
window.lineSeries_tgt.update({ time: data_dict.timestamp_c, value: data_dict.target });
// midPriceLine.applyOptions({
// price: data_dict.mid_px,
// color: '#c78228',
// lineWidth: 3,
// lineStyle: LightweightCharts.LineStyle.Dashed,
// axisLabelVisible: true,
// });
window.chart.timeScale().scrollToRealTime();
// const currentRange = window.chart.timeScale().getVisibleLogicalRange();
// window.chart.timeScale().fitContent();
// window.chart.timeScale().setVisibleLogicalRange(currentRange);
const MAX_DATA_POINTS = data_dict.LOOKBACK_RT_TV_MAX_POINTS;
if (window.lineSeries.data().length > MAX_DATA_POINTS) {
window.lineSeries.setData(lineSeries.data().slice(-MAX_DATA_POINTS));
}
if (window.lineSeries_b.data().length > MAX_DATA_POINTS) {
window.lineSeries_b.setData(lineSeries_b.data().slice(-MAX_DATA_POINTS));
}
if (window.lineSeries_c.data().length > MAX_DATA_POINTS) {
window.lineSeries_c.setData(lineSeries_c.data().slice(-MAX_DATA_POINTS));
}
if (window.lineSeries_tgt.data().length > MAX_DATA_POINTS) {
window.lineSeries_tgt.setData(lineSeries_tgt.data().slice(-MAX_DATA_POINTS));
}
};
async function create_tv() {
window.chart = LightweightCharts.createChart(document.getElementById('tv'),
{
autoSize: true,
toolbox: true,
timeScale: {
timeVisible: true, // Shows HH:mm on x-axis
secondsVisible: true // Optional: show seconds
},
rightPriceScale: {
visible: true
},
leftPriceScale: {
visible: true
},
layout: {
background: { type: 'solid', color: '#222' },
textColor: '#DDD',
},
grid: {
vertLines: {
color: '#e1e1e1', // Set vertical line color
visible: true,
style: 2, // 0: Solid, 1: Dashed, 2: Dotted, 3: LargeDashed, 4: SparseDotted
},
horzLines: {
color: '#e1e1e1', // Set horizontal line color
visible: true,
style: 2,
},
},
crosshair: { mode: LightweightCharts.CrosshairMode.Normal },
}
);
window.lineSeries = chart.addSeries(LightweightCharts.LineSeries, {
color: '#94fcdf',
priceScaleId: 'right'
// topColor: '#94fcdf',
// bottomColor: 'rgba(112, 171, 249, 0.28)',
// invertFilledArea: false
});
window.lineSeries_b = chart.addSeries(LightweightCharts.LineSeries, {
color: '#dd7525',
priceScaleId: 'right'
// topColor: '#94fcdf',
// bottomColor: 'rgba(112, 171, 249, 0.28)',
// invertFilledArea: false
});
window.lineSeries_c = chart.addSeries(LightweightCharts.LineSeries, {
color: '#ea0707',
priceScaleId: 'left',
priceRange: {
minValue: 0,
maxValue: 1
},
// topColor: '#94fcdf',
// bottomColor: 'rgba(112, 171, 249, 0.28)',
// invertFilledArea: false
});
window.lineSeries_tgt = chart.addSeries(LightweightCharts.LineSeries, {
color: '#ffffff',
priceScaleId: 'right',
lineStyle: LightweightCharts.LineStyle.Dashed
// topColor: '#94fcdf',
// bottomColor: 'rgba(112, 171, 249, 0.28)',
// invertFilledArea: false
});
// window.midPriceLine_Config = {
// price: 0,
// color: '#c78228',
// lineWidth: 3,
// lineStyle: LightweightCharts.LineStyle.Dashed,
// axisLabelVisible: false,
// };
// window.midPriceLine = window.lineSeries.createPriceLine(midPriceLine_Config);
window.data = [];
window.data_b = [];
window.data_c = [];
window.data_tgt = [];
window.lineSeries.setData(window.data);
window.lineSeries_b.setData(window.data_b);
window.lineSeries_c.setData(window.data_c);
window.lineSeries_tgt.setData(window.data_tgt);
// Create and style the tooltip html element
const container = document.getElementById('tv');
window.toolTipWidth = 200;
const toolTip = document.createElement('div');
toolTip.style = `width: ${window.toolTipWidth}px; height: 100%; position: absolute; display: none; padding: 8px; box-sizing: border-box; font-size: 12px; text-align: left; z-index: 1000; top: 12px; left: 12px; pointer-events: none; border-radius: 4px 4px 0px 0px; border-bottom: none; box-shadow: 0 2px 5px 0 rgba(117, 134, 150, 0.45);font-family: -apple-system, BlinkMacSystemFont, 'Trebuchet MS', Roboto, Ubuntu, sans-serif; -webkit-font-smoothing: antialiased; -moz-osx-font-smoothing: grayscale;`;
toolTip.style.background = `rgba(${'0, 0, 0'}, 0.25)`;
toolTip.style.color = 'white';
toolTip.style.borderColor = 'rgba( 239, 83, 80, 1)';
container.appendChild(toolTip);
// update tooltip
window.chart.subscribeCrosshairMove(async param => {
if (
param.point === undefined ||
!param.time ||
param.point.x < 0 ||
param.point.x > container.clientWidth ||
param.point.y < 0 ||
param.point.y > container.clientHeight
) {
toolTip.style.display = 'none';
} else {
// toolTip.style.height = '100%';
toolTip.style.alignContent = 'center';
const dateStr = new Date(param.time*1000).toISOString();
let data = await param.seriesData.get(window.lineSeries);
if (data === undefined) {
data = {}
data.value = 0
console.log('data is UNDEFINED, SETTING TO 0')
};
let data_b = await param.seriesData.get(window.lineSeries_b);
if (data_b === undefined) {
data_b = {}
data_b.value = 0
console.log('data is UNDEFINED, SETTING TO 0')
};
const value_px = data.value
const value_px_b = window.data_b.value
const value_px_c = window.data_c.value
const value_px_tgt = window.data_tgt.value
toolTip.style.display = 'block';
// <div style="color: ${'rgba( 239, 83, 80, 1)'}">
// Atwater Trading
// </div>
toolTip.innerHTML = `
<div style="font-size: 24px; margin: 4px 0px; color: ${'white'}">
Chainlink: ${Math.round(100 * value_px) / 100}
Binance: ${Math.round(100 * value_px_b) / 100}
</div>
<div style="color: ${'white'}">
${dateStr}
</div>
`;
let left = param.point.x; // relative to timeScale
const timeScaleWidth = chart.timeScale().width();
const priceScaleWidth = chart.priceScale('left').width();
const halfTooltipWidth = toolTipWidth / 2;
left += priceScaleWidth - halfTooltipWidth;
left = Math.min(left, priceScaleWidth + timeScaleWidth - toolTipWidth);
left = Math.max(left, priceScaleWidth);
toolTip.style.left = left + 'px';
toolTip.style.top = 0 + 'px';
}
});
window.chart.timeScale().fitContent();
console.log("TV Created!")
};

View File

@@ -0,0 +1,33 @@
/* Sticky Quasar Table for Dark Mode */
.table-sticky-dark .q-table__top,
.table-sticky-dark .q-table__bottom,
.table-sticky-dark thead tr:first-child th {
background-color: black;
}
.table-sticky-dark thead tr th {
position: sticky;
z-index: 1;
}
.table-sticky-dark thead tr:first-child th {
top: 0;
}
.table-sticky-dark tbody {
scroll-margin-top: 48px;
}
/* Sticky Quasar Table for Light Mode */
/* .table-sticky-light .q-table__top,
.table-sticky-light .q-table__bottom,
.table-sticky-light thead tr:first-child th {
background-color: rgb(229, 223, 223);
}
.table-sticky-light thead tr th {
position: sticky;
z-index: 1;
}
.table-sticky-light thead tr:first-child th {
top: 0;
}
.table-sticky-light tbody {
scroll-margin-top: 48px;
} */

File diff suppressed because one or more lines are too long

150
ws.py
View File

@@ -1,150 +0,0 @@
import asyncio
import json
import math
import pandas as pd
import os
from datetime import datetime, timezone
import websockets
import numpy as np
import talib
import requests
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
SLUG_END_TIME = 0
HIST_TRADES = np.empty((0, 2))
def format_timestamp(total_seconds) -> str:
minutes, seconds = divmod(total_seconds, 60)
return f"{minutes} minutes and {seconds} seconds"
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
interval_secs = interval_mins * 60
seconds = dt.timestamp()
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
return rounded_seconds
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
market = r.json()['markets'][0]
token_ids = json.loads(market.get("clobTokenIds", "[]"))
outcomes = json.loads(market.get("outcomes", "[]"))
d = dict(zip(outcomes, token_ids))
d['isActive'] = market['negRisk']
d['MinTickSize'] = market['orderPriceMinTickSize']
d['isNegRisk'] = market['negRisk']
d['ConditionId'] = market['conditionId']
d['EndDateTime'] = market['endDate']
return d, market
def gen_slug():
slug_prefix = 'btc-updown-5m-'
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
return slug_prefix + str(slug_time_id)
async def polymarket_stream():
global SLUG_END_TIME
global HIST_TRADES
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
TARGET_ASSET_ID = market_details['Up']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
async with websockets.connect(WSS_URL) as websocket:
print(f"Connected to {WSS_URL}")
subscribe_msg = {
"assets_ids": [TARGET_ASSET_ID],
"type": "market",
"custom_feature_enabled": True
}
await websocket.send(json.dumps(subscribe_msg))
print(f"Subscribed to Asset: {TARGET_ASSET_ID}")
try:
async for message in websocket:
current_ts = round(datetime.now().timestamp())
sec_remaining = SLUG_END_TIME - current_ts
if sec_remaining <= 0:
HIST_TRADES = np.empty((0, 2))
print('*** Attempting to unsub from past 5min')
update_unsub_msg = {
"operation": 'unsubscribe',
"assets_ids": [TARGET_ASSET_ID],
"custom_feature_enabled": True
}
await websocket.send(json.dumps(update_unsub_msg))
print('*** Attempting to SUB to new 5min')
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
TARGET_ASSET_ID = market_details['Up']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
update_sub_msg = {
"operation": 'subscribe',
"assets_ids": [TARGET_ASSET_ID],
"custom_feature_enabled": True
}
await websocket.send(json.dumps(update_sub_msg))
if isinstance(message, str):
data = json.loads(message)
if isinstance(data, dict):
# print(data.get("event_type", None))
pass
elif isinstance(data, list):
print('initial book: ')
print(data)
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
event_type = data.get("event_type", None)
if event_type == "price_change":
# print("📈 Price Change")
# print(pd.DataFrame(data['price_changes']))
pass
elif event_type == "best_bid_ask":
# print(pd.DataFrame([data]))
pass
elif event_type == "last_trade_price":
px = float(data['price'])
qty = float(data['size'])
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
elif event_type == "book":
pass
elif event_type == "new_market":
print('Received new_market')
elif event_type == "market_resolved":
print(f"Received: {event_type}")
print(data)
elif event_type == "tick_size_change": # may want for CLOB order routing
print(f"Received: {event_type}")
print(data)
else:
print(f"Received: {event_type}")
print(data)
except websockets.ConnectionClosed:
print("Connection closed by server.")
if __name__ == '__main__':
try:
asyncio.run(polymarket_stream())
except KeyboardInterrupt:
print("Stream stopped.")

199
ws_binance.py Normal file
View File

@@ -0,0 +1,199 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_binance_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
LOG_FILEPATH: str = '/root/logs/Polymarket_Binance_Trades.log'
### Globals ###
WSS_URL = "wss://stream.binance.com:9443/ws/BTCUSDT@trade"
# HIST_TRADES = np.empty((0, 2))
### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: binance_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS binance_btcusd_trades (
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
qty DOUBLE
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_msg: int,
timestamp_value: int,
value: float,
qty: float,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
'qty': qty,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO binance_btcusd_trades
(
timestamp_msg,
timestamp_value,
value,
qty
)
VALUES
(
:timestamp_msg,
:timestamp_value,
:value,
:qty
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def binance_trades_stream():
global HIST_TRADES
async for websocket in websockets.connect(WSS_URL):
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
"method": "SUBSCRIBE",
"params": ["btcusdt@trade"],
"id": 1
}
await websocket.send(json.dumps(subscribe_msg))
try:
async for message in websocket:
if isinstance(message, str):
try:
data = json.loads(message)
if data.get('t', None) is not None:
last_px = float(data['p'])
print(f'🤑 BTC Binance Last Px: {last_px:_.4f}; TS: {pd.to_datetime(data['T'], unit='ms')}')
VAL_KEY.publish(VK_CHANNEL, json.dumps({
'timestamp_msg': data['E'],
'timestamp_value': data['T'],
'value': last_px,
'qty': data['q'],
}))
VAL_KEY.set(VK_CHANNEL, json.dumps({
'timestamp_msg': data['E'],
'timestamp_value': data['T'],
'value': last_px,
'qty': data['q'],
}))
await insert_rtds_btcusd_table(
CON=CON,
timestamp_msg=data['E'],
timestamp_value=data['T'],
value=last_px,
qty=data['q'],
)
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_rtds_btcusd_table(CON=CON)
await binance_trades_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await binance_trades_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

312
ws_clob.py Normal file
View File

@@ -0,0 +1,312 @@
import asyncio
import json
import math
import logging
import pandas as pd
import os
from datetime import datetime, timezone
import websockets
import numpy as np
import talib
import requests
from typing import AsyncContextManager
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
import valkey
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_5min_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
LOG_FILEPATH: str = '/root/logs/Polymarket_5min.log'
WSS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
SLUG_END_TIME = 0
HIST_TRADES = np.empty((0, 2))
TARGET_PX = 0
def format_timestamp(total_seconds) -> str:
minutes, seconds = divmod(total_seconds, 60)
return f"{minutes} minutes and {seconds} seconds"
def time_round_down(dt, interval_mins=5) -> int: # returns timestamp in seconds
interval_secs = interval_mins * 60
seconds = dt.timestamp()
rounded_seconds = math.floor(seconds / interval_secs) * interval_secs
return rounded_seconds
def get_mkt_details_by_slug(slug: str) -> dict[str, str, str]: # {'Up' : 123, 'Down': 456, 'isActive': True, 'MinTickSize': 0.01, 'isNegRisk': False}
r = requests.get(f"https://gamma-api.polymarket.com/events/slug/{slug}")
market = r.json()['markets'][0]
token_ids = json.loads(market.get("clobTokenIds", "[]"))
outcomes = json.loads(market.get("outcomes", "[]"))
d = dict(zip(outcomes, token_ids))
d['isActive'] = market['negRisk']
d['MinTickSize'] = market['orderPriceMinTickSize']
d['OrderMinSize'] = market['orderMinSize']
d['isNegRisk'] = market['negRisk']
d['ConditionId'] = market['conditionId']
d['EndDateTime'] = market['endDate']
# d['Liquidity'] = market['liquidity']
d['LiquidityClob'] = market['liquidityClob']
d['VolumeNum'] = market['volumeNum']
d['Volume24hr'] = market['volume24hr']
print(market)
return d, market
def gen_slug():
slug_prefix = 'btc-updown-5m-'
slug_time_id = time_round_down(dt=datetime.now(timezone.utc))
return slug_prefix + str(slug_time_id)
### Database Funcs ###
async def create_poly_btcusd_trades_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: poly_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS poly_btcusd_trades (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
price DOUBLE,
qty DOUBLE,
side_taker VARCHAR(8)
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_poly_btcusd_trades_table(
timestamp_arrival: int,
timestamp_msg: int,
timestamp_value: int,
price: float,
qty: float,
side_taker: str,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_arrival': timestamp_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'price': price,
'qty': qty,
'side_taker': side_taker,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO poly_btcusd_trades
(
timestamp_arrival,
timestamp_msg,
timestamp_value,
price,
qty,
side_taker
)
VALUES
(
:timestamp_arrival,
:timestamp_msg,
:timestamp_value,
:price,
:qty,
:side_taker
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def polymarket_stream():
global SLUG_END_TIME
global TARGET_PX
global HIST_TRADES
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
TARGET_ASSET_ID = market_details['Up']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
print(f'********* NEW MKT - END DATETIME: {pd.to_datetime(SLUG_END_TIME, unit='s')} *********')
async with websockets.connect(WSS_URL) as websocket:
print(f"Connected to {WSS_URL}")
subscribe_msg = {
"assets_ids": [TARGET_ASSET_ID],
"type": "market",
"custom_feature_enabled": True
}
await websocket.send(json.dumps(subscribe_msg))
print(f"Subscribed to Asset: {TARGET_ASSET_ID}")
try:
async for message in websocket:
ts_arrival = round(datetime.now().timestamp()*1000)
sec_remaining = SLUG_END_TIME - round(datetime.now().timestamp())
if sec_remaining <= 0:
ref_data = json.loads(VAL_KEY.get('poly_rtds_cl_btcusd'))
TARGET_PX = float(ref_data['value'])
HIST_TRADES = np.empty((0, 2))
print('*** Attempting to unsub from past 5min')
update_unsub_msg = {
"operation": 'unsubscribe',
"assets_ids": [TARGET_ASSET_ID],
"custom_feature_enabled": True
}
await websocket.send(json.dumps(update_unsub_msg))
print('*** Attempting to SUB to new 5min')
slug_full = gen_slug()
market_details, market = get_mkt_details_by_slug(slug_full)
TARGET_ASSET_ID = market_details['Up']
SLUG_END_TIME = round(datetime.strptime(market_details['EndDateTime'], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc).timestamp())
update_sub_msg = {
"operation": 'subscribe',
"assets_ids": [TARGET_ASSET_ID],
"custom_feature_enabled": True
}
await websocket.send(json.dumps(update_sub_msg))
if isinstance(message, str):
data = json.loads(message)
if isinstance(data, list):
print('initial book:')
print(data)
continue
event_type = data.get("event_type", None)
if event_type == "price_change":
# print("📈 Price Change")
# print(pd.DataFrame(data['price_changes']))
pass
elif event_type == "best_bid_ask":
# print(pd.DataFrame([data]))
pass
elif event_type == "last_trade_price":
ts_msg = int(data['timestamp'])
ts_value = int(ts_msg)
px = float(data['price'])
qty = float(data['size'])
side_taker = data['side']
HIST_TRADES = np.append(HIST_TRADES, np.array([[px, qty]]), axis=0)
# SMA = talib.ROC(HIST_TRADES[:,0], timeperiod=10)[-1]
# print(f"✨ Last Px: {px:.2f}; ROC: {SMA:.4f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
print(f"✨ Last Px: {px:.2f}; Qty: {qty:6.2f}; Sec Left: {sec_remaining}")
if USE_VK:
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'price': px,
'qty': qty,
'side_taker': side_taker,
'sec_remaining': sec_remaining,
'target_price': TARGET_PX
})
VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
if USE_DB:
await insert_poly_btcusd_trades_table(
CON=CON,
timestamp_arrival=ts_arrival,
timestamp_msg=ts_msg,
timestamp_value=ts_value,
price=px,
qty=qty,
side_taker=side_taker,
)
elif event_type == "book":
pass
elif event_type == "new_market":
print('Received new_market')
elif event_type == "market_resolved":
print(f"Received: {event_type}")
print(data)
elif event_type == "tick_size_change": # may want for CLOB order routing
print(f"Received: {event_type}")
print(data)
else:
print(f"*********** REC UNMAPPED EVENT: {event_type}")
print(data)
elif isinstance(data, dict):
# print(data.get("event_type", None))
pass
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
print(f"Connection closed by server. Exception: {e}")
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_poly_btcusd_trades_table(CON=CON)
await polymarket_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await polymarket_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt as e:
print(f"Stream stopped: {e}")

224
ws_coinbase.py Normal file
View File

@@ -0,0 +1,224 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_coinbase_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
LOG_FILEPATH: str = '/root/logs/Polymarket_coinbase_Trades.log'
### Globals ###
WSS_URL = "wss://ws-feed.exchange.coinbase.com"
# HIST_TRADES = np.empty((0, 2))
### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: coinbase_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS coinbase_btcusd_trades (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
qty DOUBLE,
side VARCHAR(8)
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_arrival: int,
timestamp_msg: int,
timestamp_value: int,
value: float,
qty: float,
side: str,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_arrival': timestamp_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
'qty': qty,
'side': side,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO coinbase_btcusd_trades
(
timestamp_arrival,
timestamp_msg,
timestamp_value,
value,
qty,
side
)
VALUES
(
:timestamp_arrival,
:timestamp_msg,
:timestamp_value,
:value,
:qty,
:side
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def coinbase_trades_stream():
global HIST_TRADES
async with websockets.connect(WSS_URL) as websocket:
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
"type": "subscribe",
"product_ids": ["BTC-USD"],
"channels": [
{
"name": "ticker",
"product_ids": ["BTC-USD"]
}
]
}
await websocket.send(json.dumps(subscribe_msg))
try:
async for message in websocket:
if isinstance(message, str) or isinstance(message, bytes):
try:
data = json.loads(message)
if data.get('price', None) is not None:
ts_arrival = round(datetime.now().timestamp()*1000)
ts_msg = round(datetime.strptime(data['time'], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()*1000)
ts_value = ts_msg
last_px = float(data['price'])
qty = float(data['last_size'])
side = data['side']
print(f'🤑 BTC Coinbase Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};')
if USE_VK:
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'value': last_px,
'qty': qty,
'side': side,
})
VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
if USE_DB:
await insert_rtds_btcusd_table(
CON=CON,
timestamp_arrival=ts_arrival,
timestamp_msg=ts_msg,
timestamp_value=ts_value,
value=last_px,
qty=qty,
side=side,
)
# elif data.get('op'):
# if data['op'] == 'PING':
# pong = {"op": "PONG", "timestamp": ts_arrival}
# await websocket.send(json.dumps(pong))
# logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError) as e:
logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}')
continue
else:
raise ValueError(f'Type: {type(message)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_rtds_btcusd_table(CON=CON)
await coinbase_trades_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await coinbase_trades_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

219
ws_pionex.py Normal file
View File

@@ -0,0 +1,219 @@
import asyncio
import json
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_pionex_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
LOG_FILEPATH: str = '/root/logs/Polymarket_Pionex_Trades.log'
### Globals ###
WSS_URL = "wss://ws.pionex.com/wsPub"
# HIST_TRADES = np.empty((0, 2))
### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: pionex_btcusd_trades')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS pionex_btcusd_trades (
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
qty DOUBLE,
side VARCHAR(8)
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_msg: int,
timestamp_value: int,
value: float,
qty: float,
side: str,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
'qty': qty,
'side': side,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO pionex_btcusd_trades
(
timestamp_msg,
timestamp_value,
value,
qty,
side
)
VALUES
(
:timestamp_msg,
:timestamp_value,
:value,
:qty,
:side
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def pionex_trades_stream():
global HIST_TRADES
async with websockets.connect(WSS_URL) as websocket:
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
"op": "SUBSCRIBE",
"topic": "TRADE",
"symbol": "BTC_USDT"
}
await websocket.send(json.dumps(subscribe_msg))
try:
async for message in websocket:
if isinstance(message, str) or isinstance(message, bytes):
try:
data = json.loads(message)
if data.get('data', None) is not None:
ts_msg = data['timestamp']
data = data['data']
ts_value = data[0]['timestamp']
last_px = float(data[0]['price'])
qty = float(data[0]['size'])
side = data[0]['side']
print(f'🤑 BTC Pionex Last Px: {last_px:_.4f}; TS: {pd.to_datetime(ts_value, unit='ms')}; Side: {side};')
print(ts_value)
if USE_VK:
VAL_KEY.publish(VK_CHANNEL, json.dumps({
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'value': last_px,
'qty': qty,
'side': side,
}))
VAL_KEY.set(VK_CHANNEL, json.dumps({
'timestamp_msg': ts_msg,
'timestamp_value': ts_value,
'value': last_px,
'qty': qty,
'side': side,
}))
if USE_DB:
await insert_rtds_btcusd_table(
CON=CON,
timestamp_msg=ts_msg,
timestamp_value=ts_value,
value=last_px,
qty=qty,
side=side,
)
elif data.get('op'):
if data['op'] == 'PING':
pong = {"op": "PONG", "timestamp": round(datetime.now().timestamp()*1000)}
await websocket.send(json.dumps(pong))
logging.info(f'PING RECEIVED: {data}; PONG SENT: {pong}')
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError) as e:
logging.warning(f'Message not in JSON format, skipping: {message}; excepion: {e}')
continue
else:
raise ValueError(f'Type: {type(message)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_rtds_btcusd_table(CON=CON)
await pionex_trades_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await pionex_trades_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Stream stopped")

View File

@@ -12,7 +12,7 @@ import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
import valkey
### Allow only ipv4 ###
def allowed_gai_family():
@@ -21,7 +21,11 @@ urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
USE_VK: bool = True
VK_CHANNEL = 'poly_rtds_cl_btcusd'
CON: AsyncContextManager | None = None
VAL_KEY = None
### Logging ###
LOG_FILEPATH: str = '/root/logs/Polymarket_RTDS.log'
@@ -39,11 +43,13 @@ async def create_rtds_btcusd_table(
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
logging.info('Creating Table if Does Not Exist: poly_rtds_cl_btcusd')
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS poly_rtds_btcusd_cl (
CREATE TABLE IF NOT EXISTS poly_rtds_cl_btcusd (
timestamp_arrival BIGINT,
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
value DOUBLE
);
"""))
await CON.commit()
@@ -51,6 +57,7 @@ async def create_rtds_btcusd_table(
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_arrival: int,
timestamp_msg: int,
timestamp_value: int,
value: int,
@@ -58,6 +65,7 @@ async def insert_rtds_btcusd_table(
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_arrival': timestamp_arrival,
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
@@ -67,14 +75,16 @@ async def insert_rtds_btcusd_table(
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO poly_rtds_btcusd_cl
INSERT INTO poly_rtds_cl_btcusd
(
timestamp_arrival,
timestamp_msg,
timestamp_value,
value
)
VALUES
(
:timestamp_arrival,
:timestamp_msg,
:timestamp_value,
:value
@@ -91,7 +101,7 @@ async def insert_rtds_btcusd_table(
async def rtds_stream():
global HIST_TRADES
async with websockets.connect(WSS_URL) as websocket:
async for websocket in websockets.connect(WSS_URL):
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
@@ -113,9 +123,26 @@ async def rtds_stream():
try:
data = json.loads(message)
if data['payload'].get('value', None) is not None:
print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['timestamp'], unit='ms')}')
ts_arrival = round(datetime.now().timestamp()*1000)
print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['payload']['timestamp'], unit='ms')}')
VAL_KEY_OBJ = json.dumps({
'timestamp_arrival': ts_arrival,
'timestamp_msg': data['timestamp'],
'timestamp_value': data['payload']['timestamp'],
'value': data['payload']['value'],
})
VAL_KEY.publish(VK_CHANNEL, VAL_KEY_OBJ)
VAL_KEY.set(VK_CHANNEL, VAL_KEY_OBJ)
await insert_rtds_btcusd_table(
CON=CON,
timestamp_arrival=ts_arrival,
timestamp_msg=data['timestamp'],
timestamp_value=data['payload']['timestamp'],
value=data['payload']['value'],
)
else:
logging.info(f'Initial or unexpected data struct, skipping: {data}')
# logging.info(f'Initial or unexpected data struct, skipping: {data}')
logging.info('Initial or unexpected data struct, skipping')
continue
except (json.JSONDecodeError, ValueError):
logging.warning(f'Message not in JSON format, skipping: {message}')
@@ -125,17 +152,28 @@ async def rtds_stream():
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
continue
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
async def main():
global VAL_KEY
global CON
if USE_VK:
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0)
published_count = VAL_KEY.publish(VK_CHANNEL,f"Hola, starting to publish to valkey: {VK_CHANNEL} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Valkey message published to {published_count} subscribers of {VK_CHANNEL}")
else:
VAL_KEY = None
logging.warning("VALKEY NOT BEING USED, NO DATA WILL BE PUBLISHED")
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/mkt_maker')
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/polymarket')
async with engine.connect() as CON:
await create_rtds_btcusd_table(CON=CON)
await rtds_stream()
else:
CON = None