当前位置:网站首页 > Java教程 > 正文

java对接kafka教程



Apache Kafka 是一个分布式且容错的流处理系统。

在本教程中,我们将介绍 Spring 对 Kafka 的支持以及它在原生 Kafka Java 客户端 API 之上提供的抽象层。

Spring Kafka 通过 和使用 注解的消息驱动的POJO,提供了简单且典型的 Spring template 编程模型。

要下载和安装 Kafka,请参阅 此处 的官方指南。

我们需要在 中添加 依赖:

然后按如下方法配置 :

我们的示例应用程序是 Spring Boot。

本文假定服务器使用默认配置启动,并且没有更改服务端口。

之前,我们使用命令行工具在 Kafka 中创建主题:

但随着 Kafka 引入 ,我们现在可以以编程式创建 topic。

我们需要添加 Spring Bean,它将自动为所有 类型的 Bean 添加 topic:

要创建消息,我们首先需要配置 。这将设定创建 Kafka Producer 实例的策略。

然后,我们需要一个 ,它封装了一个 实例,并提供向 Kafka topic 发送消息的便捷方法。

实例是线程安全的。在整个 application context 中使用单例会带来更高的性能。 实例也是线程安全的,因此,也建议只维护一个实例。

我们可以使用 类发送消息:

API 会返回一个 对象。如果我们想阻塞发送线程并获取发送信息的结果,可以调用 对象的 API。线程将等待结果,但这会减慢生产者的速度。

Kafka 是一个快速流处理平台。因此,最好以异步方式处理结果,这样后续消息就不必等待前一条消息的结果。

我们可以通过回调来实现:

为了消费消息,我们需要配置一个 和一个 。一旦 Spring Bean Factory 中的这些 Bean 可用,就可以使用 注解配置基于 POJO 的消费者。

配置类上需要使用 注解,以便在 Spring 管理的 Bean 上检测 注解:

我们可以为一个 topic 实现多个 listener,每个 listener 都有不同的 group Id.。此外,一个 consumer 可以监听来自不同 topic 的消息:

Spring 还支持在 listener 中使用 @Header 注解检索一个或多个 message header:

注意,我们创建的 topic 只有一个分区。

对于有多个分区的 topic, 可以通过初始偏移量(offset)显式地订阅 topic 的特定分区:

由于该 listener 中的 已设置为 0,因此每次初始化该 listener 时,都会重新消费之前从分区 0 和 3 中消费的所有消息。

如果不需要设置偏移量,我们可以使用 注解的 属性,只设置没有偏移量的分区:

我们可以通过添加自定义 filter 来配置 listener,以消费指定的消息内容。这可以通过向 设置 来实现:

然后,我们就可以在 listener 中配置来使用这个容器工厂:

在此 listener 中,所有与 filter 匹配的消息都将被丢弃。

到目前为止,我们只介绍了生产、消费字符串消息。我们也可以发送和接收自定义 Java 对象。这需要在 中配置适当的序列化器,并在 中配置反序列化器。

让我们来看看一个简单的 bean 类, 我们将把它作为信息发送:

在本例中,我们将使用 JsonSerializer。

让我们看看 和 的代码:

我们可以使用这个新的 发送 信息:

同样,让我们修改 和 ,以正确反序列化 信息:

spring-kafka JSON 序列化器和反序列化器使用的是 Jackson 库,它也是 项目的可选 Maven 依赖项。

把它添加到 中:

建议使用 所兼容的版本(定义在 中),而不是使用最新版本的 Jackson。

最后,我们需要编写一个 listener 来接收 信息:

现在让我们看看如何配置应用程序,将各种对象发送到同一个 topic,然后再消费它们。

首先,我们将添加一个新类 :

我们需要进行一些额外的配置,才能向同一个 topic 发送 和 对象。

在 producer 中,我们必须配置 JSON 类型映射:

这样,库就会在 type header 中填写相应的类名。

因此, 和 看起来就像这样:

我们可以使用该 向 topic 发送 、 或任何对象:

为了能够反序列化传入的消息,我们需要为 提供一个自定义的消息转换器()。

依赖于一个 。默认情况下,映射器会推断出接收对象的类型:相反,我们需要明确告诉它使用 type header 来确定反序列化的目标类:

我们还需要提供反向映射信息。在 type header 中找到 “greeting” 将被识别为一个 对象,而 “farewell” 对应于一个 对象。

最后,我们需要配置 typeMapper 信任的包。我们必须确保它包含目标类的位置:

下面是该 的最终定义:

现在,我们需要告诉 使用 和一个相当基本的 :

最后但同样重要的是,在我们的 中,我们要创建 handler method 来消费每个类型的对象。每个处理方法都需要用 进行注解。

最后要注意的是,我们还可以为无法绑定到 或 类的对象定义一个默认的 handler::

在本文中,我们介绍了 Spring 支持 Apache Kafka 的基础知识。我们简要介绍了用于生产和消费消息的类。

在运行代码之前,请确保 Kafka 服务已经运行,并且已经手动创建了 Topic。


参考:

  • 上一篇: java车识别教程
  • 下一篇: java路线教程
  • 版权声明


    相关文章:

  • java车识别教程2024-12-18 08:42:04
  • java13安装教程2024-12-18 08:42:04
  • java反向教程2024-12-18 08:42:04
  • java 网页应用教程2024-12-18 08:42:04
  • java项目视频教程2024-12-18 08:42:04
  • java路线教程2024-12-18 08:42:04
  • java教程1602024-12-18 08:42:04
  • java大牛编写教程2024-12-18 08:42:04
  • 最新java速成教程2024-12-18 08:42:04
  • java自学教程四2024-12-18 08:42:04