Erstellen eines Collection Options Creators

Erstellen eines Collection Options Creators

Ein Collection Options Creator ist eine Factory-Funktion, die Konfigurationsoptionen für TanStack DB Collections generiert. Sie bietet eine standardisierte Möglichkeit, verschiedene Sync-Engines und Datenquellen mit der reaktiven, synchronisationsbasierten Architektur von TanStack DB zu integrieren.

Übersicht

Collection Options Creators folgen einem konsistenten Muster

  1. Akzeptieren von Konfigurationen, die für die Sync-Engine spezifisch sind
  2. Gibt ein Objekt zurück, das die Schnittstelle CollectionConfig erfüllt
  3. Verwalten der Sync-Initialisierung, des Daten-Parsings und des Transaktionsmanagements
  4. Bietet optional Dienstprogramme, die für die Sync-Engine spezifisch sind

Wann eine benutzerdefinierte Collection erstellt werden sollte

Sie sollten eine benutzerdefinierte Collection erstellen, wenn

  • Sie eine dedizierte Sync-Engine haben (wie ElectricSQL, Trailbase, Firebase oder eine benutzerdefinierte WebSocket-Lösung)
  • Sie spezifische Sync-Verhaltensweisen benötigen, die nicht von der Query Collection abgedeckt werden
  • Sie eine Verbindung zu einem Backend herstellen möchten, das sein eigenes Sync-Protokoll hat

Hinweis: Wenn Sie nur eine API aufrufen und Daten zurückgeben, verwenden Sie stattdessen die Query Collection.

Kernanforderungen

Jeder Collection Options Creator muss diese Kernaufgaben erfüllen

1. Konfigurationsschnittstelle

Definieren Sie eine Konfigurationsschnittstelle, die Standard-Collection-Eigenschaften erweitert oder einschließt

typescript
// Pattern A: User provides handlers (Query / ElectricSQL style)
interface MyCollectionConfig<TItem extends object> {
  // Your sync engine specific options
  connectionUrl: string
  apiKey?: string
  
  // Standard collection properties
  id?: string
  schema?: StandardSchemaV1
  getKey: (item: TItem) => string | number
  sync?: SyncConfig<TItem>
  
  rowUpdateMode?: 'partial' | 'full'
  
  // User provides mutation handlers
  onInsert?: InsertMutationFn<TItem>
  onUpdate?: UpdateMutationFn<TItem>
  onDelete?: DeleteMutationFn<TItem>
}

// Pattern B: Built-in handlers (Trailbase style)
interface MyCollectionConfig<TItem extends object> 
  extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
  // Your sync engine specific options
  recordApi: MyRecordApi<TItem>
  connectionUrl: string
  
  rowUpdateMode?: 'partial' | 'full'
  
  // Note: onInsert/onUpdate/onDelete are implemented by your collection creator
}
// Pattern A: User provides handlers (Query / ElectricSQL style)
interface MyCollectionConfig<TItem extends object> {
  // Your sync engine specific options
  connectionUrl: string
  apiKey?: string
  
  // Standard collection properties
  id?: string
  schema?: StandardSchemaV1
  getKey: (item: TItem) => string | number
  sync?: SyncConfig<TItem>
  
  rowUpdateMode?: 'partial' | 'full'
  
  // User provides mutation handlers
  onInsert?: InsertMutationFn<TItem>
  onUpdate?: UpdateMutationFn<TItem>
  onDelete?: DeleteMutationFn<TItem>
}

// Pattern B: Built-in handlers (Trailbase style)
interface MyCollectionConfig<TItem extends object> 
  extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
  // Your sync engine specific options
  recordApi: MyRecordApi<TItem>
  connectionUrl: string
  
  rowUpdateMode?: 'partial' | 'full'
  
  // Note: onInsert/onUpdate/onDelete are implemented by your collection creator
}

2. Sync-Implementierung

Die Sync-Funktion ist das Herzstück Ihrer Collection. Sie muss

Die Sync-Funktion muss eine Bereinigungsfunktion für die ordnungsgemäße Speicherbereinigung zurückgeben

