one day of code

Create tasks queue using rxjs

October 13, 2022

I recently came to need a functionality of executing multiple (unknown number) tasks sequentially but asynchronously. There is a solution with [].reduce() which involved using Promise.all(), but I wanted a dynamic array. What this mean is that I don’t want to create arrays every time a batch of tasks “wildly-appears.mp3”. Instead I wanted to queue tasks as they come and execute them in order they came one finished after another.

There are a lot of tutorials on the internet teaching you what each operator does. Here, I’ll try to explain the “bussiness logic” or the principle of “building blocks” that, when merged together, create this functionality.

Before we start, make sure you get comfortable with these terms as we are going to use them in this tutorial:

Despite the sheer size of this post, the final code is snippet-level size and you can implement it in your class. Just make sure you unsubscribe when needed to not make memory leaks. In this tutorial we’ll be using Angular’s component lifecycle, just to maintain the focus on the core functionality. So…, let us start:

Prerequisites

  • Angular/es6 basics (for this tutorial)
  • RxJs basics

User story

User needs to make async request to a server. The response can take some time to process and return result but user needs to continue to use the app. We don’t want to allow user to execute parallel tasks as they may be costly.

Random example:

  • User selects and starts uploading few videos that platform need to process.
  • Files are queued and start uploading one-by-one.
  • In the meantime, user adds more images and videos
  • New files are queued
  • User changes mind and wants to cancel

Analysis

I’ll use the metaphor of underground mines:

Three mining carts on a rail on the rocks in the sunny day near forest

  1. We first need rails for transport
  2. Then, we need carts to transport the “data” in
  3. After that, we need a “control station” to stop the transport of carts

How do we do that?

Solution (bussiness logic of the task queue)

What do we need to do? We need to split the functionality on multiple, smaller parts.

In lame terms, we need a “function” to pass “arguments” to it. Then we need to “listen” to that function when it receives the argument. After that, we need to wait for the argument to output the result and notify our function before it starts processing the next entity.

In technical terms:

  1. Create the “stream” that data needs to pass through (observable) - rails
  2. Create the entity that we can send the “tasks” in (subject) - cart
  3. Create another entity to control the termination of “transport” (subject) - control-station

Core Logic

example.component.ts

import { Subject, Observable, from } from 'rxjs'
import { concatAll, takeUntil } from 'rxjs/operators'
// Cart
readonly addToQueue$: Subject<any> = new Subject<any>;
// Control station for when to MANUALLY stop the queue
readonly cancelQueue$: Subject<any> = new Subject<any>;
// Rails (see the last snippet for automatic unsubscription)
readonly queue$ = Observable<any> = from(this.addToQueue$).pipe(
concatAll(), // Serialize the "carts" to come one-by one
takeUntil(this.cancelQueue$), // MANUALLY stop the queue when user clicks
);

Usage

Notice that this is the same component as above, so just append the snippets respectfully.

example.component.html

<button (click)="handleAddToQueue()">Add to queue</button>
<button (click)="handleCancelQueue()">Stop the queue</button>

example.component.ts

import { interval } from 'rxjs'
import { takeUntil } from 'rxjs/operators'
readonly testObservable$ = interval(Math.random() * 10000).pipe(console.log)
ngOnInit() {
// Start the queue (subscribe) and log every value as soon as it's out of the stream
this.queue$.subscribe(value => console.log(value));
}
handleAddToQueue() {
this.addToQueue$.next(testObservable$)
}
handleCancelQueue() {
this.cancelQueue$.next(true)
}

Additional part to stop the queue when component unmounts from the DOM

readonly #unsubscribe$: Subject<void> = new Subject<void>;
ngOnInit() {
// Rails
readonly queue$ = Observable<any> = from(this.addToQueue$).pipe(
concatAll(), // Serialize the "carts" to come one-by one
takeUntil(this.cancelQueue$), // MANUALLY stop the queue when user clicks
takeUntil(this.#unsubscribe$) // AUTOMATICALLY stop the queue
);
}
ngOnDestroy() {
this.#unsubscribe.next();
}

That’s it. Now you have a fully functional sequential asynchronous task queue logic. You can choose to implement this with your state management app or any other place where you need jobs done in order but don’t know when each one is going to finish.

PS: Feedbacks, critiques and suggestions are welcome.


Written by Milan Miljkovic — a tech enthusiast and design system practitioner.