Files
Funding_Rate/engine_health.py

129 lines
4.1 KiB
Python

import asyncio
import json
import logging
import os
import traceback
from datetime import datetime
import time
import valkey
from dotenv import load_dotenv
import modules.utils as utils
import modules.structs as structs
from pydantic import BaseModel
import docker
### Database ###
VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
DOCKER = docker.from_env()
### Logging ###
load_dotenv()
LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Engine_Health.log'
### CONSTANTS ###
MAX_TIME_SINCE_LAST_UPDATE_MS: int = 1000 * 60 * 3 # 1000 x 60 sec x [minutes]
LOOP_SLEEP_SEC: int = 5
### Globals ###
### Structs ###
class Health_Status(BaseModel):
status: str # ENUM: 'HEALTHY' | 'UNHEALTHY' | 'DEAD'
timestamp: int
vk_objs: list[structs.VK_Obj]
async def get_algo_working_symbol() -> str:
vk_get: str = VAL_KEY.get(name='fr_algo_working_symbol') # ty:ignore[invalid-assignment]
d = json.loads(vk_get)
algo_symbol: str = d.get('EXTEND', {}).get('symbol', '')
return algo_symbol
async def main() -> None:
vk_objs = [
structs.VK_Orchestrator_Output(),
structs.VK_Working_Symbol(),
structs.VK_User_Orders_Extend(),
structs.VK_User_Trades_Extend(),
structs.VK_User_Balances_Aster(),
structs.VK_User_Balances_Extend(),
structs.VK_User_Positions_Aster(),
structs.VK_User_Positions_Extend(),
structs.VK_FR_Aster(),
structs.VK_FR_All_Aster(),
structs.VK_FR_Extend(),
structs.VK_FR_All_Extend(),
structs.VK_Ticker_Aster(),
structs.VK_Ticker_Extend(),
structs.VK_Trade_Aster(),
structs.VK_Trade_Extend(),
]
health_status = Health_Status(
status = 'HEALTHY',
timestamp = round(number=datetime.now().timestamp()*1000),
vk_objs = vk_objs, # ty:ignore[invalid-argument-type]
)
try:
while True:
algo_symbol = await get_algo_working_symbol()
health_status.timestamp = round(number=datetime.now().timestamp()*1000)
for o in health_status.vk_objs:
vk_symbol = o.data.get('symbol') if isinstance(o.data, dict) else None
await o.checks.run_checks(args={
'timestamp': health_status.timestamp,
'algo_symbol': algo_symbol,
'vk_symbol': vk_symbol,
})
vk_statuses = [o.status for o in health_status.vk_objs]
if 'DEAD' in vk_statuses:
health_status.status = 'DEAD'
elif 'UNHEALTHY' in vk_statuses:
health_status.status = 'UNHEALTHY'
else:
health_status.status = 'HEALTHY'
if health_status.status != 'HEALTHY':
all_containers = DOCKER.containers.list(all=True)
for c in all_containers:
if c.status == 'running':
logging.warning(f"stopping: ID: {c.id}, Name: {c.name}, Status: {c.status}")
container = DOCKER.containers.get(c.id)
container.stop(timeout=10)
logging.info('Stopped all containers')
# VAL_KEY.set(name='health_status', value=json.dumps(obj=(health_status)))
logging.info(vk_statuses)
if LOOP_SLEEP_SEC > 0:
time.sleep(LOOP_SLEEP_SEC)
continue
except KeyboardInterrupt:
logging.info(msg='ORCHESTRATOR SHUTTING DOWN...')
except Exception as e:
logging.error(msg=traceback.format_exc())
logging.critical(msg=f'*** ORCHESTRATOR CRASHED: {e}')
if __name__ == '__main__':
START_TIME: int = round(number=datetime.now().timestamp()*1000)
logging.info(msg=f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(msg=f"STARTED: {START_TIME}")
asyncio.run(main())