Framework
Version
Debouncer API Referenz
Throttler API Referenz
Rate Limiter API Referenz
Queue API Referenz
Batcher API Referenz

Asynchroner Warteschlangen-Leitfaden

Hinweis: Alle Kernkonzepte der Warteschlangenverwaltung aus dem Warteschlangen-Leitfaden gelten auch für AsyncQueuer. AsyncQueuer erweitert diese Konzepte um erweiterte Funktionen wie Nebenläufigkeit (mehrere Aufgaben gleichzeitig) und robuste Fehlerbehandlung. Wenn Sie neu in der Warteschlangenverwaltung sind, beginnen Sie mit dem Warteschlangen-Leitfaden, um FIFO/LIFO, Priorität, Ablauf und Ablehnung sowie Warteschlangenverwaltung zu erlernen. Dieser Leitfaden konzentriert sich darauf, was AsyncQueuer einzigartig und leistungsfähig für die asynchrone und gleichzeitige Aufgabenverarbeitung macht.

Während der Queuer synchrone Warteschlangen mit Zeitsteuerungsoptionen bereitstellt, ist der AsyncQueuer speziell für die Verarbeitung gleichzeitiger asynchroner Operationen konzipiert. Er implementiert das traditionell als "Task Pool" oder "Worker Pool" bekannte Muster, das die gleichzeitige Verarbeitung mehrerer Operationen ermöglicht und gleichzeitig die Kontrolle über Nebenläufigkeit und Zeitsteuerung beibehält. Die Implementierung ist größtenteils von Swimmer kopiert, Tanner's ursprüngliches Task-Pooling-Dienstprogramm, das die JavaScript-Community seit 2017 bedient.

Async Queuing Concept

Asynchrone Warteschlangen erweitern das grundlegende Warteschlangenkonzept durch Hinzufügen von parallelen Verarbeitungsfunktionen. Anstatt ein Element nach dem anderen zu verarbeiten, kann ein asynchroner Warteschlangen-Manager mehrere Elemente gleichzeitig verarbeiten und dabei Reihenfolge und Kontrolle über die Ausführung beibehalten. Dies ist besonders nützlich bei I/O-Operationen, Netzwerkanfragen oder anderen Aufgaben, die die meiste Zeit mit Warten verbringen und wenig CPU-Ressourcen verbrauchen.

Async Queuing Visualization

text
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls:        ⬇️  ⬇️  ⬇️  ⬇️     ⬇️  ⬇️     ⬇️
Queue:       [ABC]   [C]    [CDE]    [E]    []
Active:      [A,B]   [B,C]  [C,D]    [D,E]  [E]
Completed:    -       A      B        C      D,E
             [=================================================================]
             ^ Unlike regular queuing, multiple items
               can be processed concurrently

             [Items queue up]   [Process 2 at once]   [Complete]
              when busy         with wait between      all items
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls:        ⬇️  ⬇️  ⬇️  ⬇️     ⬇️  ⬇️     ⬇️
Queue:       [ABC]   [C]    [CDE]    [E]    []
Active:      [A,B]   [B,C]  [C,D]    [D,E]  [E]
Completed:    -       A      B        C      D,E
             [=================================================================]
             ^ Unlike regular queuing, multiple items
               can be processed concurrently

             [Items queue up]   [Process 2 at once]   [Complete]
              when busy         with wait between      all items

Wann Async Queuing verwenden

Asynchrone Warteschlangen sind besonders effektiv, wenn Sie Folgendes tun müssen:

  • Verarbeitung mehrerer asynchroner Operationen gleichzeitig
  • Steuerung der Anzahl gleichzeitiger Operationen
  • Verarbeitung von Promise-basierten Aufgaben mit ordnungsgemäßer Fehlerbehandlung
  • Aufrechterhaltung der Reihenfolge bei gleichzeitiger Maximierung des Durchsatzes
  • Verarbeitung von Hintergrundaufgaben, die parallel ausgeführt werden können

Wann Async Queuing NICHT verwenden

Der AsyncQueuer ist sehr vielseitig und kann in vielen Situationen eingesetzt werden. Wenn Sie keine gleichzeitige Verarbeitung benötigen, verwenden Sie stattdessen Queuing. Wenn Sie nicht möchten, dass alle aufgerufenen Ausführungen durchlaufen werden, verwenden Sie stattdessen Throttling.

