excalidraw flow and basic rust ws
This commit is contained in:
9
.gitignore
vendored
9
.gitignore
vendored
@@ -1,2 +1,9 @@
|
|||||||
|
# General
|
||||||
.env
|
.env
|
||||||
*.pyc
|
|
||||||
|
# Python
|
||||||
|
*.pyc
|
||||||
|
|
||||||
|
# Rust
|
||||||
|
/rust_test/test_world/target/
|
||||||
|
Cargo.lock
|
||||||
@@ -12,6 +12,8 @@ import multiprocessing as mp
|
|||||||
import threading
|
import threading
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
TO DO:
|
TO DO:
|
||||||
- Insert config changes into database for analysis later / general tracking
|
- Insert config changes into database for analysis later / general tracking
|
||||||
@@ -20,7 +22,6 @@ TO DO:
|
|||||||
LOCAL_ORDERS: list = []
|
LOCAL_ORDERS: list = []
|
||||||
|
|
||||||
### Database ###
|
### Database ###
|
||||||
VK_POS_ASTER: str = 'fr_aster_user_positions'
|
|
||||||
VK_IN: str = 'fr_engine_orders_input'
|
VK_IN: str = 'fr_engine_orders_input'
|
||||||
VK_OUT: str = 'fr_engine_orders_output'
|
VK_OUT: str = 'fr_engine_orders_output'
|
||||||
VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
|
VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode_responses=True)
|
||||||
@@ -29,23 +30,30 @@ VAL_KEY: valkey.Valkey = valkey.Valkey(host='localhost', port=6379, db=0, decode
|
|||||||
load_dotenv()
|
load_dotenv()
|
||||||
LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Engine_Orders.log'
|
LOG_FILEPATH: str = f'{os.getenv("LOGS_PATH")}/Fund_Rate_Engine_Orders.log'
|
||||||
|
|
||||||
|
### Main Listener for Order Requests (e.g. from Algo Engine) ###
|
||||||
# async def work_order(order: dict) -> dict:
|
|
||||||
# LOCAL_ORDERS.append(order)
|
|
||||||
# while LOCAL_ORDERS:
|
|
||||||
# print(f'working order...{order}')
|
|
||||||
# time.sleep(5)
|
|
||||||
# print(f'{order}...order posted/replaced/etc')
|
|
||||||
|
|
||||||
# VAL_KEY.set(name=VK_OUT, value=json.dumps(order))
|
|
||||||
# return order
|
|
||||||
|
|
||||||
def receive_orders():
|
def receive_orders():
|
||||||
global LOCAL_ORDERS
|
global LOCAL_ORDERS
|
||||||
|
|
||||||
VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub()
|
VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub()
|
||||||
VK_PUBSUB.subscribe(VK_IN)
|
VK_PUBSUB.subscribe(VK_IN)
|
||||||
for message in VK_PUBSUB.listen():
|
for message in VK_PUBSUB.listen():
|
||||||
|
loop_start = time.time()
|
||||||
|
if message['type'] == 'message':
|
||||||
|
ts_arrival: int = round(number=datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
|
# Receive Update Msg from PubSub
|
||||||
|
data: dict = json.loads(s=message['data'])
|
||||||
|
print(data)
|
||||||
|
LOCAL_ORDERS.append(data)
|
||||||
|
|
||||||
|
print(f'__ Rec Orders: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}')
|
||||||
|
|
||||||
|
### Listeners - Aster ###
|
||||||
|
def receive_position_updates_aster():
|
||||||
|
VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub()
|
||||||
|
VK_PUBSUB.subscribe('fr_aster_user_positions')
|
||||||
|
for message in VK_PUBSUB.listen():
|
||||||
|
loop_start = time.time()
|
||||||
if message['type'] == 'message':
|
if message['type'] == 'message':
|
||||||
ts_arrival: int = round(number=datetime.now().timestamp()*1000)
|
ts_arrival: int = round(number=datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
@@ -53,26 +61,58 @@ def receive_orders():
|
|||||||
data: dict = json.loads(s=message['data'])
|
data: dict = json.loads(s=message['data'])
|
||||||
print(data)
|
print(data)
|
||||||
|
|
||||||
# def receive_notional_updates():
|
print(f'__ Aster Notional: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}')
|
||||||
# VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub()
|
|
||||||
# VK_PUBSUB.subscribe(VK_IN)
|
def receive_order_updates_aster():
|
||||||
# for message in VK_PUBSUB.listen():
|
VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub()
|
||||||
# if message['type'] == 'message':
|
VK_PUBSUB.subscribe('fr_aster_user_orders')
|
||||||
# ts_arrival: int = round(number=datetime.now().timestamp()*1000)
|
for message in VK_PUBSUB.listen():
|
||||||
|
loop_start = time.time()
|
||||||
|
if message['type'] == 'message':
|
||||||
|
# ts_arrival: int = round(number=datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
# # Receive Update Msg from PubSub
|
# Receive Update Msg from PubSub
|
||||||
# data: dict = json.loads(s=message['data'])
|
data: dict = json.loads(s=message['data'])
|
||||||
# print(data)
|
print(data)
|
||||||
# LOCAL_ORDERS.append(data)
|
|
||||||
|
print(f'__ Aster Orders: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}')
|
||||||
|
|
||||||
|
### Listeners - Extend ###
|
||||||
|
def receive_position_updates_extend():
|
||||||
|
VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub()
|
||||||
|
VK_PUBSUB.subscribe('fr_extended_user_positions')
|
||||||
|
for message in VK_PUBSUB.listen():
|
||||||
|
loop_start = time.time()
|
||||||
|
if message['type'] == 'message':
|
||||||
|
ts_arrival: int = round(number=datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
|
# Receive Update Msg from PubSub
|
||||||
|
data: dict = json.loads(s=message['data'])
|
||||||
|
print(data)
|
||||||
|
|
||||||
|
print(f'__ Aster Notional: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}')
|
||||||
|
|
||||||
|
def receive_order_updates_extend():
|
||||||
|
VK_PUBSUB: valkey.client.PubSub = VAL_KEY.pubsub()
|
||||||
|
VK_PUBSUB.subscribe('fr_extended_user_orders')
|
||||||
|
for message in VK_PUBSUB.listen():
|
||||||
|
loop_start = time.time()
|
||||||
|
if message['type'] == 'message':
|
||||||
|
# ts_arrival: int = round(number=datetime.now().timestamp()*1000)
|
||||||
|
|
||||||
|
# Receive Update Msg from PubSub
|
||||||
|
data: dict = json.loads(s=message['data'])
|
||||||
|
print(data)
|
||||||
|
|
||||||
|
print(f'__ Aster Orders: Loop End __ - Algo Engine ms: {(time.time() - loop_start)*1000:.2f}')
|
||||||
|
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
global LOCAL_ORDERS
|
global LOCAL_ORDERS
|
||||||
|
|
||||||
try:
|
try:
|
||||||
thread = threading.Thread(target=receive_orders)
|
t_rec_orders = threading.Thread(target=receive_orders)
|
||||||
thread.daemon = True
|
t_rec_orders.daemon = True
|
||||||
# thread.start()
|
t_rec_orders.start()
|
||||||
thread.join()
|
|
||||||
|
|
||||||
|
|
||||||
# while True:
|
# while True:
|
||||||
|
|||||||
2696
excalidraw/FR_Flow.excalidraw
Normal file
2696
excalidraw/FR_Flow.excalidraw
Normal file
File diff suppressed because it is too large
Load Diff
13
rust_test/test_world/Cargo.toml
Normal file
13
rust_test/test_world/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
[package]
|
||||||
|
name = "test_world"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
tokio-tungstenite = { version = "0.24", features = ["native-tls"] }
|
||||||
|
futures-util = "0.3"
|
||||||
|
url = "2"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0"
|
||||||
|
redis = "0.25.0"
|
||||||
70
rust_test/test_world/src/main.rs
Normal file
70
rust_test/test_world/src/main.rs
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use url::Url;
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use redis::Commands;
|
||||||
|
|
||||||
|
// Aster Book Ticker (TOB)
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
pub struct BookTickerResponse {
|
||||||
|
pub stream: String,
|
||||||
|
pub data: BookTickerData,
|
||||||
|
}
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
pub struct BookTickerData {
|
||||||
|
#[serde(rename(deserialize = "e"))]
|
||||||
|
pub event_type: String,
|
||||||
|
#[serde(rename(deserialize = "u"))]
|
||||||
|
pub update_id: u64,
|
||||||
|
#[serde(rename(deserialize = "s"))]
|
||||||
|
pub symbol: String,
|
||||||
|
#[serde(rename(deserialize = "b"))]
|
||||||
|
pub best_bid_price: String,
|
||||||
|
#[serde(rename(deserialize = "B"))]
|
||||||
|
pub best_bid_qty: String,
|
||||||
|
#[serde(rename(deserialize = "a"))]
|
||||||
|
pub best_ask_price: String,
|
||||||
|
#[serde(rename(deserialize = "A"))]
|
||||||
|
pub best_ask_qty: String,
|
||||||
|
#[serde(rename(deserialize = "T"))]
|
||||||
|
pub transaction_time: u64,
|
||||||
|
#[serde(rename(deserialize = "E"))]
|
||||||
|
pub event_time: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
// Connect to a local Valkey instance
|
||||||
|
let vk_client = redis::Client::open("redis://localhost:6379/0").unwrap();
|
||||||
|
let mut con = vk_client.get_connection().unwrap();
|
||||||
|
|
||||||
|
let url = Url::parse("wss://fstream.asterdex.com/stream?streams=btcusdt@bookTicker").unwrap();
|
||||||
|
let (ws_stream, _) = connect_async(url.as_str()).await.expect("Failed to connect");
|
||||||
|
println!("WebSocket client connected");
|
||||||
|
|
||||||
|
let (_, mut read) = ws_stream.split();
|
||||||
|
|
||||||
|
while let Some(msg) = read.next().await {
|
||||||
|
match msg {
|
||||||
|
Ok(Message::Text(text)) => {
|
||||||
|
let parsed: BookTickerResponse = serde_json::from_str(&text).expect("Failed to parse JSON");
|
||||||
|
println!("Symbol: {} - Bid: {}", parsed.data.symbol, parsed.data.best_bid_price);
|
||||||
|
let serialized: String = serde_json::to_string(&parsed).map_err(|_| "Serialization failed").expect("Failed to serialize struct");
|
||||||
|
// println!("{:?}", serialized);
|
||||||
|
let _: () = con.set("test_key", serialized).unwrap();
|
||||||
|
},
|
||||||
|
Ok(Message::Binary(bin)) => println!("[binary] {} bytes", bin.len()),
|
||||||
|
Ok(Message::Ping(_)) => println!("[ping]"),
|
||||||
|
Ok(Message::Pong(_)) => println!("[pong]"),
|
||||||
|
Ok(Message::Close(frame)) => {
|
||||||
|
println!("[close] {:?}", frame);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(Message::Frame(_)) => {}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("[error] {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -144,6 +144,7 @@ async def ws_stream():
|
|||||||
Local_Recent_Orders = [t for t in Local_Recent_Orders if t.get('timestamp_arrival', 0) >= lookback_min_ts_ms]
|
Local_Recent_Orders = [t for t in Local_Recent_Orders if t.get('timestamp_arrival', 0) >= lookback_min_ts_ms]
|
||||||
|
|
||||||
VAL_KEY_OBJ: str = json.dumps(obj=Local_Recent_Orders)
|
VAL_KEY_OBJ: str = json.dumps(obj=Local_Recent_Orders)
|
||||||
|
VAL_KEY.publish(channel=VK_ORDERS_TRADES, message=VAL_KEY_OBJ)
|
||||||
VAL_KEY.set(name=VK_ORDERS_TRADES, value=VAL_KEY_OBJ)
|
VAL_KEY.set(name=VK_ORDERS_TRADES, value=VAL_KEY_OBJ)
|
||||||
|
|
||||||
await db.insert_df_to_mysql(table_name='fr_aster_user_order_trade', params=new_order_update, CON=CON)
|
await db.insert_df_to_mysql(table_name='fr_aster_user_order_trade', params=new_order_update, CON=CON)
|
||||||
@@ -220,6 +221,7 @@ async def ws_stream():
|
|||||||
list_for_df_pos.append(position_update)
|
list_for_df_pos.append(position_update)
|
||||||
Local_Recent_Positions = utils.upsert_list_of_dicts_by_id(Local_Recent_Positions, position_update, id='symbol', seq_check_field='timestamp_msg')
|
Local_Recent_Positions = utils.upsert_list_of_dicts_by_id(Local_Recent_Positions, position_update, id='symbol', seq_check_field='timestamp_msg')
|
||||||
Local_Recent_Positions = [t for t in Local_Recent_Positions if t.get('timestamp_arrival', 0) >= lookback_min_ts_ms]
|
Local_Recent_Positions = [t for t in Local_Recent_Positions if t.get('timestamp_arrival', 0) >= lookback_min_ts_ms]
|
||||||
|
VAL_KEY.publish(channel=VK_POSITIONS, message=json.dumps(obj=Local_Recent_Positions))
|
||||||
VAL_KEY.set(name=VK_POSITIONS, value=json.dumps(obj=Local_Recent_Positions))
|
VAL_KEY.set(name=VK_POSITIONS, value=json.dumps(obj=Local_Recent_Positions))
|
||||||
if list_for_df_bal:
|
if list_for_df_bal:
|
||||||
await db.insert_df_to_mysql(table_name='fr_aster_user_account_bal', params=list_for_df_bal, CON=CON)
|
await db.insert_df_to_mysql(table_name='fr_aster_user_account_bal', params=list_for_df_bal, CON=CON)
|
||||||
|
|||||||
@@ -105,7 +105,8 @@ async def ws_stream():
|
|||||||
LOCAL_RECENT_ORDERS = [t for t in LOCAL_RECENT_ORDERS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
|
LOCAL_RECENT_ORDERS = [t for t in LOCAL_RECENT_ORDERS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
|
||||||
|
|
||||||
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_ORDERS)
|
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_ORDERS)
|
||||||
VAL_KEY.set(VK_ORDERS, VAL_KEY_OBJ)
|
VAL_KEY.publish(channel=VK_ORDERS, message=VAL_KEY_OBJ)
|
||||||
|
VAL_KEY.set(name=VK_ORDERS, value=VAL_KEY_OBJ)
|
||||||
await db.insert_df_to_mysql(table_name='fr_extended_user_order', params=list_for_df, CON=CON)
|
await db.insert_df_to_mysql(table_name='fr_extended_user_order', params=list_for_df, CON=CON)
|
||||||
continue
|
continue
|
||||||
case 'TRADE':
|
case 'TRADE':
|
||||||
@@ -197,7 +198,8 @@ async def ws_stream():
|
|||||||
LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
|
LOCAL_RECENT_POSITIONS = [t for t in LOCAL_RECENT_POSITIONS if t.get('timestamp_arrival', 0) >= LOOKBACK_MIN_TS_MS]
|
||||||
|
|
||||||
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_POSITIONS)
|
VAL_KEY_OBJ = json.dumps(LOCAL_RECENT_POSITIONS)
|
||||||
VAL_KEY.set(VK_POSITIONS, VAL_KEY_OBJ)
|
VAL_KEY.publish(channel=VK_POSITIONS, message=VAL_KEY_OBJ)
|
||||||
|
VAL_KEY.set(name=VK_POSITIONS, value=VAL_KEY_OBJ)
|
||||||
await db.insert_df_to_mysql(table_name='fr_extended_user_position', params=list_for_df, CON=CON)
|
await db.insert_df_to_mysql(table_name='fr_extended_user_position', params=list_for_df, CON=CON)
|
||||||
continue
|
continue
|
||||||
case _:
|
case _:
|
||||||
|
|||||||
Reference in New Issue
Block a user