flink是干嘛的
Flink是一个框架和分布式处理引擎,用于对无限制和有限制的数据留进行有状态的计算。Flink被设计为可在所有常见的集群环境中运行,以内存速度和任何规模执行计算。任何类型的数据都是作为事件流产生的。信用卡交易,传感器测量,机器日志或网站移动应用程序上的用户交互,所有这些数据均作为流生成。Flink擅长处理无边界和有界的数据集。对事件和状态的精确控制使Flink的运行时能够在无限制的流上运行任何类型的应用程序。有界流由专门为固定大小的数据集设计的算法和数据结构在内部进行处理,从而产生出色的性能。随部署应用程序:Flink是一个分布式系统,需要计算资源才能执行应用程序。Flink与所有常见的集权资源管理器(如Hadoop YARN,Mesos和Kubernetes)集成,但也可以设置为作为独立集群运行。Flink旨在与前面列出的每个资源管理器兼容。这是通过特定于资源管理器的部署模式实现的,该模式允许Flink惯用方式与每个资源管理器进行交互。部署Flink应用程序时,Flink会根据应用程序配置的并行性自动识别所需的资源,并向资源管理器请求它们。如果发生故障,Flink会通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信均通过REST调用进行。简化了Flink在许多环境中的集成。
flink是什么意思
Flink是什么?Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得Flink的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。部署应用到任何地方Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如Hadoop YARN、 Apache Mesos和 Kubernetes,但同时也可以作为独立集群运行。
Flink架构、原理
Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。 现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们所提供的SLA(Service-Level-Aggreement)是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理。 Flink从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的; 批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。 Flink流处理特性: Flink以层级式系统形式组件其软件栈,不同层的栈建立在其下层基础上,并且各层接受程序不同层的抽象形式。 1. 流、转换、操作符 Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。 Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。 2. 并行数据流 一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask,每一个Operator Subtask是在不同的线程中独立执行的。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度总是等于生成它的Operator的并行度。 One-to-one模式 比如从Source[1]到map()[1],它保持了Source的分区特性(Partitioning)和分区内元素处理的有序性,也就是说map()[1]的Subtask看到数据流中记录的顺序,与Source[1]中看到的记录顺序是一致的。 Redistribution模式 这种模式改变了输入数据流的分区,比如从map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多个不同的Subtask发送数据,改变了数据流的分区,这与实际应用所选择的Operator有关系。 3.任务、操作符链 Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。 4. 时间 处理Stream中的记录时,记录中通常会包含各种典型的时间字段: Event Time:表示事件创建时间 Ingestion Time:表示事件进入到Flink Dataflow的时间 Processing Time:表示某个Operator对事件进行处理的本地系统时间 Flink使用WaterMark衡量时间的时间,WaterMark携带时间戳t,并被插入到stream中。 5. 窗口 Flink支持基于时间窗口操作,也支持基于数据的窗口操作: 窗口分类: Tumbling/Sliding Time Window // Stream of (sensorId, carCnt) val vehicleCnts: DataStream[(Int, Int)] = ... val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts // key stream by sensorId .keyBy(0) // tumbling time window of 1 minute length .timeWindow(Time.minutes(1)) // compute sum over carCnt .sum(1) val slidingCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // sliding time window of 1 minute length and 30 secs trigger interval .timeWindow(Time.minutes(1), Time.seconds(30)) .sum(1) Tumbling/Sliding Count Window // Stream of (sensorId, carCnt) val vehicleCnts: DataStream[(Int, Int)] = ... val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts // key stream by sensorId .keyBy(0) // tumbling count window of 100 elements size .countWindow(100) // compute the carCnt sum .sum(1) val slidingCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // sliding count window of 100 elements size and 10 elements trigger interval .countWindow(100, 10) .sum(1) 自定义窗口 基本操作: 6. 容错 Barrier机制: 对齐: 当Operator接收到多个输入的数据流时,需要在Snapshot Barrier中对数据流进行排列对齐: 基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。 CheckPoint: Snapshot并不仅仅是对数据流做了一个状态的Checkpoint,它也包含了一个Operator内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。状态包含两种: 7. 调度 在JobManager端,会接收到Client提交的JobGraph形式的Flink Job,JobManager会将一个JobGraph转换映射为一个ExecutionGraph,ExecutionGraph是JobGraph的并行表示,也就是实际JobManager调度一个Job在TaskManager上运行的逻辑视图。 物理上进行调度,基于资源的分配与使用的一个例子: 8. 迭代 机器学习和图计算应用,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括Iterate和Delta Iterate两种类型。 Iterate Iterate Operator是一种简单的迭代形式:每一轮迭代,Step函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果。 流程伪代码: IterationState state = getInitialState(); while (!terminationCriterion()) { state = step(state); } setFinalState(state); Delta Iterate Delta Iterate Operator实现了增量迭代。 流程伪代码: IterationState workset = getInitialState(); IterationState solution = getInitialSolution(); while (!terminationCriterion()) { (delta, workset) = step(workset, solution); solution.update(delta) } setFinalState(solution); 最小值传播: 9. Back Pressure监控 流处理系统中,当下游Operator处理速度跟不上的情况,如果下游Operator能够将自己处理状态传播给上游Operator,使得上游Operator处理速度慢下来就会缓解上述问题,比如通过告警的方式通知现有流处理系统存在的问题。 Flink Web界面上提供了对运行Job的Backpressure行为的监控,它通过使用Sampling线程对正在运行的Task进行堆栈跟踪采样来实现。 默认情况下,JobManager会每间隔50ms触发对一个Job的每个Task依次进行100次堆栈跟踪调用,过计算得到一个比值,例如,radio=0.01,表示100次中仅有1次方法调用阻塞。Flink目前定义了如下Backpressure状态: OK: 0 <= Ratio <= 0.10 LOW: 0.10 < Ratio <= 0.5 HIGH: 0.5 < Ratio <= 1 1. Table Flink的Table API实现了使用类SQL进行流和批处理。 详情参考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html 2. CEP Flink的CEP(Complex Event Processing)支持在流中发现复杂的事件模式,快速筛选用户感兴趣的数据。 详情参考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html#next-steps 3. Gelly Gelly是Flink提供的图计算API,提供了简化开发和构建图计算分析应用的接口。 详情参考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/gelly/index.html 4. FlinkML FlinkML是Flink提供的机器学习库,提供了可扩展的机器学习算法、简洁的API和工具简化机器学习系统的开发。 详情参考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/index.html 明天更新部署与测试 本文仅代表个人的观点,如果阐述的不好欢迎大家指导纠正,在此感激不尽。
flink流处理特点
flink的流处理特性:支持高吞吐量、低延迟和高性能流处理。支持带事件时间的窗口操作。支持有状态计算的恰好一次语义支持高度灵活的窗口操作,支持基于时间、计数、会话和数据驱动的窗口操作。支持带背压功能的连续流模型基于轻量级分布式快照支持容错运行时支持流处理上的批处理和流处理。Flink在JVM中实现了自己的内存管理。支持迭代计算支持程序自动优化:避免特定情况下的洗牌、排序等代价高昂的操作,需要缓存中间结果。API支持数据流API是为流数据类应用程序提供的。对于批处理应用,提供数据集API(支持Java/Scala)图书馆支持支持机器学习(FlinkML)支持图分析(Gelly)支持关系数据处理(表)支持复杂事件处理(CEP)支持集成在纱线上支撑弗林克HDFS支持支持来自Kafka的输入数据Apache HBase支持Hadoop程序支持支持超光速粒子支持ElasticSearch。支持兔子阿帕奇风暴支援S3支持XtreemFS支持
Flink基础系列28-Flink容错机制
在执行流应用程序期间,Flink 会定期保存状态的一致检查点 如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程 (如图中所示,7这个数据被source读到了,准备传给奇数流时,奇数流宕机了,数据传输发生中断) 遇到故障之后,第一步就是重启应用 (重启后,起初流都是空的) 第二步是从 checkpoint 中读取状态,将状态重置 (读取在远程仓库(Storage,这里的仓库指状态后端保存数据指定的三种方式之一)保存的状态) 从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同 第三步:开始消费并处理检查点到发生故障之间的所有数据 这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置 (这里要求source源也能记录状态,回退到读取数据7的状态,kafka有相应的偏移指针能完成该操作) 概述 checkpoint和Watermark一样,都会以广播的形式告诉所有下游。 具体讲解 JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点 (这个带有新检查点ID的东西为barrier,由图中三角型表示,数值2只是ID) 数据源将它们的状态写入检查点,并发出一个检查点barrier 状态后端在状态存入检查点之后,会返回通知给source任务,source任务就会向JobManager确认检查点完成 上图,在Source端接受到barrier后,将自己此身的3 和 4 的数据的状态写入检查点,且向JobManager发送checkpoint成功的消息,然后向下游分别发出一个检查点 barrier 可以看出在Source接受barrier时,数据流也在不断的处理,不会进行中断 此时的偶数流已经处理完蓝2变成了4,但是还没处理到黄4,只是下游sink发送了一个数据4,而奇数流已经处理完蓝3变成了8(黄1+蓝1+黄3+蓝3),并向下游sink发送了8 此时检查点barrier都还未到Sum_odd奇数流和Sum_even偶数流 分界线对齐:barrier向下游传递,sum任务会等待所有输入分区的barrier到达 对于barrier已经达到的分区,继续到达的数据会被缓存 而barrier尚未到达的分区,数据会被正常处理 此时蓝色流的barrier先一步抵达了偶数流,黄色的barrier还未到,但是因为数据的不中断一直处理,此时的先到的蓝色的barrier会将此时的偶数流的数据4进行缓存处理,流接着处理接下来的数据等待着黄色的barrier的到来,而黄色barrier之前的数据将会对缓存的数据相加 这次处理的总结:分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达,对于barrier已经到达的分区,继续到达的数据会被缓存。而barrier尚未到达的分区,数据会被正常处理 当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发 当蓝色的barrier和黄色的barrier(所有分区的)都到达后,进行状态保存到远程仓库,然后对JobManager发送消息,说自己的检查点保存完毕了 此时的偶数流和奇数流都为8 向下游转发检查点 barrier 后,任务继续正常的数据处理 Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕 当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了 CheckPoint为自动保存,SavePoint为手动保存 有状态的流处理,内部每个算子任务都可以有自己的状态 对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。 一条数据不应该丢失,也不应该重复计算 在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。 Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。 Flink使用了一种轻量级快照机制——检查点(checkpoint)来保证exactly-once语义 有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份备份(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时间。 应用状态的一致检查点,是Flink故障恢复机制的核心 端到端(end-to-end)状态一致性 目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如Kafka)和输出到持久化系统 端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性 整个端到端的一致性级别取决于所有组件中一致性最弱的组件 端到端 exactly-once 幂等写入 所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。 (中间可能会存在不正确的情况,只能保证最后结果正确。比如5=>10=>15=>5=>10=>15,虽然最后是恢复到了15,但是中间有个恢复的过程,如果这个过程能够被读取,就会出问题。) 事务写入 预写日志(Write-Ahead-Log,WAL) 把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统 简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定 DataStream API提供了一个模版类:GenericWriteAheadSink,来实现这种事务性sink 两阶段提交(Two-Phase-Commit,2PC) 对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收到的数据添加到事务里 然后将这些数据写入外部sink系统,但不提交它们——这时只是"预提交" 这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口 不同Source和Sink的一致性保证 内部——利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性 source——kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重制偏移量,重新消费数据,保证一致性 sink——kafka producer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction Exactly-once 两阶段提交 JobManager 协调各个 TaskManager 进行 checkpoint 存储 checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存 当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流 barrier会在算子间传递下去 每个算子会对当前的状态做个快照,保存到状态后端 checkpoint 机制可以保证内部的状态一致性 每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里 sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务;遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务 (barrier之前的数据还是在之前的事务中没关闭事务,遇到barrier后的数据另外新开启一个事务) 当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成 sink 任务收到确认通知,正式提交之前的事务,kafka 中未确认数据改为“已确认” Exactly-once 两阶段提交步骤总结
关于 Flink checkpoint,都在这里(二)
目前业界主流的实时计算框架包括 Flink、Spark Streaming、Storm。相比于 Batch,Stream 的容错则需要考虑更多。Batch 数据通常基于稳定性较高的分布式存储进行数据的读写(如 HDFS、S3 等),当数据计算出现异常时可以通过重新计算的方式保证最终结果的一致性,Spark 就是基于这样的思路设计的,它通过 lineage 机制来重新计算。并且 Batch 计算往往不需要过多的考虑数据的时效性,而且不需要做到 7×24 小时的运行。但相对于 Stream 而言则会更加复杂。 对于 Stream 而言需要面对不同的流式数据源,可以是 File Stream、队列(如 Kafka),甚至可能是某个服务发来的消息。数据源的多样性就注定了 Stream 的容错需要重新进行考虑。并且 Stream 数据的容错需要在短时间内进行恢复,否则将可能会导致大量的数据积压甚至丢失,因为 Stream 数据链路不会因为下游处理任务的异常而停止数据的产出。 让我们将问题描述的更具体一些,方便更清楚的了解 Stream 的容错思想。对于分布式计算而言,目前主流的思路都是采用 Master-Slave 架构。Master 主要用于进行 Slave 节点的管理(比如检测 Slave 是否存活,状态管理等),Slave 主要是担当数据计算的职责。因此,从容错角度而言分为: Master 容错相对而言较为简单,因为不需要直接参与数据计算。主要分为有状态的 Master 和无状态的 Master 两类。 像 Storm 这类无状态的实时计算框架,Master(即 Storm 的 Nimbus 节点)的异常往往不影响 Slave(即 Storm worker 节点)的数据计算,只需要重新启动一个 Master 即可,这个过程中不需要进行 Master 状态的恢复,也不会影响实时数据的处理。甚至 Slave 节点在无感知的情况下就完成了 Master 的恢复。但是这种方式会牺牲一定的功能,实时计算框架本身无法支持状态流的处理。 像 Flink 、 Spark Streaming 这类包含状态的实时计算框架,需要恢复 Master 节点的同时还需要对其状态进行恢复,Master 状态信息包含一些必要的配置、以及对 Slave 节点状态管理的信息(如“某个 Slave 节点的状态快照所在的 HDFS 路径”)。Spark Streaming、Flink 的做法都是基于 checkpoint 机制对 Master 节点的状态进行备份,异常发生时需要基于上一次的状态备份进行恢复。 Flink 还提供了 HA 机制,即同时运行至少 2 个 JobManager 节点,但是只有其中一个真正的处理管理事务(称为主节点——Leader),其他的仅仅保持状态信息的同步(称为从节点——Standby)。一旦 Leader 发生异常,其中一个 Standby 将会代替异常节点继续进行任务的管理。 更多关于 Flink HA 可以参考官方文档 。这种机制是牺牲更多的资源来换取任务的稳定性,主从切换的成本相比于从状态备份中恢复速度更快。 Stream 数据处理整体而言可以分为 3 部分: 根据不同的保障级别,Stream 数据容错级别又分为 3 种语义: 我们之所以将数据接收和写入单独拿出来,是因为在面对不同的数据源时,实时框架的容错机制与最高语义保障级别是不同的。如 Flink 而言,它的 exactly-once 语义总的来说是针对于数据处理阶段而言,即只有框架内数据的处理可以保障 exactly-once,而数据的接收、写入是否是 exactly-once 语义取决于数据源本身与 Source、Sink 算子的实现逻辑。通常来说,我们将能够保障数据接收、数据处理、数据接入整体数据一致性称为 端到端(end-to-end) 的数据一致性。 端到端的数据一致性保障相对而言是很复杂的,因为数据源的种类众多,这些一般都不是分布式实时框架中的一部分,数据的发送与接收逻辑不受实时框架的控制。 对于 Storm 而言,框架内仅提供了相关的接口用于用户自己实现一致性语义,并没有直接提供各种存储的一致性 Spouts,数据写入也是如此。数据处理过程提供 at-least-once 语义保障(exactly-once 语义由 Storm Trident 提供了保障,本篇暂不做讨论)。Storm 通过 ACK 的机制保证 at-least-once 语义。简单来说,当 Storm 接收到一条数据后将会给这条数据唯一的 id,数据被下游 Bolts 处理但是处理后的 id 不会发生改变,当且仅当该 id 的数据经过的 Bolts 全部 ACK 后才认定该数据被 彻底处理(fully processed) ,否则 Spout 将再次发送该数据直到数据被彻底处理。 Spark Streaming 的数据接收通过预写入日志的机制保障了 at-least-once 语义。简单来说,就是将接收到的数据以日志的形式写入到稳定的存储中(存储位置基于 checkpoint 配置获取),这样一来就与数据源解耦,可以基于预写入日志实现数据重发的能力,从而保障 at-least-once 语义。数据处理过程中基于 RDD 的容错机制进行恢复,提供了精确一次的语义。数据写入需要用户自己实现,Spark Streaming 提供了两种思路:幂等写入和事务性写入。 Flink 全局基于 checkpoint 进行容错,通过向流数据中插入特殊的事件——checkpoint barrier 来触发各个算子制作状态快照,快照会写入到持久化的存储中。Flink Source、Sink 的语义保障需要依赖数据源以及自身的实现逻辑。但是 Flink 提供了多种状态接口,如 ListState、MapState,用于进行算子状态的记录,状态容错可以保障 exactly-once 语义。这也是与 Spark Streaming 的不同之处。 到这里我们大致了解了各个框架的容错机制,我们不禁想回味一下:分布式实时计算框架的容错机制的本质是什么?容错到底在保障什么? 从本质上讲,容错在保障数据可以被正确的处理,即使在发生异常的情况下。实时流处理的正确性又体现在 处理过程的完整性 与 时序的正确性 。即一条数据要被所有的逻辑完整的处理一次(根据语义的不同也可能是多次),且多条数据之间的处理的时序不发生改变。 举个例子,如下图所示的数据流 DAG 图中,流数据序列 [1, 2, 3, …, n] 被输入到 A 中,然后最终流向 D。完整性即每一个事件都被完整的 DAG 路径处理,即 A -> B -> D 或 A -> C -> D ,时序性即事件 1 永远先于事件 2 被处理,即使在发生了异常后恢复的情况下也是如此。 从整体来看,实时分布式计算框架的容错机制核心思想是 确认 与 重试 ,但是不同的框架重试过程中回滚的数据量不同。 Storm 通过 ACK 机制判断数据是否完整处理,否则将重发数据,重新进行计算。这种单条数据粒度的 ACK 与重试机制即可以保障时序性,也可以保障处理过程的完整性。但是这样细的粒度牺牲了一定的性能。Storm 并没有将数据流进行冗余存储来保障容错,从这个角度而言它的容错是轻量级的。 Spark Streaming 通过微批次的方式将数据进行截断,以批次为单位进行容错。这种方式一旦发生了异常,可以从上一个批次中恢复继续执行。这种机制从一定程度上提升了性能,但是对时效性有损。因为微批次的思路对数据流进行了截断,时间语义上的单位时间也只能根据批次的大小来界定。Spark Streaming 提供了数据流的冗余(预写入日志)可以真正做到与数据源解耦,对于所有的数据源均可以保障容错的语义,但是这类的容错是重量级的。 Flink 的思路也是对数据进行截断,从而可以分段治之。相比于 Spark Streaming 而言这种截断并没有改变数据流的连续性,时间语义上的单位时间仍然是以事件粒度来界定。并且 Flink 不会对数据流进行冗余(虽然 unaligned-checkpoint 会产生一部分的数据冗余,但是与 Spark Streaming 这种全部数据冗余的思路是不同的),只关注计算中的状态容错。这种思路较为轻量级,并且能够保障 exactly-once 语义。但是这种思路无法应对所有的数据源场景,需要强依赖数据源的实现与 Source、Sink 算子的逻辑。 总体而言,实时流的容错核心是基于 数据截断 和 重试机制 。Storm 的数据截断粒度是单条数据级别的,通过 ACK 的机制实现的重试机制,此截断粒度不会影响数据的时效性。Spark Streaming 的截断粒度是微批次的,截断会影响数据的时效性,然后通过数据冗余的方式保障了重试机制,这种冗余数据的方式可以面对任何数据源时都能够保证数据一致性。Flink 是基于 checkpoint barrier 将数据流截断,barrier 会随着数据流而流动,避免了流量截断带来的时效性影响,并且 Flink 容错只关注状态,借助状态的回滚来保证数据一致性。 从容错实现来看,三种框架的侧重点有所不同。Storm 作为无状态计算框架,采用的是非常简单有效的机制保障容错。Spark Streaming 更注重数据的可恢复性,希望通过备份原始数据能够在任何情况下、面对任何数据源都能够保证数据一致性。Flink 相对而言更加轻量,更注重数据的时效性,不希望容错机制带来太多的时效性损失(例如 unaligned-checkpoint)。 感谢你读到这里,希望你现在对 Flink 容错机制和其他的实时计算框架的容错机制有了一个基本的了解,对其容错思路和本质有了不同的想法。 下一篇 我们将讨论 Flink checkpoint 的数据结构,探索它究竟是如何存储的?都存储了哪些内容?基于这些备份数据如何在异常中恢复? 可可 @ 欢迎邮件联系我
Flink1.13 Checkpoint原理
Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"( checkpoint )的特性,在出现 故障时将系统重置回正确状态 。下面通过简单的类比来解释检查点的作用。 假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢? 如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。 于是,你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开; 当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。 Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。 checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性. . 每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator ,CheckpointCoordinator全权负责本应用的快照制作。 流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动 (有点类似于Watermark). 这些barrier不会跨越流中的数据 . 每个barrier会把数据流分成两部分: 一部分数据进入 当前的快照 , 另一部分数据进入 下一个快照 . 每个barrier携带着快照的id. barrier 不会暂停数据 的流动, 所以非常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照. Job Manager 对每一个job都会产生一个 Checkpoint Coordinator 向所有 source 节点 触发 trigger Checkpoint 节点, 并行度是几,就会触发多少个。 source 会向流中触发 Barrier ,接收到 Barrier 的节点就会保存快照(包括source)。 表示两秒钟 source 向流中触发一次 Barrier source先收到 barrier ,然后往后传递,若是多并行度,相当于多组接力赛跑比赛,所以顺序是不一致的,并不是同步。 在多并行度下, 如果要实现 严格一次 , 则要执行 barrier对齐 . 当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。 会 重复消费 , 就是至少一次语义.
Apache Flink快速入门-基本架构、核心概念和运行流程
Flink是一个基于流计算的分布式引擎,以前的名字叫stratosphere,从2010年开始在德国一所大学里发起,也是有好几年的 历史 了,2014年来借鉴了社区其它一些项目的理念,快速发展并且进入了Apache顶级孵化器,后来更名为Flink。 Flink在德语中是快速和灵敏的意思 ,用来体现流式数据处理速度快和灵活性强等特点。 Flink提供了同时支持高吞吐、低延迟和exactly-once 语义的实时计算能力,另外Flink 还提供了基于流式计算引擎处理批量数据的计算能力,真正意义上实现了流批统一。 Flink 独立于Apache Hadoop,且能在没有任何 Hadoop 依赖的情况下运行。 但是,Flink 可以很好的集成很多 Hadoop 组件,例如 HDFS、YARN 或 HBase。 当与这些组件一起运行时,Flink 可以从 HDFS 读取数据,或写入结果和检查点(checkpoint)/快照(snapshot)数据到 HDFS 。 Flink 还可以通过 YARN 轻松部署,并与 YARN 和 HDFS Kerberos 安全模块集成。 Flink具有先进的架构理念、诸多的优秀特性,以及完善的编程接口。 Flink的具体优势有如下几点: (1)同时支持高吞吐、低延迟、高性能; (2)支持事件时间(Event Time)概念; 事件时间的语义使流计算的结果更加精确,尤其在事件到达无序或者延迟的情况下,保持了事件原本产生时的时序性,尽可能避免网络传输或硬件系统的影响。 (3)支持有状态计算; 所谓状态就是在流计算过程中,将算子的中间结果数据保存在内存或者文件系统中,等下一个事件进入算子后,可以从之前的状态中获取中间结果,计算当前的结果,从而无需每次都基于全部的原始数据来统计结果。 (4)支持高度灵活的窗口(Window)操作; (5)基于轻量级分布式快照(Snapshot)实现的容错; (6)基于JVM实现独立的内存管理; (7)Save Points(保存点); 保存点是手动触发的,触发时会将它写入状态后端(State Backends)。Savepoints的实现也是依赖Checkpoint的机制。Flink 程序在执行中会周期性的在worker 节点上进行快照并生成Checkpoint。因为任务恢复的时候只需要最后一个完成的Checkpoint的,所以旧有的Checkpoint会在新的Checkpoint完成时被丢弃。Savepoints和周期性的Checkpoint非常的类似,只是有两个重要的不同。一个是由用户触发,而且不会随着新的Checkpoint生成而被丢弃。 在Flink整个软件架构体系中,统一遵循了分层的架构设计理念,在降低系统耦合度的同时,为上层用户构建Flink应用提供了丰富且友好的接口。 整个Flink的架构体系可以分为三层: Deployment层: 该层主要涉及了Flink的部署模式,Flink支持多种部署模式:本地、集群(Standalone/YARN),云(GCE/EC2),Kubernetes等。 Runtime层:Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。 API层: 主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。 Libraries层:该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的计算框架,也分别对应于面向流处理和面向批处理两类。 核心概念:Job Managers,Task Managers,Clients Flink也是典型的master-slave分布式架构。Flink的运行时,由两种类型的进程组成: Client: Client不是运行时和程序执行的一部分,它是用来准备和提交数据流到JobManagers。之后,可以断开连接或者保持连接以获取任务的状态信息。 当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager, JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。 TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。 每个Worker(Task Manager)是一个JVM进程,通常会在单独的线程里执行一个或者多个子任务。为了控制一个Worker能够接受多少个任务,会在Worker上抽象多个Task Slot (至少一个)。 只有一个slot的TaskManager意味着每个任务组运行在一个单独JVM中。 在拥有多个slot的TaskManager上,subtask共用JVM,可以共用TCP连接和心跳消息,同时可以共用一些数据集和数据结构,从而减小任务的开销。 Flink的任务运行其实是多线程的方式,这和MapReduce多JVM进程的方式有很大的区别,Flink能够极大提高CPU使用效率,在多个任务之间通过TaskSlot方式共享系统资源,每个TaskManager中通过管理多个TaskSlot资源池对资源进行有效管理。
转载:阿里巴巴为什么选择Apache Flink?
本文主要整理自阿里巴巴计算平台事业部资深技术专家莫问在云栖大会的演讲。 合抱之木,生于毫末 随着人工智能时代的降临,数据量的爆发,在典型的大数据的业务场景下数据业务最通用的做法是:选用批处理的技术处理全量数据,采用流式计算处理实时增量数据。在绝大多数的业务场景之下,用户的业务逻辑在批处理和流处理之中往往是相同的。但是,用户用于批处理和流处理的两套计算引擎是不同的。 因此,用户通常需要写两套代码。毫无疑问,这带来了一些额外的负担和成本。阿里巴巴的商品数据处理就经常需要面对增量和全量两套不同的业务流程问题,所以阿里就在想,我们能不能有一套统一的大数据引擎技术,用户只需要根据自己的业务逻辑开发一套代码。这样在各种不同的场景下,不管是全量数据还是增量数据,亦或者实时处理,一套方案即可全部支持, 这就是阿里选择Flink的背景和初衷 。 目前开源大数据计算引擎有很多选择,流计算如Storm,Samza,Flink,Kafka Stream等,批处理如Spark,Hive,Pig,Flink等。而同时支持流处理和批处理的计算引擎,只有两种选择:一个是Apache Spark,一个是Apache Flink。 从技术,生态等各方面的综合考虑。首先,Spark的技术理念是基于批来模拟流的计算。而Flink则完全相反,它采用的是基于流计算来模拟批计算。 从技术发展方向看,用批来模拟流有一定的技术局限性,并且这个局限性可能很难突破。而Flink基于流来模拟批,在技术上有更好的扩展性。从长远来看,阿里决定用Flink做一个统一的、通用的大数据引擎作为未来的选型。 Flink是一个低延迟、高吞吐、统一的大数据计算引擎。在阿里巴巴的生产环境中,Flink的计算平台可以实现毫秒级的延迟情况下,每秒钟处理上亿次的消息或者事件。同时Flink提供了一个Exactly-once的一致性语义。保证了数据的正确性。这样就使得Flink大数据引擎可以提供金融级的数据处理能力。 Flink在阿里的现状 基于Apache Flink在阿里巴巴搭建的平台于2016年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。目前阿里巴巴所有的业务,包括阿里巴巴所有子公司都采用了基于Flink搭建的实时计算平台。同时Flink计算平台运行在开源的Hadoop集群之上。采用Hadoop的YARN做为资源管理调度,以 HDFS作为数据存储。因此,Flink可以和开源大数据软件Hadoop无缝对接。 目前,这套基于Flink搭建的实时计算平台不仅服务于阿里巴巴集团内部,而且通过阿里云的云产品API向整个开发者生态提供基于Flink的云产品支持。 Flink在阿里巴巴的大规模应用,表现如何? 规模: 一个系统是否成熟,规模是重要指标,Flink最初上线阿里巴巴只有数百台服务器,目前规模已达上万台,此等规模在全球范围内也是屈指可数; 状态数据: 基于Flink,内部积累起来的状态数据已经是PB级别规模; Events: 如今每天在Flink的计算平台上,处理的数据已经超过万亿条; PS: 在峰值期间可以承担每秒超过4.72亿次的访问,最典型的应用场景是阿里巴巴双11大屏; Flink的发展之路 接下来从开源技术的角度,来谈一谈Apache Flink是如何诞生的,它是如何成长的?以及在成长的这个关键的时间点阿里是如何进入的?并对它做出了那些贡献和支持? Flink诞生于欧洲的一个大数据研究项目StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出Flink,同年将Flink捐赠Apache,并在后来成为Apache的顶级大数据项目,同时Flink计算的主流方向被定位为Streaming,即用流式计算来做所有大数据的计算,这就是Flink技术诞生的背景。 2014年Flink作为主攻流计算的大数据引擎开始在开源大数据行业内崭露头角。区别于Storm,Spark Streaming以及其他流式计算引擎的是:它不仅是一个高吞吐、低延迟的计算引擎,同时还提供很多高级的功能。比如它提供了有状态的计算,支持状态管理,支持强一致性的数据语义以及支持Event Time,WaterMark对消息乱序的处理。 Flink核心概念以及基本理念 Flink最区别于其他流计算引擎的,其实就是状态管理。 什么是状态?例如开发一套流计算的系统或者任务做数据处理,可能经常要对数据进行统计,如Sum,Count,Min,Max,这些值是需要存储的。因为要不断更新,这些值或者变量就可以理解为一种状态。如果数据源是在读取Kafka,RocketMQ,可能要记录读取到什么位置,并记录Offset,这些Offset变量都是要计算的状态。 Flink提供了内置的状态管理,可以把这些状态存储在Flink内部,而不需要把它存储在外部系统。这样做的好处是第一降低了计算引擎对外部系统的依赖以及部署,使运维更加简单;第二,对性能带来了极大的提升:如果通过外部去访问,如Redis,HBase它一定是通过网络及RPC。如果通过Flink内部去访问,它只通过自身的进程去访问这些变量。同时Flink会定期将这些状态做Checkpoint持久化,把Checkpoint存储到一个分布式的持久化系统中,比如HDFS。这样的话,当Flink的任务出现任何故障时,它都会从最近的一次Checkpoint将整个流的状态进行恢复,然后继续运行它的流处理。对用户没有任何数据上的影响。 Flink是如何做到在Checkpoint恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计算的? 这其中原因是Flink利用了一套非常经典的Chandy-Lamport算法,它的核心思想是把这个流计算看成一个流式的拓扑,定期从这个拓扑的头部Source点开始插入特殊的Barries,从上游开始不断的向下游广播这个Barries。每一个节点收到所有的Barries,会将State做一次Snapshot,当每个节点都做完Snapshot之后,整个拓扑就算完整的做完了一次Checkpoint。接下来不管出现任何故障,都会从最近的Checkpoint进行恢复。 Flink利用这套经典的算法,保证了强一致性的语义。这也是Flink与其他无状态流计算引擎的核心区别。 下面介绍Flink是如何解决乱序问题的。比如星球大战的播放顺序,如果按照上映的时间观看,可能会发现故事在跳跃。 在流计算中,与这个例子是非常类似的。所有消息到来的时间,和它真正发生在源头,在线系统Log当中的时间是不一致的。在流处理当中,希望是按消息真正发生在源头的顺序进行处理,不希望是真正到达程序里的时间来处理。Flink提供了Event Time和WaterMark的一些先进技术来解决乱序的问题。使得用户可以有序的处理这个消息。这是Flink一个很重要的特点。 接下来要介绍的是Flink启动时的核心理念和核心概念,这是Flink发展的第一个阶段;第二个阶段时间是2015年和2017年,这个阶段也是Flink发展以及阿里巴巴介入的时间。故事源于2015年年中,我们在搜索事业部的一次调研。当时阿里有自己的批处理技术和流计算技术,有自研的,也有开源的。但是,为了思考下一代大数据引擎的方向以及未来趋势,我们做了很多新技术的调研。 结合大量调研结果,我们最后得出的结论是:解决通用大数据计算需求,批流融合的计算引擎,才是大数据技术的发展方向,并且最终我们选择了Flink。 但2015年的Flink还不够成熟,不管是规模还是稳定性尚未经历实践。最后我们决定在阿里内部建立一个Flink分支,对Flink做大量的修改和完善,让其适应阿里巴巴这种超大规模的业务场景。在这个过程当中,我们团队不仅对Flink在性能和稳定性上做出了很多改进和优化,同时在核心架构和功能上也进行了大量创新和改进,并将其贡献给社区,例如:Flink新的分布式架构,增量Checkpoint机制,基于Credit-based的网络流控机制和Streaming SQL等。 阿里巴巴对Flink社区的贡献 我们举两个设计案例,第一个是阿里巴巴重构了Flink的分布式架构,将Flink的Job调度和资源管理做了一个清晰的分层和解耦。这样做的首要好处是Flink可以原生的跑在各种不同的开源资源管理器上。经过这套分布式架构的改进,Flink可以原生地跑在Hadoop Yarn和Kubernetes这两个最常见的资源管理系统之上。同时将Flink的任务调度从集中式调度改为了分布式调度,这样Flink就可以支持更大规模的集群,以及得到更好的资源隔离。 另一个是实现了增量的Checkpoint机制,因为Flink提供了有状态的计算和定期的Checkpoint机制,如果内部的数据越来越多,不停地做Checkpoint,Checkpoint会越来越大,最后可能导致做不出来。提供了增量的Checkpoint后,Flink会自动地发现哪些数据是增量变化,哪些数据是被修改了。同时只将这些修改的数据进行持久化。这样Checkpoint不会随着时间的运行而越来越难做,整个系统的性能会非常地平稳,这也是我们贡献给社区的一个很重大的特性。 经过2015年到2017年对Flink Streaming的能力完善,Flink社区也逐渐成熟起来。Flink也成为在Streaming领域最主流的计算引擎。因为Flink最早期想做一个流批统一的大数据引擎,2018年已经启动这项工作,为了实现这个目标,阿里巴巴提出了新的统一API架构,统一SQL解决方案,同时流计算的各种功能得到完善后,我们认为批计算也需要各种各样的完善。无论在任务调度层,还是在数据Shuffle层,在容错性,易用性上,都需要完善很多工作。 篇幅原因,下面主要和大家分享两点: ● 统一 API Stack ● 统一 SQL方案 先来看下目前Flink API Stack的一个现状,调研过Flink或者使用过Flink的开发者应该知道。Flink有2套基础的API,一套是DataStream,一套是DataSet。DataStream API是针对流式处理的用户提供,DataSet API是针对批处理用户提供,但是这两套API的执行路径是完全不一样的,甚至需要生成不同的Task去执行。所以这跟得到统一的API是有冲突的,而且这个也是不完善的,不是最终的解法。在Runtime之上首先是要有一个批流统一融合的基础API层,我们希望可以统一API层。 因此,我们在新架构中将采用一个DAG(有限无环图)API,作为一个批流统一的API层。对于这个有限无环图,批计算和流计算不需要泾渭分明的表达出来。只需要让开发者在不同的节点,不同的边上定义不同的属性,来规划数据是流属性还是批属性。整个拓扑是可以融合批流统一的语义表达,整个计算无需区分是流计算还是批计算,只需要表达自己的需求。有了这套API后,Flink的API Stack将得到统一。 除了统一的基础API层和统一的API Stack外,同样在上层统一SQL的解决方案。流和批的SQL,可以认为流计算有数据源,批计算也有数据源,我们可以将这两种源都模拟成数据表。可以认为流数据的数据源是一张不断更新的数据表,对于批处理的数据源可以认为是一张相对静止的表,没有更新的数据表。整个数据处理可以当做SQL的一个Query,最终产生的结果也可以模拟成一个结果表。 对于流计算而言,它的结果表是一张不断更新的结果表。对于批处理而言,它的结果表是相当于一次更新完成的结果表。从整个SOL语义上表达,流和批是可以统一的。此外,不管是流式SQL,还是批处理SQL,都可以用同一个Query来表达复用。这样以来流批都可以用同一个Query优化或者解析。甚至很多流和批的算子都是可以复用的。 Flink的未来方向 首先,阿里巴巴还是要立足于Flink的本质,去做一个全能的统一大数据计算引擎。将它在生态和场景上进行落地。目前Flink已经是一个主流的流计算引擎,很多互联网公司已经达成了共识:Flink是大数据的未来,是最好的流计算引擎。下一步很重要的工作是让Flink在批计算上有所突破。在更多的场景下落地,成为一种主流的批计算引擎。然后进一步在流和批之间进行无缝的切换,流和批的界限越来越模糊。用Flink,在一个计算中,既可以有流计算,又可以有批计算。 第二个方向就是Flink的生态上有更多语言的支持,不仅仅是Java,Scala语言,甚至是机器学习下用的Python,Go语言。未来我们希望能用更多丰富的语言来开发Flink计算的任务,来描述计算逻辑,并和更多的生态进行对接。 最后不得不说AI,因为现在很多大数据计算的需求和数据量都是在支持很火爆的AI场景,所以在Flink流批生态完善的基础上,将继续往上走,完善上层Flink的Machine Learning算法库,同时Flink往上层也会向成熟的机器学习,深度学习去集成。比如可以做Tensorflow On Flink, 让大数据的ETL数据处理和机器学习的Feature计算和特征计算,训练的计算等进行集成,让开发者能够同时享受到多种生态给大家带来的好处。