Wondering what’s next for npm?Check out our public roadmap! »

    bhb-amqp-connection-manager

    0.0.8 • Public • Published

    bhb-amqp-connection-manager

    安装

    npm install --save amqplib bhb-amqp-connection-manager@0.0.7
    注意:该组件在不断完善中,可能引入比较多不兼容的内容,请锁死版本,按需升级
    

    简介

    大头兄弟amqp链接管理 基于amqp-connection-manager 包的功能,做部分业务扩展

    需要了解的使用教程 包含

    发送信息示例

    const rabbitMQ = require('bhb-amqp-connection-manager');
     
    const connection = rabbitMQ.connect(process.env.APP_MQ_URL)
        .on('connect', () => console.log('Connected!'))
        .on('disconnect', params => console.log('Disconnected.', params.err.stack));
     
    const channel = connection.createChannel()
        .addSetup(async function(channel) {
            // `channel` here is a regular amqplib `ConfirmChannel`.
            const prefetch = parseInt(process.env.prefetch) || 1;
     
            await channel.assertQueue('hello', {durable: false});
     
            await channel.prefetch(prefetch);
        });
    (async function() {
        await channel.sendToQueue('hello', {num: 1});
        await channel.publish('', 'hello', {num: 10001});
        await connection.close();
    })();
     

    消费消息示例

    if (require('cluster').isMaster) {
        return require('../util/master');
    }
    const bluebird = require('bluebird');
    const commonUtil = require('../util/common');
     
    const rabbitMQ = require('bhb-amqp-connection-manager');
     
    const connection = rabbitMQ.connect(process.env.APP_MQ_URL);
     
    const channel = connection.createChannel()
        .addSetup(async function(channel) {
            // `channel` here is a regular amqplib `ConfirmChannel`.
            const prefetch = parseInt(process.env.prefetch) || 5;
     
            await channel.assertQueue('hello', {durable: false});
     
            await channel.prefetch(prefetch);
     
            await channel.consumerQueue('hello', async function({num}) {
                console.log(`start ${num}`);
                await bluebird.delay(2000);
                console.log(`end ${num}`);
            });
        });
     
    commonUtil.bindGraceExit(async function() {
        await channel.cancelConsumers();
        await channel.waitMessageEmpty();
        await connection.close();
    });

    版本变更

    • v0.0.3 支持node 4.0版本(babel编译)
    • v0.0.4 移除babel编译,只支持node 6>0
    • v0.0.5 修复json解析错误未捕获的bug。默认忽略推入数据json格式有误的数据,并通过日志输出提醒.
    • v0.0.6 修复当连接后未执行完setup的情况下,channel已经失去连接导致忽略处理,并返回null的bug。改为直接抛出一个错误,中断后续没必要的执行。 引入一个不兼容的默认值处理:当queue声明为durable的时候,如果没设置x-queue-mode的情况下,将默认设置x-queue-mode=lazy, 如果需要自定义,需要明确声明该配置,否则assertQueue时导致配置不一致会抛出错误。
    • v0.0.7 添加 consumerQueueUseRetry 便捷方法 相比 consumerQueue增加一个option参数 option.count (可选) 设置重试次数 option.failureQueue (可选) 设置失败后推入的队列,如果没设置,则自动创建一个队列,名字为:failure.${queueName} ,持久缓存 option.delay (可选) 设置重试频率的函数 ,设置该值时 count 不生效。 当count和delay都不设置的时候,使用amqplib-retry 的默认行为
           (attempts) => {
             const delay = Math.pow(2, attempts)
             if (delay > 60 * 60 * 24) {
               // the delay for the message is longer than 24 hours.  Fail the message and never retry again.
               return -1
             }
             return delay * 1000
           }
    
    • v0.0.8 改写mq默认的失败重试机制

      新增setConfig方法,目前可配置项:

          {
              DINGDING_HOST:'',//失败钉钉通知
              DEAD_LETTER_TTLs:[4,20,100]//失败重试间隔 单位 秒
          }
      

    Install

    npm i bhb-amqp-connection-manager

    DownloadsWeekly Downloads

    13

    Version

    0.0.8

    License

    none

    Unpacked Size

    52.1 kB

    Total Files

    26

    Last publish

    Collaborators

    • avatar