Slice of Dev Logo

Implementing a job queue in Nodejs

Cover Image for Implementing a job queue in Nodejs
Author's Profile Pic
Rajkumar Gaur

What is a job queue and when will you need one?

A job queue holds a list of processes waiting to be executed after some condition is met or the workers/processors are ready to execute them.
This is pretty useful when you want to execute a time-consuming process at a given pace.

For example, your server receives 10 requests in a minute, but you can execute only 5 requests per minute because of computing power limitations. In this case, if you try to execute all 10 processes in a minute, your system might not take the load and possibly crash. In order to solve this, you can execute 5 of the requests in a minute and add the other 5 requests to a queue. These will be executed once the worker is idle or have extra computing power.

The Naive approach

The first approach that came to me was using an array to store the job payload and pop the jobs one by one from the array.
The code:

// array for storing and simulating a queue
const queue = []
// a counter for keeping track of currently running jobs
let running = 0
// max number of jobs that can be executed concurrently
const MAX_JOBS = 3 // change this to any number according to your requirement

app.post("/", (req, res) => {
  try {
    // pick the data you need
    const { name, id } = req.body
    // add the data to the end of the queue
    queue.push({name, id})
  } catch (e) {
    console.error(e)
  }
  res.status(200).end()
})

const execute = async () => {
  // execute only if currenly running processes are less than max_jobs
  if (running < MAX_JOBS && queue.length !== 0) {
    // increment currently running jobs
    running += 1
    // get the first element/job from the queue
    const { name, id } = queue.shift()
    // replace the below function with your actual code
    await timeConsumingFunction(name, id)
    running -= 1
  }
}

// poll every minute to call execute function
setInterval(() => {
  execute()
}, 60 * 1000)

The above code gets the job done in an ideal world. But, this will fail in the real world.

  • What happens when you restart the server? You lose the track of currently running processes and also the current jobs in the queue.
  • What happens when a job fails during processing? You lose that job forever because it's already been popped from the queue.
  • What about multiple workers/instances? You cant actually share the queue with other instances because it lives in your server and is only accessible on your server. So no chance of a distributed system.

To take care of the above scenario we have to use persistent storage to solve the above problems. Let's use Redis.

What is Redis?

Redis is a key-value pair based database. We are using Redis, particularly for the reason that it provides features like data structures (list, set, map, etc) and TTL (time to live) out of the box.
We will be using the list data structure of Redis to simulate a queue. To learn more about Redis, head to https://redis.io/docs/
And for interacting with Redis we will be using the ioredis library for Nodejs https://github.com/luin/ioredis

Here, we will still be using the same pattern as before, only the code about creating and handling the queue will change. We will also be now having two queues todo and processing . The todo queue will contain jobs waiting to get executed and processing will contain jobs that are currently getting processed. We are doing this to have a fail-safe mechanism.

When a job is ready to get executed we move that job from todo to processing and only remove the job from processing if the execution was successful. If the execution fails, the job will be moved back to the todo list at the back of the queue. This way we never really lose the failed jobs and can execute them again.

Let's revisit the three questions again:

  • What happens when you restart the server? No data is lost as the job data is stored in a persistent database. On a restart, you can still access the job data waiting to get processed.
  • What happens when a job fails during processing? If the system crashes or job execution fails, the failed job is again moved to the todo list from the processing list and it gets processed again after a while.
  • What about multiple workers/instances? Multiple workers and instances can connect to Redis and execute the jobs without relying on your main server.

We will be having two extra parameters for each job to handle failures.
We will use tries to keep track of how many times the job has been sent to the processing list. We will just delete the job after 3 tries if it doesn't succeed.
Another parameter will be timestamp to store check if a job has been in the processing queue for a long time (most probably the system crashed during the execution). If the difference between the current time and the timestamp is more than 10 minutes, move this process back to the todo queue.

// import the library after installing it
const { default: Redis } = require("ioredis");

// pass the connection parameters or pass nothing if connecting to local host
// for localhost, change to `const redis = new Redis()`
const redis = new Redis({
  port: process.env.REDIS_PORT,
  host: process.env.REDIS_HOST,
  password: process.env.REDIS_PASSWORD
});

// export the connection to import in index file
module.exports = { redis };

Let's also change the index.js file to use Redis instead of an array.

First, we update the code to use a Redis list instead of an array. We import the file above into our main file to use the Redis connection and push new jobs to the back of the Redis todoqueue

// other imports
// importing the redis client connection instance that we created above
const { redis } = require("./redis");

