php kafaka_kafka php

Php (94) 2023-03-24 21:04

大家好,我是编程小6,很高兴遇见你,有问题可以及时留言哦。

最近项目上有一个需要用到消息队列的功能,从网上找了一些php相关的kafka使用的教程和博客,大抵都是安装php的拓展librdkafka(这里就不讲这个拓展的安装方法了,搜一下还是有很多教程的),然后直接用这个拓展进行开发,但是我直接用这个拓展开发的时候,不知道为啥运行不起来,一直报错(应该是我太菜了,哈哈哈哈哈哈)......我从github上找了一些相关的包想直接用一下,但是发现很多包都是几年前的了,基本上都是kafka 0.x 版本的。

我看了一下最多star的一个包,链接是:github.com/weiboad/kaf…

后面我用了enqueue/rdkafka 这个包,链接:github.com/php-enqueue…

下面展示一下我测试的代码,只是能运行起来,但是还没达到我想要的预期

生产者

$connFactory = new RdKafkaConnectionFactory([
    'global' => [
        'metadata.broker.list' => '127.0.0.1:9092',
        'socket.timeout.ms' => '50'
    ]
]);
$context = $connFactory->createContext();
$message = $context->createMessage('hello world!');
$topic = $context->createTopic('app');
$context->createProducer()->send($topic, $message);

消费者

$config = [
    'global' => [
        'group.id' => uniqid('', true),
        'metadata.broker.list' => '127.0.0.1:9092',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        // 设置从最后一个offset开始读取消息,不会读取到之前的消息
        'auto.offset.reset' => 'latest',
    ],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$topic = $context->createTopic('app');
$consumer = $context->createConsumer($topic);
while (true) {
    $message = $consumer->receive(30 * 1000);
    if (!$message instanceof RdKafkaMessage && !$message instanceof Message) {
        var_dump($message);
        continue;
    }
    $consumer->acknowledge($message);
    var_dump($message->getBody());
}

这样是能运行起来的,能正常发送和接受数据,我实际情况是想做一个队列,生产者有多个,往一个topic进行数据生产,然后有多个消费者在消费,但是这样写有个问题,我在启动了多个消费者的时候,每个消费者都会接受到生产者发送过来的消息,更像是群体发布群体订阅的形式,不是我想要的结果,我去网上找了其他的教程,有人说只要设置group_id不同就可以了,但我的group_id全部都是随机的,不太可能一样,按理来说是能实现的,但是就是不行,也试过用Queue去操作,但是还是会所有消费者都收到消息。

目前就只了解到了这个地方,还需要花点时间再看看,如果有大神看到的话,求指点一下!!

发表回复