Files
Funding_Rate/modules/utils.py

28 lines
956 B
Python

import logging
from dotenv import load_dotenv
import requests
import os
load_dotenv()
def upsert_list_of_dicts_by_id(list_of_dicts, new_dict, id='id', seq_check_field: str | None = None):
for index, item in enumerate(list_of_dicts):
if item.get(id) == new_dict.get(id):
if seq_check_field is not None:
if item.get(seq_check_field) > new_dict.get(seq_check_field):
logging.info('Skipping out of sequence msg')
return list_of_dicts
list_of_dicts[index] = new_dict
return list_of_dicts
list_of_dicts.append(new_dict)
return list_of_dicts
def send_tg_alert(msg: str):
token = os.getenv("TG_TOKEN")
chat_id = os.getenv("TG_ALERTS_CHAT_ID")
url = f'https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}'
response = requests.post(url, json={'text': str(str(msg)[:250])}, timeout=10)
return response.json()