Files
Funding_Rate/algo_orchestrator.py
2026-04-25 23:43:28 +00:00

95 lines
2.9 KiB
Python

import asyncio
import json
import logging
import os
import traceback
from datetime import datetime
from typing import AsyncContextManager
import valkey
from dotenv import load_dotenv
# from sqlalchemy.ext.asyncio import create_async_engine
'''
TO DO:
- Insert config changes into database for analysis later / general tracking
'''
### Database ###
CON: AsyncContextManager | None = None
VAL_KEY = None
VK_IN = 'fr_orchestrator_input'
VK_OUT = 'fr_orchestrator_output'
### Logging ###
load_dotenv()
LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo_Orchestrator.log'
ALGO_CONFIG: None | dict
# ALGO_CONFIG: None | Algo_Config = None
async def orchestrator() -> None:
global ALGO_CONFIG
try:
VK_PUBSUB = VAL_KEY.pubsub()
VK_PUBSUB.subscribe(VK_IN)
print(f"Subscribed to '{VK_IN}'. Waiting for messages...")
for message in VK_PUBSUB.listen():
if message['type'] == 'message':
timestamp = round(datetime.now().timestamp()*1000)
data = json.loads(message['data'])
# channel = message['channel']
for k, v in data.items():
if ALGO_CONFIG.get(k, None) is not None:
ALGO_CONFIG[k] = v
ALGO_CONFIG['Config_Updated_Timestamp'] = timestamp
VAL_KEY.set(VK_OUT, json.dumps(ALGO_CONFIG))
with open('algo_config.json', 'w', encoding='utf-8') as f:
json.dump(ALGO_CONFIG, f, indent=4)
print(f"Algo Config Updated @ {timestamp}; {data}")
except valkey.exceptions.ConnectionError as e:
print(f"Could not connect to Valkey. Please check the publish server is up; {e}")
except KeyboardInterrupt:
logging.info('ORCHESTRATOR SHUTTING DOWN...')
except Exception as e:
logging.error(traceback.format_exc())
logging.critical(f'*** ORCHESTRATOR CRASHED: {e}')
### MAIN STARTUP ###
async def main() -> None:
global VAL_KEY
global CON
global ALGO_CONFIG
VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
# engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate')
with open('algo_config.json', 'r', encoding='utf-8') as f:
# ALGO_CONFIG = json.load(f, object_hook=lambda d: Algo_Config(**d))
ALGO_CONFIG = json.load(f)
ALGO_CONFIG['Config_Updated_Timestamp'] = round(datetime.now().timestamp()*1000)
# async with engine.connect() as CON:
await orchestrator()
if __name__ == '__main__':
START_TIME = round(datetime.now().timestamp()*1000)
logging.info(f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(f"STARTED: {START_TIME}")
asyncio.run(main())