# spotSnapshot

{% hint style="info" %}
💧**New endpoint - this endpoint is not a part of original Hyperliquid API and is added by us for builder convenience.**
{% endhint %}

{% hint style="warning" %}

### ⚠️ This is an add-on endpoint - access has to be purchased separately.

{% endhint %}

### Overview

spotSnapshot endpoint allows you to access all balances of Hypercore users that hold spot assets.

There are two ways to access the data:

1. **Metadata Endpoint (Fast)**: Check for updates without downloading data
2. **Snapshots Endpoint (Heavy)**: Download actual market snapshot data

<details>

<summary>Efficient polling pattern</summary>

To minimize bandwidth and server load, you should:

1. Poll the metadata endpoint to check for updates
2. Only call the snapshots endpoint when data has changed
3. Cache the snapshot ID locally to avoid unnecessary downloads

</details>

<details>

<summary>Dependencies and best practices</summary>

#### Dependencies

**JavaScript**

* **zstd**: For decompression (e.g., `fzstd`, `@gera2ld/zstd`)
* **msgpack**: For decoding (e.g., `@msgpack/msgpack`)

**Python**

* **zstd**: For decompression (e.g., `zstandard`)
* **msgpack**: For decoding (e.g., `msgpack`)
* **aiohttp**: For async HTTP requests

***

✅ **Best Practices**

1. **Always poll metadata first** - Don't skip the lightweight metadata check.
2. **Cache snapshots locally** - Avoid unnecessary downloads of large files.
3. **Handle errors gracefully** - Network issues are common; implement retry logic.
4. **Use appropriate poll intervals** - 5-10 seconds is usually sufficient.
5. **Request specific markets** - Don't use `['ALL']` unless you truly need all data.
6. **Monitor bandwidth usage** - Snapshots can be large (1-35MB+) compressed.

</details>

<details>

<summary>Error handling</summary>

* **404**: No snapshots are available yet. Wait and retry.
* **500**: A server error occurred. Retry with exponential backoff.
* **Network errors**: Implement retry logic with jitter to avoid overwhelming the server.
* **Parse errors**: Log the issue and continue using cached data if available.

</details>

### Spot Snapshot Metadata (Fast)

* **Endpoint**: `POST /info`
* **Purpose**: Check if snapshots have been updated
* **Response Time**: \~1ms

#### Request

```json
{
  "type": "spotSnapshotTimestamp"
}
```

#### Response

```json
{
  "snapshot_id": "20240915_state_123456789",
  "timestamp": 1694764800
}
```

### Spot Snapshots (Heavy)

* **Endpoint**: `POST /info`
* **Purpose**: Download market snapshot data
* **Response Time**: \~100ms+

#### Request

```json
{
  "type": "spotSnapshots",
  "tokens": ["HYPE", "UBTC", "USOL"]  // or ["ALL"] for all markets
}
```

#### Response Headers

* **Single Market**: `x-payload-format: msgpack`, `Content-Encoding: zstd`
* **Multiple Markets**: `x-payload-format: multi-zstd`, `x-compression: inner-zstd`

### Data Format

<details>

<summary>Single Market Response</summary>

**Format**: Raw zstd-compressed MessagePack data

**Decompressed Structure:**

{% code overflow="wrap" %}

```json
[
  "20240915_state_123456789",  // Snapshot ID
  "UBTC",                      // Token name
  [                            // Balances array
    [
      20,                      // current balance
      2000000                  // entry value $
    ],
    /* more balances and entries */
  ],
  [                            // Addresses array
    "0x1234567890abcdef...",
    "0xfedcba0987654321...",
    /* more addresses */
  ]
}
```

{% endcode %}

ℹ️ **Balance Tuple Format** The format of the balances array has 2 values in the following order:

```
size
entry_value
```

</details>

<details>

<summary>Multiple Markets Response</summary>

**Format**: Custom binary format with length prefixes

**Binary Structure:**

{% code overflow="wrap" %}

```
[market_count: 4 bytes][length1: 4 bytes][zstd_data1][length2: 4 bytes][zstd_data2]...
```

{% endcode %}

</details>

### Implementation examples

**Note**: Make sure to set **HYDROMANCER\_API\_KEY** in your .env file

Note: Make sure to download required packages

{% tabs %}
{% tab title="JavaScript" %}

