视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
对于NodeJS如何操作消息队列RabbitMQ的分析
2020-11-27 19:33:44 责编:小采
文档
 这篇文章主要介绍了关于对NodeJS如何操作消息队列RabbitMQ的分析,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下

一. 什么是消息队列?

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

二. 常用的消息队列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至现在部分NoSQL也可做消息队列,如Redis。

三. 消息队列的使用场景?

  • 异步处理

  • 应用解耦

  • 流量削峰

  • 四. 使用案例

    上规模的公司都会有自己的日志分析系统,日志系统是怎么实现的呢?

    图解:用户在访问应用的时候,我们要记录下用户的操作记录和系统的异常日志,常规的做法是将系统产生的日志保存到服务器磁盘,在服务器中开启定时任务,定时将磁盘的日志信息传入mq中(生产者),也定时将mq中的消息取出并存到相应的数据库,如ElasticSearch或Hive中。

    五. 如何安装RabbitMQ?

    上面的案例介绍了MQ的一个使用场景,我这里是用RabbitMQ举例,现实项目中可能用到的是Kafka。

    首先安装brew(mac为例)

    /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

    安装RabbitMQ

    brew install rabbitmq

    运行RabbitMQ

    进入到 /usr/local/Cellar/rabbitmq/3.7.7,执行

    sbin/rabbitmq-server

    启动插件

    进入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ./rabbitmq-plugins enable rabbitmq_management

    登陆管理界面

    打开浏览器输入:http://localhost:15672,RabbitMQ默认15672端口六. Nodejs操作RabbitMQ

    网上可以找到好几个相应的Node SDK,这里推荐amqplib

    1.生产者

    /**
     * 对RabbitMQ的封装
     */
    let amqp = require('amqplib');
    
    class RabbitMQ {
     constructor() {
     this.hosts = [];
     this.index = 0;
     this.length = this.hosts.length;
     this.open = amqp.connect(this.hosts[this.index]);
     }
     sendQueueMsg(queueName, msg, errCallBack) {
     let self = this;
    
     self.open
     .then(function (conn) {
     return conn.createChannel();
     })
     .then(function (channel) {
     return channel.assertQueue(queueName).then(function (ok) {
     return channel.sendToQueue(queueName, new Buffer(msg), {
     persistent: true
     });
     })
     .then(function (data) {
     if (data) {
     errCallBack && errCallBack("success");
     channel.close();
     }
     })
     .catch(function () {
     setTimeout(() => {
     if (channel) {
     channel.close();
     }
     }, 500)
     });
     })
     .catch(function () {
     let num = self.index++;
    
     if (num <= self.length - 1) {
     self.open = amqp.connect(self.hosts[num]);
     } else {
     self.index == 0;
     }
     });
     }
    }

    2. 消费者

    /**
     * 对RabbitMQ的封装
     */
    let amqp = require('amqplib');
    
    class RabbitMQ {
     constructor() {
     this.open = amqp.connect(this.hosts[this.index]);
     }
     receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
     let self = this;
    
     self.open
     .then(function (conn) {
     return conn.createChannel();
     })
     .then(function (channel) {
     return channel.assertQueue(queueName)
     .then(function (ok) {
     return channel.consume(queueName, function (msg) {
     if (msg !== null) {
     let data = msg.content.toString();
     channel.ack(msg);
     receiveCallBack && receiveCallBack(data);
     }
     })
     .finally(function () {
     setTimeout(() => {
     if (channel) {
     channel.close();
     }
     }, 500)
     });
     })
     })
     .catch(function () {
     let num = self.index++;
     if (num <= self.length - 1) {
     self.open = amqp.connect(self.hosts[num]);
     } else {
     self.index = 0;
     self.open = amqp.connect(self.hosts[0]);
     }
     });
     }

    3. 通过生产者向MQ发送一个消息,并创建队列

    let mq = new RabbitMQ();
    mq.sendQueueMsg('testQueue', 'my first message', (error) => {
     console.log(error)
    })

    执行之后,我们打开管理平台,发现RabbbitMQ已经接受到了一条消息:

    并且RabbbitMQ新增了一个队列testQueue

    4. 获取指定队列的消息

    let mq = new RabbitMQ();
    mq.receiveQueueMsg('testQueue',(msg) => { 
     console.log(msg)
    })// 
    输出结果:my first message

    此时打开RabbitMQ管理平台,消息数量已经变为0

    综上:我们简单讲述了消息队列及RabbitMQ相关的一些知识,以及我们如何通过nodejs来生产与消费消息。

    下载本文
    显示全文
    专题