typescript
const sync: SyncConfig<T>['sync'] = (params) => {
  const { begin, write, commit, markReady, collection } = params
  
  // 1. Initialize connection to your sync engine
  const connection = initializeConnection(config)
  
  // 2. Set up real-time subscription FIRST (prevents race conditions)
  const eventBuffer: Array<any> = []
  let isInitialSyncComplete = false
  
  connection.subscribe((event) => {
    if (!isInitialSyncComplete) {
      // Buffer events during initial sync to prevent race conditions
      eventBuffer.push(event)
      return
    }
    
    // Process real-time events
    begin()
    
    switch (event.type) {
      case 'insert':
        write({ type: 'insert', value: event.data })
        break
      case 'update':
        write({ type: 'update', value: event.data })
        break
      case 'delete':
        write({ type: 'delete', value: event.data })
        break
    }
    
    commit()
  })
  
  // 3. Perform initial data fetch
  async function initialSync() {
    try {
      const data = await fetchInitialData()
      
      begin() // Start a transaction
      
      for (const item of data) {
        write({
          type: 'insert',
          value: item
        })
      }
      
      commit() // Commit the transaction
      
      // 4. Process buffered events
      isInitialSyncComplete = true
      if (eventBuffer.length > 0) {
        begin()
        for (const event of eventBuffer) {
          // Deduplicate if necessary based on your sync engine
          write({ type: event.type, value: event.data })
        }
        commit()
        eventBuffer.splice(0)
      }
      
    } catch (error) {
      console.error('Initial sync failed:', error)
      throw error
    } finally {
      // ALWAYS call markReady, even on error
      markReady()
    }
  }

  initialSync()
  
  // 4. Return cleanup function
  return () => {
    connection.close()
    // Clean up any timers, intervals, or other resources
  }
}
const sync: SyncConfig<T>['sync'] = (params) => {
  const { begin, write, commit, markReady, collection } = params
  
  // 1. Initialize connection to your sync engine
  const connection = initializeConnection(config)
  
  // 2. Set up real-time subscription FIRST (prevents race conditions)
  const eventBuffer: Array<any> = []
  let isInitialSyncComplete = false
  
  connection.subscribe((event) => {
    if (!isInitialSyncComplete) {
      // Buffer events during initial sync to prevent race conditions
      eventBuffer.push(event)
      return
    }
    
    // Process real-time events
    begin()
    
    switch (event.type) {
      case 'insert':
        write({ type: 'insert', value: event.data })
        break
      case 'update':
        write({ type: 'update', value: event.data })
        break
      case 'delete':
        write({ type: 'delete', value: event.data })
        break
    }
    
    commit()
  })
  
  // 3. Perform initial data fetch
  async function initialSync() {
    try {
      const data = await fetchInitialData()
      
      begin() // Start a transaction
      
      for (const item of data) {
        write({
          type: 'insert',
          value: item
        })
      }
      
      commit() // Commit the transaction
      
      // 4. Process buffered events
      isInitialSyncComplete = true
      if (eventBuffer.length > 0) {
        begin()
        for (const event of eventBuffer) {
          // Deduplicate if necessary based on your sync engine
          write({ type: event.type, value: event.data })
        }
        commit()
        eventBuffer.splice(0)
      }
      
    } catch (error) {
      console.error('Initial sync failed:', error)
      throw error
    } finally {
      // ALWAYS call markReady, even on error
      markReady()
    }
  }

  initialSync()
  
  // 4. Return cleanup function
  return () => {
    connection.close()
    // Clean up any timers, intervals, or other resources
  }
}

3. Transaktionslebenszyklus

Das Verständnis des Transaktionslebenszyklus ist für eine korrekte Implementierung wichtig.

Der Sync-Prozess folgt diesem Lebenszyklus

  1. begin() - Beginnt mit dem Erfassen von Änderungen
  2. write() - Fügt Änderungen zur ausstehenden Transaktion hinzu (zwischengespeichert bis zum Commit)
  3. commit() - Wendet alle Änderungen atomar auf den Collection-Zustand an
  4. markReady() - Signalisiert, dass die initiale Synchronisierung abgeschlossen ist

Schutz vor Race Conditions: Viele Sync-Engines starten Echtzeit-Abonnements, bevor die anfängliche Synchronisierung abgeschlossen ist. Ihre Implementierung MUSS Ereignisse deduplizieren, die über das Abonnement eingehen und dieselben Daten wie die anfängliche Synchronisierung darstellen. Erwägen Sie

  • Starten des Listeners VOR dem anfänglichen Abruf und Puffern von Ereignissen
  • Verfolgen von Zeitstempeln, Sequenznummern oder Dokumentversionen
  • Verwenden von Lese-Zeitstempeln oder anderen Ordnungsmechanismen

4. Daten-Parsing und Typkonvertierung

Wenn Ihre Sync-Engine Daten mit unterschiedlichen Typen zurückgibt, stellen Sie Konvertierungsfunktionen für bestimmte Felder bereit

typescript
interface MyCollectionConfig<TItem, TRecord> {
  // ... other config
  
  // Only specify conversions for fields that need type conversion
  parse: {
    created_at: (ts: number) => new Date(ts * 1000),  // timestamp -> Date
    updated_at: (ts: number) => new Date(ts * 1000),  // timestamp -> Date
    metadata?: (str: string) => JSON.parse(str)       // JSON string -> object
  }
  
  serialize: {
    created_at: (date: Date) => Math.floor(date.valueOf() / 1000),  // Date -> timestamp
    updated_at: (date: Date) => Math.floor(date.valueOf() / 1000),  // Date -> timestamp  
    metadata?: (obj: object) => JSON.stringify(obj)                 // object -> JSON string
  }
}
interface MyCollectionConfig<TItem, TRecord> {
  // ... other config
  
  // Only specify conversions for fields that need type conversion
  parse: {
    created_at: (ts: number) => new Date(ts * 1000),  // timestamp -> Date
    updated_at: (ts: number) => new Date(ts * 1000),  // timestamp -> Date
    metadata?: (str: string) => JSON.parse(str)       // JSON string -> object
  }
  
