Nietzsche's Preposterous Moustache
Miss any of our Open RFC calls?Watch the recordings here! »

node-persistent-queue

1.0.3 • Public • Published

node-persistent-queue

NPM

Overview

Simple SQLite backed Queue for running many short tasks in Node.js using setImmediate()

If you have a large batch of small running tasks, this library will allow them to execute in sequence via the main event thread of node.js without blocking/starving other node.js events.

Description

The purpose of this library is to provide a simple means of:

  • executing, serially in FIFO order a queue of tasks one at a time
  • maintaining an on-disk queue (using SQLite) that persists through crashes/restarts
  • ensuring the node.js event loop can return to the poll phase between invocations of each job using setImmediate(), thus preventing blocking.
  • asynchronicity via Promises

A unit of work, or task is stored in the queue as a simple json object. Each task should complete with sufficient speed so as not to block your node.js event loop.

If you cannot break your tasks down sufficiently, you should consider a multi-threaded Worker implementation instead.

If however, you have a large batch of small running tasks, this library will allow them to execute via the main event thread of node.js without blocking/starving other node.js events.

Refer to the Event Loop Timers and Nexttick Guide from https://nodejs.org for a great explanation of the node.js events thread.

Installation

$ npm install --save node-persistent-queue

Usage

The module is quite simple to use through its EventEmitter API.

Instantiation

The following illustrates how to specify the location of the SQLite database for your instance.

var Queue = require('node-persistent-queue') ;
 
/*
Provide path to your sqlite database.  If file doesn't exist, it will be created
 */
var q = new Queue('./path/to/db.sqlite') ;
 
/*
You can use an in-memory sqlite database (although it would no longer be a persistent queue)
 */
var q = new Queue(':memory:') ;
// or
var q = new Queue('') ;

The second optional parameter specifies the number of tasks to retrieve from the DB at a time.

/*
By default, the module will retrieve up to 10 'tasks' from the sqlite database at a time. 
If the data for your tasks is quite large, you can reduce this to conserve more memory
or you can increase this limit to improve throughput.
 */
var q = new Queue('./path/to/db.sqlite',1) ; // Retrieve each job from DB one at a time
 
var q = new Queue('./path/to/db.sqlite',1000) ; // Grab 1000 at a time

Events

node-persistent-queue emits events according to the following table:

Event Description Event Handler Parameters
start Emitted when the queue starts processing tasks (after calling .start() method) q.on('start',function(){
}) ;
stop Emitted when the queue stops processing tasks (after calling .stop() method) q.on('stop',function(){
}) ;
next Emitted when the next task is to be executed. This occurs:
* when there are items in the queue and .start() has been called; or
* after .add() has been called to add a task to an empty queue and queue isStarted() already
q.on('next',function(job) {
  job.id,
  job.job
}) ;
empty Emitted when the last task is completed and removed from the db q.on('empty',function() {
}) ;
add Emitted when a task has been added to the queue (after calling .add() method) q.on('add',function(job) {
  job.id,
  job.job
}) ;
open Emitted when the sqlite database has been opened successfully (after calling .open() method) q.on('open',function(sqlite) {
  sqlite //instance of sqlite3.Database
}) ;
close Emitted when the sqlite database has been closed successfully (after calling .close() method) q.on('close',function() {
}) ;

Contrived Example

This example illustrates the use of the events emitted from node-persistent-queue.

The empty event handler below automatically stops the queue when it becomes empty. It then closes the SQLite DB and terminates the script.

Note, that the next event handler, on completion of processing the task, must call the .done() callback method. This will then schedule another next event to be emitted, using setImmediate().

The .add() method returns a promise that resolves when the job has been saved in the sqlite database.

var Queue = require('node-persistent-queue') ;
var q = new Queue(':memory:') ;
 
var task1 = {
    data: "Data1"
} ;
var task2 = {
    data: "Data2"
} ;
var task3 = {
    data: "Data3"
} ;
var task4 = {
    data: "Data4"
} ;
 
q.on('open',() => {
    console.log('Opening SQLite DB') ;
    console.log('Queue contains '+q.getLength()+' job/s') ;
}) ;
 
q.on('add',task => {
    console.log('Adding task: '+JSON.stringify(task)) ;
    console.log('Queue contains '+q.getLength()+' job/s') ;
}) ;
 
q.on('start',() => {
    console.log('Starting queue') ;
}) ;
 
q.on('next',task => {
    console.log('Queue contains '+q.getLength()+' job/s') ;
    console.log('Process task: ') ;
    console.log(JSON.stringify(task)) ;
 
    // Must tell Queue that we have finished this task
    // This call will schedule the next task (if there is one)
    q.done() ;
}) ;
 
// Stop the queue when it gets empty
q.on('empty',() => {
    console.log('Queue contains '+q.getLength()+' job/') ;
    q.stop() ;
    q.close()
    .then(() => {
        process.exit(0) ;
    })
}) ;
 
q.on('stop',() => {
    console.log('Stopping queue') ;
}) ;
 
q.on('close',() => {
    console.log('Closing SQLite DB') ;
}) ;
 
q.open()
.then(() => {
    q.add(task1) ;
    q.add(task2) ;
    q.add(task3) ;
    q.add(task4) ;
    q.start() ;
})
.catch(err => {
    console.log('Error occurred:') ;
    console.log(err) ;
    process.exit(1) ;
}) ;
 

The above script produces the following output:

Opening SQLite DB
Queue contains 0 job/s
Starting queue
Adding task: {"id":1,"job":{"data":"Data1"}}
Queue contains 1 job/s
Adding task: {"id":2,"job":{"data":"Data2"}}
Queue contains 2 job/s
Adding task: {"id":3,"job":{"data":"Data3"}}
Queue contains 3 job/s
Adding task: {"id":4,"job":{"data":"Data4"}}
Queue contains 4 job/s
Queue contains 4 job/s
Process task: 
{"id":1,"job":{"data":"Data1"}}
Queue contains 3 job/s
Process task: 
{"id":2,"job":{"data":"Data2"}}
Queue contains 2 job/s
Process task: 
{"id":3,"job":{"data":"Data3"}}
Queue contains 1 job/s
Process task: 
{"id":4,"job":{"data":"Data4"}}
Queue contains 0 job/
Stopping queue
Closing SQLite DB

Take a look at the test script for further examples of the API

Notes

You may be wondering why the start event is emitted before the add events, based on the output above.

The .add() method calls perform an asynchronous write to the SQLite database. The callback that emits the add event from this I/O doesn't happen until after the current block of code completes. Thus, the .start() method is called first along with the start event.

Whether the tasks are added before starting the queue or after doesn't matter because the next event cannot occur until both the add and start events have fired.

The queue can be left in the start state (when the queue becomes empty, you don't have to .stop() it). As things are .add()ed, the next event will be emitted after the current code block finishes.

TODO

A TODO List of possible future features is included. Contributions welcome.

Licence

Copyright (C) 2019 Damien Clark, Damo's World

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Acknowledgements / Attribution

Thanks to the SQLite team for an awesome "in-process library that implements a self-contained, serverless, zero-configuration, transactional SQL database engine."

Install

npm i node-persistent-queue

DownloadsWeekly Downloads

98

Version

1.0.3

License

Apache-2.0

Unpacked Size

50.6 kB

Total Files

8

Last publish

Collaborators

  • avatar