Apache Kafka 是一个分布式且容错的流处理系统。
在本教程中,我们将介绍 Spring 对 Kafka 的支持以及它在原生 Kafka Java 客户端 API 之上提供的抽象层。
Spring Kafka 通过 和使用 注解的消息驱动的POJO,提供了简单且典型的 Spring template 编程模型。
要下载和安装 Kafka,请参阅 此处 的官方指南。
我们需要在 中添加 依赖:
然后按如下方法配置 :
我们的示例应用程序是 Spring Boot。
本文假定服务器使用默认配置启动,并且没有更改服务端口。
之前,我们使用命令行工具在 Kafka 中创建主题:
但随着 Kafka 引入 ,我们现在可以以编程式创建 topic。
我们需要添加 Spring Bean,它将自动为所有 类型的 Bean 添加 topic:
要创建消息,我们首先需要配置 。这将设定创建 Kafka Producer 实例的策略。
然后,我们需要一个 ,它封装了一个 实例,并提供向 Kafka topic 发送消息的便捷方法。
实例是线程安全的。在整个 application context 中使用单例会带来更高的性能。 实例也是线程安全的,因此,也建议只维护一个实例。
我们可以使用 类发送消息:
API 会返回一个 对象。如果我们想阻塞发送线程并获取发送信息的结果,可以调用 对象的 API。线程将等待结果,但这会减慢生产者的速度。
Kafka 是一个快速流处理平台。因此,最好以异步方式处理结果,这样后续消息就不必等待前一条消息的结果。
我们可以通过回调来实现:
为了消费消息,我们需要配置一个 和一个 。一旦 Spring Bean Factory 中的这些 Bean 可用,就可以使用 注解配置基于 POJO 的消费者。
配置类上需要使用 注解,以便在 Spring 管理的 Bean 上检测 注解:
我们可以为一个 topic 实现多个 listener,每个 listener 都有不同的 group Id.。此外,一个 consumer 可以监听来自不同 topic 的消息:
Spring 还支持在 listener 中使用 @Header 注解检索一个或多个 message header:
注意,我们创建的 topic 只有一个分区。
对于有多个分区的 topic, 可以通过初始偏移量(offset)显式地订阅 topic 的特定分区:
由于该 listener 中的 已设置为 0,因此每次初始化该 listener 时,都会重新消费之前从分区 0 和 3 中消费的所有消息。
如果不需要设置偏移量,我们可以使用 注解的 属性,只设置没有偏移量的分区:
我们可以通过添加自定义 filter 来配置 listener,以消费指定的消息内容。这可以通过向 设置 来实现:
然后,我们就可以在 listener 中配置来使用这个容器工厂:
在此 listener 中,所有与 filter 匹配的消息都将被丢弃。
到目前为止,我们只介绍了生产、消费字符串消息。我们也可以发送和接收自定义 Java 对象。这需要在 中配置适当的序列化器,并在 中配置反序列化器。
让我们来看看一个简单的 bean 类, 我们将把它作为信息发送:
在本例中,我们将使用 JsonSerializer。
让我们看看 和 的代码:
我们可以使用这个新的 发送 信息:
同样,让我们修改 和 ,以正确反序列化 信息:
spring-kafka JSON 序列化器和反序列化器使用的是 Jackson 库,它也是 项目的可选 Maven 依赖项。
把它添加到 中:
建议使用 所兼容的版本(定义在 中),而不是使用最新版本的 Jackson。
最后,我们需要编写一个 listener 来接收 信息:
现在让我们看看如何配置应用程序,将各种对象发送到同一个 topic,然后再消费它们。
首先,我们将添加一个新类 :
我们需要进行一些额外的配置,才能向同一个 topic 发送 和 对象。
在 producer 中,我们必须配置 JSON 类型映射:
这样,库就会在 type header 中填写相应的类名。
因此, 和 看起来就像这样:
我们可以使用该 向 topic 发送 、 或任何对象:
为了能够反序列化传入的消息,我们需要为 提供一个自定义的消息转换器()。
依赖于一个 。默认情况下,映射器会推断出接收对象的类型:相反,我们需要明确告诉它使用 type header 来确定反序列化的目标类:
我们还需要提供反向映射信息。在 type header 中找到 “greeting” 将被识别为一个 对象,而 “farewell” 对应于一个 对象。
最后,我们需要配置 typeMapper 信任的包。我们必须确保它包含目标类的位置:
下面是该 的最终定义:
现在,我们需要告诉 使用 和一个相当基本的 :
最后但同样重要的是,在我们的 中,我们要创建 handler method 来消费每个类型的对象。每个处理方法都需要用 进行注解。
最后要注意的是,我们还可以为无法绑定到 或 类的对象定义一个默认的 handler::
在本文中,我们介绍了 Spring 支持 Apache Kafka 的基础知识。我们简要介绍了用于生产和消费消息的类。
在运行代码之前,请确保 Kafka 服务已经运行,并且已经手动创建了 Topic。
参考:
版权声明:
本文来源网络,所有图片文章版权属于原作者,如有侵权,联系删除。
本文网址:https://www.bianchenghao6.com/java-jiao-cheng/14320.html