ionos_last_commit

This commit is contained in:
2026-03-27 17:57:12 +00:00
parent adf3e592eb
commit c051130867
2 changed files with 195 additions and 16 deletions

77
database.ipynb Normal file
View File

@@ -0,0 +1,77 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "4cae6bf1",
"metadata": {},
"outputs": [],
"source": [
"from sqlalchemy import create_engine, text\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f5040527",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connection failed: (pymysql.err.OperationalError) (2003, \"Can't connect to MySQL server on 'localhost' ([Errno 111] Connection refused)\")\n",
"(Background on this error at: https://sqlalche.me/e/20/e3q8)\n"
]
}
],
"source": [
"### MYSQL ###\n",
"engine = create_engine('mysql+pymysql://root:pwd@localhost/mkt_maker')\n",
"try:\n",
" with engine.connect() as conn:\n",
" print(\"Connection successful\")\n",
"except Exception as e:\n",
" print(f\"Connection failed: {e}\") "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a866e9ca",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "5ba7be5f",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "py_313",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -1,24 +1,98 @@
import asyncio import asyncio
import json import json
import math import logging
import pandas as pd import socket
import os import traceback
from datetime import datetime, timezone from datetime import datetime
import websockets from typing import AsyncContextManager
import numpy as np import numpy as np
import talib import pandas as pd
import requests import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore
from sqlalchemy import text
import websockets
from sqlalchemy.ext.asyncio import create_async_engine
### Allow only ipv4 ###
def allowed_gai_family():
return socket.AF_INET
urllib3_cn.allowed_gai_family = allowed_gai_family
### Database ###
USE_DB: bool = True
CON: AsyncContextManager | None = None
### Logging ###
LOG_FILEPATH: str = '/root/logs/Polymarket_RTDS.log'
### Globals ###
WSS_URL = "wss://ws-live-data.polymarket.com" WSS_URL = "wss://ws-live-data.polymarket.com"
# HIST_TRADES = np.empty((0, 2))
HIST_TRADES = np.empty((0, 2)) ### Database Funcs ###
async def create_rtds_btcusd_table(
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Create Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
CREATE TABLE IF NOT EXISTS poly_rtds_btcusd_cl (
timestamp_msg BIGINT,
timestamp_value BIGINT,
value DOUBLE,
);
"""))
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
async def insert_rtds_btcusd_table(
timestamp_msg: int,
timestamp_value: int,
value: int,
CON: AsyncContextManager,
engine: str = 'mysql', # mysql | duckdb
) -> None:
params={
'timestamp_msg': timestamp_msg,
'timestamp_value': timestamp_value,
'value': value,
}
if CON is None:
logging.info("NO DB CONNECTION, SKIPPING Insert Statements")
else:
if engine == 'mysql':
await CON.execute(text("""
INSERT INTO poly_rtds_btcusd_cl
(
timestamp_msg,
timestamp_value,
value
)
VALUES
(
:timestamp_msg,
:timestamp_value,
:value
)
"""),
parameters=params
)
await CON.commit()
else:
raise ValueError('Only MySQL engine is implemented')
### Websocket ###
async def rtds_stream(): async def rtds_stream():
global HIST_TRADES global HIST_TRADES
async with websockets.connect(WSS_URL) as websocket: async with websockets.connect(WSS_URL) as websocket:
print(f"Connected to {WSS_URL}") logging.info(f"Connected to {WSS_URL}")
subscribe_msg = { subscribe_msg = {
"action": "subscribe", "action": "subscribe",
@@ -41,21 +115,49 @@ async def rtds_stream():
if data['payload'].get('value', None) is not None: if data['payload'].get('value', None) is not None:
print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['timestamp'], unit='ms')}') print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['timestamp'], unit='ms')}')
else: else:
print(f'Initial or unexpected data struct, skipping: {data}') logging.info(f'Initial or unexpected data struct, skipping: {data}')
continue continue
except (json.JSONDecodeError, ValueError): except (json.JSONDecodeError, ValueError):
print(f'Message not in JSON format, skipping: {message}') logging.warning(f'Message not in JSON format, skipping: {message}')
continue continue
else: else:
raise ValueError(f'Type: {type(data)} not expected: {message}') raise ValueError(f'Type: {type(data)} not expected: {message}')
except websockets.ConnectionClosed as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except Exception as e:
logging.error(f'Connection closed: {e}')
logging.error(traceback.format_exc())
except websockets.ConnectionClosed:
print("Connection closed by server.") async def main():
global CON
if USE_DB:
engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/mkt_maker')
async with engine.connect() as CON:
await rtds_stream()
else:
CON = None
logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED")
await rtds_stream()
if __name__ == '__main__': if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
try: try:
asyncio.run(rtds_stream()) asyncio.run(main())
except KeyboardInterrupt: except KeyboardInterrupt:
print("Stream stopped.") logging.info("Stream stopped")