  serialize: {
    created_at: (date: Date) => Math.floor(date.valueOf() / 1000),  // Date -> timestamp
    updated_at: (date: Date) => Math.floor(date.valueOf() / 1000),  // Date -> timestamp  
    metadata?: (obj: object) => JSON.stringify(obj)                 // object -> JSON string
  }
}

Beispiele für Typkonvertierung

typescript
// Firebase Timestamp to Date
parse: {
  createdAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
  updatedAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
}

// PostGIS geometry to GeoJSON
parse: {
  location: (wkb: string) => parseWKBToGeoJSON(wkb)
}

// JSON string to object with error handling
parse: {
  metadata: (str: string) => {
    try {
      return JSON.parse(str)
    } catch {
      return {}
    }
  }
}
// Firebase Timestamp to Date
parse: {
  createdAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
  updatedAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
}

// PostGIS geometry to GeoJSON
parse: {
  location: (wkb: string) => parseWKBToGeoJSON(wkb)
}

// JSON string to object with error handling
parse: {
  metadata: (str: string) => {
    try {
      return JSON.parse(str)
    } catch {
      return {}
    }
  }
}

5. Muster für Mutationshandler

Es gibt zwei unterschiedliche Muster für die Behandlung von Mutationen in Collection Options Creators

Muster A: Vom Benutzer bereitgestellte Handler (ElectricSQL, Query)

Der Benutzer stellt Mutationshandler in der Konfiguration bereit. Ihr Collection Creator leitet sie weiter

typescript
interface MyCollectionConfig<TItem extends object> {
  // ... other config
  
  // User provides these handlers
  onInsert?: InsertMutationFn<TItem>
  onUpdate?: UpdateMutationFn<TItem>
  onDelete?: DeleteMutationFn<TItem>
}

export function myCollectionOptions<TItem extends object>(
  config: MyCollectionConfig<TItem>
) {
  return {
    // ... other options
    rowUpdateMode: config.rowUpdateMode || 'partial',
    
    // Pass through user-provided handlers (possibly with additional logic)
    onInsert: config.onInsert ? async (params) => {
      const result = await config.onInsert!(params)
      // Additional sync coordination logic
      return result
    } : undefined
  }
}
interface MyCollectionConfig<TItem extends object> {
  // ... other config
  
  // User provides these handlers
  onInsert?: InsertMutationFn<TItem>
  onUpdate?: UpdateMutationFn<TItem>
  onDelete?: DeleteMutationFn<TItem>
}

export function myCollectionOptions<TItem extends object>(
  config: MyCollectionConfig<TItem>
) {
  return {
    // ... other options
    rowUpdateMode: config.rowUpdateMode || 'partial',
    
    // Pass through user-provided handlers (possibly with additional logic)
    onInsert: config.onInsert ? async (params) => {
      const result = await config.onInsert!(params)
      // Additional sync coordination logic
      return result
    } : undefined
  }
}

Muster B: Integrierte Handler (Trailbase, WebSocket, Firebase)

Ihr Collection Creator implementiert die Handler direkt über die APIs der Sync-Engine

typescript
interface MyCollectionConfig<TItem extends object> 
  extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
  // ... sync engine specific config
  // Note: onInsert/onUpdate/onDelete are NOT in the config
}

export function myCollectionOptions<TItem extends object>(
  config: MyCollectionConfig<TItem>
) {
  return {
    // ... other options
    rowUpdateMode: config.rowUpdateMode || 'partial',
    
    // Implement handlers using sync engine APIs
    onInsert: async ({ transaction }) => {
      // Handle provider-specific batch limits (e.g., Firestore's 500 limit)
      const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
      
      for (const chunk of chunks) {
        const ids = await config.recordApi.createBulk(
          chunk.map(m => serialize(m.modified))
        )
        await awaitIds(ids)
      }
      
      return transaction.mutations.map(m => m.key)
    },
    
    onUpdate: async ({ transaction }) => {
      const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
      
      for (const chunk of chunks) {
        await Promise.all(
          chunk.map(m => 
            config.recordApi.update(m.key, serialize(m.changes))
          )
        )
      }
      
      await awaitIds(transaction.mutations.map(m => String(m.key)))
    }
  }
}
interface MyCollectionConfig<TItem extends object> 
  extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
  // ... sync engine specific config
  // Note: onInsert/onUpdate/onDelete are NOT in the config
}

export function myCollectionOptions<TItem extends object>(
  config: MyCollectionConfig<TItem>
) {
  return {
    // ... other options
    rowUpdateMode: config.rowUpdateMode || 'partial',
    
    // Implement handlers using sync engine APIs
    onInsert: async ({ transaction }) => {
      // Handle provider-specific batch limits (e.g., Firestore's 500 limit)
      const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
      
      for (const chunk of chunks) {
        const ids = await config.recordApi.createBulk(
          chunk.map(m => serialize(m.modified))
        )
        await awaitIds(ids)
      }
      
      return transaction.mutations.map(m => m.key)
    },
    
    onUpdate: async ({ transaction }) => {
      const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
      
      for (const chunk of chunks) {
        await Promise.all(
          chunk.map(m => 
            config.recordApi.update(m.key, serialize(m.changes))
          )
        )
      }
      
      await awaitIds(transaction.mutations.map(m => String(m.key)))
    }
  }
}

