import asyncio import json import logging import os import traceback from datetime import datetime import time import valkey from dotenv import load_dotenv import modules.utils as utils import multiprocessing as mp import threading from typing import Any ''' TO DO: - Insert config changes into database for analysis later / general tracking ''' LOCAL_ORDERS: list = [] ### Database ### VK_IN: str = 'fr_engine_orders_input' VK_OUT: str = 'fr_engine_orders_output' VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True) ### Logging ### load_dotenv() LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Engine_Orders.log' ### Main Listener for Order Requests (e.g. from Algo Engine) ### def receive_orders(): global LOCAL_ORDERS VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub() VK_PUBSUB.subscribe(VK_IN) for message in VK_PUBSUB.listen(): loop_start = time.time() if message['type'] == 'message': ts_arrival: int = round(number=datetime.now().timestamp()*1000) # Receive Update Msg from PubSub data: dict = json.loads(s=message['data']) print(data) LOCAL_ORDERS.append(data) print(f'__ Rec Orders: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}') ### Listeners - Aster ### def receive_position_updates_aster(): VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub() VK_PUBSUB.subscribe('fr_aster_user_positions') for message in VK_PUBSUB.listen(): loop_start = time.time() if message['type'] == 'message': ts_arrival: int = round(number=datetime.now().timestamp()*1000) # Receive Update Msg from PubSub data: dict = json.loads(s=message['data']) print(data) print(f'__ Aster Notional: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}') def receive_order_updates_aster(): VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub() VK_PUBSUB.subscribe('fr_aster_user_orders') for message in VK_PUBSUB.listen(): loop_start = time.time() if message['type'] == 'message': # ts_arrival: int = round(number=datetime.now().timestamp()*1000) # Receive Update Msg from PubSub data: dict = json.loads(s=message['data']) print(data) print(f'__ Aster Orders: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}') ### Listeners - Extend ### def receive_position_updates_extend(): VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub() VK_PUBSUB.subscribe('fr_extended_user_positions') for message in VK_PUBSUB.listen(): loop_start = time.time() if message['type'] == 'message': ts_arrival: int = round(number=datetime.now().timestamp()*1000) # Receive Update Msg from PubSub data: dict = json.loads(s=message['data']) print(data) print(f'__ Aster Notional: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}') def receive_order_updates_extend(): VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub() VK_PUBSUB.subscribe('fr_extended_user_orders') for message in VK_PUBSUB.listen(): loop_start = time.time() if message['type'] == 'message': # ts_arrival: int = round(number=datetime.now().timestamp()*1000) # Receive Update Msg from PubSub data: dict = json.loads(s=message['data']) print(data) print(f'__ Aster Orders: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}') async def main() -> None: global LOCAL_ORDERS try: t_rec_orders = threading.Thread(target=receive_orders) t_rec_orders.daemon = True t_rec_orders.start() # while True: # print(f"Subscribed to '{VK_IN}'. Waiting for messages...") # print(f'LOCAL_ORDERS: {LOCAL_ORDERS}') # aster_position_updates: Any = VAL_KEY.get(name=VK_POS_ASTER) # aster_position_updates: list = json.loads(s=aster_position_updates) if aster_position_updates is not None else [] # print(f'Aster Pos Updates: {aster_position_updates}') # time.sleep(5) except valkey.exceptions.ConnectionError as e: logging.info(msg=f"Could not connect to Valkey. Please check the publish server is up; {e}") except KeyboardInterrupt: logging.info(msg='ORCHESTRATOR SHUTTING DOWN...') except Exception as e: logging.error(msg=traceback.format_exc()) logging.critical(msg=f'*** ORCHESTRATOR CRASHED: {e}') if __name__ == '__main__': START_TIME: int = round(number=datetime.now().timestamp()*1000) logging.info(msg=f'Log FilePath: {LOG_FILEPATH}') logging.basicConfig( force=True, filename=LOG_FILEPATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w' ) logging.info(msg=f"STARTED: {START_TIME}") asyncio.run(main())