// a counter for keeping track of currently running jobs
let running = 0
// max number of jobs that can be executed concurrently
const MAX_JOBS = 3 // change this to any number according to your requirement

app.post("/", (req, res) => {
  try {
    // pick the data you need
    const { name, id } = req.body
    
    // push the data to the back of the list/queue
    await redis.rpush(
      'todo', // list name in redis, this will store jobs to be processed
      JSON.stringify({ name, id })
    )
  } catch (e) {
    console.error(e)
  }
  res.status(200).end()
})

A worker will always choose to execute the first/oldest job in the queue. But suppose there are two workers that are executed at the same time and they are accessing the same job i.e. first element of the queue.

We don't want this to be happening otherwise there can be multiple executions of the same job or other unexpected behaviors.

For this reason, we are going to use a locking system that will lock the queue for the current worker and other workers will wait before this worker has finished accessing the job (note that I said accessing the job and not executing it, we will release the lock as soon as the job is accessed/popped from the queue).

Don't worry if you are seeing this concept for the first time. The implementation is quite simple using Redis.
We are using a simple SET command in Redis to keep a flag if a queue is locked or not. We check every 5 seconds if the flag is not set, if it's not set then we can acquire the lock and set the flag to locked (this is just a string and can be anything you want!).
Also, we set the lock to expire after 10 seconds because that's more than enough to access the job.

// function for acquiring the lock for this worker
// param `key` is the name of the queue
// so `key` can either take value of `todo` or `processing` as these are our lists
const acquireLock = async (key) => {
  return new Promise((resolve, reject) => {
    // check every 5 seconds if lock can be acquired
    const interval = setInterval(() => {
      // set the `todo-lock` or `processing-lock` to "locked", if it doesnt already exist
      // "NX" option stands for Only set this value if it not exists and return OK, otherwise return null
      // "EX", 10 is the expire time of the lock, this key is deleted after 10 seconds
      redis
        .set(`${key}-lock`, "locked", "NX", "EX", 10)
        .then((res) => {
          if (res) { // if res is not null, then we set the lock successfully and can clear the interval
            resolve(res);
            clearInterval(interval);
          }
        })
        .catch((err) => {
          reject(err);
          clearInterval(interval);
        });
    }, 5000);
  });
};

// for deleting the lock programatically
// although it will expire after 10 seconds
const releaseLock = async (key) => {
  // delete the lock for `todo` or `processing` queue
  await redis.del(`${key}-lock`);
};

Let's also update the execute function. This will first try to acquire the lock for the todo queue, then access the element and transfer it to processing queue, then release the lock for todo queue and execute the job.
This will also delete the job from the processing queue after acquiring the lock.

const execute = async () => {
  // execute only if currenly running processes are less than max_jobs
  if (running < MAX_JOBS) {
    try {
      // try to acquire the lock for `todo` queue if another process havent acquired that already
      await acquireLock("todo");
      // get the first job
      const newProcess = await redis.lindex("todo", 0);
      if (newProcess) {
        const copy = JSON.parse(newProcess);
        copy.timestamp = Date.now(); // add a unix timestamp to the job, we will use this later 
        copy.tries = copy.tries ? copy.tries + 1 : 1; // update the number of tries for this job
        await redis
          .multi() // multi is for chaining redis commands in one transaction
          .lset("todo", 0, JSON.stringify(copy)) // set the first job with `timestamp` and `tries`
          .lmove("todo", "processing", "LEFT", "RIGHT") // move the first job to the back of `processing`
          .exec(); // if any command fails, revert everything back
        // release lock on `todo` queue as we have accessed the job and moved to `processing` queue
        await releaseLock("todo");
        
        // replace this function with your actual code
        await timeConsumingFunction();
        // job has been executed at this point

        await acquireLock("processing"); // acquire lock for `processing` queue
        await redis.lrem("processing", 1, JSON.stringify(copy)); // delete the current job from processing as its been executed
        await releaseLock("processing"); // release lock for `processing` queue
      } else {
        await releaseLock("todo"); // release lock for `todo` queue if there was no job in the queue
      }
    } catch (err) {
      console.error(err);
    }
    running -= 1
  }
}

// check for new jobs every five minutes
setInterval(() => {
  execute();
}, 5 * 60 * 1000); // tweak this as you need