```javascript
const axios = require('axios');
const { compress, decompress } = require('@mongodb-js/zstd');
const msgpack = require('msgpack-lite');
const fs = require('fs').promises;
require('dotenv').config();

class SpotSnapshotClient {
    constructor() {
        this.baseUrl = process.env.HYDROMANCER_API_URL || 'https://api.hydromancer.xyz';
        this.apiKey = process.env.HYDROMANCER_API_KEY;
        
        if (!this.apiKey || this.apiKey === 'INSERT_API_KEY_HERE') {
            throw new Error('API key not found. Please set HYDROMANCER_API_KEY in .env file');
        }
        
        this.cachedTimestamp = null;
        this.cachedSnapshots = {};
        this.outputFile = 'spot_snapshot.json';
    }

    async checkForUpdates() {
        const headers = {
            'Content-Type': 'application/json',
            'Authorization': `Bearer ${this.apiKey}`
        };

        try {
            const response = await axios.post(
                `${this.baseUrl}/info`,
                { type: 'spotSnapshotTimestamp' },
                { headers }
            );

            const currentTimestamp = response.data.timestamp || response.data.snapshot_id;
            const hasUpdates = this.cachedTimestamp !== currentTimestamp;
            
            if (hasUpdates) {
                console.log(`Spot snapshot updated: ${this.cachedTimestamp} -> ${currentTimestamp}`);
                this.cachedTimestamp = currentTimestamp;
            } else {
                console.log(`No spot snapshot updates (timestamp: ${currentTimestamp})`);
            }

            return hasUpdates;
        } catch (error) {
            throw new Error(`Timestamp request failed: ${error.response?.status} - ${error.response?.data || error.message}`);
        }
    }

    async pollSnapshots(tokens = ['UBTC', 'PUP']) {
        try {
            const hasUpdates = await this.checkForUpdates();
            
            if (!hasUpdates) {
                console.log('No updates available, using cached spot data');
                return this.cachedSnapshots;
            }

            console.log('New spot snapshots available, downloading...');
            const snapshots = await this.downloadSnapshots(tokens);
            
            this.cachedSnapshots = snapshots;
            await this.saveToFile(snapshots);
            
            return snapshots;
        } catch (error) {
            console.error('Error polling spot snapshots:', error);
            throw error;
        }
    }

    async downloadSnapshots(tokens) {
        const headers = {
            'Content-Type': 'application/json',
            'Authorization': `Bearer ${this.apiKey}`
        };

        try {
            const response = await axios.post(
                `${this.baseUrl}/info`,
                { type: 'spotSnapshots', tokens: tokens },
                { 
                    headers,
                    responseType: 'arraybuffer'
                }
            );

            const payloadFormat = response.headers['x-payload-format'];
            const binaryData = Buffer.from(response.data);
            
            console.log(`Payload format: ${payloadFormat}`);
            console.log(`Downloaded ${binaryData.length} bytes`);
            
            if (payloadFormat === 'multi-zstd') {
                return await this.parseMultipleTokens(binaryData);
            } else {
                // Single token - could be msgpack or zstd compressed
                return await this.parseSingleToken(binaryData, tokens[0]);
            }
        } catch (error) {
            throw new Error(`Spot snapshots request failed: ${error.response?.status} - ${error.response?.data || error.message}`);
        }
    }

    async parseSingleToken(binaryData, tokenName) {
        try {
            // Try to decompress first (if it's zstd compressed)
            let decompressed;
            try {
                decompressed = await decompress(binaryData);
                console.log('Successfully decompressed single token data');
            } catch (e) {
                // If decompression fails, assume it's already uncompressed msgpack
                console.log('Data appears to be uncompressed msgpack');
                decompressed = binaryData;
            }
            
            // Decode msgpack
            const data = msgpack.decode(decompressed);
            
            // For spot: [identifier, token, balances, addresses]
            if (Array.isArray(data) && data.length >= 4) {
                const [identifier, token, balances, addresses] = data;
                return {
                    [token]: {
                        identifier: identifier,
                        token: token,
                        balances: balances || [],
                        addresses: addresses || [],
                        timestamp: this.cachedTimestamp
                    }
                };
            }
            
            // Or it could be object format: {i, t, b, a} or similar
            if (data.i !== undefined || data.identifier !== undefined) {
                const token = data.t || data.token || tokenName;
                return {
                    [token]: {
                        identifier: data.i || data.identifier,
                        token: token,
                        balances: data.b || data.balances || [],
                        addresses: data.a || data.addresses || [],
                        timestamp: this.cachedTimestamp
                    }
                };
            }
            
            throw new Error('Unknown single token data format');
        } catch (error) {
            console.error('Error parsing single token:', error);
            throw error;
        }
    }

    async parseMultipleTokens(binaryData) {
        let offset = 0;
        
        // Read number of snapshots (4 bytes, little-endian)
        const count = binaryData.readUInt32LE(offset);
        offset += 4;
        
        console.log(`Number of spot snapshots: ${count}`);
        
        const tokens = {};
        
        for (let i = 0; i < count; i++) {
            // Read length of this snapshot (4 bytes, little-endian)
            const length = binaryData.readUInt32LE(offset);
            offset += 4;
            
            console.log(`Snapshot ${i + 1}/${count}: ${length} bytes`);
            
            // Extract and decompress snapshot
            const zstdData = binaryData.slice(offset, offset + length);
            
            try {
                const decompressed = await decompress(zstdData);
                const data = msgpack.decode(decompressed);
                
                // Handle array format: [identifier, token, balances, addresses]
                if (Array.isArray(data) && data.length >= 4) {
                    const [identifier, token, balances, addresses] = data;
                    tokens[token] = {
                        identifier: identifier,
                        token: token,
                        balances: balances || [],
                        addresses: addresses || [],
                        timestamp: this.cachedTimestamp
                    };
                    console.log(`Parsed token: ${token} with ${balances?.length || 0} balances`);
                }
                // Handle object format
                else if ((data.i !== undefined || data.identifier !== undefined) && 
                         (data.t !== undefined || data.token !== undefined)) {
                    const token = data.t || data.token;
                    tokens[token] = {
                        identifier: data.i || data.identifier,
                        token: token,
                        balances: data.b || data.balances || [],
                        addresses: data.a || data.addresses || [],
                        timestamp: this.cachedTimestamp
                    };
                    console.log(`Parsed token: ${token} with ${(data.b || data.balances)?.length || 0} balances`);
                }
            } catch (error) {
                console.error(`Error parsing snapshot ${i + 1}:`, error);
            }
            
            offset += length;
        }
        
        return tokens;
    }

    async saveToFile(snapshots) {
        const outputData = {
            timestamp: this.cachedTimestamp,
            tokens: {}
        };
        
        for (const [token, data] of Object.entries(snapshots)) {
            const balances = data.balances || [];
            const addresses = data.addresses || [];
            
            // Create readable balance list (first 100 balances)
            const balanceList = [];
            const numBalances = Math.min(100, balances.length, addresses.length);
            
            for (let i = 0; i < numBalances; i++) {
                if (Array.isArray(balances[i]) && balances[i].length >= 2) {
                    balanceList.push({
                        address: addresses[i],
                        balance: balances[i][0],
                        entry_value: balances[i][1]
                    });
                }
            }
            
            outputData.tokens[token] = {
                identifier: data.identifier,
                total_balances: balances.length,
                balances: balanceList
            };
        }
        
        await fs.writeFile(this.outputFile, JSON.stringify(outputData, null, 2));
        console.log(`Saved spot snapshot to ${this.outputFile}`);
    }
}

// Example usage
async function main() {
    const client = new SpotSnapshotClient();
    
    while (true) {
        try {
            const snapshots = await client.pollSnapshots(['UBTC', "PUP"]);
            
            for (const [token, data] of Object.entries(snapshots)) {
                console.log(`Token ${token}: ${data.balances?.length || 0} balances`);
            }
            
            console.log('Waiting 5 seconds...');
            await new Promise(resolve => setTimeout(resolve, 5000));
            
        } catch (error) {
            console.error('Polling error:', error.message);
            await new Promise(resolve => setTimeout(resolve, 5000));
        }
    }
}

if (require.main === module) {
    main().catch(console.error);
}

module.exports = SpotSnapshotClient;
```

