# Session Management and Reconnection

{% hint style="info" %}
Sessions enable reliable WebSocket connections with automatic replay of missed data on reconnect.
{% endhint %}

## Overview

When you connect to the WebSocket API, you receive a `sessionId` that persists your subscriptions and tracks your position in the data stream. On reconnect, provide this session ID to:

1. **Restore subscriptions** - No need to re-subscribe
2. **Receive replay** - Automatically get missed data (up to 30 seconds)
3. **Resume seamlessly** - Continue from where you left off

## Key Concepts

### Sequence Numbers (`seq`)

Every live data message includes a `seq` field - a monotonically increasing integer **for client use only**:

```json
{
  "type": "allFills",
  "seq": 42,
  "fills": [...]
}
```

**Purpose:** Detect gaps within a single connection. If you receive `seq: 5` then `seq: 7`, you know `seq: 6` was lost.

**Key behaviors:**

* Resets to 1 on every new connection (including reconnects)
* Per-subscription channel (each subscription has its own sequence)
* Client-side only - server doesn't track your last received seq

### Cursor

The **cursor** tracks your position in the data stream, tied to Hyperliquid's block production. The server supports two cursor formats:

#### Cursor Format

Cursors use a colon-separated format with up to 3 components:

| Format                     | Example                 | Description                                                                   |
| -------------------------- | ----------------------- | ----------------------------------------------------------------------------- |
| `block:timestamp:tx_index` | `"500:1706123456789:3"` | Full precision — most subscription types                                      |
| `block:timestamp`          | `"500:1706123456789"`   | Block-level — used by `l4BookUpdates`, `fundingRates`, and `setOracleUpdates` |
| `timestamp` (number)       | `1706123456789`         | Numeric — used by leverage and isolated margin update subscriptions           |

The `tx_index` component enables precise sub-block positioning. Within a single block (\~70ms), there can be hundreds of events. Without `tx_index`, resuming from a cursor could miss events or produce duplicates when boundaries fall mid-block.

**Treat cursors as opaque strings** — always store and return the full cursor exactly as received. The server handles all parsing.

{% hint style="info" %}
**Cursor format:** The server always sends cursors in `block:timestamp:tx_index` format (or `block:timestamp` for block-level subscriptions). Clients should treat cursors as opaque strings.
{% endhint %}

**Client-side tracking (recommended):**

* Every live event message includes a `cursor` field
* **Save this cursor as a string** after successfully processing each message
* Provide your cursor on reconnect for accurate replay

```json
{
  "type": "allFills",
  "seq": 42,
  "cursor": "500:1706123456789:3",
  "fills": [...]
}
```

**Server-side tracking (fallback):**

* The server also tracks a cursor for your session
* Persists across disconnects
* Used as fallback if you don't provide your own cursor

{% hint style="warning" %}
**Important:** The server cursor is based on messages **sent**, not messages **received**. Due to network issues, OS socket buffering, or client-side delays, you may not have actually received all messages the server sent. For best accuracy, **always track and provide your own cursor** based on messages you successfully processed.
{% endhint %}

**Manual cursor provision:** When subscribing, you can optionally provide a `cursor` to request replay from a specific point:

```json
{
  "method": "subscribe",
  "subscription": {
    "type": "allFills",
    "cursor": "500:1706123400000:0"
  }
}
```

This is useful for:

* Resuming from a known checkpoint after application restart
* Requesting specific historical data within the cache window (30 seconds)

## Message Types

### Control Messages

| Type                 | Direction     | Description                     |
| -------------------- | ------------- | ------------------------------- |
| `connected`          | Server→Client | New session created             |
| `reconnected`        | Server→Client | Existing session resumed        |
| `subscriptionUpdate` | Server→Client | Subscription confirmed          |
| `ping`               | Server→Client | Heartbeat (respond with `pong`) |
| `pong`               | Client→Server | Heartbeat response              |
| `error`              | Server→Client | Error occurred                  |

### Data Messages (Live Events)

Live event messages have the subscription type as their `type` field:

```json
{
  "type": "allFills",
  "seq": 42,
  "cursor": "500:1706123456789:3",
  "fills": [
    ["0x742d35...", { "coin": "ETH", "px": "2150.50", ... }]
  ]
}
```

