Files
Funding_Rate/algo_orchestrator.py

101 lines
3.5 KiB
Python

import asyncio
import json
import logging
import os
import traceback
from datetime import datetime
import valkey
from dotenv import load_dotenv
import modules.utils as utils
import modules.structs as structs
from pathlib import Path
'''
TO DO:
- Insert config changes into database for analysis later / general tracking
'''
### Database ###
VK_IN: str = 'fr_orchestrator_input'
VK_OUT: str = 'fr_orchestrator_output'
### Logging ###
load_dotenv()
LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Algo_Orchestrator.log'
async def main() -> None:
VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
CONFIG_FILEPATH: str = '/algo_local_drive/algo_config.json'
if not Path(CONFIG_FILEPATH).exists():
CONFIG_FILEPATH: str = 'algo_config.json'
# Init Load Config File
with open(file=CONFIG_FILEPATH, mode='r', encoding='utf-8') as f:
Algo_Config: dict = json.load(fp=f)
Algo_Config['Updated_Timestamp'] = round(number=datetime.now().timestamp()*1000)
# vk = structs.VK_Obj(vk_name = 'fr_orchestrator_output', data=Algo_Config)
vk = structs.VK_Orchestrator_Out()
await vk.set(VK_CON=VAL_KEY)
try:
VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub()
VK_PUBSUB.subscribe(VK_IN)
logging.info(msg=f"Subscribed to '{VK_IN}'. Waiting for messages...")
for message in VK_PUBSUB.listen():
if message['type'] == 'message':
timestamp: int = round(number=datetime.now().timestamp()*1000)
# Receive Update Msg from PubSub
data: dict = json.loads(s=message['data'])
# Load Config File
with open(file=CONFIG_FILEPATH, mode='r', encoding='utf-8') as f:
Algo_Config: dict = json.load(fp=f)
Algo_Config['Updated_Timestamp'] = timestamp
if not Algo_Config:
raise ValueError(f'Algo Orchestrator, config is none: {Algo_Config}')
# Update Config w Update Data
Algo_Config: dict = utils.rec_set_dict(orig_dict=Algo_Config, new_dict=data)
# Set VK KV w Updated Config
# vk = structs.VK_Obj(vk_name = 'fr_orchestrator_output', data=Algo_Config)
vk = structs.VK_Orchestrator_Out()
await vk.set(VK_CON=VAL_KEY)
# Save Updated Config to File
with open(file=CONFIG_FILEPATH, mode='w', encoding='utf-8') as f:
json.dump(obj=Algo_Config, fp=f, indent=4)
logging.info(msg=f"Algo Config Updated @ {timestamp}; {data}")
except valkey.exceptions.ConnectionError as e:
logging.info(msg=f"Could not connect to Valkey. Please check the publish server is up; {e}")
except KeyboardInterrupt:
logging.info(msg='ORCHESTRATOR SHUTTING DOWN...')
except Exception as e:
logging.error(msg=traceback.format_exc())
logging.critical(msg=f'*** ORCHESTRATOR CRASHED: {e}')
if __name__ == '__main__':
START_TIME: int = round(number=datetime.now().timestamp()*1000)
logging.info(msg=f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(msg=f"STARTED: {START_TIME}")
asyncio.run(main())