October 13, 2022
I recently came to need a functionality of executing multiple (unknown number) tasks sequentially but asynchronously.
There is a solution with [].reduce()
which involved using Promise.all()
, but I wanted a dynamic array.
What this mean is that I don’t want to create arrays every time a batch of tasks “wildly-appears.mp3”.
Instead I wanted to queue tasks as they come and execute them in order they came one finished after another.
There are a lot of tutorials on the internet teaching you what each operator does. Here, I’ll try to explain the “bussiness logic” or the principle of “building blocks” that, when merged together, create this functionality.
Before we start, make sure you get comfortable with these terms as we are going to use them in this tutorial:
Despite the sheer size of this post, the final code is snippet-level size and you can implement it in your class. Just make sure you unsubscribe when needed to not make memory leaks. In this tutorial we’ll be using Angular’s component lifecycle, just to maintain the focus on the core functionality. So…, let us start:
User needs to make async request to a server. The response can take some time to process and return result but user needs to continue to use the app. We don’t want to allow user to execute parallel tasks as they may be costly.
Random example:
- User selects and starts uploading few videos that platform need to process.
- Files are queued and start uploading one-by-one.
- In the meantime, user adds more images and videos
- New files are queued
- User changes mind and wants to cancel
I’ll use the metaphor of underground mines:
How do we do that?
What do we need to do? We need to split the functionality on multiple, smaller parts.
In lame terms, we need a “function” to pass “arguments” to it. Then we need to “listen” to that function when it receives the argument. After that, we need to wait for the argument to output the result and notify our function before it starts processing the next entity.
In technical terms:
example.component.ts
import { Subject, Observable, from } from 'rxjs'import { concatAll, takeUntil } from 'rxjs/operators'
// Cartreadonly addToQueue$: Subject<any> = new Subject<any>;// Control station for when to MANUALLY stop the queuereadonly cancelQueue$: Subject<any> = new Subject<any>;// Rails (see the last snippet for automatic unsubscription)readonly queue$ = Observable<any> = from(this.addToQueue$).pipe(concatAll(), // Serialize the "carts" to come one-by onetakeUntil(this.cancelQueue$), // MANUALLY stop the queue when user clicks);
Notice that this is the same component as above, so just append the snippets respectfully.
example.component.html
<button (click)="handleAddToQueue()">Add to queue</button><button (click)="handleCancelQueue()">Stop the queue</button>
example.component.ts
import { interval } from 'rxjs'import { takeUntil } from 'rxjs/operators'
readonly testObservable$ = interval(Math.random() * 10000).pipe(console.log)ngOnInit() {// Start the queue (subscribe) and log every value as soon as it's out of the streamthis.queue$.subscribe(value => console.log(value));}handleAddToQueue() {this.addToQueue$.next(testObservable$)}handleCancelQueue() {this.cancelQueue$.next(true)}
Additional part to stop the queue when component unmounts from the DOM
readonly #unsubscribe$: Subject<void> = new Subject<void>;ngOnInit() {// Railsreadonly queue$ = Observable<any> = from(this.addToQueue$).pipe(concatAll(), // Serialize the "carts" to come one-by onetakeUntil(this.cancelQueue$), // MANUALLY stop the queue when user clickstakeUntil(this.#unsubscribe$) // AUTOMATICALLY stop the queue);}ngOnDestroy() {this.#unsubscribe.next();}
That’s it. Now you have a fully functional sequential asynchronous task queue logic. You can choose to implement this with your state management app or any other place where you need jobs done in order but don’t know when each one is going to finish.
PS: Feedbacks, critiques and suggestions are welcome.
Written by Milan Miljkovic — a tech enthusiast and design system practitioner.