| Field                                                     | Type   | Description                                                   |
| --------------------------------------------------------- | ------ | ------------------------------------------------------------- |
| `type`                                                    | string | Subscription type (e.g., `"allFills"`)                        |
| `seq`                                                     | number | Sequence number for gap detection (resets on reconnect)       |
| `cursor`                                                  | string | Position cursor (opaque string, e.g. `"500:1706123456789:3"`) |
| `fills`/`updates`/`events`/`trades`/`liquidations`/`data` | array  | Event data (field name varies by subscription type)           |

Common data message types: `allFills`, `userFills`, `userOrderUpdates`, `liquidationFills`, `l4BookUpdates`, etc.

### Replay Messages

Replay messages are **distinct from live events** - they have `"type": "replay"` and contain historical data:

```json
{
  "type": "replay",
  "channel": "allFills",
  "replayTimeMs": "500:1706123456789:3",
  "cursor": "500:1706123456789:3",
  "count": 150,
  "chunk": 1,
  "totalChunks": 2,
  "data": [
    ["0x742d35...", { "coin": "ETH", "px": "2150.50", ... }]
  ]
}
```

| Field          | Type    | Description                                                                                                                                    |
| -------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------- |
| `type`         | string  | Always `"replay"`                                                                                                                              |
| `channel`      | string  | The subscription type (e.g., `"allFills"`)                                                                                                     |
| `cursor`       | string  | **Per-chunk cursor** derived from the last event in this chunk (same format as live `cursor`). Use this for reconnection.                      |
| `replayTimeMs` | string  | Same as `cursor`. Deprecated — use `cursor` instead.                                                                                           |
| `count`        | number  | Number of items in this chunk                                                                                                                  |
| `chunk`        | number  | Current chunk number (1-indexed)                                                                                                               |
| `totalChunks`  | number  | Total chunks in this replay                                                                                                                    |
| `hasGap`       | boolean | Only present when `true`. Indicates cache didn't cover entire disconnect period (only meaningful on first chunk). Absent when there is no gap. |
| `data`         | array   | Array of missed events (same format as live data)                                                                                              |

#### Chunking

Large replays are split into multiple chunks to keep message sizes manageable. Chunking occurs when either limit is reached:

* **Max items:** 2,000 items per chunk
* **Max size:** 1 MB per chunk

Use `chunk` and `totalChunks` to track replay progress:

* **Update your cursor from each chunk's `cursor`** as it arrives — this ensures progress is saved even if the connection drops mid-replay
* `hasGap` is only set on the first chunk (`chunk: 1`)
* Process chunks in order as they arrive

{% hint style="warning" %}
**Important:** Replay messages do NOT have a `seq` field. Only live event messages have sequences.
{% endhint %}

## Connection Flow

### Initial Connection

```
wss://api.hydromancer.xyz/ws?token=YOUR_API_KEY
```

Server responds with `connected`:

```json
{
  "type": "connected",
  "clientId": "abc123-def456-...",
  "sessionId": "789xyz-..."
}
```

**Save the `sessionId`** - you'll need it for reconnection.

### Subscribing

```json
{
  "method": "subscribe",
  "subscription": { "type": "allFills" }
}
```

Server confirms with `subscriptionUpdate`:

```json
{
  "type": "subscriptionUpdate",
  "subscribed": ["allFills"],
  "failed": []
}
```

Then live data begins flowing with `seq` starting at 1.

## Reconnection Flow

### Step-by-Step Process

1. **Client disconnects** (network issue, restart, etc.)
2. **Server marks session disconnected** - starts 30-second grace period
3. **Client reconnects with sessionId and cursor:**

   ```
   wss://api.hydromancer.xyz/ws?token=YOUR_API_KEY&sessionId=YOUR_SESSION_ID&cursor=500:1706123456789:3
   ```
4. **Server validates session** - checks if still within grace period
5. **Server automatically restores subscriptions** - no need to re-subscribe
6. **Server sends `reconnected`:**

   ```json
   {
     "type": "reconnected",
     "clientId": "new-client-id-...",
     "sessionId": "789xyz-...",
     "subscriptions": ["allFills"],
     "lastSeq": 0
   }
   ```
7. **Server sends replay** - events since your cursor (up to 30 seconds of data)
8. **Live events resume** - with `seq` starting at 1

