写在前面
最近一段时间都没有更新博客了,原因有点离谱,在实现flink的两阶段提交的时候,每次执行自定义的notifyCheckpointComplete时候,好像就会停止消费数据,完成notifyComplete后再消费数据;基于上述原因,开始了一些奇奇怪怪的探索…
存在问题:
1、JobMaster的提交过程,在ResourceManager上,但是为什么会用到zookeeper;–》高可用
2、执行图是否会放到Checkpoint里面,–》目前没有分析,背景是SQL添加字段后,ck就无法使用了
3、本地快照和远程快照的区别,回顾一下Checkpoint的过程—》flink本身支持本地快照和远程快照,其目的自然不言而喻;
总结:
- 之前一直不理解flink为什么好像和kafka总是有一种莫名其妙的联系,其实flink=通过mailbox处理数数据的过程本身就利用了消息队列的思想
- 源码剖析,通过对整个集群启动,执行图的转换,Task的启动,Checkpoint过程了解,这部分大量运用到了并发编程的知识,真的强;
- 下一阶段,先把优秀的源码Copy一遍,java基础搞起来;flink历史版本的重大变更也需要看一下;
1.Flink集群启动
1.1.Flink RPC详解
Flink的RPC实现:基于Scala的网络编程库:Akka
- ActorSystem是管理Actor生命周期的组件,Actor是负责进行通信的组
- 每个Actor都有一个Mailbox,别的Actor发送给它的消息都首先存储在Mailbox中,通过这种方式可以实现异步通信;
- 每个Actor是单线程的处理方式,不断的从Mailbox拉取消息执行处理,所以对于Actor的消息处理,不适合调用会阻塞的处理方法。
- Actor可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的Actor
- 每一个ActorSystem和Actor都在启动的时候会给定一个name,如果要从ActorSystem中,获取一个Actor,则通过以下的方式来进行Actor的获取:akka.tcp://asname@bigdata02:9527/user/actorname
- 如果一个 Actor 要和另外一个 Actor进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然
后通过该对象发送消息即可。 - 通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到
返回处理结果。
- RpcGateway:路由,RPC的老祖宗,其他个RPC组件,都是RpcGateWay的子类
- RpcServer:RpcService 和 RpcEndpoint 之间的粘合层
- RpcEndpoint: 业务逻辑载体,对应的 Actor 的封装
- RpcService:对应ActorSystem的封装
RpcEndpoint下面有四个比较重要的子类:TaskExecutor、Dispatcher、JobMaster、ResourceManager
1.2.Flink集群启动脚本分析
Flink 集群的启动脚本在:flink-dist 子项目中,位于 flink-bin 下的 bin 目录:启动脚本为:startcluster.sh
该脚本会首先调用 config.sh 来获取 masters 和 workers,masters 的信息,是从 conf/masters 配置
文件中获取的, workers 是从 conf/workers 配置文件中获取的。然后分别:
- 通过 jobmanager.sh 来启动 JobManager
- 通过 taskmanager.sh 来启动 TaskManager
他们的内部,都通过 flink-daemon.sh 脚本来启动 JVM 进程,分析 flink-daemon.sh 脚本发现:
- JobManager 的启动参数:standalonesession,实现类是:StandaloneSessionClusterEntrypoint
- TaskManager 的启动参数:taskexecutor,实现类是:TaskManagerRunner
1.3.Flink主节点JobManager启动分析
JobManager是Flink集群的主节点,它包含三大重要的组件:
- ResourceManager:Flink的集群资源管理器,只有一个,关于Slot的管理和申请等工作,都由他负责
- Dispatcher:负责接收用户提交的JobGraph,然后启动一个JobMaster
- JobMaster:负责一个具体的Job的执行,在一个集群中,可能会有多个JobMaster同时执行,类似于 YARN集群中的 AppMaster 角色,类似于 Spark Job 中的 Driver 角色
- WebMonitorEndpoint:里面维护了很多的handler,如果客户端通过flink run的方式提交一个Job到Flink集群,最终,是由WebMonitorEndpoint来接收,并且决定使用哪一个handler来执行处理,如:
根据以上的启动脚本分析:JobManager的启动主类:StandaloneSessionClusterEntrypoint
第一步,initializeServices()中做了很多服务组件的初始化:
第二步, createDispatcherResourceManagerComponentFactory(configuration)中负责初始化了很多组件的工厂实例;
第三步,dispatcherResourceManagerComponentFactory.create(…)中主要去创建三个重要的组件:
1.4.Flink从节点TaskManager启动分析
TaskManager:This class is the executable entry point(入口点) for the task manager in yarn or standalone mode.
TaskManager上的基本资源单位是Slot,一个作业的Task最终会部署在一个TaskManager的Slot上运行,TaskManager会维护本地的Slot资源列表,并与JobMaster和JobManager通信
根据以上的加班启动分析:TaskManager的启动主类:TaskManagerRunner
2. Flink 应用程序的提交
2.1.Flink Program编程流程总结
Flink底层提供了一个功能完善且复杂的分布式流式计算引擎,但是上层的应用API却很简单,简单来说,把整个Flink应用程序的编写,抽象成三个方面:
- 执行环境 ExecutionEnvironment
- 数据抽象 DataSet DataStream
- 逻辑操作 Source Transformation Sink
所以Flink的应用程序在编写的时候,基本是一个简单的统一套路:
在Flink应用程序中,其实所有的操作,都是StreamOperator,分为SourceOperator,SinkOperator,StreamOperator,然后能被优化的Operator就会Chain在一起,形成一个OperatorChain。
基本路数,和Spark一致,并且,在Flink-1.13版本后,将会完全统一批处理的API。
三个类似的概念:
- Function:函数
- Operator:对Function的封装
- Transformation:等价于Operator的概念,Flink中,process函数底层调用的依旧是transform
2.2.Flink Job提交脚本解析-Session模式
2.3.CliFronted提交分析
- 根据Flink后面的执行命令来确定执行方法(run===>run(params))
- 解析main参数,构建PackageProgram,然后执行PackageProgram
- 通过反射获取应用程序的main方法的实例,通过反射调用执行起来
总得来说,就是准备执行Program所需要的配置,jar包,运行主类等的必要信息,然后提交执行。
2.4.ExecutionEnvironment源码解析
Flink应用程序的执行,首先就是创建运行环境StreamExecutionEnvironment,一般在企业环境中,都是通过getExecutionEnvironment()来获取ExecutionEnvironment,如果是本地运行的话,则会获取到:LocalStreamEnvironment,如果是提交到Flink集群运行,则获取到:StreamExecutionEnvironment。
StreamExecutionEnvironment是Flink应用程序的执行入口,提供了一些重要的操作机制(包括但不限于):
- 提供了readTextFile(),socketTextStream(),createInput(),addSource()等方法去对接数据源
- 提供了setParallelism()设置程序的并行度
- StreamExecutionEnvironment管理了ExecutionConfig对象,该对象负责Job执行的一些行为配置管理
- StreamExecutionEnvironment管理了一个List<Transformation<?>> transformations 成员变量,该成员变了,主要用于保持Job的各种算子转化得到的Transformation,把这些Transformation按照逻辑拼接起来,就能得到StreamGraph(Transformation–>StreamOperator–>StreamNode)
- StreamExecutionEnvironment提供了execute()方法用于提交Job执行,该方法接收的参数就是:StreamGraph
2.5.Job提交流程源码分析
核心流程如下:
第一步:getStreamGraph(jobName)生成StreamGraph解析
transform(transformation)的内部实现:
第二步:execute(StreamGraph)解析
继续提交:
最终通过Channel把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。
2.6.Flink Graph演变
Flink的一个job,本质还是构建一个高效率的能用于分布式执行的DAG执行图。
- 帮我们把上下游两个相邻算作如果能Chain到一起,则Chain到一起做优化
- Chain到一起的多个Operator就会组成一个OperatorChain,当OperatorChain执行的时候,到底要执行多少个Task,则需要把DAG进行并行化变成实实在在的Task来调度执行。
一个Flink流式作业,从Client提交到Flink集群,到最后执行,总共会经历四种不同的状态。总得来说:
- Client 首先根据用户编写的代码生成StreamGraph,然后把StreamGraph构建成JobGraph交给Flink集群主节点
- 然后启动的JobMaster在接收到JobGraph后,会对其进行并行化生成ExecutionGraph后调度启动StreamTask执行。
- StreamTask并行化的运行在Flink集群中,就是最终的物理执行图状态结构。
没有java基础能学flink
- JobGraph中,数据从上一个Operator(JobVertex)流到下一个Operator(JobVertex)的过程中,上游作为生产者提供了IntermediateDataSet,而下游作为消费者需要JobEdge。事实上,JobEdge是一个通信管道,连接了上游生产的dataset和下游的JobVertex节点。【注:优化算子链以提高效率】
- 在JobGraph转换到ExecutionGraph的过程中,主要发生了以下转变:
1.加入了并行度的概念,成为真正可调度的图结构;
2.生成了与JobVertex对应的ExecutionJobVertex,ExecutionVertex;与IntermediateDataSet对应的IntermediateResult和IntermediateResultPartition;并行将通过这些类实现 - ExecutionGraph已经可以用于调度任务。Flink根据该图生成了一一对应的Task,每个Task对应一个ExecutionGraph的一个Execution。Task用InputGate、InputChannel和ResultPartition对对应了上面图中IntermediateResult和ExecutionEdge。
- StreamGraph是对用户逻辑的映射
- JobGraph在StreamGraph基础上进行了一些优化,比如吧一部分操作串成Chain以提高效率
- ExecutionGraph是为了调度存在的,加入了并行处理的概念
- 物理执行结构,真正执行的是Task及其相关结构。
2.6.1.StreamGraph构建和提交源码解析
- StreamNode:用来代表Operator的类,并具有所有相关的配置,如并发度、入边和出边等。
- StreamEdge:表示连接两个StreamNode的边。
源码核心代码入口:
StreamGraph生成过程中,生成StreamNode的代码入口:
StreamGraph生成过程中,生成StreamEdge的代码入口:
2.6.2.JobGraph构建和提交源码解析
- JobVertex:经过优化后符合条件的多个StreamNode可能会Chain子啊一起生成一个JobVertex,即一个JobVertex包含一个或多个Operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
- IntermediateDataSet:表示JobVertex的输出,即经过Operator处理产生的数据集。
producer是JobVertex,consumer是JobEdge。 - JobEdge:代表了Job Graph中的一条数据传输通道。Source是IntermediateDataSet,target事故JobVertex,即数据通过JobEdge由InterMediateDataSet传递给目标JobVertex。
源码核心代码入口:
注:这里的Pipeline其实本质上就是StreamGraph
经过层层递进:
在StreamGraph构建JobGraph的过程中,最重要的事情就是Operator的Chain优化,那么到底什么情况下的Operator能Chain在一起呢?
2.6.3.ExecutionGraph构建和提交源码解析
- ExecutionJobVertex:和JobGraph中的JobVertex一一对应;每一个ExecutionJobVertex都有和并发度一样的ExecutionVertex。
- ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。
- IntermediateResult:和JobGraph中IntermediateDataSet一一对应。一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该Operator的并发度。
- IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。
- ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。
- Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下
ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过ExecutionAttemptID 来确定消息接受者。
源码核心代码入口:
在SchedulerBase这个类的内部,有两个成员变量:一个是JobGraph,一个是ExecutionGraph;
在创建SchedulerBase这个类的子类:DefaultSchedule的实例对象的时候,会再SchedulerBase的构造方法中去生成ExecutionGraph。
源码核心流程:
2.6.4.物理执行图
- Task:Execution被调度后再分配的TaskManager中启动对应的Task,Task包裹了具有用户执行逻辑的Operator。
- ResultPartition:A result partition for data produced by a single task.
- ResultSubpartition:A single subpartition of a {@link ResultPartition} instance.
- InputGate:An input gate consumes one or more partitions of a single produced intermediate result.
- InputChannel:An input channel consumes a single {@link ResultSubpartitionView}.
2.7.WebMonitorEndpoint处理RetClient的JobSubmit请求
Dispatcher的提交执行逻辑:
接着上面的过程继续:
接着继续申请Slot然后部署:
3.1.Slot管理(申请和释放)源码解析
大体上,分为四个大步骤:
- JobManager发送请求申请Slot
- ResourceManager接收到请求,执行Slot请求处理
- TaskManager处理ResourceManager发送过来的Slot请求
- JobMaster接收到TaskManager发送过来的Slot申请处理结果
4.1.StreamTask初始化和执行
4.1.1.TaskExecutor执行一个Task
TaskExecutor接收提交Task执行的请求,则调用:
在该方法的内部,会封装一个Task对象,在Task的构造方法中,也做了一些相应的初始化动作:
4.1.2.SourceStreamTask和StreamTask初始化
前提知识:在最开始一个Job提交到Flink standalone集群运行的时候,在client构建StreamGraph(顶点是StreamNode,边是StreamEdge)的时候,会根据用户调用的算子生成Transformation为StreamGraph生成StreamNode,在生成StreamNode的时候,会通过OperatorFactory执行判断,如果该StreamOperator是StreamSource的时候,就会指定该StreamTask的invokableClass为SourceStreamTask,否则为(OneInputStreamTask,TwoInputStreamTask,StreamTask),核心代码如下:
因此,当ExecutionVertex真正被提交到TaskExecutor中运行的时候,被封装的Execution对应的Task类的启动类AbstractInvokeable就是在构建StreamGraph的时候指定的对应的invokableClass。即:
- 如果启动SourceStreamTask,则启动类是:SourceStreamTask
- 如果启动非SourceStreamTask,则启动类是StreamTask
拉起ResultPartitionWriter和InputGate的时候到底是怎么做的?
然后跳转到重载构造:
其中在SourceStreamTask的processInput()方法中,主要是启动接收数据的线程LegacySourceFunctionThread;在执行构造方法完毕后,LegacySourceFunctionThread已经初始化好了,但是 headOperator 还是null,所以,LegacySourceFunctionThread 还未真正启动。
4.1.3.SourceStreamTask和StreamTask执行
接下来要进入到StreamTask.invoke()方法执行,核心分为四个步骤:
在beforeInvoke()中,主要初始化OperatorChain,然后调用init()执行初始化,然后恢复状态,更改Task的状态isRuning=true;
在runMailboxLoop()中,主要是不停的处理mail,是Flink-1.10的改进,使用了mailbox模型来处理任务;
参考链接:http://matt33.com/2020/03/20/flink-task-mailbox/
在afterInvoke()中,主要是完成Task要结束之前需要完成的一些细节,比如,把buffer中的数据flush;
最后,在cleanUpInvoke()主要做一些资源的释放,执行各自关闭动作:set false,interrupt,shutdown,close,cleanup,dispose等;
首先看ChainOperator的初始化,首先会为每个Operator创建一个RecordWriterOutput,再为每个Operator创建一个OutputCollector;然后把每一个Operator都包装成OperatorWrapper放入List< StreamOperatorWrapper > allOpWrappers集合中。最后调用linkOperatorWrappers(allOpWrappers)方法以逻辑正序的方式来构建StreamOperator的链式关系。
然后就是init()方法,对于SourceStreamTask来说,就是看Source是不是ExternallyInduceSource,如果是,则注册一个savepoint钩子。对于OneInputStreamTask来说,主要就是创建CheckpointedInputGate,StreamTaskNetworkOutput,StreamTaskNetworkInput,StreamOneInputProcessor用来进行shuffle相关的数据传输。
到此为止,Task初始化和预执行相关的,都基本到位了,然后就开始我们的SourceStreamTask的HeadOperator的数据接收线程,开始流式处理。
核心代码入口:
4.2~4.3.State和Checkpoint的过程剖析
首先明确一个观点,Statebackend核心的工作是管理State(生命周期),State是用于状态数据,Checkpoint是实现快照的过程
- 假定现在一个Task在正常的处理数据,即从mailBox中不断的取出数据,处理数据;更新该Task的State,如:KeyedState通过CopyOnWriteMap实现状态更新,但是本身受到HashMapStateBackend管理,
- TaskManager接收到JobMaster发送的checkpoint请求,向SourceTask注入barrier,barrier首先会进入mailbox,处理完成后开始准成Checkpoint,完成的工作:
- 广播当前barrier到下游StreamTask
- 启动一个异步线程AsyncCheckpointRunnable完成快照,这个过程Task会继续处理数据;
- Task完成Checkpoint后,汇报JobManager完成了Checkpoint,JobManager通知Task完成Checkpoint(不是等所有Task完成后再通知),会在StreamTask的notifyCheckpointOperation方法中,产生一个command(可以理解为mailbox的Task,优先级为MAX_PRIORITY)完成notifyCheckpointComplete
其实Checkpoint过程本身说复杂不负责,但是对于每一个步骤的细节如何,似乎单纯死记硬背不是一个好的建议,下面给出网上一些大佬的帖子,真的再次感谢各位大佬对源码的分析:
Flink 源码阅读笔记(10)- State 管理
AsyncCheckpointRunnable
怎么理解flink的异步检查点机制
Flink 基于 MailBox 实现的 StreamTask 线程模型
Flink Mailbox模型
Apache Flink and the input data reading
分布式数据流的轻量级异步快照
论文阅读-Lightweight Asynchronous Snapshots for Distributed Dataflows
4.4.Checkpoint的触发
Checkpoint是Flink Default Tolerance机制的重要组成部分,Flink Checkpoint的核心类名为
4.4.1.Client端生成Checkpoint配置
Flink的应用程序,都是通过StreamExecutionEnvironment的execute()方法提交执行的,在StreamExecutionEnvironment初始化的时候,会调用:
方法来执行配置参数的初始化,其中就会涉及到:
将CheckpointConfig中的各自参数,封装成JobCheckpointingSettings对象,然后设置到JobGraph中由成员变量snapshotSettings来进行保存。
4.4.2.Checkpoint CheckpointCoordinator启动源码详解
然后再ExecutionGraphBuilder构建ExecutionGraph的时候,会生成CheckpointCoordinatorConfiguration对象,来保存成JobGraph中的snapshotSettings参数,最终该交给:
- 解析ExecutionGraph中的各种ExecutionVertex,设置到tasksToTrigger,tasksToWaitFor,tasksToCommitTo数组中
- 注册了CheckpointFailureManager组件,用来汇总Checkpoint的统计信息。
- 创建CheckpointFailureManager,管理Checkpoint失败后的策略
- 创建定时器CheckpointCoordinatorTimer(ScheduledExecutorService),用于定时触发Checkpoint
- 创建CheckpointCoordinator,并注册CheckpointCoordinatorDeActivator
首先是CheckCoordinator的触发机制,核心入口是:
checkpointCoordinator.createActivatorDeactivator()方法返回的是一个JobStatusListener,具体实现是:CheckpointCoordinatorDeActivator,它的作用是:当监听到Job的状态为JobStatus.RUNNING的时候,就开始执行CheckpointCoordinatorDeActivator.jobStatusChanges()的回调处理。而具体的间隔时间,一般都由用户自己设置。
总结一下:
4.4.3.CheckpointCoordinator Checkpoint执行源码详解
所以,真正开始执行Checkpoint的入口是:
内部具体通过scheduleTriggerWithDelay(getRandomInitDelay())来实现调度!
其中:getRandomInitDelay()存在意义是:ScheduledExecutor timer不要一上来就执行Checkpoint,而是等一段随机事件(在minPauseBetweenCheckpoints和baseInterval + 1L之间)。ScheduledTrigger就是定时调度的一个Checkpoint触发器。
具体的minPauseBetweenCheckpoints和baseInterval是多少,就看用户的设置是多少:
接下来看调度逻辑:
- coordinator处于shutdown状态
- 当前有排队的Checkpoint请求
- 当前pendingCheckpoints数量达到设定上限
- 与上一次Checkpoint间隔小于设定的最小值,如果间隔太小,会取消并重新设定调度器
- 如果Job的所有Source ExecutionVertex没有全处于RUNNING的状态的时候
4.4.4.Checkpoint TaskManager端处理state保存
当TaskManager接收到Checkpoint请求的时候,TaskManager端的Checkpoint分为两种情况:
- SourceStreamTask
- 其他StreamTask
当SourceStreamTask所在的TaskExecutor收到trigger Checkpoint消息,继续进行Checkpoint,核心入口是:
下一步调用:
下一步调用:获取Task的AbstractInvokeable。并生成CheckpointMetaData,然后执行SourceStreamTask的状态判断,继续调用Checkpoint还是取消:
经过一些跳转,最终跳转到:StreamTask
内部通过MailBox模型来调度执行,内部调用:
来触发执行Checkpoint。从这里开始,就进入到Mailbox的主线程来执行Checkpoint了。在该方法中,核心逻辑为:
在performCheckpoint()方法中,会调用:SubtaskCheckpointCoordinatorImpl的checkpointState() 执行 state 的快照!内部分为这么几个步骤:
事实上一个Job的所有Task的state的Checkpoint是由takeSnapshotSync来真正完成的。最底层会调用StreamOperatorStateHandler的snapshotState()方法来完成具体的工作,它的内部主要做三件事情:
4.4.5.Checkpoint CheckCoordinator端反馈处理
当上述,第四步完成的时候,第五步就可以对JobMaster进行Checkpoint状态汇报了。然后当TaskExecutor执行完Checkpoint之后,发送回反馈CheckCoordinator执行处理。
核心入口是:JobMaster.acknowledgeCheckpoint()方法
下面看详细流程:
4.5.Checkpoint State恢复源码剖析
JobMaster实例创建时,通过调用链:
版权声明:
本文来源网络,所有图片文章版权属于原作者,如有侵权,联系删除。
本文网址:https://www.bianchenghao6.com/h6javajc/20491.html