fileq

    1.4.1 • Public • Published

    fileq

    File based FIFO queue. High-performance queue that stores JSON objects in a file-based FIFO, so the reads and writes are independent, allowing them to each have their own rhythm without increasing the memory usage.

    Features

    • Multiple writers and readers on the same queue
    • Callback and promise modes
    • Can recover previous queue if process is restarted
    • Recover queue position on process restart
    • Persitent or truncate modes on process restart
    • Fault tolerant, and fine-tunning

    Installation

    npm install fileq
    

    Usage

    const FileQueue = require("fileq");
     
    // Each queue stores its files in a folder
    var queue = FileQueue.from("queue");
    var i=0;
     
    // Callback mode
    setInterval(()=>{
        queue.push({key:i, message:"This is the entry for "+i});
        i++;
    },100);
     
    setInterval(()=>{
        queue.peek((err,entry)=>{
            console.log(entry);
        });
    },100);
     
    // Promise mode
    setInterval(async ()=>{
        await queue.push({key:i, message:"This is the entry for "+(i++)});
    },100);
     
    setInterval(async ()=>{
        let item = await queue.peek();
        console.log(item);
    },100);
     

    API

    FileQueue.from([path],[options]) => FileQueue

    Retrieve a queue stored in path folder. If the queue doesn't exist, it is created. The options parameter will be described later. If path is not specified, an anonymous queue will be created in the base path defined in the base options.

    FileQueue.configure(options)

    Sets default options that will be passed to every new created queue

    queue.push(json,callback) => Promise

    Pushes a JSON object to the queue. Callback takes the typical err and result arguments. If no callback is provided, it returns a promise.

    queue.peek(time,callback,commit) => Promise

    Retrieves a JSON object from the queue, in a FIFO manner. Callback takes the usual err and result arguments. If no callback is provided, it returns a promise. The argument time specifies a wait for data timeout. If no data could be read before time, then callback is called with "timeout" error (promise is rejected). The commit parameter specifies a transactional requirement. When commit is true, callback function receives a third argument (function done) that must be called in order to remove the item from the queue. Commit mode only works when callback function is passed:

        // Promise mode
        // Waits forever for an entry. Entry is returned and removed from queue
        let item = await queue.peek();
        // Waits 100 ms for an entry. Entry is returned and removed from queue, or timeout error
        let item = await queue.peek(10);
     
        // Callback mode
        // Waits forever for an entry. Entry is returned and removed from queue
        queue.peek((err,item)=>{
            console.log(item);
        });
        // Waits 100 ms for an entry. Entry is returned and removed from queue, or timeout error
        queue.peek((err,item)=>{
            if(err) console.error(err);
            else console.log(entry);
        },100);
        // Waits forever for an entry. Entry is returned but not remove until done is called
        queue.peek((err,item,done)=>{
            let error = doSomething(item);
            if(error) done(error); // If done is called with arguments, item is not removed
            else done(); // done called without arguments remove the item from the queue
        },100,true);

    Important: If commit mode is used, no more reads will be done until done has been called (queue will block further reads to avoid inconsistency):

        queue.peek((err,item,done)=>{
            console.log(item);
            setTimeout(done,1000); // Queue will be locked 1 sec
        },0,true);
     
        // Cannot retrieve next item until previous call ends
        queue.peek((err,item)=>{
            console.log(item);
        });

    queue.poll(time,callback) => Promise

    The same as queue.peek but without the commit feature

    queue.head(time,callback) => Promise

    Retrieves the head of the queue, without removing the element, as oposed to peek and poll

    queue.lock(callback) => Promise

    Locks the queue so no other callers can read from it until queue.unlock is called. Note that this is a soft lock (other readers can ignore the lock). The only time where a lock cannot be ignored if is queue.peek is called with commit feature (It's a different hard lock):

        // Locks the queue for 100 reads
        async function reader1() {
            await queue.lock();
            for(let i=0;i<100;i++) {
                let item = await queue.poll();
            }
            queue.unlock();
        }
     
        // Same as reader1: If reader1 has the lock, reader2 must wait
        async function reader2() {
            await queue.lock();
            for(let i=0;i<10;i++) {
                let item = await queue.poll();
            }
            queue.unlock();
        }
     
        // reader3 doesn't ask for lock, so it can read without waiting
        async function reader3() {
            for(let i=0;i<100;i++) {
                let item = await queue.poll();
            }
        }
     
        // reader4 doesn't ask for lock, but uses commit feature, so nobody
        // can read until commit is applied
        async function reader4() {
            for(let i=0;i<10;i++) {
                queue.peek((err,item,done)=>{
                    setTimeout(done,1000);
                });
            }
        }

    queue.unlock()

    Unlocks queue reads

    queue.close() => Promise

    Closes de queue

    queue.locked => Boolean

    Returns true if queue has a virtual lock; false otherwise.

    queue.closed => Boolean

    Returns true if the queue has been closed

    Options

    When creating a queue, data are stored in several files in a folder.

    The options object allows us to fine-tune the queue files to better match the needs of our process:

    • truncate : If true, previous queue status is reset, and a new empty queue is created. If false, a previously created queue is recovered. By default is set to false.
    • path : Base folder to store anonymous queues when the path is not specified. By default, the base path is the os temporal folder.

    Install

    npm i fileq

    DownloadsWeekly Downloads

    6

    Version

    1.4.1

    License

    MIT

    Unpacked Size

    19.6 kB

    Total Files

    10

    Last publish

    Collaborators

    • solzimer