import asyncio
import aiohttp
import struct
from typing import Optional, Dict, List, Any
import time
import json
import zstandard as zstd
import msgpack
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
class SpotSnapshotClient:
def __init__(self):
# Use environment variables with fallbacks
self.base_url = os.getenv('HYDROMANCER_API_URL', 'https://api.hydromancer.xyz')
self.api_key = os.getenv('HYDROMANCER_API_KEY')
if not self.api_key:
raise ValueError("API key not found. Please set HYDROMANCER_API_KEY in .env file")
self.cached_timestamp: Optional[str] = None
self.cached_snapshots: Dict[str, Any] = {}
self.session: Optional[aiohttp.ClientSession] = None
self.output_file = "spot_snapshot.json"
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def check_for_updates(self) -> bool:
"""Check if snapshots have been updated since last poll"""
if not self.session:
raise RuntimeError("Client session not initialized")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
start_time = time.time()
async with self.session.post(
f'{self.base_url}/info',
json={'type': 'spotSnapshotTimestamp'},
headers=headers
) as response:
fetch_time = time.time() - start_time
if not response.ok:
text = await response.text()
raise Exception(f'Timestamp request failed: {response.status} - {text}')
data = await response.json()
current_timestamp = data.get('timestamp', data.get('snapshot_id'))
# Check if this is a new snapshot
has_updates = self.cached_timestamp != current_timestamp
if has_updates:
print(f'📈 Spot snapshot updated: {self.cached_timestamp} -> {current_timestamp} (fetched in {fetch_time:.3f}s)')
self.cached_timestamp = current_timestamp
else:
print(f'⏳ No spot snapshot updates (timestamp: {current_timestamp}, checked in {fetch_time:.3f}s)')
return has_updates
async def poll_snapshots(self, tokens: List[str] = ["UBTC", "PUP"]) -> Dict[str, Any]:
"""
Efficiently poll for spot snapshots
Only downloads data when snapshots have been updated
"""
try:
# Step 1: Check if snapshots have been updated
has_updates = await self.check_for_updates()
if not has_updates:
print('Waiting for next update, using cached spot data')
return self.cached_snapshots
# Step 2: Download new snapshots
print('New spot snapshots available, downloading...')
snapshots = await self.download_snapshots(tokens)
# Update cache
self.cached_snapshots = snapshots
# Save to file
self.save_to_file(snapshots)
return snapshots
except Exception as error:
print(f'Error polling spot snapshots: {error}')
raise
async def download_snapshots(self, tokens: List[str]) -> Dict[str, Any]:
"""Download spot market snapshots"""
if not self.session:
raise RuntimeError("Client session not initialized")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
start_time = time.time()
async with self.session.post(
f'{self.base_url}/info',
json={'type': 'spotSnapshots', 'tokens': tokens},
headers=headers
) as response:
fetch_time = time.time() - start_time
if not response.ok:
text = await response.text()
raise Exception(f'Spot snapshots request failed: {response.status} - {text}')
payload_format = response.headers.get('x-payload-format')
binary_data = await response.read()
# Format file size in human readable format
def format_bytes(bytes_size):
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes_size < 1024.0:
return f"{bytes_size:.2f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.2f} TB"
readable_size = format_bytes(len(binary_data))
print(f"📦 Downloaded {readable_size} ({len(binary_data):,} bytes) in {fetch_time:.3f}s")
print(f"🔧 Payload format: {payload_format}")
if payload_format == 'multi-zstd':
return self.parse_multiple_tokens(binary_data)
else:
return self.parse_single_snapshot(binary_data)
def parse_single_snapshot(self, binary_data: bytes) -> Dict[str, Any]:
"""Parse single compressed msgpack response"""
# Decompress zstd
decompressed = self.decompress_zstd(binary_data)
# Decode msgpack
data = self.decode_msgpack(decompressed)
# Handle single snapshot response
if isinstance(data, list) and len(data) >= 4:
# Spot format: [identifier, token, balances, addresses]
token = data[1]
return {
token: {
'identifier': data[0],
'token': token,
'balances': data[2],
'addresses': data[3],
'timestamp': self.cached_timestamp
}
}
return data
def parse_multiple_tokens(self, binary_data: bytes) -> Dict[str, Any]:
"""Parse multiple tokens response (multi-zstd format)"""
offset = 0
# Read number of snapshots (4 bytes, little-endian)
if len(binary_data) < 4:
raise ValueError(f"Data too short: {len(binary_data)} bytes")
count = struct.unpack('<I', binary_data[offset:offset + 4])[0]
offset += 4
print(f"Number of spot snapshots: {count}")
tokens = {}
for i in range(count):
# Read length of this snapshot (4 bytes, little-endian)
if offset + 4 > len(binary_data):
raise ValueError(f"Unexpected end of data at offset {offset}")
length = struct.unpack('<I', binary_data[offset:offset + 4])[0]
offset += 4
print(f"Spot snapshot {i}: {length} bytes")
# Extract and decompress snapshot
if offset + length > len(binary_data):
raise ValueError(f"Data too short for snapshot {i}")
zstd_data = binary_data[offset:offset + length]
try:
decompressed = self.decompress_zstd(zstd_data)
snapshot = self.decode_msgpack(decompressed)
# Handle list format
if isinstance(snapshot, list) and len(snapshot) >= 4:
# Spot format: [identifier, token, balances, addresses]
identifier = snapshot[0]
token_name = snapshot[1]
balances = snapshot[2] # Array of [balance, entry_value]
addresses = snapshot[3]
# Store as a structured dict
tokens[token_name] = {
'identifier': identifier,
'token': token_name,
'balances': balances,
'addresses': addresses,
'timestamp': self.cached_timestamp
}
print(f"Parsed spot token: {token_name} with {len(balances) if isinstance(balances, list) else 0} balances")
else:
# Fallback for unexpected format
key = f'token_{i}'
tokens[key] = snapshot
print(f"Unexpected format for token {i}: {type(snapshot)}")
except Exception as e:
print(f"Error parsing spot snapshot {i}: {e}")
# Continue with next snapshot
offset += length
return tokens
def decompress_zstd(self, data: bytes) -> bytes:
"""Decompress zstd data"""
try:
dctx = zstd.ZstdDecompressor()
return dctx.decompress(data)
except Exception as e:
print(f"Decompression error: {e}")
raise
def decode_msgpack(self, data: bytes) -> Any:
"""Decode MessagePack data"""
return msgpack.unpackb(data, raw=False)
def save_to_file(self, snapshots: Dict[str, Any]):
"""Save snapshots to JSON file"""
# Convert balances to a more readable format
output_data = {
'timestamp': self.cached_timestamp,
'tokens': {}
}
for token, data in snapshots.items():
if isinstance(data, dict):
balances = data.get('balances', [])
addresses = data.get('addresses', [])
# Create list of balance objects
balance_list = []
for i, (balance_entry, address) in enumerate(zip(balances, addresses)):
if isinstance(balance_entry, list) and len(balance_entry) >= 2:
balance_list.append({
'address': address,
'balance': balance_entry[0],
'entry_value': balance_entry[1]
})
output_data['tokens'][token] = {
'identifier': data.get('identifier'),
'total_balances': len(balances),
'balances': balance_list
}
with open(self.output_file, 'w') as f:
json.dump(output_data, f, indent=2)
print(f"Saved spot snapshot to {self.output_file}")
def parse_spot_balance(self, balance: List[float]) -> Dict[str, float]:
"""Parse a spot balance array into a dictionary"""
if not isinstance(balance, list) or len(balance) < 2:
return {}
return {
'balance': balance[0],
'entry_value': balance[1]
}
async def main():
"""Poll spot snapshots every 5 seconds, only download when updated"""
async with SpotSnapshotClient() as client:
while True:
try:
# Poll for updates
snapshots = await client.poll_snapshots(["UBTC", "PUP"])
# Print summary
for token, data in snapshots.items():
if isinstance(data, dict):
balances = data.get('balances', [])
# print(f"Spot Token {token}: {len(balances)} balances")
# Wait 5 seconds before next check
# print("Waiting 5 seconds before next timestamp check...")
await asyncio.sleep(5)
except Exception as error:
print(f'Polling failed: {error}')
await asyncio.sleep(5) # Wait before retrying
if __name__ == "__main__":
asyncio.run(main())