消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。
甚至现在部分NoSQL也可做消息队列,如Redis。
上规模的公司都会有自己的日志分析系统,日志系统是怎么实现的呢?
图解:用户在访问应用的时候,我们要记录下用户的操作记录和系统的异常日志,常规的做法是将系统产生的日志保存到服务器磁盘,在服务器中开启定时任务,定时将磁盘的日志信息传入mq中(生产者),也定时将mq中的消息取出并存到相应的数据库,如ElasticSearch或Hive中。
上面的案例介绍了MQ的一个使用场景,我这里是用RabbitMQ举例,现实项目中可能用到的是Kafka。
首先安装brew(mac为例)
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
安装RabbitMQ
brew install rabbitmq
运行RabbitMQ
进入到 /usr/local/Cellar/rabbitmq/3.7.7,执行
sbin/rabbitmq-server
启动插件
进入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin
./rabbitmq-plugins enable rabbitmq_management
登陆管理界面
打开浏览器输入:http://localhost:15672,RabbitMQ默认15672端口六. Nodejs操作RabbitMQ
 
网上可以找到好几个相应的Node SDK,这里推荐amqplib
1.生产者
/**
 * 对RabbitMQ的封装
 */
let amqp = require('amqplib');
class RabbitMQ {
 constructor() {
 this.hosts = [];
 this.index = 0;
 this.length = this.hosts.length;
 this.open = amqp.connect(this.hosts[this.index]);
 }
 sendQueueMsg(queueName, msg, errCallBack) {
 let self = this;
 self.open
 .then(function (conn) {
 return conn.createChannel();
 })
 .then(function (channel) {
 return channel.assertQueue(queueName).then(function (ok) {
 return channel.sendToQueue(queueName, new Buffer(msg), {
 persistent: true
 });
 })
 .then(function (data) {
 if (data) {
 errCallBack && errCallBack("success");
 channel.close();
 }
 })
 .catch(function () {
 setTimeout(() => {
 if (channel) {
 channel.close();
 }
 }, 500)
 });
 })
 .catch(function () {
 let num = self.index++;
 if (num <= self.length - 1) {
 self.open = amqp.connect(self.hosts[num]);
 } else {
 self.index == 0;
 }
 });
 }
}2. 消费者
/**
 * 对RabbitMQ的封装
 */
let amqp = require('amqplib');
class RabbitMQ {
 constructor() {
 this.open = amqp.connect(this.hosts[this.index]);
 }
 receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
 let self = this;
 self.open
 .then(function (conn) {
 return conn.createChannel();
 })
 .then(function (channel) {
 return channel.assertQueue(queueName)
 .then(function (ok) {
 return channel.consume(queueName, function (msg) {
 if (msg !== null) {
 let data = msg.content.toString();
 channel.ack(msg);
 receiveCallBack && receiveCallBack(data);
 }
 })
 .finally(function () {
 setTimeout(() => {
 if (channel) {
 channel.close();
 }
 }, 500)
 });
 })
 })
 .catch(function () {
 let num = self.index++;
 if (num <= self.length - 1) {
 self.open = amqp.connect(self.hosts[num]);
 } else {
 self.index = 0;
 self.open = amqp.connect(self.hosts[0]);
 }
 });
 }3. 通过生产者向MQ发送一个消息,并创建队列
let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', 'my first message', (error) => {
 console.log(error)
})执行之后,我们打开管理平台,发现RabbbitMQ已经接受到了一条消息:
并且RabbbitMQ新增了一个队列testQueue
4. 获取指定队列的消息
let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => { 
 console.log(msg)
})// 输出结果:my first message此时打开RabbitMQ管理平台,消息数量已经变为0
综上:我们简单讲述了消息队列及RabbitMQ相关的一些知识,以及我们如何通过nodejs来生产与消费消息。
下载本文