If your original connection used `liveFormat=chunked-v1`, you must include `liveFormat=chunked-v1` again on reconnect. `liveFormat` is connection-level and is not persisted on the session.

### Providing Your Cursor on Reconnect

When reconnecting, you can provide your last successfully processed cursor as a query parameter:

```
wss://api.hydromancer.xyz/ws?token=YOUR_API_KEY&sessionId=YOUR_SESSION_ID&cursor=500:1706123456789:3
```

If you are using live chunking, include `liveFormat=chunked-v1` as well:

```
wss://api.hydromancer.xyz/ws?token=YOUR_API_KEY&sessionId=YOUR_SESSION_ID&cursor=500:1706123456789:3&liveFormat=chunked-v1
```

| Parameter    | Required | Description                                                                   |
| ------------ | -------- | ----------------------------------------------------------------------------- |
| `token`      | Yes      | Your API key                                                                  |
| `sessionId`  | No       | Session ID to resume (omit for new session)                                   |
| `cursor`     | No       | Last cursor you received (opaque string, e.g. `"500:1706123456789:3"`)        |
| `liveFormat` | No       | Required on every reconnect if you are using `chunked-v1` live message format |

**Cursor priority:**

1. **Client-provided cursor** (recommended) - from the `cursor` query parameter
2. **Server-stored cursor** (fallback) - used if you don't provide one

{% hint style="success" %}
**Best Practice:** Always track the `cursor` field from each message you process successfully, and provide it on reconnect. This ensures you receive exactly the data you missed, even if some messages were lost in transit before disconnect.
{% endhint %}

{% hint style="success" %}
**Automatic Subscription Restore:** You do **not** need to re-subscribe after reconnecting. The server automatically restores all your previous subscriptions and begins sending data immediately. The `subscriptions` array in the `reconnected` message confirms which feeds were restored.
{% endhint %}

{% hint style="info" %}
**Note:** `lastSeq` is always 0 because sequences reset on each connection.
{% endhint %}

### What Triggers Replay

Replay is sent when ALL of these conditions are met:

* You reconnect with a valid, non-expired sessionId
* The session had active subscriptions before disconnect
* There is cached data newer than your cursor
* The cache TTL (30 seconds) hasn't fully expired

### When Replay is NOT Sent

* New connection (no sessionId provided)
* Expired session (disconnected > 30 seconds ago)
* No cached data for your subscriptions (quiet market)
* Your cursor is already current (instant reconnect)

## Gap Detection

The `hasGap` field in replay messages indicates data completeness:

| `hasGap` | Meaning                        | Action                          |
| -------- | ------------------------------ | ------------------------------- |
| absent   | Complete replay - no data lost | Process normally                |
| `true`   | Some data may be missing       | Consider fetching from REST API |

A gap occurs when your cursor is older than the oldest cached event - some events aged out before you reconnected.

### Replay Response Scenarios

When you reconnect with a session, you'll receive one of these replay response types depending on market activity and your disconnect duration:

| Scenario              | `count` | `hasGap` | `data`       | Meaning                                               |
| --------------------- | ------- | -------- | ------------ | ----------------------------------------------------- |
| **Data, no gap**      | > 0     | absent   | Events array | Normal replay - you have all missed events            |
| **Data, with gap**    | > 0     | `true`   | Events array | Partial replay - some older events expired from cache |
| **No data, with gap** | 0       | `true`   | `[]`         | Cache empty and cursor is old - all events expired    |
| **No data, no gap**   | 0       | absent   | `[]`         | Quiet market - no events occurred during disconnect   |

#### Example Responses

**Data with no gap** (complete replay):

```json
{
  "type": "replay",
  "channel": "allFills",
  "replayTimeMs": "500:1706123456789:3",
  "cursor": "500:1706123456789:3",
  "count": 42,
  "chunk": 1,
  "totalChunks": 1,
  "data": [/* 42 fill events */]
}
```

Note: `hasGap` is absent here because there is no gap. The field is only included when `true`.

**Data with gap** (partial replay - some events expired):

```json
{
  "type": "replay",
  "channel": "allFills",
  "replayTimeMs": "500:1706123456789:3",
  "cursor": "500:1706123456789:3",
  "count": 15,
  "chunk": 1,
  "totalChunks": 1,
  "hasGap": true,
  "data": [/* 15 fill events - older events expired */]
}
```