Wenn Sie Operationen gruppieren möchten, verwenden Sie stattdessen Batching.

Async Queuing in TanStack Pacer

TanStack Pacer bietet asynchrone Warteschlangen über die einfache Funktion asyncQueue und die leistungsfähigere Klasse AsyncQueuer. Alle Warteschlangentypen und Ordnungsstrategien (FIFO, LIFO, Priorität usw.) werden wie im Kernleitfaden für Warteschlangen unterstützt.

Grundlegende Verwendung mit asyncQueue

Die Funktion asyncQueue bietet eine einfache Möglichkeit, eine immer laufende asynchrone Warteschlange zu erstellen

ts
import { asyncQueue } from '@tanstack/pacer'

// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
  async (item: number) => {
    // Process each item asynchronously
    const result = await fetchData(item)
    return result
  },
  {
    concurrency: 2,
    onItemsChange: (queuer) => {
      console.log('Active tasks:', queuer.peekActiveItems().length)
    }
  }
)

// Add items to be processed
processItems(1)
processItems(2)
import { asyncQueue } from '@tanstack/pacer'

// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
  async (item: number) => {
    // Process each item asynchronously
    const result = await fetchData(item)
    return result
  },
  {
    concurrency: 2,
    onItemsChange: (queuer) => {
      console.log('Active tasks:', queuer.peekActiveItems().length)
    }
  }
)

// Add items to be processed
processItems(1)
processItems(2)

Für mehr Kontrolle über die Warteschlange verwenden Sie direkt die Klasse AsyncQueuer.

Erweiterte Verwendung mit der Klasse AsyncQueuer

Die Klasse AsyncQueuer bietet vollständige Kontrolle über das Verhalten von asynchronen Warteschlangen, einschließlich aller Kernfunktionen für Warteschlangen plus

  • Nebenläufigkeit: Verarbeitung mehrerer Elemente gleichzeitig (konfigurierbar mit concurrency)
  • Asynchrone Fehlerbehandlung: Globale und pro Aufgabe verfügbare Rückruffunktionen, mit Kontrolle über die Fehlerweitergabe
  • Verfolgung aktiver und anstehender Aufgaben: Überwachung, welche Aufgaben ausgeführt werden und welche in der Warteschlange stehen
  • Async-spezifische Rückruffunktionen: onSuccess, onError, onSettled, etc.
ts
import { AsyncQueuer } from '@tanstack/pacer'

const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    const result = await fetchData(item)
    return result
  },
  {
    concurrency: 2, // Process 2 items at once
    wait: 1000,     // Wait 1 second between starting new items
    started: true,  // Start processing immediately
    key: 'data-processor' // Identify this queuer in devtools
  }
)

// Add error and success handlers via options
queue.setOptions({
  onError: (error, item, queuer) => {
    console.error('Task failed:', error)
    console.log('Failed item:', item)
    // You can access queue state here
    console.log('Error count:', queuer.store.state.errorCount)
  },
  onSuccess: (result, item, queuer) => {
    console.log('Task completed:', result)
    console.log('Completed item:', item)
    // You can access queue state here
    console.log('Success count:', queuer.store.state.successCount)
  },
  onSettled: (item, queuer) => {
    // Called after each execution (success or failure)
    console.log('Task settled:', item)
    console.log('Total settled:', queuer.store.state.settledCount)
  }
})

// Add items to be processed
queue.addItem(1)
queue.addItem(2)
import { AsyncQueuer } from '@tanstack/pacer'

const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    const result = await fetchData(item)
    return result
  },
  {
    concurrency: 2, // Process 2 items at once
    wait: 1000,     // Wait 1 second between starting new items
    started: true,  // Start processing immediately
    key: 'data-processor' // Identify this queuer in devtools
  }
)

