import asyncio import json import logging import os import traceback from datetime import datetime import valkey from dotenv import load_dotenv import modules.utils as utils import modules.structs as structs from pathlib import Path ''' TO DO: - Insert config changes into database for analysis later / general tracking ''' ### Database ### VK_IN: str = 'fr_orchestrator_input' VK_OUT: str = 'fr_orchestrator_output' ### Logging ### load_dotenv() LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Algo_Orchestrator.log' async def main() -> None: VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) CONFIG_FILEPATH: str = '/algo_local_drive/algo_config.json' if not Path(CONFIG_FILEPATH).exists(): CONFIG_FILEPATH: str = 'algo_config.json' # Init Load Config File with open(file=CONFIG_FILEPATH, mode='r', encoding='utf-8') as f: Algo_Config: dict = json.load(fp=f) Algo_Config['Updated_Timestamp'] = round(number=datetime.now().timestamp()*1000) # vk = structs.VK_Obj(vk_name = 'fr_orchestrator_output', data=Algo_Config) vk = structs.VK_Orchestrator_Out() await vk.set(VK_CON=VAL_KEY) try: VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub() VK_PUBSUB.subscribe(VK_IN) logging.info(msg=f"Subscribed to '{VK_IN}'. Waiting for messages...") for message in VK_PUBSUB.listen(): if message['type'] == 'message': timestamp: int = round(number=datetime.now().timestamp()*1000) # Receive Update Msg from PubSub data: dict = json.loads(s=message['data']) # Load Config File with open(file=CONFIG_FILEPATH, mode='r', encoding='utf-8') as f: Algo_Config: dict = json.load(fp=f) Algo_Config['Updated_Timestamp'] = timestamp if not Algo_Config: raise ValueError(f'Algo Orchestrator, config is none: {Algo_Config}') # Update Config w Update Data Algo_Config: dict = utils.rec_set_dict(orig_dict=Algo_Config, new_dict=data) # Set VK KV w Updated Config # vk = structs.VK_Obj(vk_name = 'fr_orchestrator_output', data=Algo_Config) vk = structs.VK_Orchestrator_Out() await vk.set(VK_CON=VAL_KEY) # Save Updated Config to File with open(file=CONFIG_FILEPATH, mode='w', encoding='utf-8') as f: json.dump(obj=Algo_Config, fp=f, indent=4) logging.info(msg=f"Algo Config Updated @ {timestamp}; {data}") except valkey.exceptions.ConnectionError as e: logging.info(msg=f"Could not connect to Valkey. Please check the publish server is up; {e}") except KeyboardInterrupt: logging.info(msg='ORCHESTRATOR SHUTTING DOWN...') except Exception as e: logging.error(msg=traceback.format_exc()) logging.critical(msg=f'*** ORCHESTRATOR CRASHED: {e}') if __name__ == '__main__': START_TIME: int = round(number=datetime.now().timestamp()*1000) logging.info(msg=f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(msg=f"STARTED: {START_TIME}") asyncio.run(main())