129 lines
4.1 KiB
Python
129 lines
4.1 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import traceback
|
|
from datetime import datetime
|
|
import time
|
|
import valkey
|
|
from dotenv import load_dotenv
|
|
import modules.utils as utils
|
|
import modules.structs as structs
|
|
from pydantic import BaseModel
|
|
import docker
|
|
|
|
### Database ###
|
|
VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
|
|
DOCKER = docker.from_env()
|
|
|
|
### Logging ###
|
|
load_dotenv()
|
|
LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Engine_Health.log'
|
|
|
|
### CONSTANTS ###
|
|
MAX_TIME_SINCE_LAST_UPDATE_MS: int = 1000 * 60 * 3 # 1000 x 60 sec x [minutes]
|
|
LOOP_SLEEP_SEC: int = 5
|
|
|
|
### Globals ###
|
|
|
|
### Structs ###
|
|
class Health_Status(BaseModel):
|
|
status: str # ENUM: 'HEALTHY' | 'UNHEALTHY' | 'DEAD'
|
|
timestamp: int
|
|
vk_objs: list[structs.VK_Obj]
|
|
|
|
async def get_algo_working_symbol() -> str:
|
|
vk_get: str = VAL_KEY.get(name='fr_algo_working_symbol') # ty:ignore[invalid-assignment]
|
|
d = json.loads(vk_get)
|
|
algo_symbol: str = d.get('EXTEND', {}).get('symbol', '')
|
|
|
|
return algo_symbol
|
|
|
|
async def main() -> None:
|
|
vk_objs = [
|
|
structs.VK_Orchestrator_Output(),
|
|
structs.VK_Working_Symbol(),
|
|
structs.VK_User_Orders_Extend(),
|
|
structs.VK_User_Trades_Extend(),
|
|
structs.VK_User_Balances_Aster(),
|
|
structs.VK_User_Balances_Extend(),
|
|
structs.VK_User_Positions_Aster(),
|
|
structs.VK_User_Positions_Extend(),
|
|
structs.VK_FR_Aster(),
|
|
structs.VK_FR_All_Aster(),
|
|
structs.VK_FR_Extend(),
|
|
structs.VK_FR_All_Extend(),
|
|
structs.VK_Ticker_Aster(),
|
|
structs.VK_Ticker_Extend(),
|
|
structs.VK_Trade_Aster(),
|
|
structs.VK_Trade_Extend(),
|
|
]
|
|
|
|
health_status = Health_Status(
|
|
status = 'HEALTHY',
|
|
timestamp = round(number=datetime.now().timestamp()*1000),
|
|
vk_objs = vk_objs, # ty:ignore[invalid-argument-type]
|
|
)
|
|
|
|
try:
|
|
while True:
|
|
algo_symbol = await get_algo_working_symbol()
|
|
health_status.timestamp = round(number=datetime.now().timestamp()*1000)
|
|
|
|
for o in health_status.vk_objs:
|
|
vk_symbol = o.data.get('symbol') if isinstance(o.data, dict) else None
|
|
await o.checks.run_checks(args={
|
|
'timestamp': health_status.timestamp,
|
|
'algo_symbol': algo_symbol,
|
|
'vk_symbol': vk_symbol,
|
|
})
|
|
|
|
vk_statuses = [o.status for o in health_status.vk_objs]
|
|
if 'DEAD' in vk_statuses:
|
|
health_status.status = 'DEAD'
|
|
elif 'UNHEALTHY' in vk_statuses:
|
|
health_status.status = 'UNHEALTHY'
|
|
else:
|
|
health_status.status = 'HEALTHY'
|
|
|
|
if health_status.status != 'HEALTHY':
|
|
all_containers = DOCKER.containers.list(all=True)
|
|
|
|
for c in all_containers:
|
|
if c.status == 'running':
|
|
logging.warning(f"stopping: ID: {c.id}, Name: {c.name}, Status: {c.status}")
|
|
container = DOCKER.containers.get(c.id)
|
|
container.stop(timeout=10)
|
|
|
|
logging.info('Stopped all containers')
|
|
|
|
# VAL_KEY.set(name='health_status', value=json.dumps(obj=(health_status)))
|
|
|
|
logging.info(vk_statuses)
|
|
|
|
if LOOP_SLEEP_SEC > 0:
|
|
time.sleep(LOOP_SLEEP_SEC)
|
|
|
|
continue
|
|
except KeyboardInterrupt:
|
|
logging.info(msg='ORCHESTRATOR SHUTTING DOWN...')
|
|
except Exception as e:
|
|
logging.error(msg=traceback.format_exc())
|
|
logging.critical(msg=f'*** ORCHESTRATOR CRASHED: {e}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
START_TIME: int = round(number=datetime.now().timestamp()*1000)
|
|
|
|
logging.info(msg=f'Log FilePath: {LOG_FILEPATH}')
|
|
|
|
logging.basicConfig(
|
|
force=True,
|
|
filename=LOG_FILEPATH,
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
filemode='w'
|
|
)
|
|
logging.info(msg=f"STARTED: {START_TIME}")
|
|
|
|
asyncio.run(main()) |