**No data with gap** (all cached events expired):

```json
{
  "type": "replay",
  "channel": "allFills",
  "replayTimeMs": "500:1706123456789:3",
  "cursor": "500:1706123456789:3",
  "count": 0,
  "chunk": 1,
  "totalChunks": 1,
  "hasGap": true,
  "data": []
}
```

**No data, no gap** (quiet market - nothing happened):

```json
{
  "type": "replay",
  "channel": "allFills",
  "replayTimeMs": "500:1706123456789:3",
  "cursor": "500:1706123456789:3",
  "count": 0,
  "chunk": 1,
  "totalChunks": 1,
  "data": []
}
```

{% hint style="info" %}
**Quiet Markets:** A replay with `count: 0` and `hasGap: false` confirms you're fully up-to-date - there simply were no events during your disconnect period. This is common for user-specific subscriptions (e.g., `userFills`) where activity is infrequent.
{% endhint %}

{% hint style="warning" %}
**Handling Gaps:** When `hasGap: true`, consider fetching historical data from the REST API to fill in missing events. The replay still contains all available cached data - it just doesn't cover your entire disconnect period.
{% endhint %}

## Message Ordering

{% hint style="warning" %}
**Important:** Live messages may begin arriving before replay is complete. Your client must handle this correctly.
{% endhint %}

### Why This Happens

For minimal latency, the server:

1. Adds you to live broadcast immediately on reconnect
2. Sends replay data in parallel

In high-throughput scenarios, a live event may arrive before the final replay chunk.

### Recommended Client Pattern

```javascript
class ReconnectionHandler {
  constructor() {
    this.replayComplete = false;
    this.replayChunksReceived = 0;
    this.replayTotalChunks = 0;
    this.bufferedLive = [];
    this.lastSeen = { time: 0, txIndex: 0 };  // For deduplication
  }

  onMessage(msg) {
    if (msg.type === 'replay') {
      // Process replay data (historical)
      this.processReplayData(msg.data);

      // Update cursor from each chunk for incremental progress
      if (msg.cursor && this.onCursorUpdate) {
        this.onCursorUpdate(msg.cursor);
      }

      this.replayTotalChunks = msg.totalChunks;
      this.replayChunksReceived++;

      // Check if all replay chunks received
      if (this.replayChunksReceived === this.replayTotalChunks) {
        this.replayComplete = true;
        // Now process buffered live messages in order
        this.bufferedLive.forEach(m => this.processLiveData(m));
        this.bufferedLive = [];
      }
    } else if (msg.seq !== undefined) {
      // Live data message (has seq field)
      if (!this.replayComplete) {
        // Buffer until replay is done
        this.bufferedLive.push(msg);
      } else {
        this.processLiveData(msg);
      }
    }
  }
}
```

## Deduplication

{% hint style="warning" %}
**Important:** Replay and live events may overlap. The same event can appear in both the replay data and the first live messages. Your client **must deduplicate** events to avoid processing them twice.
{% endhint %}

### Why Overlap Occurs

To ensure no events are lost, the server:

1. Subscribes you to live broadcasts **immediately** on reconnect
2. Generates and sends replay data in parallel

This means events occurring during the brief window between these steps may appear in both streams. This is intentional - it's better to receive a duplicate than to miss an event.

### Deduplication Keys

Each event type has a monotonically increasing key composed of `(time, txIndex)` (or similar). Since these are ordered, you only need to track the **last seen key** and skip any events at or before it.

| Subscription Type                                                     | Dedup Key               | Fields                                   |
| --------------------------------------------------------------------- | ----------------------- | ---------------------------------------- |
| `allFills`, `userFills`, `builderFills`, `liquidationFills`           | `(time, txIndex)`       | `fill.time`, `fill.txIndex`              |
| `userOrderUpdates`, `builderOrderUpdates`                             | `(time, txIndex)`       | `update.time`, `update.txIndex`          |
| `allTwapStatusUpdates`                                                | `(time, txIndex)`       | `update.time`, `update.txIndex`          |
| `builderLiquidations`, `allBuilderLiquidations`                       | `(time, txIndex)`       | `fill.time`, `fill.txIndex`              |
| `allCompletedTrades`, `userCompletedTrades`, `builderCompletedTrades` | `(closeTime, txIndex)`  | `trade.closeTime`, `trade.txIndex`       |
| `userNonFundingLedgerEvents`, `allUserNonFundingLedgerEvents`         | `(time, txIndex, role)` | `event.time`, `event.txIndex`, user role |

