Flink:流上的“不确定性”(Non-Determinism)
1. 什么是“确定性”
先明确一下什么叫“确定性”:对于一个“操作”来说,如果每次给它的“输入”不变,操作输出的“结果”也不变,那么这个操作就是“确定性“的。通常,我们认为批处理的操作都是确定的,比如针对一张 clicks 表,假如表中的数据没有变化,无论我们执行多少次 SELECT * FROM clicks 操作,它的结果始终不变。但是,批处理操作并不一定总是“确定性”的,如下的 SQL:
SELECT * FROM clicks
WHERE cTime BETWEEN TIMESTAMPADD(MINUTE, -2, CURRENT_TIMESTAMP) AND CURRENT_TIMESTAMP;
会随执行的时间点不同而呈现不同的结果(clicks 表数据没有变化),原因是 SQL 中的时间函数 CURRENT_TIMESTAMP 在每次执行时返回的值都不一样;
另一个示例是 UUID 函数:
SELECT UUID() AS uuid, * FROM clicks LIMIT 3;
该函数是会为每一条记录生成唯一的 UUID,所以每次执行的结果也必然是不同的。
2. 批处理中的“不确定性”
批处理中的“不确定性”都是由函数引起的,上述两个示例都导致了结果的不确定性,但是它们却是有差异的,而且这种差异其实是非常“鲜明”的:
以 CURRENT_TIMESTAMP 为代表的函数是在生成查询计划时执行的,只会执行一次,所以所有记录得到的是同一个值,这类函数叫“动态函数”以 UUID 为代表的函数在每条记录上都会执行一次,生成独立的值,所以所有记录得到的是不同的值,这类函数叫“不确定函数”
官方文档的解释是:
在确定性函数之外存在不确定函数(non-deterministic function)和动态函数(dynamic function, 内置的动态函数以时间函数为主)两类,不确定函数会在运行时(即在集群执行,每条记录单独计算),而动态函数仅在生成查询计划时确定对应的值, 运行时不再执行(不同时间执行得到不同的值,但同一次执行得到的值一致)。
简单总结一下:批处理中的“不确定性”是由不确定函数和动态函数两种函数引起的,前者产生的变化值会作用在每一条记录上,而后者产生的变化值仅作用在一次执行中(每执行一次变化一次,一次执行中记录得到值是一样的)
3.流处理中的“不确定性”
和批处理相比,流处理中的“不确定性”因素明显增多,本质上,还是因为流处理的两大核心特性导致的:1. 流处理抽象出的表是“无边界的”;2. 流处理的查询是“连续的”,以下是流处理中几种典型的“不确定性”:
补充:在官方文档中,讨论流上的“不确定性”时,先介绍了一种情况:原来在批处理中的动态函数,跑到流式场景中,就“降级”成了不确定函数,举的例子是 函数,这里文档只是想说明流处理的特性对不确定性有很大的影响,但这个 Case 不是流处理中的“不确定性”的示例,因为动态函数(这里的CURRENT_TIMESTAMP )在批处理中本来就是“不确定性”的了。
3.1. 外部输入的不确定性
官方文档中的描述叫作 “Source 连接器回溯读取的不确定性”,本人不太喜欢这个称谓,因为它没有描述出“不确定性”的本质原因。实际上,这种情况就是说:流计算并不能对外界的数据(上游数据)进行强有力的控制,导致及时你使用相同的时间参数和配置,以流式重新读取数据时,可能依然会导致结果是不一样的。其实这种情况并不能算是一种很 “Strong” 的“不确定性”,在输出可能会发生变化的情况下,是不会存在任何确定性操作的,所以,这一点不是很值得强调。
3.2. 基于处理时间的不确定性
其实这类不确定性和前面批处理中起到的 CURRENT_TIMESTAMP 情形是很类似的,只是这里问题的只发生在流上,主要是和基于处理时间运作的一些函数和机制有关!因为:区别于事件时间,处理时间是基于机器的本地时间,这种处理不能提供确定性。相关的依赖时间属性的操作作包括:窗口聚合、Interval Join、Temporal Join 等,另一个典型的操作是 Lookup Join,语义上是类似基于处理时间的 Temporal Join,访问的外部表如存在更新,就会产生不确定性。
3.3. 基于 TTL 淘汰内部状态数据的不确定性
这也是典型的流式处理所特有的一种不确定性!由于数据流的“无边界性”,流计算引擎在处理双流 Join 、分组聚合这些场景时必须要在流上维持若干“状态”,随着的时间的推移,状态体积会不断地膨胀,所以必须要设置 TTL 在规定地时间内清理这些状态,这是流计算引擎必须进行的妥协。而清理 TTL 就会导致不确定性的产生,这几乎无法避免。
4. 流上的“不确定性更新”(NDU)
流上的“不确定更新”特指流上的增量更新场景。我们知道,Flink SQL 基于“在动态表上的持续查询”将流式处理“映射”成了关系表操作。对于流上发生的各种更新,Flink SQL 必须要予以捕获并更新到所维持的表中,用官方文档中的话说就是:所有会产生增量消息的操作都必须要在 Flink 内部维护完整的状态数据!在 Flink 中,在整个查询管道(Pipeline,就是从 Source 到 Sink 的完整 DAG)都依赖于更新消息的正确投递(从上一下算子到下一个算子),而“不确定的更新”就会导致错误!
那到底什么是不确定更新呢?我们知道 Flink 使用 changelog 类型的消息来描述和传导更新,它包含这样几种类型:(I) INSERT,(D)DELETE,(-U)UPDATE_BEFORE,(+U)UPDATE_AFTER,对于 Insert-Only 类型的数据(也就是日志型数据)不存在不确定更新问题,这很容易理解,只有在包含 D/-U/+U 这类更新消息的场景下,才有可能会出现“不确定更新”问题,因为,此时,Flink 需要根据主键去更新对应的状态数据,这里涉及 Flink "如何确定主键” 的问题,Flink 的处理方式是:
如果能推导出主键,就根据主键更新状态数据如果不能推导出主键,Flink 只能完整比对现有状态中维护的所有的行才能确定如何更新或删除。此时,消息不能被“不确定的列值”所干扰,否则就会出现“不确定性更新”错误了