Viele Anbieter haben Batch-Größenbeschränkungen (Firestore: 500, DynamoDB: 25 usw.), daher sollten große Transaktionen entsprechend geclustert werden.

Wählen Sie Muster A, wenn Benutzer ihre eigenen APIs bereitstellen müssen, und Muster B, wenn Ihre Sync-Engine Schreibvorgänge direkt verarbeitet.

Zeilenaktualisierungsmodi

Collections unterstützen zwei Aktualisierungsmodi

  • partial (Standard) - Aktualisierungen werden mit vorhandenen Daten zusammengeführt
  • full - Aktualisierungen ersetzen die gesamte Zeile

Konfigurieren Sie dies in Ihrer Sync-Konfiguration

typescript
sync: {
  sync: syncFn,
  rowUpdateMode: 'full' // or 'partial'
}
sync: {
  sync: syncFn,
  rowUpdateMode: 'full' // or 'partial'
}

Produktionsbeispiele

Vollständige, produktionsfertige Beispiele finden Sie in den Collection-Paketen im TanStack DB-Repository

Wichtige Erkenntnisse aus Produktionscollections

Von Query Collection

  • Einfachster Ansatz: Vollständiges Neuladen nach Mutationen
  • Am besten geeignet für: APIs ohne Echtzeit-Synchronisierung
  • Muster: Benutzer stellt onInsert/onUpdate/onDelete-Handler bereit

Von Trailbase Collection

  • Zeigt ID-basiertes optimistisches Zustandsmanagement
  • Verarbeitet Batch-Limits des Anbieters (Chunking großer Operationen)
  • Muster: Collection stellt Mutationshandler mit Record-API bereit

Von Electric Collection

  • Komplexes Transaktions-ID-Tracking für verteilte Synchronisierung
  • Demonstriert fortgeschrittene Deduplizierungstechniken
  • Zeigt, wie Benutzer-Handler mit Sync-Koordination umschlossen werden

Vollständiges Beispiel: WebSocket-Collection

Hier ist ein vollständiges Beispiel für einen WebSocket-basierten Collection Options Creator, der den vollständigen Round-Trip-Fluss demonstriert

  1. Client sendet Transaktion mit allen gebündelten Mutationen
  2. Server verarbeitet die Transaktion und kann die Daten ändern (Validierung, Zeitstempel usw.)
  3. Server sendet eine Bestätigung und die tatsächlich verarbeiteten Daten zurück
  4. Client wartet auf diesen Round-Trip, bevor der optimistische Zustand gelöscht wird
typescript
import type {
  CollectionConfig,
  SyncConfig,
  InsertMutationFnParams,
  UpdateMutationFnParams,
  DeleteMutationFnParams,
  UtilsRecord
} from '@tanstack/db'

interface WebSocketMessage<T> {
  type: 'insert' | 'update' | 'delete' | 'sync' | 'transaction' | 'ack'
  data?: T | T[]
  mutations?: Array<{
    type: 'insert' | 'update' | 'delete'
    data: T
    id?: string
  }>
  transactionId?: string
  id?: string
}

interface WebSocketCollectionConfig<TItem extends object>
  extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete' | 'sync'> {
  url: string
  reconnectInterval?: number
  
  // Note: onInsert/onUpdate/onDelete are handled by the WebSocket connection
  // Users don't provide these handlers
}

interface WebSocketUtils extends UtilsRecord {
  reconnect: () => void
  getConnectionState: () => 'connected' | 'disconnected' | 'connecting'
}

