Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说
flume与kafka整合_flume采集数据到kafka,希望能够帮助你!!!。
引言
flume为什么要与kafka对接?
我们都知道flume可以跨节点进行数据的传输,那么flume与spark streaming对接不好吗?主要是flume对接到kafka的topic,可以给多个consumer group去生成多条业务线。虽然flume中的channel selector中的副本策略也可以给多个sink传输数据,但是每个channel selector都是很消耗资源的。其次,kafka也可以起到一个消峰的作用
这里为了方便测试,我采用的是netcat source、memory channel、kafka sink,当然你也可以采用你自己想要的方式配置flume,只需要根据官方文档修改对应的source和channel即可。
necat-flume-kafka.conf的配置文件如下:
#Name a1.sources = r1 a1.sinks = k1 a1.channels = c1 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = wjt a1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
其中你只需要修改sink中的topic和brokerList即可,当然你也可以增加其他的配置
1、启动kafka消费者
2、启动flume
3、启动netcat的客户端并发送几条数据
4、观察到kafka consumer很快就消费到了数据
如果数据有多种类型,比如点赞数据、评论数据、喜欢数据等等,是不是就要发往不同的topic去分析数据,这时候就需要用到flume的拦截器来做分类。
flume可以给event加上头信息,结合channel selector来发往不同的sink。
在flume官方文档可以看到:
意思是:如果你的event的头信息(k-v类型)包含一个topic字段,那么这个event将会被发送到对应的topic,并覆盖你配置的kafka.topic
拦截器的代码:
package wjt.demo; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @description: * @author: wanjintao * @time: 2020/8/29 11:45 */ public class myInterceptor implements Interceptor {
//声明一个存放事件的集合 private List<Event> addHeaderEvents; @Override public void initialize() {
//初始化存放事件的集合 addHeaderEvents = new ArrayList<>(); } //单个事件拦截 @Override public Event intercept(Event event) {
//1. 获取事件中的头信息 Map<String, String> headers = event.getHeaders(); //2. 获取事件中的body信息 String body = new String(event.getBody()); //3. 根据body中是否有“Hello”来决定是否添加头信息 if (body.contains("hello")) {
//4. 有hello添加“wan”头信息 headers.put("topic", "www1"); } else {
//4. 没有hello添加“tao”头信息 headers.put("topic", "www2"); } return event; } //批量事件拦截 @Override public List<Event> intercept(List<Event> events) {
//1. 清空集合 addHeaderEvents.clear(); //2. 遍历events for (Event event : events) {
//3. 给每一个事件添加头信息 addHeaderEvents.add(intercept(event)); } //4. 返回结果 return addHeaderEvents; } @Override public void close() {
} public static class Builder implements Interceptor.Builder {
@Override public Interceptor build() {
return new myInterceptor(); } @Override public void configure(Context context) {
} } }
你只需要修改单个事件拦截的代码即可,我这里是如果数据包含hello,将会给事件加上header(topic,www1),反之则给事件加上header(topic,www2),打包上传至flume/lib目录下
netcat-flume-typekafka.conf的配置文件:
#Name a1.sources = r1 a1.sinks = k1 a1.channels = c1 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #Interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = wjt.demo.myInterceptor$Builder #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = wjt a1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
你只需要将a1.sources.r1.interceptors.i1.type的值改为你上面的拦截器的全类名$Builder即可
1、先启动consumer1和consumer2(flume启动顺序都是先启动服务端在启动客户端)
2、启动flume
3、启动netcat客户端
4、观察consumer消费的topic可以看到,www1只接受到了包含hello的数据,www2只接受到了没有包含hello的数据
很多时候flume官方文档可以帮助我们解决很多自己想要的业务场景,我们要更多地去查看官方文档
今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
上一篇
已是最后文章
下一篇
已是最新文章