Files
Funding_Rate/engine_health.py

129 lines
4.1 KiB
Python
Raw Permalink Normal View History

import asyncio
import json
import logging
import os
import traceback
from datetime import datetime
import time
import valkey
from dotenv import load_dotenv
import modules.utils as utils
import modules.structs as structs
from pydantic import BaseModel
import docker
### Database ###
VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
DOCKER = docker.from_env()
### Logging ###
load_dotenv()
LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Engine_Health.log'
### CONSTANTS ###
MAX_TIME_SINCE_LAST_UPDATE_MS: int = 1000 * 60 * 3 # 1000 x 60 sec x [minutes]
LOOP_SLEEP_SEC: int = 5
### Globals ###
### Structs ###
class Health_Status(BaseModel):
status: str # ENUM: 'HEALTHY' | 'UNHEALTHY' | 'DEAD'
timestamp: int
vk_objs: list[structs.VK_Obj]
async def get_algo_working_symbol() -> str:
vk_get: str = VAL_KEY.get(name='fr_algo_working_symbol') # ty:ignore[invalid-assignment]
d = json.loads(vk_get)
algo_symbol: str = d.get('EXTEND', {}).get('symbol', '')
return algo_symbol
async def main() -> None:
vk_objs = [
structs.VK_Orchestrator_Output(),
structs.VK_Working_Symbol(),
structs.VK_User_Orders_Extend(),
structs.VK_User_Trades_Extend(),
structs.VK_User_Balances_Aster(),
structs.VK_User_Balances_Extend(),
structs.VK_User_Positions_Aster(),
structs.VK_User_Positions_Extend(),
structs.VK_FR_Aster(),
structs.VK_FR_All_Aster(),
structs.VK_FR_Extend(),
structs.VK_FR_All_Extend(),
structs.VK_Ticker_Aster(),
structs.VK_Ticker_Extend(),
structs.VK_Trade_Aster(),
structs.VK_Trade_Extend(),
]
health_status = Health_Status(
status = 'HEALTHY',
timestamp = round(number=datetime.now().timestamp()*1000),
vk_objs = vk_objs, # ty:ignore[invalid-argument-type]
)
try:
while True:
algo_symbol = await get_algo_working_symbol()
health_status.timestamp = round(number=datetime.now().timestamp()*1000)
for o in health_status.vk_objs:
vk_symbol = o.data.get('symbol') if isinstance(o.data, dict) else None
await o.checks.run_checks(args={
'timestamp': health_status.timestamp,
'algo_symbol': algo_symbol,
'vk_symbol': vk_symbol,
})
vk_statuses = [o.status for o in health_status.vk_objs]
if 'DEAD' in vk_statuses:
health_status.status = 'DEAD'
elif 'UNHEALTHY' in vk_statuses:
health_status.status = 'UNHEALTHY'
else:
health_status.status = 'HEALTHY'
if health_status.status != 'HEALTHY':
all_containers = DOCKER.containers.list(all=True)
for c in all_containers:
if c.status == 'running':
logging.warning(f"stopping: ID: {c.id}, Name: {c.name}, Status: {c.status}")
container = DOCKER.containers.get(c.id)
container.stop(timeout=10)
logging.info('Stopped all containers')
# VAL_KEY.set(name='health_status', value=json.dumps(obj=(health_status)))
logging.info(vk_statuses)
if LOOP_SLEEP_SEC > 0:
time.sleep(LOOP_SLEEP_SEC)
continue
except KeyboardInterrupt:
logging.info(msg='ORCHESTRATOR SHUTTING DOWN...')
except Exception as e:
logging.error(msg=traceback.format_exc())
logging.critical(msg=f'*** ORCHESTRATOR CRASHED: {e}')
if __name__ == '__main__':
START_TIME: int = round(number=datetime.now().timestamp()*1000)
logging.info(msg=f'Log FilePath: {LOG_FILEPATH}')
logging.basicConfig(
force=True,
filename=LOG_FILEPATH,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
logging.info(msg=f"STARTED: {START_TIME}")
asyncio.run(main())