export function webSocketCollectionOptions<TItem extends object>(
  config: WebSocketCollectionConfig<TItem>
): CollectionConfig<TItem> & { utils: WebSocketUtils } {
  let ws: WebSocket | null = null
  let reconnectTimer: NodeJS.Timeout | null = null
  let connectionState: 'connected' | 'disconnected' | 'connecting' = 'disconnected'
  
  // Track pending transactions awaiting acknowledgment
  const pendingTransactions = new Map<string, {
    resolve: () => void
    reject: (error: Error) => void
    timeout: NodeJS.Timeout
  }>()
  
  const sync: SyncConfig<TItem>['sync'] = (params) => {
    const { begin, write, commit, markReady } = params
    
    function connect() {
      connectionState = 'connecting'
      ws = new WebSocket(config.url)
      
      ws.onopen = () => {
        connectionState = 'connected'
        // Request initial sync
        ws.send(JSON.stringify({ type: 'sync' }))
      }
      
      ws.onmessage = (event) => {
        const message: WebSocketMessage<TItem> = JSON.parse(event.data)
        
        switch (message.type) {
          case 'sync':
            // Initial sync with array of items
            begin()
            if (Array.isArray(message.data)) {
              for (const item of message.data) {
                write({ type: 'insert', value: item })
              }
            }
            commit()
            markReady()
            break
            
          case 'insert':
          case 'update':
          case 'delete':
            // Real-time updates from other clients
            begin()
            write({ 
              type: message.type, 
              value: message.data as TItem 
            })
            commit()
            break
            
          case 'ack':
            // Server acknowledged our transaction
            if (message.transactionId) {
              const pending = pendingTransactions.get(message.transactionId)
              if (pending) {
                clearTimeout(pending.timeout)
                pendingTransactions.delete(message.transactionId)
                pending.resolve()
              }
            }
            break
            
          case 'transaction':
            // Server sending back the actual data after processing our transaction
            if (message.mutations) {
              begin()
              for (const mutation of message.mutations) {
                write({
                  type: mutation.type,
                  value: mutation.data
                })
              }
              commit()
            }
            break
        }
      }
      
      ws.onerror = (error) => {
        console.error('WebSocket error:', error)
        connectionState = 'disconnected'
      }
      
      ws.onclose = () => {
        connectionState = 'disconnected'
        // Auto-reconnect
        if (!reconnectTimer) {
          reconnectTimer = setTimeout(() => {
            reconnectTimer = null
            connect()
          }, config.reconnectInterval || 5000)
        }
      }
    }
    
    // Start connection
    connect()
    
    // Return cleanup function
    return () => {
      if (reconnectTimer) {
        clearTimeout(reconnectTimer)
        reconnectTimer = null
      }
      if (ws) {
        ws.close()
        ws = null
      }
    }
  }
  
  // Helper function to send transaction and wait for server acknowledgment
  const sendTransaction = async (
    params: InsertMutationFnParams<TItem> | UpdateMutationFnParams<TItem> | DeleteMutationFnParams<TItem>
  ): Promise<void> => {
    if (ws?.readyState !== WebSocket.OPEN) {
      throw new Error('WebSocket not connected')
    }
    
    const transactionId = crypto.randomUUID()
    
    // Convert all mutations in the transaction to the wire format
    const mutations = params.transaction.mutations.map(mutation => ({
      type: mutation.type,
      id: mutation.key,
      data: mutation.type === 'delete' ? undefined : 
           mutation.type === 'update' ? mutation.changes : 
           mutation.modified
    }))
    
    // Send the entire transaction at once
    ws.send(JSON.stringify({
      type: 'transaction',
      transactionId,
      mutations
    }))
    
    // Wait for server acknowledgment
    return new Promise<void>((resolve, reject) => {
      const timeout = setTimeout(() => {
        pendingTransactions.delete(transactionId)
        reject(new Error(`Transaction ${transactionId} timed out`))
      }, 10000) // 10 second timeout
      
      pendingTransactions.set(transactionId, {
        resolve,
        reject,
        timeout
      })
    })
  }
  
  // All mutation handlers use the same transaction sender
  const onInsert = async (params: InsertMutationFnParams<TItem>) => {
    await sendTransaction(params)
  }
  
  const onUpdate = async (params: UpdateMutationFnParams<TItem>) => {
    await sendTransaction(params)
  }
  
  const onDelete = async (params: DeleteMutationFnParams<TItem>) => {
    await sendTransaction(params)
  }
  
  return {
    id: config.id,
    schema: config.schema,
    getKey: config.getKey,
    sync: { sync },
    onInsert,
    onUpdate,
    onDelete,
    utils: {
      reconnect: () => {
        if (ws) ws.close()
        connect()
      },
      getConnectionState: () => connectionState
    }
  }
}
import type {
  CollectionConfig,
  SyncConfig,
  InsertMutationFnParams,
  UpdateMutationFnParams,
  DeleteMutationFnParams,
  UtilsRecord
} from '@tanstack/db'

interface WebSocketMessage<T> {
  type: 'insert' | 'update' | 'delete' | 'sync' | 'transaction' | 'ack'
  data?: T | T[]
  mutations?: Array<{
    type: 'insert' | 'update' | 'delete'
    data: T
    id?: string
  }>
  transactionId?: string
  id?: string
}

interface WebSocketCollectionConfig<TItem extends object>
  extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete' | 'sync'> {
  url: string
  reconnectInterval?: number
  
  // Note: onInsert/onUpdate/onDelete are handled by the WebSocket connection
  // Users don't provide these handlers
}

interface WebSocketUtils extends UtilsRecord {
  reconnect: () => void
  getConnectionState: () => 'connected' | 'disconnected' | 'connecting'
}

