Ein Collection Options Creator ist eine Factory-Funktion, die Konfigurationsoptionen für TanStack DB Collections generiert. Sie bietet eine standardisierte Möglichkeit, verschiedene Sync-Engines und Datenquellen mit der reaktiven, synchronisationsbasierten Architektur von TanStack DB zu integrieren.
Collection Options Creators folgen einem konsistenten Muster
Sie sollten eine benutzerdefinierte Collection erstellen, wenn
Hinweis: Wenn Sie nur eine API aufrufen und Daten zurückgeben, verwenden Sie stattdessen die Query Collection.
Jeder Collection Options Creator muss diese Kernaufgaben erfüllen
Definieren Sie eine Konfigurationsschnittstelle, die Standard-Collection-Eigenschaften erweitert oder einschließt
// Pattern A: User provides handlers (Query / ElectricSQL style)
interface MyCollectionConfig<TItem extends object> {
// Your sync engine specific options
connectionUrl: string
apiKey?: string
// Standard collection properties
id?: string
schema?: StandardSchemaV1
getKey: (item: TItem) => string | number
sync?: SyncConfig<TItem>
rowUpdateMode?: 'partial' | 'full'
// User provides mutation handlers
onInsert?: InsertMutationFn<TItem>
onUpdate?: UpdateMutationFn<TItem>
onDelete?: DeleteMutationFn<TItem>
}
// Pattern B: Built-in handlers (Trailbase style)
interface MyCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
// Your sync engine specific options
recordApi: MyRecordApi<TItem>
connectionUrl: string
rowUpdateMode?: 'partial' | 'full'
// Note: onInsert/onUpdate/onDelete are implemented by your collection creator
}
// Pattern A: User provides handlers (Query / ElectricSQL style)
interface MyCollectionConfig<TItem extends object> {
// Your sync engine specific options
connectionUrl: string
apiKey?: string
// Standard collection properties
id?: string
schema?: StandardSchemaV1
getKey: (item: TItem) => string | number
sync?: SyncConfig<TItem>
rowUpdateMode?: 'partial' | 'full'
// User provides mutation handlers
onInsert?: InsertMutationFn<TItem>
onUpdate?: UpdateMutationFn<TItem>
onDelete?: DeleteMutationFn<TItem>
}
// Pattern B: Built-in handlers (Trailbase style)
interface MyCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
// Your sync engine specific options
recordApi: MyRecordApi<TItem>
connectionUrl: string
rowUpdateMode?: 'partial' | 'full'
// Note: onInsert/onUpdate/onDelete are implemented by your collection creator
}
Die Sync-Funktion ist das Herzstück Ihrer Collection. Sie muss
Die Sync-Funktion muss eine Bereinigungsfunktion für die ordnungsgemäße Speicherbereinigung zurückgeben
const sync: SyncConfig<T>['sync'] = (params) => {
const { begin, write, commit, markReady, collection } = params
// 1. Initialize connection to your sync engine
const connection = initializeConnection(config)
// 2. Set up real-time subscription FIRST (prevents race conditions)
const eventBuffer: Array<any> = []
let isInitialSyncComplete = false
connection.subscribe((event) => {
if (!isInitialSyncComplete) {
// Buffer events during initial sync to prevent race conditions
eventBuffer.push(event)
return
}
// Process real-time events
begin()
switch (event.type) {
case 'insert':
write({ type: 'insert', value: event.data })
break
case 'update':
write({ type: 'update', value: event.data })
break
case 'delete':
write({ type: 'delete', value: event.data })
break
}
commit()
})
// 3. Perform initial data fetch
async function initialSync() {
try {
const data = await fetchInitialData()
begin() // Start a transaction
for (const item of data) {
write({
type: 'insert',
value: item
})
}
commit() // Commit the transaction
// 4. Process buffered events
isInitialSyncComplete = true
if (eventBuffer.length > 0) {
begin()
for (const event of eventBuffer) {
// Deduplicate if necessary based on your sync engine
write({ type: event.type, value: event.data })
}
commit()
eventBuffer.splice(0)
}
} catch (error) {
console.error('Initial sync failed:', error)
throw error
} finally {
// ALWAYS call markReady, even on error
markReady()
}
}
initialSync()
// 4. Return cleanup function
return () => {
connection.close()
// Clean up any timers, intervals, or other resources
}
}
const sync: SyncConfig<T>['sync'] = (params) => {
const { begin, write, commit, markReady, collection } = params
// 1. Initialize connection to your sync engine
const connection = initializeConnection(config)
// 2. Set up real-time subscription FIRST (prevents race conditions)
const eventBuffer: Array<any> = []
let isInitialSyncComplete = false
connection.subscribe((event) => {
if (!isInitialSyncComplete) {
// Buffer events during initial sync to prevent race conditions
eventBuffer.push(event)
return
}
// Process real-time events
begin()
switch (event.type) {
case 'insert':
write({ type: 'insert', value: event.data })
break
case 'update':
write({ type: 'update', value: event.data })
break
case 'delete':
write({ type: 'delete', value: event.data })
break
}
commit()
})
// 3. Perform initial data fetch
async function initialSync() {
try {
const data = await fetchInitialData()
begin() // Start a transaction
for (const item of data) {
write({
type: 'insert',
value: item
})
}
commit() // Commit the transaction
// 4. Process buffered events
isInitialSyncComplete = true
if (eventBuffer.length > 0) {
begin()
for (const event of eventBuffer) {
// Deduplicate if necessary based on your sync engine
write({ type: event.type, value: event.data })
}
commit()
eventBuffer.splice(0)
}
} catch (error) {
console.error('Initial sync failed:', error)
throw error
} finally {
// ALWAYS call markReady, even on error
markReady()
}
}
initialSync()
// 4. Return cleanup function
return () => {
connection.close()
// Clean up any timers, intervals, or other resources
}
}
Das Verständnis des Transaktionslebenszyklus ist für eine korrekte Implementierung wichtig.
Der Sync-Prozess folgt diesem Lebenszyklus
Schutz vor Race Conditions: Viele Sync-Engines starten Echtzeit-Abonnements, bevor die anfängliche Synchronisierung abgeschlossen ist. Ihre Implementierung MUSS Ereignisse deduplizieren, die über das Abonnement eingehen und dieselben Daten wie die anfängliche Synchronisierung darstellen. Erwägen Sie
Wenn Ihre Sync-Engine Daten mit unterschiedlichen Typen zurückgibt, stellen Sie Konvertierungsfunktionen für bestimmte Felder bereit
interface MyCollectionConfig<TItem, TRecord> {
// ... other config
// Only specify conversions for fields that need type conversion
parse: {
created_at: (ts: number) => new Date(ts * 1000), // timestamp -> Date
updated_at: (ts: number) => new Date(ts * 1000), // timestamp -> Date
metadata?: (str: string) => JSON.parse(str) // JSON string -> object
}
serialize: {
created_at: (date: Date) => Math.floor(date.valueOf() / 1000), // Date -> timestamp
updated_at: (date: Date) => Math.floor(date.valueOf() / 1000), // Date -> timestamp
metadata?: (obj: object) => JSON.stringify(obj) // object -> JSON string
}
}
interface MyCollectionConfig<TItem, TRecord> {
// ... other config
// Only specify conversions for fields that need type conversion
parse: {
created_at: (ts: number) => new Date(ts * 1000), // timestamp -> Date
updated_at: (ts: number) => new Date(ts * 1000), // timestamp -> Date
metadata?: (str: string) => JSON.parse(str) // JSON string -> object
}
serialize: {
created_at: (date: Date) => Math.floor(date.valueOf() / 1000), // Date -> timestamp
updated_at: (date: Date) => Math.floor(date.valueOf() / 1000), // Date -> timestamp
metadata?: (obj: object) => JSON.stringify(obj) // object -> JSON string
}
}
Beispiele für Typkonvertierung
// Firebase Timestamp to Date
parse: {
createdAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
updatedAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
}
// PostGIS geometry to GeoJSON
parse: {
location: (wkb: string) => parseWKBToGeoJSON(wkb)
}
// JSON string to object with error handling
parse: {
metadata: (str: string) => {
try {
return JSON.parse(str)
} catch {
return {}
}
}
}
// Firebase Timestamp to Date
parse: {
createdAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
updatedAt: (timestamp) => timestamp?.toDate?.() || new Date(timestamp),
}
// PostGIS geometry to GeoJSON
parse: {
location: (wkb: string) => parseWKBToGeoJSON(wkb)
}
// JSON string to object with error handling
parse: {
metadata: (str: string) => {
try {
return JSON.parse(str)
} catch {
return {}
}
}
}
Es gibt zwei unterschiedliche Muster für die Behandlung von Mutationen in Collection Options Creators
Der Benutzer stellt Mutationshandler in der Konfiguration bereit. Ihr Collection Creator leitet sie weiter
interface MyCollectionConfig<TItem extends object> {
// ... other config
// User provides these handlers
onInsert?: InsertMutationFn<TItem>
onUpdate?: UpdateMutationFn<TItem>
onDelete?: DeleteMutationFn<TItem>
}
export function myCollectionOptions<TItem extends object>(
config: MyCollectionConfig<TItem>
) {
return {
// ... other options
rowUpdateMode: config.rowUpdateMode || 'partial',
// Pass through user-provided handlers (possibly with additional logic)
onInsert: config.onInsert ? async (params) => {
const result = await config.onInsert!(params)
// Additional sync coordination logic
return result
} : undefined
}
}
interface MyCollectionConfig<TItem extends object> {
// ... other config
// User provides these handlers
onInsert?: InsertMutationFn<TItem>
onUpdate?: UpdateMutationFn<TItem>
onDelete?: DeleteMutationFn<TItem>
}
export function myCollectionOptions<TItem extends object>(
config: MyCollectionConfig<TItem>
) {
return {
// ... other options
rowUpdateMode: config.rowUpdateMode || 'partial',
// Pass through user-provided handlers (possibly with additional logic)
onInsert: config.onInsert ? async (params) => {
const result = await config.onInsert!(params)
// Additional sync coordination logic
return result
} : undefined
}
}
Ihr Collection Creator implementiert die Handler direkt über die APIs der Sync-Engine
interface MyCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
// ... sync engine specific config
// Note: onInsert/onUpdate/onDelete are NOT in the config
}
export function myCollectionOptions<TItem extends object>(
config: MyCollectionConfig<TItem>
) {
return {
// ... other options
rowUpdateMode: config.rowUpdateMode || 'partial',
// Implement handlers using sync engine APIs
onInsert: async ({ transaction }) => {
// Handle provider-specific batch limits (e.g., Firestore's 500 limit)
const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
for (const chunk of chunks) {
const ids = await config.recordApi.createBulk(
chunk.map(m => serialize(m.modified))
)
await awaitIds(ids)
}
return transaction.mutations.map(m => m.key)
},
onUpdate: async ({ transaction }) => {
const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
for (const chunk of chunks) {
await Promise.all(
chunk.map(m =>
config.recordApi.update(m.key, serialize(m.changes))
)
)
}
await awaitIds(transaction.mutations.map(m => String(m.key)))
}
}
}
interface MyCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete'> {
// ... sync engine specific config
// Note: onInsert/onUpdate/onDelete are NOT in the config
}
export function myCollectionOptions<TItem extends object>(
config: MyCollectionConfig<TItem>
) {
return {
// ... other options
rowUpdateMode: config.rowUpdateMode || 'partial',
// Implement handlers using sync engine APIs
onInsert: async ({ transaction }) => {
// Handle provider-specific batch limits (e.g., Firestore's 500 limit)
const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
for (const chunk of chunks) {
const ids = await config.recordApi.createBulk(
chunk.map(m => serialize(m.modified))
)
await awaitIds(ids)
}
return transaction.mutations.map(m => m.key)
},
onUpdate: async ({ transaction }) => {
const chunks = chunkArray(transaction.mutations, PROVIDER_BATCH_LIMIT)
for (const chunk of chunks) {
await Promise.all(
chunk.map(m =>
config.recordApi.update(m.key, serialize(m.changes))
)
)
}
await awaitIds(transaction.mutations.map(m => String(m.key)))
}
}
}
Viele Anbieter haben Batch-Größenbeschränkungen (Firestore: 500, DynamoDB: 25 usw.), daher sollten große Transaktionen entsprechend geclustert werden.
Wählen Sie Muster A, wenn Benutzer ihre eigenen APIs bereitstellen müssen, und Muster B, wenn Ihre Sync-Engine Schreibvorgänge direkt verarbeitet.
Collections unterstützen zwei Aktualisierungsmodi
Konfigurieren Sie dies in Ihrer Sync-Konfiguration
sync: {
sync: syncFn,
rowUpdateMode: 'full' // or 'partial'
}
sync: {
sync: syncFn,
rowUpdateMode: 'full' // or 'partial'
}
Vollständige, produktionsfertige Beispiele finden Sie in den Collection-Paketen im TanStack DB-Repository
Von Query Collection
Von Trailbase Collection
Von Electric Collection
Hier ist ein vollständiges Beispiel für einen WebSocket-basierten Collection Options Creator, der den vollständigen Round-Trip-Fluss demonstriert
import type {
CollectionConfig,
SyncConfig,
InsertMutationFnParams,
UpdateMutationFnParams,
DeleteMutationFnParams,
UtilsRecord
} from '@tanstack/db'
interface WebSocketMessage<T> {
type: 'insert' | 'update' | 'delete' | 'sync' | 'transaction' | 'ack'
data?: T | T[]
mutations?: Array<{
type: 'insert' | 'update' | 'delete'
data: T
id?: string
}>
transactionId?: string
id?: string
}
interface WebSocketCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete' | 'sync'> {
url: string
reconnectInterval?: number
// Note: onInsert/onUpdate/onDelete are handled by the WebSocket connection
// Users don't provide these handlers
}
interface WebSocketUtils extends UtilsRecord {
reconnect: () => void
getConnectionState: () => 'connected' | 'disconnected' | 'connecting'
}
export function webSocketCollectionOptions<TItem extends object>(
config: WebSocketCollectionConfig<TItem>
): CollectionConfig<TItem> & { utils: WebSocketUtils } {
let ws: WebSocket | null = null
let reconnectTimer: NodeJS.Timeout | null = null
let connectionState: 'connected' | 'disconnected' | 'connecting' = 'disconnected'
// Track pending transactions awaiting acknowledgment
const pendingTransactions = new Map<string, {
resolve: () => void
reject: (error: Error) => void
timeout: NodeJS.Timeout
}>()
const sync: SyncConfig<TItem>['sync'] = (params) => {
const { begin, write, commit, markReady } = params
function connect() {
connectionState = 'connecting'
ws = new WebSocket(config.url)
ws.onopen = () => {
connectionState = 'connected'
// Request initial sync
ws.send(JSON.stringify({ type: 'sync' }))
}
ws.onmessage = (event) => {
const message: WebSocketMessage<TItem> = JSON.parse(event.data)
switch (message.type) {
case 'sync':
// Initial sync with array of items
begin()
if (Array.isArray(message.data)) {
for (const item of message.data) {
write({ type: 'insert', value: item })
}
}
commit()
markReady()
break
case 'insert':
case 'update':
case 'delete':
// Real-time updates from other clients
begin()
write({
type: message.type,
value: message.data as TItem
})
commit()
break
case 'ack':
// Server acknowledged our transaction
if (message.transactionId) {
const pending = pendingTransactions.get(message.transactionId)
if (pending) {
clearTimeout(pending.timeout)
pendingTransactions.delete(message.transactionId)
pending.resolve()
}
}
break
case 'transaction':
// Server sending back the actual data after processing our transaction
if (message.mutations) {
begin()
for (const mutation of message.mutations) {
write({
type: mutation.type,
value: mutation.data
})
}
commit()
}
break
}
}
ws.onerror = (error) => {
console.error('WebSocket error:', error)
connectionState = 'disconnected'
}
ws.onclose = () => {
connectionState = 'disconnected'
// Auto-reconnect
if (!reconnectTimer) {
reconnectTimer = setTimeout(() => {
reconnectTimer = null
connect()
}, config.reconnectInterval || 5000)
}
}
}
// Start connection
connect()
// Return cleanup function
return () => {
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
if (ws) {
ws.close()
ws = null
}
}
}
// Helper function to send transaction and wait for server acknowledgment
const sendTransaction = async (
params: InsertMutationFnParams<TItem> | UpdateMutationFnParams<TItem> | DeleteMutationFnParams<TItem>
): Promise<void> => {
if (ws?.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected')
}
const transactionId = crypto.randomUUID()
// Convert all mutations in the transaction to the wire format
const mutations = params.transaction.mutations.map(mutation => ({
type: mutation.type,
id: mutation.key,
data: mutation.type === 'delete' ? undefined :
mutation.type === 'update' ? mutation.changes :
mutation.modified
}))
// Send the entire transaction at once
ws.send(JSON.stringify({
type: 'transaction',
transactionId,
mutations
}))
// Wait for server acknowledgment
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
pendingTransactions.delete(transactionId)
reject(new Error(`Transaction ${transactionId} timed out`))
}, 10000) // 10 second timeout
pendingTransactions.set(transactionId, {
resolve,
reject,
timeout
})
})
}
// All mutation handlers use the same transaction sender
const onInsert = async (params: InsertMutationFnParams<TItem>) => {
await sendTransaction(params)
}
const onUpdate = async (params: UpdateMutationFnParams<TItem>) => {
await sendTransaction(params)
}
const onDelete = async (params: DeleteMutationFnParams<TItem>) => {
await sendTransaction(params)
}
return {
id: config.id,
schema: config.schema,
getKey: config.getKey,
sync: { sync },
onInsert,
onUpdate,
onDelete,
utils: {
reconnect: () => {
if (ws) ws.close()
connect()
},
getConnectionState: () => connectionState
}
}
}
import type {
CollectionConfig,
SyncConfig,
InsertMutationFnParams,
UpdateMutationFnParams,
DeleteMutationFnParams,
UtilsRecord
} from '@tanstack/db'
interface WebSocketMessage<T> {
type: 'insert' | 'update' | 'delete' | 'sync' | 'transaction' | 'ack'
data?: T | T[]
mutations?: Array<{
type: 'insert' | 'update' | 'delete'
data: T
id?: string
}>
transactionId?: string
id?: string
}
interface WebSocketCollectionConfig<TItem extends object>
extends Omit<CollectionConfig<TItem>, 'onInsert' | 'onUpdate' | 'onDelete' | 'sync'> {
url: string
reconnectInterval?: number
// Note: onInsert/onUpdate/onDelete are handled by the WebSocket connection
// Users don't provide these handlers
}
interface WebSocketUtils extends UtilsRecord {
reconnect: () => void
getConnectionState: () => 'connected' | 'disconnected' | 'connecting'
}
export function webSocketCollectionOptions<TItem extends object>(
config: WebSocketCollectionConfig<TItem>
): CollectionConfig<TItem> & { utils: WebSocketUtils } {
let ws: WebSocket | null = null
let reconnectTimer: NodeJS.Timeout | null = null
let connectionState: 'connected' | 'disconnected' | 'connecting' = 'disconnected'
// Track pending transactions awaiting acknowledgment
const pendingTransactions = new Map<string, {
resolve: () => void
reject: (error: Error) => void
timeout: NodeJS.Timeout
}>()
const sync: SyncConfig<TItem>['sync'] = (params) => {
const { begin, write, commit, markReady } = params
function connect() {
connectionState = 'connecting'
ws = new WebSocket(config.url)
ws.onopen = () => {
connectionState = 'connected'
// Request initial sync
ws.send(JSON.stringify({ type: 'sync' }))
}
ws.onmessage = (event) => {
const message: WebSocketMessage<TItem> = JSON.parse(event.data)
switch (message.type) {
case 'sync':
// Initial sync with array of items
begin()
if (Array.isArray(message.data)) {
for (const item of message.data) {
write({ type: 'insert', value: item })
}
}
commit()
markReady()
break
case 'insert':
case 'update':
case 'delete':
// Real-time updates from other clients
begin()
write({
type: message.type,
value: message.data as TItem
})
commit()
break
case 'ack':
// Server acknowledged our transaction
if (message.transactionId) {
const pending = pendingTransactions.get(message.transactionId)
if (pending) {
clearTimeout(pending.timeout)
pendingTransactions.delete(message.transactionId)
pending.resolve()
}
}
break
case 'transaction':
// Server sending back the actual data after processing our transaction
if (message.mutations) {
begin()
for (const mutation of message.mutations) {
write({
type: mutation.type,
value: mutation.data
})
}
commit()
}
break
}
}
ws.onerror = (error) => {
console.error('WebSocket error:', error)
connectionState = 'disconnected'
}
ws.onclose = () => {
connectionState = 'disconnected'
// Auto-reconnect
if (!reconnectTimer) {
reconnectTimer = setTimeout(() => {
reconnectTimer = null
connect()
}, config.reconnectInterval || 5000)
}
}
}
// Start connection
connect()
// Return cleanup function
return () => {
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
if (ws) {
ws.close()
ws = null
}
}
}
// Helper function to send transaction and wait for server acknowledgment
const sendTransaction = async (
params: InsertMutationFnParams<TItem> | UpdateMutationFnParams<TItem> | DeleteMutationFnParams<TItem>
): Promise<void> => {
if (ws?.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected')
}
const transactionId = crypto.randomUUID()
// Convert all mutations in the transaction to the wire format
const mutations = params.transaction.mutations.map(mutation => ({
type: mutation.type,
id: mutation.key,
data: mutation.type === 'delete' ? undefined :
mutation.type === 'update' ? mutation.changes :
mutation.modified
}))
// Send the entire transaction at once
ws.send(JSON.stringify({
type: 'transaction',
transactionId,
mutations
}))
// Wait for server acknowledgment
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
pendingTransactions.delete(transactionId)
reject(new Error(`Transaction ${transactionId} timed out`))
}, 10000) // 10 second timeout
pendingTransactions.set(transactionId, {
resolve,
reject,
timeout
})
})
}
// All mutation handlers use the same transaction sender
const onInsert = async (params: InsertMutationFnParams<TItem>) => {
await sendTransaction(params)
}
const onUpdate = async (params: UpdateMutationFnParams<TItem>) => {
await sendTransaction(params)
}
const onDelete = async (params: DeleteMutationFnParams<TItem>) => {
await sendTransaction(params)
}
return {
id: config.id,
schema: config.schema,
getKey: config.getKey,
sync: { sync },
onInsert,
onUpdate,
onDelete,
utils: {
reconnect: () => {
if (ws) ws.close()
connect()
},
getConnectionState: () => connectionState
}
}
}
import { createCollection } from '@tanstack/react-db'
import { webSocketCollectionOptions } from './websocket-collection'
const todos = createCollection(
webSocketCollectionOptions({
url: 'ws://:8080/todos',
getKey: (todo) => todo.id,
schema: todoSchema
// Note: No onInsert/onUpdate/onDelete - handled by WebSocket automatically
})
)
// Use the collection
todos.insert({ id: '1', text: 'Buy milk', completed: false })
// Access utilities
todos.utils.getConnectionState() // 'connected'
todos.utils.reconnect() // Force reconnect
import { createCollection } from '@tanstack/react-db'
import { webSocketCollectionOptions } from './websocket-collection'
const todos = createCollection(
webSocketCollectionOptions({
url: 'ws://:8080/todos',
getKey: (todo) => todo.id,
schema: todoSchema
// Note: No onInsert/onUpdate/onDelete - handled by WebSocket automatically
})
)
// Use the collection
todos.insert({ id: '1', text: 'Buy milk', completed: false })
// Access utilities
todos.utils.getConnectionState() // 'connected'
todos.utils.reconnect() // Force reconnect
Eine entscheidende Herausforderung bei synchronisationsbasierten Apps ist zu wissen, wann der optimistische Zustand gelöscht werden soll. Wenn ein Benutzer eine Änderung vornimmt
Die Schlüsselfrage ist: Wie wissen Sie, wann Schritt 4 abgeschlossen ist?
Viele Anbieter bieten integrierte Methoden, um auf den Abschluss der Synchronisierung zu warten
// Firebase
await waitForPendingWrites(firestore)
// Custom WebSocket
await websocket.waitForAck(transactionId)
// Firebase
await waitForPendingWrites(firestore)
// Custom WebSocket
await websocket.waitForAck(transactionId)
ElectricSQL gibt Transaktions-IDs zurück, die Sie verfolgen können
// Track seen transaction IDs
const seenTxids = new Store<Set<number>>(new Set())
// In sync, track txids from incoming messages
if (message.headers.txids) {
message.headers.txids.forEach(txid => {
seenTxids.setState(prev => new Set([...prev, txid]))
})
}
// Mutation handlers return txids and wait for them
const wrappedOnInsert = async (params) => {
const result = await config.onInsert!(params)
// Wait for the txid to appear in synced data
if (result.txid) {
await awaitTxId(result.txid)
}
return result
}
// Utility function to wait for a txid
const awaitTxId = (txId: number): Promise<boolean> => {
if (seenTxids.state.has(txId)) return Promise.resolve(true)
return new Promise((resolve) => {
const unsubscribe = seenTxids.subscribe(() => {
if (seenTxids.state.has(txId)) {
unsubscribe()
resolve(true)
}
})
})
}
// Track seen transaction IDs
const seenTxids = new Store<Set<number>>(new Set())
// In sync, track txids from incoming messages
if (message.headers.txids) {
message.headers.txids.forEach(txid => {
seenTxids.setState(prev => new Set([...prev, txid]))
})
}
// Mutation handlers return txids and wait for them
const wrappedOnInsert = async (params) => {
const result = await config.onInsert!(params)
// Wait for the txid to appear in synced data
if (result.txid) {
await awaitTxId(result.txid)
}
return result
}
// Utility function to wait for a txid
const awaitTxId = (txId: number): Promise<boolean> => {
if (seenTxids.state.has(txId)) return Promise.resolve(true)
return new Promise((resolve) => {
const unsubscribe = seenTxids.subscribe(() => {
if (seenTxids.state.has(txId)) {
unsubscribe()
resolve(true)
}
})
})
}
Trailbase verfolgt, wann bestimmte Record-IDs synchronisiert wurden
// Track synced IDs with timestamps
const seenIds = new Store(new Map<string, number>())
// In sync, mark IDs as seen
write({ type: 'insert', value: item })
seenIds.setState(prev => new Map(prev).set(item.id, Date.now()))
// Wait for specific IDs after mutations
const wrappedOnInsert = async (params) => {
const ids = await config.recordApi.createBulk(items)
// Wait for all IDs to be synced back
await awaitIds(ids)
}
const awaitIds = (ids: string[]): Promise<void> => {
const allSynced = ids.every(id => seenIds.state.has(id))
if (allSynced) return Promise.resolve()
return new Promise((resolve) => {
const unsubscribe = seenIds.subscribe((state) => {
if (ids.every(id => state.has(id))) {
unsubscribe()
resolve()
}
})
})
}
// Track synced IDs with timestamps
const seenIds = new Store(new Map<string, number>())
// In sync, mark IDs as seen
write({ type: 'insert', value: item })
seenIds.setState(prev => new Map(prev).set(item.id, Date.now()))
// Wait for specific IDs after mutations
const wrappedOnInsert = async (params) => {
const ids = await config.recordApi.createBulk(items)
// Wait for all IDs to be synced back
await awaitIds(ids)
}
const awaitIds = (ids: string[]): Promise<void> => {
const allSynced = ids.every(id => seenIds.state.has(id))
if (allSynced) return Promise.resolve()
return new Promise((resolve) => {
const unsubscribe = seenIds.subscribe((state) => {
if (ids.every(id => state.has(id))) {
unsubscribe()
resolve()
}
})
})
}
Verfolgen Sie Versionsnummern oder Zeitstempel, um zu erkennen, wann Daten aktuell sind
// Track latest sync timestamp
let lastSyncTime = 0
// In mutations, record when the operation was sent
const wrappedOnUpdate = async (params) => {
const mutationTime = Date.now()
await config.onUpdate(params)
// Wait for sync to catch up
await waitForSync(mutationTime)
}
const waitForSync = (afterTime: number): Promise<void> => {
if (lastSyncTime > afterTime) return Promise.resolve()
return new Promise((resolve) => {
const check = setInterval(() => {
if (lastSyncTime > afterTime) {
clearInterval(check)
resolve()
}
}, 100)
})
}
// Track latest sync timestamp
let lastSyncTime = 0
// In mutations, record when the operation was sent
const wrappedOnUpdate = async (params) => {
const mutationTime = Date.now()
await config.onUpdate(params)
// Wait for sync to catch up
await waitForSync(mutationTime)
}
const waitForSync = (afterTime: number): Promise<void> => {
if (lastSyncTime > afterTime) return Promise.resolve()
return new Promise((resolve) => {
const check = setInterval(() => {
if (lastSyncTime > afterTime) {
clearInterval(check)
resolve()
}
}, 100)
})
}
Die Query Collection lädt einfach alle Daten nach Mutationen neu
const wrappedOnInsert = async (params) => {
// Perform the mutation
await config.onInsert(params)
// Refetch the entire collection
await refetch()
// The refetch will trigger sync with fresh data,
// automatically dropping optimistic state
}
const wrappedOnInsert = async (params) => {
// Perform the mutation
await config.onInsert(params)
// Refetch the entire collection
await refetch()
// The refetch will trigger sync with fresh data,
// automatically dropping optimistic state
}
Testen Sie Ihren Collection Options Creator mit
Das Erstellen eines Collection Options Creators ermöglicht es Ihnen, jede Sync-Engine in die leistungsstarke synchronisationsbasierte Architektur von TanStack DB zu integrieren. Befolgen Sie die hier gezeigten Muster, und Sie erhalten eine robuste, typsichere Integration, die eine hervorragende Entwicklererfahrung bietet.
Ihre wöchentliche Dosis JavaScript-Nachrichten. Jeden Montag kostenlos an über 100.000 Entwickler geliefert.