import asyncio import json import logging import os import traceback from datetime import datetime import time import valkey from dotenv import load_dotenv import modules.utils as utils import modules.structs as structs from pydantic import BaseModel import docker ### Database ### VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) DOCKER = docker.from_env() ### Logging ### load_dotenv() LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Engine_Health.log' ### CONSTANTS ### MAX_TIME_SINCE_LAST_UPDATE_MS: int = 1000 * 60 * 3 # 1000 x 60 sec x [minutes] LOOP_SLEEP_SEC: int = 5 ### Globals ### ### Structs ### class Health_Status(BaseModel): status: str # ENUM: 'HEALTHY' | 'UNHEALTHY' | 'DEAD' timestamp: int vk_objs: list[structs.VK_Obj] async def get_algo_working_symbol() -> str: vk_get: str = VAL_KEY.get(name='fr_algo_working_symbol') # ty:ignore[invalid-assignment] d = json.loads(vk_get) algo_symbol: str = d.get('EXTEND', {}).get('symbol', '') return algo_symbol async def main() -> None: vk_objs = [ structs.VK_Orchestrator_Output(), structs.VK_Working_Symbol(), structs.VK_User_Orders_Extend(), structs.VK_User_Trades_Extend(), structs.VK_User_Balances_Aster(), structs.VK_User_Balances_Extend(), structs.VK_User_Positions_Aster(), structs.VK_User_Positions_Extend(), structs.VK_FR_Aster(), structs.VK_FR_All_Aster(), structs.VK_FR_Extend(), structs.VK_FR_All_Extend(), structs.VK_Ticker_Aster(), structs.VK_Ticker_Extend(), structs.VK_Trade_Aster(), structs.VK_Trade_Extend(), ] health_status = Health_Status( status = 'HEALTHY', timestamp = round(number=datetime.now().timestamp()*1000), vk_objs = vk_objs, # ty:ignore[invalid-argument-type] ) try: while True: algo_symbol = await get_algo_working_symbol() health_status.timestamp = round(number=datetime.now().timestamp()*1000) for o in health_status.vk_objs: vk_symbol = o.data.get('symbol') if isinstance(o.data, dict) else None await o.checks.run_checks(args={ 'timestamp': health_status.timestamp, 'algo_symbol': algo_symbol, 'vk_symbol': vk_symbol, }) vk_statuses = [o.status for o in health_status.vk_objs] if 'DEAD' in vk_statuses: health_status.status = 'DEAD' elif 'UNHEALTHY' in vk_statuses: health_status.status = 'UNHEALTHY' else: health_status.status = 'HEALTHY' if health_status.status != 'HEALTHY': all_containers = DOCKER.containers.list(all=True) for c in all_containers: if c.status == 'running': logging.warning(f"stopping: ID: {c.id}, Name: {c.name}, Status: {c.status}") container = DOCKER.containers.get(c.id) container.stop(timeout=10) logging.info('Stopped all containers') # VAL_KEY.set(name='health_status', value=json.dumps(obj=(health_status))) logging.info(vk_statuses) if LOOP_SLEEP_SEC > 0: time.sleep(LOOP_SLEEP_SEC) continue except KeyboardInterrupt: logging.info(msg='ORCHESTRATOR SHUTTING DOWN...') except Exception as e: logging.error(msg=traceback.format_exc()) logging.critical(msg=f'*** ORCHESTRATOR CRASHED: {e}') if __name__ == '__main__': START_TIME: int = round(number=datetime.now().timestamp()*1000) logging.info(msg=f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(msg=f"STARTED: {START_TIME}") asyncio.run(main())