{% endtab %}

{% tab title="Python" %}

```python
import asyncio
import aiohttp
import struct
from typing import Optional, Dict, List, Any
import time
import json
import zstandard as zstd
import msgpack
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()


class SpotSnapshotClient:
    def __init__(self):
        # Use environment variables with fallbacks
        self.base_url = os.getenv('HYDROMANCER_API_URL', 'https://api.hydromancer.xyz')
        self.api_key = os.getenv('HYDROMANCER_API_KEY')

        if not self.api_key:
            raise ValueError("API key not found. Please set HYDROMANCER_API_KEY in .env file")
        self.cached_timestamp: Optional[str] = None
        self.cached_snapshots: Dict[str, Any] = {}
        self.session: Optional[aiohttp.ClientSession] = None
        self.output_file = "spot_snapshot.json"

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def check_for_updates(self) -> bool:
        """Check if snapshots have been updated since last poll"""
        if not self.session:
            raise RuntimeError("Client session not initialized")

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}"
        }

        start_time = time.time()
        async with self.session.post(
            f'{self.base_url}/info',
            json={'type': 'spotSnapshotTimestamp'},
            headers=headers
        ) as response:
            fetch_time = time.time() - start_time
            
            if not response.ok:
                text = await response.text()
                raise Exception(f'Timestamp request failed: {response.status} - {text}')

            data = await response.json()
            current_timestamp = data.get('timestamp', data.get('snapshot_id'))
            
            # Check if this is a new snapshot
            has_updates = self.cached_timestamp != current_timestamp
            
            if has_updates:
                print(f'📈 Spot snapshot updated: {self.cached_timestamp} -> {current_timestamp} (fetched in {fetch_time:.3f}s)')
                self.cached_timestamp = current_timestamp
            else:
                print(f'⏳ No spot snapshot updates (timestamp: {current_timestamp}, checked in {fetch_time:.3f}s)')

            return has_updates

    async def poll_snapshots(self, tokens: List[str] = ["UBTC", "PUP"]) -> Dict[str, Any]:
        """
        Efficiently poll for spot snapshots
        Only downloads data when snapshots have been updated
        """
        try:
            # Step 1: Check if snapshots have been updated
            has_updates = await self.check_for_updates()
            
            if not has_updates:
                print('Waiting for next update, using cached spot data')
                return self.cached_snapshots

            # Step 2: Download new snapshots
            print('New spot snapshots available, downloading...')
            snapshots = await self.download_snapshots(tokens)
            
            # Update cache
            self.cached_snapshots = snapshots
            
            # Save to file
            self.save_to_file(snapshots)
            
            return snapshots
        except Exception as error:
            print(f'Error polling spot snapshots: {error}')
            raise

    async def download_snapshots(self, tokens: List[str]) -> Dict[str, Any]:
        """Download spot market snapshots"""
        if not self.session:
            raise RuntimeError("Client session not initialized")

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}"
        }

        start_time = time.time()
        async with self.session.post(
            f'{self.base_url}/info',
            json={'type': 'spotSnapshots', 'tokens': tokens},
            headers=headers
        ) as response:
            fetch_time = time.time() - start_time
            
            if not response.ok:
                text = await response.text()
                raise Exception(f'Spot snapshots request failed: {response.status} - {text}')

            payload_format = response.headers.get('x-payload-format')
            binary_data = await response.read()
            
            # Format file size in human readable format
            def format_bytes(bytes_size):
                for unit in ['B', 'KB', 'MB', 'GB']:
                    if bytes_size < 1024.0:
                        return f"{bytes_size:.2f} {unit}"
                    bytes_size /= 1024.0
                return f"{bytes_size:.2f} TB"
            
            readable_size = format_bytes(len(binary_data))
            print(f"📦 Downloaded {readable_size} ({len(binary_data):,} bytes) in {fetch_time:.3f}s")
            print(f"🔧 Payload format: {payload_format}")
            
            if payload_format == 'multi-zstd':
                return self.parse_multiple_tokens(binary_data)
            else:
                return self.parse_single_snapshot(binary_data)

    def parse_single_snapshot(self, binary_data: bytes) -> Dict[str, Any]:
        """Parse single compressed msgpack response"""
        # Decompress zstd
        decompressed = self.decompress_zstd(binary_data)
        
        # Decode msgpack
        data = self.decode_msgpack(decompressed)
        
        # Handle single snapshot response
        if isinstance(data, list) and len(data) >= 4:
            # Spot format: [identifier, token, balances, addresses]
            token = data[1]
            return {
                token: {
                    'identifier': data[0],
                    'token': token,
                    'balances': data[2],
                    'addresses': data[3],
                    'timestamp': self.cached_timestamp
                }
            }
        
        return data

    def parse_multiple_tokens(self, binary_data: bytes) -> Dict[str, Any]:
        """Parse multiple tokens response (multi-zstd format)"""
        offset = 0
        
        # Read number of snapshots (4 bytes, little-endian)
        if len(binary_data) < 4:
            raise ValueError(f"Data too short: {len(binary_data)} bytes")
            
        count = struct.unpack('<I', binary_data[offset:offset + 4])[0]
        offset += 4
        
        print(f"Number of spot snapshots: {count}")
        
        tokens = {}
        
        for i in range(count):
            # Read length of this snapshot (4 bytes, little-endian)
            if offset + 4 > len(binary_data):
                raise ValueError(f"Unexpected end of data at offset {offset}")
                
            length = struct.unpack('<I', binary_data[offset:offset + 4])[0]
            offset += 4
            
            print(f"Spot snapshot {i}: {length} bytes")
            
            # Extract and decompress snapshot
            if offset + length > len(binary_data):
                raise ValueError(f"Data too short for snapshot {i}")
                
            zstd_data = binary_data[offset:offset + length]
            
            try:
                decompressed = self.decompress_zstd(zstd_data)
                snapshot = self.decode_msgpack(decompressed)
                
                # Handle list format
                if isinstance(snapshot, list) and len(snapshot) >= 4:
                    # Spot format: [identifier, token, balances, addresses]
                    identifier = snapshot[0]
                    token_name = snapshot[1]
                    balances = snapshot[2]  # Array of [balance, entry_value]
                    addresses = snapshot[3]
                    
                    # Store as a structured dict
                    tokens[token_name] = {
                        'identifier': identifier,
                        'token': token_name,
                        'balances': balances,
                        'addresses': addresses,
                        'timestamp': self.cached_timestamp
                    }
                    
                    print(f"Parsed spot token: {token_name} with {len(balances) if isinstance(balances, list) else 0} balances")
                else:
                    # Fallback for unexpected format
                    key = f'token_{i}'
                    tokens[key] = snapshot
                    print(f"Unexpected format for token {i}: {type(snapshot)}")
                
            except Exception as e:
                print(f"Error parsing spot snapshot {i}: {e}")
                # Continue with next snapshot
                
            offset += length
        
        return tokens

    def decompress_zstd(self, data: bytes) -> bytes:
        """Decompress zstd data"""
        try:
            dctx = zstd.ZstdDecompressor()
            return dctx.decompress(data)
        except Exception as e:
            print(f"Decompression error: {e}")
            raise

    def decode_msgpack(self, data: bytes) -> Any:
        """Decode MessagePack data"""
        return msgpack.unpackb(data, raw=False)

    def save_to_file(self, snapshots: Dict[str, Any]):
        """Save snapshots to JSON file"""
        # Convert balances to a more readable format
        output_data = {
            'timestamp': self.cached_timestamp,
            'tokens': {}
        }
        
        for token, data in snapshots.items():
            if isinstance(data, dict):
                balances = data.get('balances', [])
                addresses = data.get('addresses', [])
                
                # Create list of balance objects
                balance_list = []
                for i, (balance_entry, address) in enumerate(zip(balances, addresses)):
                    if isinstance(balance_entry, list) and len(balance_entry) >= 2:
                        balance_list.append({
                            'address': address,
                            'balance': balance_entry[0],
                            'entry_value': balance_entry[1]
                        })
                
                output_data['tokens'][token] = {
                    'identifier': data.get('identifier'),
                    'total_balances': len(balances),
                    'balances': balance_list
                }
        
        with open(self.output_file, 'w') as f:
            json.dump(output_data, f, indent=2)
        
        print(f"Saved spot snapshot to {self.output_file}")

    def parse_spot_balance(self, balance: List[float]) -> Dict[str, float]:
        """Parse a spot balance array into a dictionary"""
        if not isinstance(balance, list) or len(balance) < 2:
            return {}
        
        return {
            'balance': balance[0],
            'entry_value': balance[1]
        }


async def main():
    """Poll spot snapshots every 5 seconds, only download when updated"""
    async with SpotSnapshotClient() as client:
        while True:
            try:
                # Poll for updates
                snapshots = await client.poll_snapshots(["UBTC", "PUP"])
                
                # Print summary
                for token, data in snapshots.items():
                    if isinstance(data, dict):
                        balances = data.get('balances', [])
                        # print(f"Spot Token {token}: {len(balances)} balances")
                
                # Wait 5 seconds before next check
                # print("Waiting 5 seconds before next timestamp check...")
                await asyncio.sleep(5)
                
            except Exception as error:
                print(f'Polling failed: {error}')
                await asyncio.sleep(5)  # Wait before retrying


if __name__ == "__main__":
    asyncio.run(main())
```

{% endtab %}
{% endtabs %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.hydromancer.xyz/readme/rest-api/market-data/spotsnapshot.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