export function webSocketCollectionOptions<TItem extends object>(
  config: WebSocketCollectionConfig<TItem>
): CollectionConfig<TItem> & { utils: WebSocketUtils } {
  let ws: WebSocket | null = null
  let reconnectTimer: NodeJS.Timeout | null = null
  let connectionState: 'connected' | 'disconnected' | 'connecting' = 'disconnected'
  
  // Track pending transactions awaiting acknowledgment
  const pendingTransactions = new Map<string, {
    resolve: () => void
    reject: (error: Error) => void
    timeout: NodeJS.Timeout
  }>()
  
  const sync: SyncConfig<TItem>['sync'] = (params) => {
    const { begin, write, commit, markReady } = params
    
    function connect() {
      connectionState = 'connecting'
      ws = new WebSocket(config.url)
      
      ws.onopen = () => {
        connectionState = 'connected'
        // Request initial sync
        ws.send(JSON.stringify({ type: 'sync' }))
      }
      
      ws.onmessage = (event) => {
        const message: WebSocketMessage<TItem> = JSON.parse(event.data)
        
        switch (message.type) {
          case 'sync':
            // Initial sync with array of items
            begin()
            if (Array.isArray(message.data)) {
              for (const item of message.data) {
                write({ type: 'insert', value: item })
              }
            }
            commit()
            markReady()
            break
            
          case 'insert':
          case 'update':
          case 'delete':
            // Real-time updates from other clients
            begin()
            write({ 
              type: message.type, 
              value: message.data as TItem 
            })
            commit()
            break
            
          case 'ack':
            // Server acknowledged our transaction
            if (message.transactionId) {
              const pending = pendingTransactions.get(message.transactionId)
              if (pending) {
                clearTimeout(pending.timeout)
                pendingTransactions.delete(message.transactionId)
                pending.resolve()
              }
            }
            break
            
          case 'transaction':
            // Server sending back the actual data after processing our transaction
            if (message.mutations) {
              begin()
              for (const mutation of message.mutations) {
                write({
                  type: mutation.type,
                  value: mutation.data
                })
              }
              commit()
            }
            break
        }
      }
      
      ws.onerror = (error) => {
        console.error('WebSocket error:', error)
        connectionState = 'disconnected'
      }
      
      ws.onclose = () => {
        connectionState = 'disconnected'
        // Auto-reconnect
        if (!reconnectTimer) {
          reconnectTimer = setTimeout(() => {
            reconnectTimer = null
            connect()
          }, config.reconnectInterval || 5000)
        }
      }
    }
    
    // Start connection
    connect()
    
    // Return cleanup function
    return () => {
      if (reconnectTimer) {
        clearTimeout(reconnectTimer)
        reconnectTimer = null
      }
      if (ws) {
        ws.close()
        ws = null
      }
    }
  }
  
  // Helper function to send transaction and wait for server acknowledgment
  const sendTransaction = async (
    params: InsertMutationFnParams<TItem> | UpdateMutationFnParams<TItem> | DeleteMutationFnParams<TItem>
  ): Promise<void> => {
    if (ws?.readyState !== WebSocket.OPEN) {
      throw new Error('WebSocket not connected')
    }
    
    const transactionId = crypto.randomUUID()
    
    // Convert all mutations in the transaction to the wire format
    const mutations = params.transaction.mutations.map(mutation => ({
      type: mutation.type,
      id: mutation.key,
      data: mutation.type === 'delete' ? undefined : 
           mutation.type === 'update' ? mutation.changes : 
           mutation.modified
    }))
    
    // Send the entire transaction at once
    ws.send(JSON.stringify({
      type: 'transaction',
      transactionId,
      mutations
    }))
    
    // Wait for server acknowledgment
    return new Promise<void>((resolve, reject) => {
      const timeout = setTimeout(() => {
        pendingTransactions.delete(transactionId)
        reject(new Error(`Transaction ${transactionId} timed out`))
      }, 10000) // 10 second timeout
      
      pendingTransactions.set(transactionId, {
        resolve,
        reject,
        timeout
      })
    })
  }
  
  // All mutation handlers use the same transaction sender
  const onInsert = async (params: InsertMutationFnParams<TItem>) => {
    await sendTransaction(params)
  }
  
  const onUpdate = async (params: UpdateMutationFnParams<TItem>) => {
    await sendTransaction(params)
  }
  
  const onDelete = async (params: DeleteMutationFnParams<TItem>) => {
    await sendTransaction(params)
  }
  
  return {
    id: config.id,
    schema: config.schema,
    getKey: config.getKey,
    sync: { sync },
    onInsert,
    onUpdate,
    onDelete,
    utils: {
      reconnect: () => {
        if (ws) ws.close()
        connect()
      },
      getConnectionState: () => connectionState
    }
  }
}

Anwendungsbeispiel

typescript
import { createCollection } from '@tanstack/react-db'
import { webSocketCollectionOptions } from './websocket-collection'

const todos = createCollection(
  webSocketCollectionOptions({
    url: 'ws://:8080/todos',
    getKey: (todo) => todo.id,
    schema: todoSchema
    // Note: No onInsert/onUpdate/onDelete - handled by WebSocket automatically
  })
)

// Use the collection
todos.insert({ id: '1', text: 'Buy milk', completed: false })

// Access utilities
todos.utils.getConnectionState() // 'connected'
todos.utils.reconnect() // Force reconnect
import { createCollection } from '@tanstack/react-db'
import { webSocketCollectionOptions } from './websocket-collection'

const todos = createCollection(
  webSocketCollectionOptions({
    url: 'ws://:8080/todos',
    getKey: (todo) => todo.id,
    schema: todoSchema
    // Note: No onInsert/onUpdate/onDelete - handled by WebSocket automatically
  })
)

// Use the collection
todos.insert({ id: '1', text: 'Buy milk', completed: false })

// Access utilities
todos.utils.getConnectionState() // 'connected'
todos.utils.reconnect() // Force reconnect

Fortgeschritten: Verwaltung des optimistischen Zustands

