From c05113086709ba58f20607360ae934ccfc33758f Mon Sep 17 00:00:00 2001 From: stevekeyharvey Date: Fri, 27 Mar 2026 17:57:12 +0000 Subject: [PATCH] ionos_last_commit --- database.ipynb | 77 ++++++++++++++++++++++++++++ ws_rtds.py | 134 +++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 195 insertions(+), 16 deletions(-) create mode 100644 database.ipynb diff --git a/database.ipynb b/database.ipynb new file mode 100644 index 0000000..e048b6a --- /dev/null +++ b/database.ipynb @@ -0,0 +1,77 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "4cae6bf1", + "metadata": {}, + "outputs": [], + "source": [ + "from sqlalchemy import create_engine, text\n", + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f5040527", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Connection failed: (pymysql.err.OperationalError) (2003, \"Can't connect to MySQL server on 'localhost' ([Errno 111] Connection refused)\")\n", + "(Background on this error at: https://sqlalche.me/e/20/e3q8)\n" + ] + } + ], + "source": [ + "### MYSQL ###\n", + "engine = create_engine('mysql+pymysql://root:pwd@localhost/mkt_maker')\n", + "try:\n", + " with engine.connect() as conn:\n", + " print(\"Connection successful\")\n", + "except Exception as e:\n", + " print(f\"Connection failed: {e}\") " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a866e9ca", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5ba7be5f", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "py_313", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ws_rtds.py b/ws_rtds.py index e860944..705138b 100644 --- a/ws_rtds.py +++ b/ws_rtds.py @@ -1,24 +1,98 @@ import asyncio import json -import math -import pandas as pd -import os -from datetime import datetime, timezone -import websockets +import logging +import socket +import traceback +from datetime import datetime +from typing import AsyncContextManager + import numpy as np -import talib -import requests +import pandas as pd +import requests.packages.urllib3.util.connection as urllib3_cn # type: ignore +from sqlalchemy import text +import websockets +from sqlalchemy.ext.asyncio import create_async_engine + +### Allow only ipv4 ### +def allowed_gai_family(): + return socket.AF_INET +urllib3_cn.allowed_gai_family = allowed_gai_family + +### Database ### +USE_DB: bool = True +CON: AsyncContextManager | None = None + +### Logging ### +LOG_FILEPATH: str = '/root/logs/Polymarket_RTDS.log' + +### Globals ### WSS_URL = "wss://ws-live-data.polymarket.com" +# HIST_TRADES = np.empty((0, 2)) -HIST_TRADES = np.empty((0, 2)) +### Database Funcs ### +async def create_rtds_btcusd_table( + CON: AsyncContextManager, + engine: str = 'mysql', # mysql | duckdb + ) -> None: + if CON is None: + logging.info("NO DB CONNECTION, SKIPPING Create Statements") + else: + if engine == 'mysql': + await CON.execute(text(""" + CREATE TABLE IF NOT EXISTS poly_rtds_btcusd_cl ( + timestamp_msg BIGINT, + timestamp_value BIGINT, + value DOUBLE, + ); + """)) + await CON.commit() + else: + raise ValueError('Only MySQL engine is implemented') + +async def insert_rtds_btcusd_table( + timestamp_msg: int, + timestamp_value: int, + value: int, + CON: AsyncContextManager, + engine: str = 'mysql', # mysql | duckdb + ) -> None: + params={ + 'timestamp_msg': timestamp_msg, + 'timestamp_value': timestamp_value, + 'value': value, + } + if CON is None: + logging.info("NO DB CONNECTION, SKIPPING Insert Statements") + else: + if engine == 'mysql': + await CON.execute(text(""" + INSERT INTO poly_rtds_btcusd_cl + ( + timestamp_msg, + timestamp_value, + value + ) + VALUES + ( + :timestamp_msg, + :timestamp_value, + :value + ) + """), + parameters=params + ) + await CON.commit() + else: + raise ValueError('Only MySQL engine is implemented') +### Websocket ### async def rtds_stream(): global HIST_TRADES async with websockets.connect(WSS_URL) as websocket: - print(f"Connected to {WSS_URL}") + logging.info(f"Connected to {WSS_URL}") subscribe_msg = { "action": "subscribe", @@ -41,21 +115,49 @@ async def rtds_stream(): if data['payload'].get('value', None) is not None: print(f'🤑 BTC Chainlink Last Px: {data['payload']['value']:_.4f}; TS: {pd.to_datetime(data['timestamp'], unit='ms')}') else: - print(f'Initial or unexpected data struct, skipping: {data}') + logging.info(f'Initial or unexpected data struct, skipping: {data}') continue except (json.JSONDecodeError, ValueError): - print(f'Message not in JSON format, skipping: {message}') + logging.warning(f'Message not in JSON format, skipping: {message}') continue else: raise ValueError(f'Type: {type(data)} not expected: {message}') - + except websockets.ConnectionClosed as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) + except Exception as e: + logging.error(f'Connection closed: {e}') + logging.error(traceback.format_exc()) - except websockets.ConnectionClosed: - print("Connection closed by server.") + +async def main(): + global CON + + if USE_DB: + engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/mkt_maker') + async with engine.connect() as CON: + await rtds_stream() + else: + CON = None + logging.warning("DATABASE NOT BEING USED, NO DATA WILL BE RECORDED") + await rtds_stream() if __name__ == '__main__': + START_TIME = round(datetime.now().timestamp()*1000) + + logging.info(f'Log FilePath: {LOG_FILEPATH}') + + logging.basicConfig( + force=True, + filename=LOG_FILEPATH, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filemode='w' + ) + logging.info(f"STARTED: {START_TIME}") + try: - asyncio.run(rtds_stream()) + asyncio.run(main()) except KeyboardInterrupt: - print("Stream stopped.") \ No newline at end of file + logging.info("Stream stopped") \ No newline at end of file