### Deduplication Example (Fills)

Since keys are monotonically increasing, track the last processed `(time, txIndex)` and skip events at or before it:

```javascript
class DeduplicatingHandler {
  constructor() {
    this.lastSeen = { time: 0, txIndex: 0 };
    // ... other fields
  }

  // Compare (time, txIndex) tuples
  isNewEvent(time, txIndex) {
    if (time > this.lastSeen.time) return true;
    if (time === this.lastSeen.time && txIndex > this.lastSeen.txIndex) return true;
    return false;
  }

  processFill(fill) {
    const time = fill.time;
    const txIndex = fill.txIndex ?? 0;

    // Skip if already seen (duplicate from replay/live overlap)
    if (!this.isNewEvent(time, txIndex)) {
      return;
    }

    // Update last seen
    this.lastSeen = { time, txIndex };

    // Process the fill...
    console.log('New fill:', fill.coin, fill.sz, '@', fill.px);
  }
}
```

{% hint style="info" %}
**No Memory Cleanup Needed:** Since you only track the last seen key (not a set of all seen IDs), there's no memory growth over time.
{% endhint %}

## Session Lifetime

| Parameter      | Value      | Description                                           |
| -------------- | ---------- | ----------------------------------------------------- |
| Grace period   | 30 seconds | Time to reconnect before session expires              |
| Redis TTL      | 60 seconds | Total time session persists in Redis (grace + buffer) |
| Replay cache   | 30 seconds | Maximum lookback for replay data                      |
| Cross-instance | Supported  | Sessions persist across server restarts               |

### Timeline Example

```
T+0s:   Client connected, subscribed to allFills
T+10s:  Client disconnects (network issue)
        └─ Session marked disconnected, 30s grace period starts
T+15s:  Client reconnects with sessionId
        └─ Session resumed, replay sent for T+10s to T+15s
T+40s:  Grace period expires (T+10s + 30s)
        └─ Session would be deleted if not reconnected
```

## Examples

{% tabs %}
{% tab title="JavaScript" %}

```javascript
const WebSocket = require('ws');

class HydromancerClient {
  constructor(apiKey) {
    this.apiKey = apiKey;
    this.liveFormat = null;  // Set to 'chunked-v1' if you use live chunking
    this.sessionId = null;
    this.lastCursor = null;  // Track last successfully processed cursor
    this.ws = null;
    this.replayHandler = new ReconnectionHandler((cursor) => {
      this.lastCursor = cursor;  // Update cursor after each successfully processed message
    });
  }

  connect() {
    let url = `wss://api.hydromancer.xyz/ws?token=${this.apiKey}`;
    if (this.sessionId) {
      url += `&sessionId=${this.sessionId}`;
      // Always provide cursor for accurate replay
      if (this.lastCursor) {
        url += `&cursor=${this.lastCursor}`;
      }
      console.log(`Reconnecting with session ${this.sessionId.slice(0, 8)}, cursor ${this.lastCursor}...`);
    }
    if (this.liveFormat) {
      url += `&liveFormat=${this.liveFormat}`;
    }

    this.ws = new WebSocket(url);

    this.ws.on('message', (data) => {
      const msg = JSON.parse(data);
      this.handleMessage(msg);
    });

    this.ws.on('close', () => {
      console.log('Disconnected, reconnecting in 1s...');
      setTimeout(() => this.connect(), 1000);
    });

    this.ws.on('error', (err) => {
      console.error('WebSocket error:', err.message);
    });
  }

  handleMessage(msg) {
    switch (msg.type) {
      case 'connected':
        // New session - save ID and subscribe
        this.sessionId = msg.sessionId;
        console.log(`New session: ${this.sessionId}`);
        this.subscribe({ type: 'allFills' });
        break;

      case 'reconnected':
        // Session resumed - subscriptions already restored
        console.log(`Resumed session: ${msg.sessionId}`);
        console.log(`Restored subscriptions: ${msg.subscriptions.join(', ')}`);
        // Reset replay handler for new connection (keep cursor callback)
        this.replayHandler = new ReconnectionHandler((cursor) => {
          this.lastCursor = cursor;
        });
        // Don't subscribe - already restored!
        break;

      case 'replay':
        // Historical data - different from live events
        console.log(`Replay chunk ${msg.chunk}/${msg.totalChunks}: ${msg.data.length} items`);
        if (msg.hasGap) {
          console.warn('Gap detected - some data may be missing');
          // Consider fetching from REST API
        }
        this.replayHandler.onMessage(msg);
        break;

      case 'subscriptionUpdate':
        console.log(`Subscribed: ${msg.subscribed.join(', ')}`);
        break;

      case 'ping':
        this.ws.send(JSON.stringify({ type: 'pong' }));
        break;

      case 'error':
        console.error(`Server error: ${msg.message}`);
        break;

      default:
        // Live data message (has seq field)
        if (msg.seq !== undefined) {
          this.replayHandler.onMessage(msg);
        }
    }
  }

  subscribe(subscription) {
    this.ws.send(JSON.stringify({
      method: 'subscribe',
      subscription
    }));
  }
}