Eine entscheidende Herausforderung bei synchronisationsbasierten Apps ist zu wissen, wann der optimistische Zustand gelöscht werden soll. Wenn ein Benutzer eine Änderung vornimmt

  1. Die Benutzeroberfläche wird sofort aktualisiert (optimistische Aktualisierung)
  2. Eine Mutation wird an das Backend gesendet
  3. Das Backend verarbeitet und speichert die Änderung
  4. Die Änderung wird zum Client synchronisiert
  5. Der optimistische Zustand sollte zugunsten der synchronisierten Daten gelöscht werden

Die Schlüsselfrage ist: Wie wissen Sie, wann Schritt 4 abgeschlossen ist?

Viele Anbieter bieten integrierte Methoden, um auf den Abschluss der Synchronisierung zu warten

typescript
// Firebase
await waitForPendingWrites(firestore)

// Custom WebSocket
await websocket.waitForAck(transactionId)
// Firebase
await waitForPendingWrites(firestore)

// Custom WebSocket
await websocket.waitForAck(transactionId)

Strategie 2: Transaktions-ID-Tracking (ElectricSQL)

ElectricSQL gibt Transaktions-IDs zurück, die Sie verfolgen können

typescript
// Track seen transaction IDs
const seenTxids = new Store<Set<number>>(new Set())

// In sync, track txids from incoming messages
if (message.headers.txids) {
  message.headers.txids.forEach(txid => {
    seenTxids.setState(prev => new Set([...prev, txid]))
  })
}

// Mutation handlers return txids and wait for them
const wrappedOnInsert = async (params) => {
  const result = await config.onInsert!(params)
  
  // Wait for the txid to appear in synced data
  if (result.txid) {
    await awaitTxId(result.txid)
  }
  
  return result
}

// Utility function to wait for a txid
const awaitTxId = (txId: number): Promise<boolean> => {
  if (seenTxids.state.has(txId)) return Promise.resolve(true)
  
  return new Promise((resolve) => {
    const unsubscribe = seenTxids.subscribe(() => {
      if (seenTxids.state.has(txId)) {
        unsubscribe()
        resolve(true)
      }
    })
  })
}
// Track seen transaction IDs
const seenTxids = new Store<Set<number>>(new Set())

// In sync, track txids from incoming messages
if (message.headers.txids) {
  message.headers.txids.forEach(txid => {
    seenTxids.setState(prev => new Set([...prev, txid]))
  })
}

// Mutation handlers return txids and wait for them
const wrappedOnInsert = async (params) => {
  const result = await config.onInsert!(params)
  
  // Wait for the txid to appear in synced data
  if (result.txid) {
    await awaitTxId(result.txid)
  }
  
  return result
}

// Utility function to wait for a txid
const awaitTxId = (txId: number): Promise<boolean> => {
  if (seenTxids.state.has(txId)) return Promise.resolve(true)
  
  return new Promise((resolve) => {
    const unsubscribe = seenTxids.subscribe(() => {
      if (seenTxids.state.has(txId)) {
        unsubscribe()
        resolve(true)
      }
    })
  })
}

Strategie 3: ID-basiertes Tracking (Trailbase)

Trailbase verfolgt, wann bestimmte Record-IDs synchronisiert wurden

typescript
// Track synced IDs with timestamps
const seenIds = new Store(new Map<string, number>())

// In sync, mark IDs as seen
write({ type: 'insert', value: item })
seenIds.setState(prev => new Map(prev).set(item.id, Date.now()))

// Wait for specific IDs after mutations
const wrappedOnInsert = async (params) => {
  const ids = await config.recordApi.createBulk(items)
  
  // Wait for all IDs to be synced back
  await awaitIds(ids)
}

const awaitIds = (ids: string[]): Promise<void> => {
  const allSynced = ids.every(id => seenIds.state.has(id))
  if (allSynced) return Promise.resolve()
  
  return new Promise((resolve) => {
    const unsubscribe = seenIds.subscribe((state) => {
      if (ids.every(id => state.has(id))) {
        unsubscribe()
        resolve()
      }
    })
  })
}
// Track synced IDs with timestamps
const seenIds = new Store(new Map<string, number>())

// In sync, mark IDs as seen
write({ type: 'insert', value: item })
seenIds.setState(prev => new Map(prev).set(item.id, Date.now()))

// Wait for specific IDs after mutations
const wrappedOnInsert = async (params) => {
  const ids = await config.recordApi.createBulk(items)
  
  // Wait for all IDs to be synced back
  await awaitIds(ids)
}

const awaitIds = (ids: string[]): Promise<void> => {
  const allSynced = ids.every(id => seenIds.state.has(id))
  if (allSynced) return Promise.resolve()
  
  return new Promise((resolve) => {
    const unsubscribe = seenIds.subscribe((state) => {
      if (ids.every(id => state.has(id))) {
        unsubscribe()
        resolve()
      }
    })
  })
}

Strategie 4: Versions-/Zeitstempel-Tracking

Verfolgen Sie Versionsnummern oder Zeitstempel, um zu erkennen, wann Daten aktuell sind

typescript
// Track latest sync timestamp
let lastSyncTime = 0

// In mutations, record when the operation was sent
const wrappedOnUpdate = async (params) => {
  const mutationTime = Date.now()
  await config.onUpdate(params)
  
  // Wait for sync to catch up
  await waitForSync(mutationTime)
}