// Add error and success handlers via options
queue.setOptions({
  onError: (error, item, queuer) => {
    console.error('Task failed:', error)
    console.log('Failed item:', item)
    // You can access queue state here
    console.log('Error count:', queuer.store.state.errorCount)
  },
  onSuccess: (result, item, queuer) => {
    console.log('Task completed:', result)
    console.log('Completed item:', item)
    // You can access queue state here
    console.log('Success count:', queuer.store.state.successCount)
  },
  onSettled: (item, queuer) => {
    // Called after each execution (success or failure)
    console.log('Task settled:', item)
    console.log('Total settled:', queuer.store.state.settledCount)
  }
})

// Add items to be processed
queue.addItem(1)
queue.addItem(2)

Async-spezifische Features

Alle Warteschlangentypen und Ordnungsstrategien (FIFO, LIFO, Priorität usw.) werden unterstützt – Details finden Sie im Warteschlangen-Leitfaden. AsyncQueuer fügt hinzu

  • Nebenläufigkeit: Mehrere Elemente können gleichzeitig verarbeitet werden, gesteuert durch die Option concurrency (kann dynamisch sein).
  • Asynchrone Fehlerbehandlung: Verwenden Sie onError, onSuccess und onSettled für eine robuste Fehler- und Ergebnisverfolgung.
  • Verfolgung aktiver und anstehender Aufgaben: Verwenden Sie peekActiveItems() und peekPendingItems(), um den Warteschlangenstatus zu überwachen.
  • Asynchroner Ablauf und Ablehnung: Elemente können ablaufen oder abgelehnt werden, genau wie im Kernleitfaden für Warteschlangen, jedoch mit asynchronen Rückruffunktionen.

Beispiel: Prioritäts-Async-Warteschlange

ts
const priorityQueue = new AsyncQueuer(
  async (item: { value: string; priority: number }) => {
    // Process each item asynchronously
    return await processTask(item.value)
  },
  {
    concurrency: 2,
    getPriority: (item) => item.priority // Higher numbers have priority
  }
)

priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low
const priorityQueue = new AsyncQueuer(
  async (item: { value: string; priority: number }) => {
    // Process each item asynchronously
    return await processTask(item.value)
  },
  {
    concurrency: 2,
    getPriority: (item) => item.priority // Higher numbers have priority
  }
)

priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low

Beispiel: Fehlerbehandlung

ts
const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    if (item < 0) throw new Error('Negative item')
    return await processTask(item)
  },
  {
    onError: (error, item, queuer) => {
      console.error('Task failed:', error)
      console.log('Failed item:', item)
      // You can access queue state here
      console.log('Error count:', queuer.store.state.errorCount)
    },
    throwOnError: true, // Will throw errors even with onError handler
    onSuccess: (result, item, queuer) => {
      console.log('Task succeeded:', result)
      console.log('Succeeded item:', item)
      // You can access queue state here
      console.log('Success count:', queuer.store.state.successCount)
    },
    onSettled: (item, queuer) => {
      // Called after each execution (success or failure)
      console.log('Task settled:', item)
      console.log('Total settled:', queuer.store.state.settledCount)
    }
  }
)

queue.addItem(-1) // Will trigger error handling
queue.addItem(2)
const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    if (item < 0) throw new Error('Negative item')
    return await processTask(item)
  },
  {
    onError: (error, item, queuer) => {
      console.error('Task failed:', error)
      console.log('Failed item:', item)
      // You can access queue state here
      console.log('Error count:', queuer.store.state.errorCount)
    },
    throwOnError: true, // Will throw errors even with onError handler
    onSuccess: (result, item, queuer) => {
      console.log('Task succeeded:', result)
      console.log('Succeeded item:', item)
      // You can access queue state here
      console.log('Success count:', queuer.store.state.successCount)
    },
    onSettled: (item, queuer) => {
      // Called after each execution (success or failure)
      console.log('Task settled:', item)
      console.log('Total settled:', queuer.store.state.settledCount)
    }
  }
)

queue.addItem(-1) // Will trigger error handling
queue.addItem(2)

Beispiel: Dynamische Nebenläufigkeit

