asyncQueueAsyncQueuerHinweis: Alle Kernkonzepte der Warteschlangenverwaltung aus dem Warteschlangen-Leitfaden gelten auch für AsyncQueuer. AsyncQueuer erweitert diese Konzepte um erweiterte Funktionen wie Nebenläufigkeit (mehrere Aufgaben gleichzeitig) und robuste Fehlerbehandlung. Wenn Sie neu in der Warteschlangenverwaltung sind, beginnen Sie mit dem Warteschlangen-Leitfaden, um FIFO/LIFO, Priorität, Ablauf und Ablehnung sowie Warteschlangenverwaltung zu erlernen. Dieser Leitfaden konzentriert sich darauf, was AsyncQueuer einzigartig und leistungsfähig für die asynchrone und gleichzeitige Aufgabenverarbeitung macht.
Während der Queuer synchrone Warteschlangen mit Zeitsteuerungsoptionen bereitstellt, ist der AsyncQueuer speziell für die Verarbeitung gleichzeitiger asynchroner Operationen konzipiert. Er implementiert das traditionell als "Task Pool" oder "Worker Pool" bekannte Muster, das die gleichzeitige Verarbeitung mehrerer Operationen ermöglicht und gleichzeitig die Kontrolle über Nebenläufigkeit und Zeitsteuerung beibehält. Die Implementierung ist größtenteils von Swimmer kopiert, Tanner's ursprüngliches Task-Pooling-Dienstprogramm, das die JavaScript-Community seit 2017 bedient.
Asynchrone Warteschlangen erweitern das grundlegende Warteschlangenkonzept durch Hinzufügen von parallelen Verarbeitungsfunktionen. Anstatt ein Element nach dem anderen zu verarbeiten, kann ein asynchroner Warteschlangen-Manager mehrere Elemente gleichzeitig verarbeiten und dabei Reihenfolge und Kontrolle über die Ausführung beibehalten. Dies ist besonders nützlich bei I/O-Operationen, Netzwerkanfragen oder anderen Aufgaben, die die meiste Zeit mit Warten verbringen und wenig CPU-Ressourcen verbrauchen.
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ Unlike regular queuing, multiple items
can be processed concurrently
[Items queue up] [Process 2 at once] [Complete]
when busy with wait between all items
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ Unlike regular queuing, multiple items
can be processed concurrently
[Items queue up] [Process 2 at once] [Complete]
when busy with wait between all items
Asynchrone Warteschlangen sind besonders effektiv, wenn Sie Folgendes tun müssen:
Der AsyncQueuer ist sehr vielseitig und kann in vielen Situationen eingesetzt werden. Wenn Sie keine gleichzeitige Verarbeitung benötigen, verwenden Sie stattdessen Queuing. Wenn Sie nicht möchten, dass alle aufgerufenen Ausführungen durchlaufen werden, verwenden Sie stattdessen Throttling.
Wenn Sie Operationen gruppieren möchten, verwenden Sie stattdessen Batching.
TanStack Pacer bietet asynchrone Warteschlangen über die einfache Funktion asyncQueue und die leistungsfähigere Klasse AsyncQueuer. Alle Warteschlangentypen und Ordnungsstrategien (FIFO, LIFO, Priorität usw.) werden wie im Kernleitfaden für Warteschlangen unterstützt.
Die Funktion asyncQueue bietet eine einfache Möglichkeit, eine immer laufende asynchrone Warteschlange zu erstellen
import { asyncQueue } from '@tanstack/pacer'
// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2,
onItemsChange: (queuer) => {
console.log('Active tasks:', queuer.peekActiveItems().length)
}
}
)
// Add items to be processed
processItems(1)
processItems(2)
import { asyncQueue } from '@tanstack/pacer'
// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2,
onItemsChange: (queuer) => {
console.log('Active tasks:', queuer.peekActiveItems().length)
}
}
)
// Add items to be processed
processItems(1)
processItems(2)
Für mehr Kontrolle über die Warteschlange verwenden Sie direkt die Klasse AsyncQueuer.
Die Klasse AsyncQueuer bietet vollständige Kontrolle über das Verhalten von asynchronen Warteschlangen, einschließlich aller Kernfunktionen für Warteschlangen plus
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2, // Process 2 items at once
wait: 1000, // Wait 1 second between starting new items
started: true, // Start processing immediately
key: 'data-processor' // Identify this queuer in devtools
}
)
// Add error and success handlers via options
queue.setOptions({
onError: (error, item, queuer) => {
console.error('Task failed:', error)
console.log('Failed item:', item)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
onSuccess: (result, item, queuer) => {
console.log('Task completed:', result)
console.log('Completed item:', item)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (item, queuer) => {
// Called after each execution (success or failure)
console.log('Task settled:', item)
console.log('Total settled:', queuer.store.state.settledCount)
}
})
// Add items to be processed
queue.addItem(1)
queue.addItem(2)
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2, // Process 2 items at once
wait: 1000, // Wait 1 second between starting new items
started: true, // Start processing immediately
key: 'data-processor' // Identify this queuer in devtools
}
)
// Add error and success handlers via options
queue.setOptions({
onError: (error, item, queuer) => {
console.error('Task failed:', error)
console.log('Failed item:', item)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
onSuccess: (result, item, queuer) => {
console.log('Task completed:', result)
console.log('Completed item:', item)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (item, queuer) => {
// Called after each execution (success or failure)
console.log('Task settled:', item)
console.log('Total settled:', queuer.store.state.settledCount)
}
})
// Add items to be processed
queue.addItem(1)
queue.addItem(2)
Alle Warteschlangentypen und Ordnungsstrategien (FIFO, LIFO, Priorität usw.) werden unterstützt – Details finden Sie im Warteschlangen-Leitfaden. AsyncQueuer fügt hinzu
const priorityQueue = new AsyncQueuer(
async (item: { value: string; priority: number }) => {
// Process each item asynchronously
return await processTask(item.value)
},
{
concurrency: 2,
getPriority: (item) => item.priority // Higher numbers have priority
}
)
priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low
const priorityQueue = new AsyncQueuer(
async (item: { value: string; priority: number }) => {
// Process each item asynchronously
return await processTask(item.value)
},
{
concurrency: 2,
getPriority: (item) => item.priority // Higher numbers have priority
}
)
priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
if (item < 0) throw new Error('Negative item')
return await processTask(item)
},
{
onError: (error, item, queuer) => {
console.error('Task failed:', error)
console.log('Failed item:', item)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
throwOnError: true, // Will throw errors even with onError handler
onSuccess: (result, item, queuer) => {
console.log('Task succeeded:', result)
console.log('Succeeded item:', item)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (item, queuer) => {
// Called after each execution (success or failure)
console.log('Task settled:', item)
console.log('Total settled:', queuer.store.state.settledCount)
}
}
)
queue.addItem(-1) // Will trigger error handling
queue.addItem(2)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
if (item < 0) throw new Error('Negative item')
return await processTask(item)
},
{
onError: (error, item, queuer) => {
console.error('Task failed:', error)
console.log('Failed item:', item)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
throwOnError: true, // Will throw errors even with onError handler
onSuccess: (result, item, queuer) => {
console.log('Task succeeded:', result)
console.log('Succeeded item:', item)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (item, queuer) => {
// Called after each execution (success or failure)
console.log('Task settled:', item)
console.log('Total settled:', queuer.store.state.settledCount)
}
}
)
queue.addItem(-1) // Will trigger error handling
queue.addItem(2)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
return await processTask(item)
},
{
// Dynamic concurrency based on system load
concurrency: (queuer) => {
return Math.max(1, 4 - queuer.store.state.activeItems.length)
},
// Dynamic wait time based on queue size
wait: (queuer) => {
return queuer.store.state.size > 10 ? 2000 : 1000
}
}
)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
return await processTask(item)
},
{
// Dynamic concurrency based on system load
concurrency: (queuer) => {
return Math.max(1, 4 - queuer.store.state.activeItems.length)
},
// Dynamic wait time based on queue size
wait: (queuer) => {
return queuer.store.state.size > 10 ? 2000 : 1000
}
}
)
AsyncQueuer bietet alle Methoden zur Verwaltung und Überwachung von Warteschlangen aus dem Kernleitfaden für Warteschlangen sowie asynchron-spezifische Methoden
Siehe den Warteschlangen-Leitfaden für weitere Informationen zu Konzepten der Warteschlangenverwaltung.
AsyncQueuer unterstützt Ablauf und Ablehnung genau wie der Kern-Queuer
Siehe den Warteschlangen-Leitfaden für Details und Beispiele.
Der asynchrone Queuer unterstützt das Leeren von Elementen, um sie sofort zu verarbeiten
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })
queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3
// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)
// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })
queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3
// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)
// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)
Die Klasse AsyncQueuer verwendet TanStack Store für reaktives State-Management und bietet Echtzeit-Zugriff auf den Warteschlangenstatus, Verarbeitungsstatistiken und die Überwachung gleichzeitiger Aufgaben. Der gesamte Status wird in einem TanStack Store gespeichert und kann über asyncQueuer.store.state abgerufen werden. Wenn Sie jedoch einen Framework-Adapter wie React oder Solid verwenden, sollten Sie den Status nicht von hier abrufen. Stattdessen rufen Sie den Status aus asyncQueuer.state ab und stellen eine Selektorfunktion als 3. Argument für den Hook useAsyncQueuer bereit, um die Statusverfolgung zu aktivieren, wie unten gezeigt.
Framework-Adapter unterstützen ein selector-Argument, das es Ihnen ermöglicht, anzugeben, welche Zustandsänderungen Neu-Renderings auslösen. Dies optimiert die Leistung, indem unnötige Neu-Renderings bei irrelevanten Zustandsänderungen verhindert werden.
Standardmäßig ist asyncQueuer.state leer ({}), da der Selektor standardmäßig leer ist. Hier wird der reaktive Status aus einem TanStack Store useStore gespeichert. Sie müssen die Statusverfolgung aktivieren, indem Sie eine Selektorfunktion angeben.
// Default behavior - no reactive state subscriptions
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
console.log(queue.state) // {}
// Opt-in to re-render when activeItems changes
const queue = useAsyncQueuer(
processFn,
{ concurrency: 2, wait: 1000 },
(state) => ({ activeItems: state.activeItems })
)
console.log(queue.state.activeItems.length) // Reactive value
// Multiple state properties
const queue = useAsyncQueuer(
processFn,
{ concurrency: 2, wait: 1000 },
(state) => ({
activeItems: state.activeItems,
successCount: state.successCount,
errorCount: state.errorCount
})
)
// Default behavior - no reactive state subscriptions
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
console.log(queue.state) // {}
// Opt-in to re-render when activeItems changes
const queue = useAsyncQueuer(
processFn,
{ concurrency: 2, wait: 1000 },
(state) => ({ activeItems: state.activeItems })
)
console.log(queue.state.activeItems.length) // Reactive value
// Multiple state properties
const queue = useAsyncQueuer(
processFn,
{ concurrency: 2, wait: 1000 },
(state) => ({
activeItems: state.activeItems,
successCount: state.successCount,
errorCount: state.errorCount
})
)
Sie können Anfangswerte für den Status beim Erstellen eines asynchronen Queuers angeben
const savedState = localStorage.getItem('async-queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}
const queue = new AsyncQueuer(processFn, {
concurrency: 2,
wait: 1000,
initialState
})
const savedState = localStorage.getItem('async-queuer-state')
const initialState = savedState ? JSON.parse(savedState) : {}
const queue = new AsyncQueuer(processFn, {
concurrency: 2,
wait: 1000,
initialState
})
Der Store ist reaktiv und unterstützt Abonnements
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
// do something with the state like persist it to localStorage
})
// Unsubscribe when done
unsubscribe()
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
// do something with the state like persist it to localStorage
})
// Unsubscribe when done
unsubscribe()
Hinweis: Dies ist bei der Verwendung eines Framework-Adapters nicht notwendig, da der zugrunde liegende Hook useStore dies bereits tut. Sie können auch useStore von TanStack Store importieren und verwenden, um queuer.store.state bei Bedarf mit einem benutzerdefinierten Selektor zu reaktivem Status zu machen.
Die AsyncQueuerState enthält alle Eigenschaften aus dem Kernleitfaden für Warteschlangen plus
Jeder Framework-Adapter baut praktische Hooks und Funktionen um die asynchronen Queuer-Klassen herum. Hooks wie useAsyncQueuer oder useAsyncQueuedState sind kleine Wrapper, die den Boilerplate-Code für häufige Anwendungsfälle reduzieren können.
Für Kernkonzepte der Warteschlangenverwaltung und synchrone Warteschlangen siehe den Warteschlangen-Leitfaden.
Ihre wöchentliche Dosis JavaScript-Nachrichten. Jeden Montag kostenlos an über 100.000 Entwickler geliefert.