视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
PHP 消息队列 Kafka 使用
2020-11-03 12:31:01 责编:小采
文档

安装 Kafka 服务

直接到 kafka 官网 , 下载最新的

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz

解压,进入目录

tar -zxvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0

启动 Kafka 服务

使用安装包中的脚本启动单节点 Zookeeper 实例

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

使用 kafka-server-start.sh 启动 kafka 服务

bin/kafka-server-start.sh config/server.properties

创建 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看 topic 列表,检查是否创建成功

bin/kafka-topics.sh --list --zookeeper localhost:2181
$ test

生产者,发送消息

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

服务方面到这里就差不多了,接下来就是 php 的事了。

安装 PHP 扩展

rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install

安装 php-rdkafka 扩展

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config ## 这里根据自己的情况填写路径
make && make install

在 php-ini 加上

extension=rdkafka.so

重启,php-fpm,就应该可以看到该扩展。

使用Kafka

创建一个生产者类

<?php
class KafkaProducer
{
 public static $brokerList = '127.0.0.1:9092';
 public static function send($message, $topic)
 {
 self::producer($message, $topic);
 }
 public static function producer($message, $topic = 'test')
 {
 $conf = new RdKafkaConf();
 $conf->set('metadata.broker.list', self::$brokerList);
 $producer = new RdKafkaProducer($conf);
 $topic = $producer->newTopic($topic);
 $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
 $producer->poll(0);
 $result = $producer->flush(10000);
 if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
 throw new RuntimeException('Was unable to flush, messages might be lost!');
 }
 }
}

创建一个消费类

<?php
class KafkaConsumer
{
 public static $brokerList = '127.0.0.1:9092';
 public static function consumer()
 {
 $conf = new RdKafkaConf();
 $conf->set('group.id', 'test');
 $rk = new RdKafkaConsumer($conf);
 $rk->addBrokers("127.0.0.1");
 $topicConf = new RdKafkaTopicConf();
 $topicConf->set('auto.commit.interval.ms', 100);
 $topicConf->set('offset.store.method', 'broker');
 $topicConf->set('auto.offset.reset', 'smallest');
 $topic = $rk->newTopic('test', $topicConf);
 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
 while (true) {
 $message = $topic->consume(0, 120*10000);
 switch ($message->err) {
 case RD_KAFKA_RESP_ERR_NO_ERROR:
 var_dump($message);
 break;
 case RD_KAFKA_RESP_ERR__PARTITION_EOF:
 echo "No more messages; will wait for more
";
 break;
 case RD_KAFKA_RESP_ERR__TIMED_OUT:
 echo "Timed out
";
 break;
 default:
 throw new Exception($message->errstr(), $message->err);
 break;
 }
 }
 }
}

问题汇总

1、 No Java runtime present, requesting install

因为 kafka 需要 java 环境支持,所以安装 java 环境。可以到 javase-jdk14-downloads 选择自己的版本进行下载安装

2、创建 topic 出现:Replication factor: 1 larger than available brokers: 0

意思是至少有一个 brokers. 也就是说并没有有效的 brokers 可以用。你要确保你的 kafka 已经启动了

推荐教程:《PHP教程》

下载本文
显示全文
专题