// Replay handler from earlier (with deduplication)
class ReconnectionHandler {
  constructor(onCursorUpdate) {
    this.replayComplete = false;
    this.replayChunksReceived = 0;
    this.replayTotalChunks = 0;
    this.bufferedLive = [];
    this.lastSeq = 0;
    this.onCursorUpdate = onCursorUpdate;  // Callback to update client's cursor
    // Deduplication: track last seen (time, txIndex) per channel
    this.lastSeen = { time: 0, txIndex: 0 };
  }

  // Check if event is new (for deduplication)
  isNewEvent(time, txIndex) {
    if (time > this.lastSeen.time) return true;
    if (time === this.lastSeen.time && txIndex > this.lastSeen.txIndex) return true;
    return false;
  }

  onMessage(msg) {
    if (msg.type === 'replay') {
      this.replayTotalChunks = msg.totalChunks;
      this.replayChunksReceived++;

      // Process replay data
      msg.data.forEach(item => this.processEvent(item, 'replay'));

      // Update cursor from each chunk for incremental progress
      // If disconnected mid-replay, reconnect resumes from the last completed chunk
      if (msg.cursor && this.onCursorUpdate) {
        this.onCursorUpdate(msg.cursor);
      }

      if (this.replayChunksReceived === this.replayTotalChunks) {
        this.replayComplete = true;
        console.log('Replay complete, processing buffered live events...');
        this.bufferedLive.forEach(m => this.processLiveMessage(m));
        this.bufferedLive = [];
      }
    } else if (msg.seq !== undefined) {
      if (!this.replayComplete) {
        this.bufferedLive.push(msg);
      } else {
        this.processLiveMessage(msg);
      }
    }
  }

  processLiveMessage(msg) {
    // Check for sequence gaps
    if (this.lastSeq > 0 && msg.seq !== this.lastSeq + 1) {
      console.warn(`Sequence gap: expected ${this.lastSeq + 1}, got ${msg.seq}`);
    }
    this.lastSeq = msg.seq;

    // Process the data
    const items = msg.fills || msg.updates || msg.events || msg.trades || msg.liquidations || msg.data || [];
    items.forEach(item => this.processEvent(item, 'live'));

    // Update cursor after successful processing
    if (msg.cursor && this.onCursorUpdate) {
      this.onCursorUpdate(msg.cursor);
    }
  }

  processEvent(item, source) {
    // Extract event data (fills are [address, fillData], others may differ)
    const eventData = Array.isArray(item) ? item[1] : item;
    const time = eventData.time || eventData.closeTime || 0;
    const txIndex = eventData.txIndex ?? 0;

    // Deduplicate: skip if already seen
    if (!this.isNewEvent(time, txIndex)) {
      return;  // Duplicate from replay/live overlap
    }
    this.lastSeen = { time, txIndex };

    console.log(`[${source}] Event:`, JSON.stringify(item).slice(0, 100));
  }
}

const client = new HydromancerClient(process.env.HYDROMANCER_API_KEY);
client.connect();
```

{% endtab %}

{% tab title="Python" %}

```python
import websocket
import json
import os
import time