ts
const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    return await processTask(item)
  },
  {
    // Dynamic concurrency based on system load
    concurrency: (queuer) => {
      return Math.max(1, 4 - queuer.store.state.activeItems.length)
    },
    // Dynamic wait time based on queue size
    wait: (queuer) => {
      return queuer.store.state.size > 10 ? 2000 : 1000
    }
  }
)
const queue = new AsyncQueuer(
  async (item: number) => {
    // Process each item asynchronously
    return await processTask(item)
  },
  {
    // Dynamic concurrency based on system load
    concurrency: (queuer) => {
      return Math.max(1, 4 - queuer.store.state.activeItems.length)
    },
    // Dynamic wait time based on queue size
    wait: (queuer) => {
      return queuer.store.state.size > 10 ? 2000 : 1000
    }
  }
)

Warteschlangenverwaltung und -überwachung

AsyncQueuer bietet alle Methoden zur Verwaltung und Überwachung von Warteschlangen aus dem Kernleitfaden für Warteschlangen sowie asynchron-spezifische Methoden

  • peekActiveItems() – Aktuell verarbeitete Elemente
  • peekPendingItems() – Anstehende Elemente zur Verarbeitung
  • queuer.store.state.successCount, queuer.store.state.errorCount, queuer.store.state.settledCount – Ausführungsstatistiken
  • queuer.store.state.activeItems – Array der aktuell verarbeiteten Elemente
  • queuer.store.state.size – Aktuelle Warteschlangengröße
  • start(), stop(), clear(), reset(), flush(), etc.

Siehe den Warteschlangen-Leitfaden für weitere Informationen zu Konzepten der Warteschlangenverwaltung.

Aufgabenablauf und Ablehnung

AsyncQueuer unterstützt Ablauf und Ablehnung genau wie der Kern-Queuer

  • Verwenden Sie expirationDuration, getIsExpired und onExpire für ablaufende Aufgaben
  • Verwenden Sie maxSize und onReject für die Behandlung von Warteschlangenüberläufen

Siehe den Warteschlangen-Leitfaden für Details und Beispiele.

Leeren von Warteschlangenelementen

Der asynchrone Queuer unterstützt das Leeren von Elementen, um sie sofort zu verarbeiten

ts
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })

queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3

// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)

// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })

queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3

// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)

// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)

Zustandsverwaltung

Die Klasse AsyncQueuer verwendet TanStack Store für reaktives State-Management und bietet Echtzeit-Zugriff auf den Warteschlangenstatus, Verarbeitungsstatistiken und die Überwachung gleichzeitiger Aufgaben. Der gesamte Status wird in einem TanStack Store gespeichert und kann über asyncQueuer.store.state abgerufen werden. Wenn Sie jedoch einen Framework-Adapter wie React oder Solid verwenden, sollten Sie den Status nicht von hier abrufen. Stattdessen rufen Sie den Status aus asyncQueuer.state ab und stellen eine Selektorfunktion als 3. Argument für den Hook useAsyncQueuer bereit, um die Statusverfolgung zu aktivieren, wie unten gezeigt.

Zustandsselektor (Framework-Adapter)

Framework-Adapter unterstützen ein selector-Argument, das es Ihnen ermöglicht, anzugeben, welche Zustandsänderungen Neu-Renderings auslösen. Dies optimiert die Leistung, indem unnötige Neu-Renderings bei irrelevanten Zustandsänderungen verhindert werden.

Standardmäßig ist asyncQueuer.state leer ({}), da der Selektor standardmäßig leer ist. Hier wird der reaktive Status aus einem TanStack Store useStore gespeichert. Sie müssen die Statusverfolgung aktivieren, indem Sie eine Selektorfunktion angeben.

ts
// Default behavior - no reactive state subscriptions
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
console.log(queue.state) // {}

// Opt-in to re-render when activeItems changes
const queue = useAsyncQueuer(
  processFn, 
  { concurrency: 2, wait: 1000 },
  (state) => ({ activeItems: state.activeItems })
)
console.log(queue.state.activeItems.length) // Reactive value

// Multiple state properties
const queue = useAsyncQueuer(
  processFn,
  { concurrency: 2, wait: 1000 },
  (state) => ({
    activeItems: state.activeItems,
    successCount: state.successCount,
    errorCount: state.errorCount
  })
)
// Default behavior - no reactive state subscriptions
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
console.log(queue.state) // {}

// Opt-in to re-render when activeItems changes
const queue = useAsyncQueuer(
  processFn, 
  { concurrency: 2, wait: 1000 },
  (state) => ({ activeItems: state.activeItems })
)
console.log(queue.state.activeItems.length) // Reactive value

