import asyncio import json import logging import os import time import traceback from datetime import datetime from typing import AsyncContextManager import valkey from dotenv import load_dotenv from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine ### Database ### EXTEND_CLIENT = None CON: AsyncContextManager | None = None VAL_KEY = None VK_IN = 'fr_orchestrator_input' VK_OUT = 'fr_orchestrator_output' ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo_Orchestrator.log' ### ALGO GLOBALS ### ASTER_ALLOW_ORDERING: bool = True EXTEND_ALLOW_ORDERING: bool = True LOOP_SLEEP_SEC = 1 async def orchestrator() -> None: try: VK_PUBSUB = VAL_KEY.pubsub() VK_PUBSUB.subscribe(VK_IN) print(f"Subscribed to '{VK_IN}'. Waiting for messages...") for message in VK_PUBSUB.listen(): # Valkey sends a 'subscribe' message first; we usually want to skip it if message['type'] == 'message': data = message['data'] channel = message['channel'] print(f"[{channel}] Received: {data}") except valkey.exceptions.ConnectionError as e: print(f"Could not connect to Valkey. Please check the publish server is up; {e}") except KeyboardInterrupt: logging.info('ORCHESTRATOR SHUTTING DOWN...') except Exception as e: logging.error(traceback.format_exc()) logging.critical(f'*** ORCHESTRATOR CRASHED: {e}') ### MAIN STARTUP ### async def main() -> None: global EXTEND_CLIENT global VAL_KEY global CON VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') async with engine.connect() as CON: await orchestrator() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") asyncio.run(main())