109 lines
3.0 KiB
Python
109 lines
3.0 KiB
Python
|
|
import requests
|
||
|
|
from dotenv import load_dotenv
|
||
|
|
import os
|
||
|
|
import time
|
||
|
|
import threading
|
||
|
|
import urllib
|
||
|
|
|
||
|
|
from eth_account.messages import encode_typed_data
|
||
|
|
from eth_account import Account
|
||
|
|
|
||
|
|
load_dotenv()
|
||
|
|
|
||
|
|
user = os.getenv("RABBY_WALLET")
|
||
|
|
signer = os.getenv("ASTER_API_WALLET_ADDRESS")
|
||
|
|
private_key = os.getenv("ASTER_API_PRIVATE_KEY")
|
||
|
|
|
||
|
|
_last_ms = 0
|
||
|
|
_i = 0
|
||
|
|
|
||
|
|
def post_authenticated_url(req: dict) -> dict:
|
||
|
|
typed_data = {
|
||
|
|
"types": {
|
||
|
|
"EIP712Domain": [
|
||
|
|
{"name": "name", "type": "string"},
|
||
|
|
{"name": "version", "type": "string"},
|
||
|
|
{"name": "chainId", "type": "uint256"},
|
||
|
|
{"name": "verifyingContract", "type": "address"}
|
||
|
|
],
|
||
|
|
"Message": [
|
||
|
|
{ "name": "msg", "type": "string" }
|
||
|
|
]
|
||
|
|
},
|
||
|
|
"primaryType": "Message",
|
||
|
|
"domain": {
|
||
|
|
"name": "AsterSignTransaction",
|
||
|
|
"version": "1",
|
||
|
|
"chainId": 1666,
|
||
|
|
"verifyingContract": "0x0000000000000000000000000000000000000000"
|
||
|
|
},
|
||
|
|
"message": {
|
||
|
|
"msg": "$msg"
|
||
|
|
}
|
||
|
|
}
|
||
|
|
headers = {
|
||
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
||
|
|
'User-Agent': 'PythonApp/1.0'
|
||
|
|
}
|
||
|
|
host = 'https://fapi.asterdex.com'
|
||
|
|
|
||
|
|
|
||
|
|
def get_nonce():
|
||
|
|
_nonce_lock = threading.Lock()
|
||
|
|
global _last_ms, _i
|
||
|
|
with _nonce_lock:
|
||
|
|
now_ms = int(time.time())
|
||
|
|
|
||
|
|
if now_ms == _last_ms:
|
||
|
|
_i += 1
|
||
|
|
else:
|
||
|
|
_last_ms = now_ms
|
||
|
|
_i = 0
|
||
|
|
|
||
|
|
return now_ms * 1_000_000 + _i
|
||
|
|
|
||
|
|
def sign_typed_data(data: dict, private_key: str):
|
||
|
|
"""Sign EIP-712 typed data using encode_typed_data."""
|
||
|
|
message = encode_typed_data(
|
||
|
|
domain_data=data["domain"],
|
||
|
|
message_types={"Message": data["types"]["Message"]},
|
||
|
|
message_data=data["message"],
|
||
|
|
)
|
||
|
|
return Account.sign_message(message, private_key=private_key)
|
||
|
|
|
||
|
|
def send_by_url(req):
|
||
|
|
my_dict = req['params'].copy()
|
||
|
|
url = host + req['url']
|
||
|
|
method = req['method']
|
||
|
|
|
||
|
|
my_dict['nonce'] = str(get_nonce())
|
||
|
|
my_dict['user'] = user
|
||
|
|
my_dict['signer'] = signer
|
||
|
|
|
||
|
|
param = urllib.parse.urlencode(my_dict)
|
||
|
|
|
||
|
|
typed_data['message']['msg'] = param
|
||
|
|
signed = sign_typed_data(typed_data, private_key)
|
||
|
|
|
||
|
|
full_url = url + '?' + param + '&signature=' + signed.signature.hex()
|
||
|
|
# print(full_url)
|
||
|
|
|
||
|
|
if method == 'GET':
|
||
|
|
res = requests.get(full_url, headers=headers)
|
||
|
|
# print(res.status_code, res.text)
|
||
|
|
return res.json()
|
||
|
|
elif method == 'POST':
|
||
|
|
res = requests.post(full_url, headers=headers)
|
||
|
|
# print(res.status_code, res.text)
|
||
|
|
return res.json()
|
||
|
|
elif method == 'PUT':
|
||
|
|
res = requests.put(full_url, headers=headers)
|
||
|
|
# print(res.status_code, res.text)
|
||
|
|
return res.json()
|
||
|
|
elif method == 'DELETE':
|
||
|
|
res = requests.delete(full_url, headers=headers)
|
||
|
|
# print(res.status_code, res.text)
|
||
|
|
return res.json()
|
||
|
|
|
||
|
|
return send_by_url(req=req)
|