// Multiple state properties
const queue = useAsyncQueuer(
  processFn,
  { concurrency: 2, wait: 1000 },
  (state) => ({
    activeItems: state.activeItems,
    successCount: state.successCount,
    errorCount: state.errorCount
  })
)

Anfangszustand

Sie können Anfangswerte für den Status beim Erstellen eines asynchronen Queuers angeben

ts
const savedState = localStorage.getItem('async-queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const queue = new AsyncQueuer(processFn, {
  concurrency: 2,
  wait: 1000,
  initialState
})
const savedState = localStorage.getItem('async-queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}

const queue = new AsyncQueuer(processFn, {
  concurrency: 2,
  wait: 1000,
  initialState
})

Zustandsänderungen abonnieren

Der Store ist reaktiv und unterstützt Abonnements

ts
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })

// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })

// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
  // do something with the state like persist it to localStorage
})

// Unsubscribe when done
unsubscribe()

Hinweis: Dies ist bei der Verwendung eines Framework-Adapters nicht notwendig, da der zugrunde liegende Hook useStore dies bereits tut. Sie können auch useStore von TanStack Store importieren und verwenden, um queuer.store.state bei Bedarf mit einem benutzerdefinierten Selektor zu reaktivem Status zu machen.

Verfügbare Zustandseigenschaften

Die AsyncQueuerState enthält alle Eigenschaften aus dem Kernleitfaden für Warteschlangen plus

  • activeItems: Array der aktuell verarbeiteten Elemente
  • addItemCount: Anzahl der Aufrufe von addItem (für Reduktionsberechnungen)
  • errorCount: Anzahl der Funktionsausführungen, die zu Fehlern geführt haben
  • expirationCount: Anzahl der Elemente, die aufgrund des Ablaufs aus der Warteschlange entfernt wurden
  • isEmpty: Gibt an, ob der Queuer keine zu verarbeitenden Elemente hat (Array ist leer)
  • isFull: Ob der Queuer seine maximale Kapazität erreicht hat
  • isIdle: Ob der Queuer gerade keine Elemente verarbeitet
  • isRunning: Ob der Queuer aktiv ist und Elemente automatisch verarbeitet
  • items: Array von Elementen, die derzeit auf die Verarbeitung warten
  • itemTimestamps: Zeitstempel, wann Elemente zur Warteschlange hinzugefügt wurden, zur Verfolgung von Abläufen
  • lastResult: Das Ergebnis der letzten erfolgreichen Funktionsausführung
  • pendingTick: Ob der Queuer einen ausstehenden Timeout für die Verarbeitung des nächsten Elements hat
  • rejectionCount: Anzahl der Elemente, die beim Hinzufügen zur Warteschlange abgelehnt wurden
  • settledCount: Anzahl der abgeschlossenen Funktionsaufrufe (entweder erfolgreich oder mit Fehlern)
  • size: Anzahl der Elemente, die sich derzeit in der Warteschlange befinden
  • status: Aktueller Verarbeitungsstatus ('idle' | 'running' | 'stopped')
  • successCount: Anzahl der Funktionsausführungen, die erfolgreich abgeschlossen wurden

Framework-Adapter

Jeder Framework-Adapter baut praktische Hooks und Funktionen um die asynchronen Queuer-Klassen herum. Hooks wie useAsyncQueuer oder useAsyncQueuedState sind kleine Wrapper, die den Boilerplate-Code für häufige Anwendungsfälle reduzieren können.


Für Kernkonzepte der Warteschlangenverwaltung und synchrone Warteschlangen siehe den Warteschlangen-Leitfaden.

Unsere Partner
Code Rabbit
Unkey
Bytes abonnieren

Ihre wöchentliche Dosis JavaScript-Nachrichten. Jeden Montag kostenlos an über 100.000 Entwickler geliefert.

Bytes

Kein Spam. Jederzeit kündbar.

Bytes abonnieren

Ihre wöchentliche Dosis JavaScript-Nachrichten. Jeden Montag kostenlos an über 100.000 Entwickler geliefert.

Bytes

Kein Spam. Jederzeit kündbar.