At this point, we have a system that will check for new jobs and execute them. But what about the failed jobs that crashed due to system crashes? We got that covered!
Every 10 minutes (change this wrt your need), we check if there is a job that's been in the the processing queue for more than 10 minutes (again, change this wrt your need) using the timestamp we provided.
Also, delete the job altogether if the tries have exceeded 3 times (this is optional for your use case).
Also, this doesn't have to be on the same server as your above code, but for simplicity, I have deployed this on the same server.

const retry = async () => {
  try {
    await acquireLock("processing"); // acquire lock for `processing` to avoid conflicts
    const res = await redis.lindex("processing", 0); // get the first processing job
    if (res) {
      const processing = await JSON.parse(res);
      if (Date.now() - Number(processing.timestamp) >= 10 * 60 * 1000) { // check if its been more than 10 minutes
        if (Number(processing.tries) < 3) { // if `tries` are less than 3, move back to `todo` queue
          await redis
            .multi()
            .lset("processing", 0, JSON.stringify(processing))
            .lmove("processing", "todo", "LEFT", "RIGHT")
            .exec();
        } else {
          await redis.lpop("processing"); // otherwise if tries execeeded, delete the job
        }
      }
      await releaseLock("processing"); // release the lock
    } else {
      await releaseLock("processing"); // relase the lock if no job
    }
  } catch (err) {
    console.error(err);
  }
};

// check every 10 minutes for retrying
setInterval(() => {
  retry();
}, 10 * 60 * 1000);

That was all for the code, we now have a working queue with a failsafe mechanism.
The whole code:

// index.js

// other imports
// importing the redis client connection instance that we created above
const { redis } = require("./redis");

// a counter for keeping track of currently running jobs
let running = 0
// max number of jobs that can be executed concurrently
const MAX_JOBS = 3 // change this to any number according to your requirement

app.post("/", (req, res) => {
  try {
    // pick the data you need
    const { name, id } = req.body
    
    // push the data to the back of the list/queue
    await redis.rpush(
      'todo', // list name in redis, this will store jobs to be processed
      JSON.stringify({ name, id })
    )
  } catch (e) {
    console.error(e)
  }
  res.status(200).end()
})

// function for acquiring the lock for this worker
// param `key` is the name of the queue
// so `key` can either take value of `todo` or `processing` as these are our lists
const acquireLock = async (key) => {
  return new Promise((resolve, reject) => {
    // check every 5 seconds if lock can be acquired
    const interval = setInterval(() => {
      // set the `todo-lock` or `processing-lock` to "locked", if it doesnt already exist
      // "NX" option stands for Only set this value if it not exists and return OK, otherwise return null
      // "EX", 10 is the expire time of the lock, this key is deleted after 10 seconds
      redis
        .set(`${key}-lock`, "locked", "NX", "EX", 10)
        .then((res) => {
          if (res) { // if res is not null, then we set the lock successfully and can clear the interval
            resolve(res);
            clearInterval(interval);
          }
        })
        .catch((err) => {
          reject(err);
          clearInterval(interval);
        });
    }, 5000);
  });
};

// for deleting the lock programatically
// although it will expire after 10 seconds
const releaseLock = async (key) => {
  // delete the lock for `todo` or `processing` queue
  await redis.del(`${key}-lock`);
};

const execute = async () => {
  // execute only if currenly running processes are less than max_jobs
  if (running < MAX_JOBS) {
    try {
      // try to acquire the lock for `todo` queue if another process havent acquired that already
      await acquireLock("todo");
      // get the first job
      const newProcess = await redis.lindex("todo", 0);
      if (newProcess) {
        const copy = JSON.parse(newProcess);
        copy.timestamp = Date.now(); // add a unix timestamp to the job, we will use this later 
        copy.tries = copy.tries ? copy.tries + 1 : 1; // update the number of tries for this job
        await redis
          .multi() // multi is for chaining redis commands in one transaction
          .lset("todo", 0, JSON.stringify(copy)) // set the first job with `timestamp` and `tries`
          .lmove("todo", "processing", "LEFT", "RIGHT") // move the first job to the back of `processing`
          .exec(); // if any command fails, revert everything back
        // release lock on `todo` queue as we have accessed the job and moved to `processing` queue
        await releaseLock("todo");
        
        // replace this function with your actual code
        await timeConsumingFunction();
        // job has been executed at this point

        await acquireLock("processing"); // acquire lock for `processing` queue
        await redis.lrem("processing", 1, JSON.stringify(copy)); // delete the current job from processing as its been executed
        await releaseLock("processing"); // release lock for `processing` queue
      } else {
        await releaseLock("todo"); // release lock for `todo` queue if there was no job in the queue
      }
    } catch (err) {
      console.error(err);
    }
    running -= 1
  }
}

