1 Commits

Author SHA1 Message Date
c051130867 ionos_last_commit 2026-03-27 17:57:12 +00:00
2 changed files with 195 additions and 16 deletions

77
database.ipynb Normal file
View File

@@ -0,0 +1,77 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "4cae6bf1",
"metadata": {},
"outputs": [],
"source": [
"from sqlalchemy import create_engine, text\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f5040527",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connection failed: (pymysql.err.OperationalError) (2003, \"Can't connect to MySQL server on 'localhost' ([Errno 111] Connection refused)\")\n",
"(Background on this error at: https://sqlalche.me/e/20/e3q8)\n"
]
}
],
"source": [
"### MYSQL ###\n",
"engine = create_engine('mysql+pymysql://root:pwd@localhost/mkt_maker')\n",
"try:\n",
" with engine.connect() as conn:\n",
" print(\"Connection successful\")\n",
"except Exception as e:\n",
" print(f\"Connection failed: {e}\") "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a866e9ca",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "5ba7be5f",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "py_313",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -1,24 +1,98 @@
import asyncio
import json
import math
import pandas as pd
import os
from datetime import datetime, timezone
import websockets
import logging
import socket
import traceback
from datetime import datetime
from typing import AsyncContextManager
import numpy as np
import talib
import requests
import pandas as pd
import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
CON: AsyncContextManager | None = None
### Logging ###
LOG_FILEPATH: str = '/root/logs/Polymarket_RTDS.log'
### Globals ###
WSS_URL = "wss://ws-live-data.polymarket.com"
# HIST_TRADES = np.empty((0, 2))
HIST_TRADES = np.empty((0, 2))
### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS poly_rtds_btcusd_cl (
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_msg: int,
timestamp_value: int,
value: int,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO poly_rtds_btcusd_cl
(
timestamp_msg,
timestamp_value,
value
)
VALUES
(
:timestamp_msg,
:timestamp_value,
:value
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def rtds_stream():
global HIST_TRADES
async with websockets.connect(WSS_URL) as websocket:
print(f"Connected to {WSS_URL}")
logging.info(f"Connected to {WSS_URL}")
subscribe_msg = {
"action": "subscribe",
@@ -41,21 +115,49 @@ async def rtds_stream():
if data['payload'].get('value', None) is not None:
print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['timestamp'], unit='ms')}')
else:
print(f'Initial or unexpected data struct, skipping: {data}')
logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue
except (json.JSONDecodeError, ValueError):
print(f'Message not in JSON format, skipping: {message}')
logging.warning(f'Message not in JSON format, skipping: {message}')
continue
else:
raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except websockets.ConnectionClosed:
print("Connection closed by server.")
async def main():
global CON
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/mkt_maker')
async with engine.connect() as CON:
await rtds_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await rtds_stream()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try:
asyncio.run(rtds_stream())
asyncio.run(main())
except KeyboardInterrupt:
print("Stream stopped.")
logging.info("Stream stopped")