const waitForSync = (afterTime: number): Promise<void> => {
  if (lastSyncTime > afterTime) return Promise.resolve()
  
  return new Promise((resolve) => {
    const check = setInterval(() => {
      if (lastSyncTime > afterTime) {
        clearInterval(check)
        resolve()
      }
    }, 100)
  })
}
// Track latest sync timestamp
let lastSyncTime = 0

// In mutations, record when the operation was sent
const wrappedOnUpdate = async (params) => {
  const mutationTime = Date.now()
  await config.onUpdate(params)
  
  // Wait for sync to catch up
  await waitForSync(mutationTime)
}

const waitForSync = (afterTime: number): Promise<void> => {
  if (lastSyncTime > afterTime) return Promise.resolve()
  
  return new Promise((resolve) => {
    const check = setInterval(() => {
      if (lastSyncTime > afterTime) {
        clearInterval(check)
        resolve()
      }
    }, 100)
  })
}

Strategie 5: Vollständiges Neuladen (Query Collection)

Die Query Collection lädt einfach alle Daten nach Mutationen neu

typescript
const wrappedOnInsert = async (params) => {
  // Perform the mutation
  await config.onInsert(params)
  
  // Refetch the entire collection
  await refetch()
  
  // The refetch will trigger sync with fresh data,
  // automatically dropping optimistic state
}
const wrappedOnInsert = async (params) => {
  // Perform the mutation
  await config.onInsert(params)
  
  // Refetch the entire collection
  await refetch()
  
  // The refetch will trigger sync with fresh data,
  // automatically dropping optimistic state
}

Auswahl einer Strategie

  • Integrierte Methoden: Am besten geeignet, wenn Ihr Anbieter APIs für den Abschluss der Synchronisierung anbietet
  • Transaktions-IDs: Am besten geeignet, wenn Ihr Backend eine zuverlässige Transaktionsverfolgung bietet
  • ID-basiert: Gut für Systeme, bei denen jede Mutation die betroffenen IDs zurückgibt
  • Vollständiges Neuladen: Am einfachsten, aber am wenigsten effizient; gut für kleine Datensätze
  • Version/Zeitstempel: Funktioniert, wenn Ihre Synchronisierung zuverlässige Ordnungsinformationen enthält

Implementierungstipps

  1. Warten Sie immer auf die Synchronisierung in Ihren Mutationshandlern, um sicherzustellen, dass der optimistische Zustand ordnungsgemäß verwaltet wird
  2. Behandeln Sie Timeouts - Warten Sie nicht ewig auf die Bestätigung der Synchronisierung
  3. Bereinigen Sie Tracking-Daten - Entfernen Sie alte Txids/IDs, um Speicherlecks zu vermeiden
  4. Stellen Sie Dienstprogramme bereit - Exportieren Sie Funktionen wie awaitTxId oder awaitSync für fortgeschrittene Anwendungsfälle

Bewährte Praktiken

  1. Rufen Sie immer markReady() auf - Dies signalisiert, dass die Collection anfängliche Daten hat und zur Verwendung bereit ist
  2. Fehler ordnungsgemäß behandeln - Rufen Sie markReady() auch im Fehlerfall auf, um eine Blockierung der App zu vermeiden
  3. Ressourcen bereinigen - Geben Sie eine Bereinigungsfunktion von sync zurück, um Speicherlecks zu vermeiden
  4. Operationen bündeln - Verwenden Sie begin/commit, um mehrere Änderungen für eine bessere Leistung zu bündeln
  5. Race Conditions - Starten Sie Listener vor dem anfänglichen Abruf und puffern Sie Ereignisse
  6. Typsicherheit - Verwenden Sie TypeScript-Generics, um die Typsicherheit durchgängig aufrechtzuerhalten
  7. Stellen Sie Dienstprogramme bereit - Exportieren Sie für fortgeschrittene Anwendungsfälle Sync-Engine-spezifische Dienstprogramme

Testen Ihrer Collection

Testen Sie Ihren Collection Options Creator mit

  1. Unit-Tests - Testen Sie Sync-Logik, Datentransformationen
  2. Integrationstests - Testen Sie mit einer echten Sync-Engine
  3. Fehlerszenarien - Verbindungsfehler, ungültige Daten
  4. Leistung - Große Datensätze, häufige Aktualisierungen

Fazit

Das Erstellen eines Collection Options Creators ermöglicht es Ihnen, jede Sync-Engine in die leistungsstarke synchronisationsbasierte Architektur von TanStack DB zu integrieren. Befolgen Sie die hier gezeigten Muster, und Sie erhalten eine robuste, typsichere Integration, die eine hervorragende Entwicklererfahrung bietet.

Unsere Partner
Code Rabbit
Electric
Prisma
Bytes abonnieren

Ihre wöchentliche Dosis JavaScript-Nachrichten. Jeden Montag kostenlos an über 100.000 Entwickler geliefert.

Bytes

Kein Spam. Jederzeit kündbar.

Bytes abonnieren

Ihre wöchentliche Dosis JavaScript-Nachrichten. Jeden Montag kostenlos an über 100.000 Entwickler geliefert.

Bytes

Kein Spam. Jederzeit kündbar.