// check for new jobs every five minutes
setInterval(() => {
  execute();
}, 5 * 60 * 1000); // tweak this as you need

const retry = async () => {
  try {
    await acquireLock("processing"); // acquire lock for `processing` to avoid conflicts
    const res = await redis.lindex("processing", 0); // get the first processing job
    if (res) {
      const processing = await JSON.parse(res);
      if (Date.now() - Number(processing.timestamp) >= 10 * 60 * 1000) { // check if its been more than 10 minutes
        if (Number(processing.tries) < 3) { // if `tries` are less than 3, move back to `todo` queue
          await redis
            .multi()
            .lset("processing", 0, JSON.stringify(processing))
            .lmove("processing", "todo", "LEFT", "RIGHT")
            .exec();
        } else {
          await redis.lpop("processing"); // otherwise if tries execeeded, delete the job
        }
      }
      await releaseLock("processing"); // release the lock
    } else {
      await releaseLock("processing"); // relase the lock if no job
    }
  } catch (err) {
    console.error(err);
  }
};

// check every 10 minutes for retrying
setInterval(() => {
  retry();
}, 10 * 60 * 1000);

// redis.js

// import the library after installing it
const { default: Redis } = require("ioredis");

// pass the connection parameters or pass nothing if connecting to local host
// for localhost, change to `const redis = new Redis()`
const redis = new Redis({
  port: process.env.REDIS_PORT,
  host: process.env.REDIS_HOST,
  password: process.env.REDIS_PASSWORD
});

// export the connection to import in index file
module.exports = { redis };

We have successfully implemented a job queue from scratch. But if this was too much for you, there are many Nodejs libraries that provide this feature, one of them is BullMQ https://docs.bullmq.io/
This library also uses Redis behind the scenes to implement tons of features. Let's look at an example of how we can use it.

import { Queue, Worker, QueueScheduler } from 'bullmq'

// necessary for retrying failed jobs
const myQueueScheduler = new QueueScheduler('todo');

const MAX_JOBS = 3

// Create a queue, we dont need two queues here
// because BullMQ handles problems like locking resources, fail safes, etc
const queue = new Queue('todo', { 
  connection: {
    port: process.env.REDIS_PORT, // pass in the redis credentials
    host: process.env.REDIS_HOST,
    password: process.env.REDIS_PASSWORD
  },
  defaultJobOptions: {
    attempts: 3, // no of attempts for failed jobs
    backoff: { // retry after every 2^n times where n=1, 2, 3,...
      type: 'exponential',
      delay: 1000, // 1 sec
    },
  },
});

// worker is like the execute function we wrote above
// is executes the jobs in the queue as soon as it is available
const worker = new Worker('execute', async (job) => {
  const { name, id } = job.data;
  // your actual time consuming process
  await yourTimeConsumingFunction(name, id);
}, {
  // max number of jobs that can run concurrently
  // another way is removing this option and making multiple workers like worker1, worker2, etx
  concurrency: MAX_JOBS,
  connection: {
    port: process.env.REDIS_PORT, // pass in the redis credentials
    host: process.env.REDIS_HOST,
    password: process.env.REDIS_PASSWORD
  }
});

app.post("/", (req, res) => {
  try {
    // pick the data you need
    const { name, id } = req.body
    
    // push the data to the back of the above queue
    await queue.add(
      'job', // job name, doesnt have to be unique
      { name, id } // actual data to be used during execution
    )
  } catch (e) {
    console.error(e)
  }
  res.status(200).end()
})

That's all folks, thanks for reading! See you at the next one!


Cover Image for React Interview Experience

React Interview Experience

This blog is about my recent React interview experiences and some interesting questions that were asked. These questions might help you prepare for your next interview. Guess The Output | useState vs useReducer | useCallback and useMemo | Redux Vs Context API | Manage The Focus Of Elements Using React | Why is useRef used | Coding Problem.

Author's Profile Pic
Rajkumar Gaur
Cover Image for Streams in NodeJS

Streams in NodeJS

Node.js Streams are an essential feature of the platform that provide an efficient way to handle data flows. They allow for processing of large volumes of data in a memory-efficient and scalable way, and can be used for a variety of purposes such as reading from and writing to files, transforming data, and combining multiple streams into a single pipeline

Author's Profile Pic
Rajkumar Gaur