class ReconnectionHandler:
    def __init__(self, on_cursor_update=None):
        self.replay_complete = False
        self.replay_chunks_received = 0
        self.replay_total_chunks = 0
        self.buffered_live = []
        self.last_seq = 0
        self.on_cursor_update = on_cursor_update  # Callback to update client's cursor
        # Deduplication: track last seen (time, tx_index)
        self.last_seen = {'time': 0, 'tx_index': 0}

    def reset(self):
        """Reset state for new connection."""
        self.replay_complete = False
        self.replay_chunks_received = 0
        self.replay_total_chunks = 0
        self.buffered_live = []
        self.last_seq = 0
        self.last_seen = {'time': 0, 'tx_index': 0}

    def is_new_event(self, time, tx_index):
        """Check if event is new (for deduplication)."""
        if time > self.last_seen['time']:
            return True
        if time == self.last_seen['time'] and tx_index > self.last_seen['tx_index']:
            return True
        return False

    def on_message(self, msg):
        if msg.get('type') == 'replay':
            self.replay_total_chunks = msg['totalChunks']
            self.replay_chunks_received += 1

            # Process replay data
            for item in msg.get('data', []):
                self.process_event(item, 'replay')

            # Update cursor from each chunk for incremental progress
            if msg.get('cursor') and self.on_cursor_update:
                self.on_cursor_update(msg['cursor'])

            if self.replay_chunks_received == self.replay_total_chunks:
                self.replay_complete = True
                print('Replay complete, processing buffered live events...')
                for buffered in self.buffered_live:
                    self.process_live_message(buffered)
                self.buffered_live = []

        elif 'seq' in msg:
            if not self.replay_complete:
                self.buffered_live.append(msg)
            else:
                self.process_live_message(msg)

    def process_live_message(self, msg):
        seq = msg['seq']
        # Check for sequence gaps
        if self.last_seq > 0 and seq != self.last_seq + 1:
            print(f'Sequence gap: expected {self.last_seq + 1}, got {seq}')
        self.last_seq = seq

        # Process the data
        items = msg.get('fills') or msg.get('updates') or msg.get('events') or msg.get('trades') or msg.get('liquidations') or msg.get('data') or []
        for item in items:
            self.process_event(item, 'live')

        # Update cursor after successful processing
        if msg.get('cursor') and self.on_cursor_update:
            self.on_cursor_update(msg['cursor'])

    def process_event(self, item, source):
        # Extract event data (fills are [address, fill_data], others may differ)
        event_data = item[1] if isinstance(item, list) else item
        time = event_data.get('time') or event_data.get('closeTime') or 0
        tx_index = event_data.get('txIndex') or 0

        # Deduplicate: skip if already seen
        if not self.is_new_event(time, tx_index):
            return  # Duplicate from replay/live overlap
        self.last_seen = {'time': time, 'tx_index': tx_index}

        print(f'[{source}] Event: {str(item)[:100]}')


class HydromancerClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.live_format = None  # Set to "chunked-v1" if you use live chunking
        self.session_id = None
        self.last_cursor = None  # Track last successfully processed cursor
        self.ws = None
        self.replay_handler = ReconnectionHandler(self.on_cursor_update)

    def on_cursor_update(self, cursor):
        """Called after each successfully processed message."""
        self.last_cursor = cursor

    def connect(self):
        url = f"wss://api.hydromancer.xyz/ws?token={self.api_key}"
        if self.session_id:
            url += f"&sessionId={self.session_id}"
            # Always provide cursor for accurate replay
            if self.last_cursor:
                url += f"&cursor={self.last_cursor}"
            print(f"Reconnecting with session {self.session_id[:8]}, cursor {self.last_cursor}...")
        if self.live_format:
            url += f"&liveFormat={self.live_format}"

        self.ws = websocket.WebSocketApp(
            url,
            on_message=self.on_message,
            on_close=self.on_close,
            on_open=self.on_open,
            on_error=self.on_error
        )
        self.ws.run_forever()

    def on_open(self, ws):
        print("WebSocket connected")

    def on_error(self, ws, error):
        print(f"WebSocket error: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        print("Disconnected, reconnecting in 1s...")
        time.sleep(1)
        self.connect()

    def on_message(self, ws, message):
        msg = json.loads(message)
        msg_type = msg.get('type')

        if msg_type == 'connected':
            # New session
            self.session_id = msg['sessionId']
            print(f"New session: {self.session_id}")
            self.subscribe({'type': 'allFills'})

        elif msg_type == 'reconnected':
            # Session resumed - subscriptions restored
            print(f"Resumed session: {msg['sessionId']}")
            print(f"Restored subscriptions: {msg.get('subscriptions', [])}")
            # Reset replay handler for new connection (keeps cursor callback)
            self.replay_handler.reset()
            # Don't subscribe - already restored!

        elif msg_type == 'replay':
            # Historical data
            chunk = msg['chunk']
            total = msg['totalChunks']
            items = len(msg.get('data', []))
            print(f"Replay chunk {chunk}/{total}: {items} items")
            if msg.get('hasGap'):
                print("WARNING: Gap detected - some data may be missing")
            self.replay_handler.on_message(msg)

        elif msg_type == 'subscriptionUpdate':
            print(f"Subscribed: {msg.get('subscribed', [])}")

        elif msg_type == 'ping':
            ws.send(json.dumps({'type': 'pong'}))

        elif msg_type == 'error':
            print(f"Server error: {msg.get('message')}")

        elif 'seq' in msg:
            # Live data message
            self.replay_handler.on_message(msg)

    def subscribe(self, subscription):
        self.ws.send(json.dumps({
            'method': 'subscribe',
            'subscription': subscription
        }))


if __name__ == '__main__':
    api_key = os.environ.get('HYDROMANCER_API_KEY')
    if not api_key:
        print("Set HYDROMANCER_API_KEY environment variable")
        exit(1)

    client = HydromancerClient(api_key)
    client.connect()
```

{% endtab %}
{% endtabs %}

## Best Practices

1. **Always save the sessionId** from `connected` messages
2. **Always pass sessionId** when reconnecting
3. **Track the cursor** from each successfully processed message
4. **Always pass your cursor** when reconnecting for accurate replay
5. **Distinguish replay from live** - check `msg.type === 'replay'`
6. **Buffer live messages** during replay processing
7. **Deduplicate events** - replay and live may overlap (see [Deduplication](#deduplication))
8. **Handle `hasGap: true`** by fetching missing data from REST API if critical
9. **Track sequences** within a connection to detect gaps
10. **Implement exponential backoff** for reconnection attempts
11. **Respond to pings** within 150 seconds to keep connection alive

## Error Handling

### Session Expired

If your session expired, you'll receive `connected` instead of `reconnected`:

```json
{
  "type": "connected",
  "clientId": "new-client-...",
  "sessionId": "new-session-..."
}
```

This means:

* A new session was created
* You must re-subscribe to your feeds
* No replay will be sent

**Detection:** Check `msg.type === 'reconnected'` to confirm successful resume.

### Common Errors

| Error                 | Cause                         | Solution                         |
| --------------------- | ----------------------------- | -------------------------------- |
| "Session not found"   | Invalid or expired sessionId  | Create new session, re-subscribe |
| "Invalid API key"     | Bad token                     | Check API key                    |
| "Rate limit exceeded" | Too many connections/messages | Implement backoff                |
| "Connection timeout"  | Missed ping/pong              | Respond to pings within 150s     |

## Summary

| Concept       | Purpose                                      | Client Responsibility                                             |
| ------------- | -------------------------------------------- | ----------------------------------------------------------------- |
| `sessionId`   | Persist subscriptions across reconnects      | Save and reuse                                                    |
| `seq`         | Detect gaps within a connection              | Track and validate                                                |
| `cursor`      | Stream position (`block:ts:tx_index`)        | **Save from each message and replay chunk, provide on reconnect** |
| Replay        | Historical data on reconnect                 | Buffer live until complete                                        |
| Deduplication | Replay/live events may overlap               | **Track (time, txIndex), skip events at or before it**            |
| `hasGap`      | Only present when `true` - incomplete replay | Fetch from REST if critical                                       |
| Auto-restore  | Subscriptions restored automatically         | None - just reconnect with sessionId                              |

{% hint style="info" %}
**Remember:** The server's cursor tracks what was *sent*, not what you *received*. For guaranteed data continuity, always track and provide your own cursor based on messages you successfully processed.
{% endhint %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.hydromancer.xyz/readme/websocket/session-management-and-reconnection.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
