flink
Flink
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
# Flink是什么?
- 支持高吞吐、低延迟、高性能的分布式处理框架
- Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
- Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
- 2014年12月,Flink一跃成为Apache软件基金会的顶级项目。
# Flink的特点
- 事件驱动型(Event-driven)
- 事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以3. kafka为代表的消息队列几乎都是事件驱动型应用。
# 流处理与批处理
- 批处理的特点是有界、持久、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。
- 流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。
- 在spark中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
- 而在flink中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
# 无界数据流
无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。
# 有界数据流
有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。
# 分层api
最底层级的抽象 :仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。 DataStream API : 底层过程函数(Process Function) 与 DataStream API 相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
实际上,大多数应用并不需要上述的底层抽象,而是针对核心API(Core APIs) 进行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些API处理的数据类型以类(classes)的形式由各自的编程语言所表示。
Table API :是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何 。 尽管Table API可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心API更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API程序在执行之前会经过内置优化器进行优化。
你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。 Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。
# 支持有状态计算
Flink在1.4版本中实现了状态管理。 状态管理就是在流失计算过程中将算子的中间结果保存在内存或者文件系统中,等下一个事件进入算子后可以让当前事件的值与历史值进行汇总累计。
# 支持exactly-once语义
在分布式系统中,组成系统的各个计算机是独立的。这些计算机有可能fail。 一个sender发送一条message到receiver。根据receiver出现fail时sender如何处理fail,可以将message delivery分为三种语义:
At Most once: 对于一条message,receiver最多收到一次(0次或1次). 可以达成At Most Once的策略: sender把message发送给receiver.无论receiver是否收到message,sender都不再重发message.
At Least once: 对于一条message,receiver最少收到一次(1次及以上). 可以达成At Least Once的策略: sender把message发送给receiver.当receiver在规定时间内没有回复ACK或回复了error信息,那么sender重发这条message给receiver,直到sender收到receiver的ACK.
Exactly once: 对于一条message, 确保只收到一次
# 支持事件时间(EventTime)
Flink支持事件时间。 目前大多数框架时间窗口计算,都是采用当前系统时间,以时间为单位进行的聚合计算只能反应数据到达计算引擎的时间,而并不是实际业务时间。 Processing time:处理时间是指执行相应操作机器的系统时间。当流程序在处理时间上运行时,所有基于时间的操作(如时间窗口)将使用当前运行机器的系统时间。每小时处理时间窗口包括在系统时间每小时内到达的所有指定操作记录。例如:如果应用程序在上午9:15开始运行,第一个小时处理时间窗口将包括上午9:15到10:00之间处理的事件,下一个窗口将包含上午10:00到11:00之间处理的事件,以此类推。
处理时间是最简单的时间概念,不需要流和物理机之间的协调,它有最好的性能和最低的延迟。然而,在分布式和异步环境中,处理时间具有不确定性,因为它容易受到系统之间操作记录传输速度以及中断(计划或其他)的影响,导致数据延迟。
Event time:事件时间是每个独立事件在其生成设备上发生的时间,通常是在进入到Flink之前就嵌入在记录中的时间。并且可以从每个记录中提取事件时间戳,在事件时间中,决于数据产生的时间,而不是当前系统时间。事件时间程序必须指定如何生成事件Watermarks,用来保证事件时间的有序性。
事件时间处理将产生完全一致和确定的结果,无论事件何时到达或顺序如何。但是,除非事件是按顺序(通过时间戳)到达的,否则在等待无序事件时,事件时间处理会产生延迟。因为等待的时间是有限的,这就限制了事件时间应用程序的确定性。
假设所有数据都已到达,事件时间操作将按照预期进行,即使在处理无序延迟的事件或重新处理历史数据,也会产生正确一致的结果。例如,每小时事件时间窗口将包含所有带有属于该小时的事件时间戳的记录,而不管它们到达的顺序如何,也不管它们是在什么时候处理的。
side ouput支持同一个流中输出不同其他格式的数据。
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
数据写入side output
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
获取side output
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
2
3
4
5
6