import asyncio import json import logging import os import traceback from datetime import datetime from typing import AsyncContextManager import valkey from dotenv import load_dotenv # from sqlalchemy.ext.asyncio import create_async_engine ''' TO DO: - Insert config changes into database for analysis later / general tracking ''' ### Database ### CON: AsyncContextManager | None = None VAL_KEY = None VK_IN = 'fr_orchestrator_input' VK_OUT = 'fr_orchestrator_output' ### Logging ### load_dotenv() LOG_FILEPATH: str = os.getenv("LOGS_PATH") + '/Fund_Rate_Algo_Orchestrator.log' ALGO_CONFIG: None | dict async def orchestrator() -> None: global ALGO_CONFIG try: VK_PUBSUB = VAL_KEY.pubsub() VK_PUBSUB.subscribe(VK_IN) logging.info(f"Subscribed to '{VK_IN}'. Waiting for messages...") for message in VK_PUBSUB.listen(): if message['type'] == 'message': timestamp = round(datetime.now().timestamp()*1000) data = json.loads(message['data']) # channel = message['channel'] with open('/algo_local_drive/algo_config.json', 'r', encoding='utf-8') as f: # ALGO_CONFIG = json.load(f, object_hook=lambda d: Algo_Config(**d)) ALGO_CONFIG = json.load(f) ALGO_CONFIG['Updated_Timestamp'] = timestamp for k, v in data.items(): if ALGO_CONFIG.get(k, None) is not None: ALGO_CONFIG[k] = v VAL_KEY.set(VK_OUT, json.dumps(ALGO_CONFIG)) with open('/algo_local_drive/algo_config.json', 'w', encoding='utf-8') as f: json.dump(ALGO_CONFIG, f, indent=4) logging.info(f"Algo Config Updated @ {timestamp}; {data}") except valkey.exceptions.ConnectionError as e: logging.info(f"Could not connect to Valkey. Please check the publish server is up; {e}") except KeyboardInterrupt: logging.info('ORCHESTRATOR SHUTTING DOWN...') except Exception as e: logging.error(traceback.format_exc()) logging.critical(f'*** ORCHESTRATOR CRASHED: {e}') ### MAIN STARTUP ### async def main() -> None: global VAL_KEY global CON global ALGO_CONFIG VAL_KEY = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) # engine = create_async_engine('mysql+asyncmy://root:pwd@localhost/fund_rate') with open('/algo_local_drive/algo_config.json', 'r', encoding='utf-8') as f: # ALGO_CONFIG = json.load(f, object_hook=lambda d: Algo_Config(**d)) ALGO_CONFIG = json.load(f) ALGO_CONFIG['Updated_Timestamp'] = round(datetime.now().timestamp()*1000) # async with engine.connect() as CON: await orchestrator() if __name__ == '__main__': START_TIME = round(datetime.now().timestamp()*1000) logging.info(f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(f"STARTED: {START_TIME}") asyncio.run(main())