上海品茶

您的当前位置:上海品茶 > 报告分类 > PDF报告下载

4、周伊莎、马越-Flink SQL上的状态迁移和查询 .pdf

编号:101793 PDF 40页 12.96MB 下载积分:VIP专享
下载报告请您先登录!

4、周伊莎、马越-Flink SQL上的状态迁移和查询 .pdf

1、周伊莎/马越 字节跳动基础架构工程师State Migration and Query on State Migration and Query on FlinkFlink SQLSQL背景背景SQLSQL 作业的作业的状态迁移状态迁移StateState 的离线的离线查询查询未来展望未来展望#1#1#2#2#3#3#4#4#1#1背景背景状态的作用状态在流式作业的生命周期中扮演了十分重要的角色计算状态在版本间迁移运行时容错计算逻辑正确性校验一些定义:一些定义:SnapshotSnapshot-a global,consistent image of the state of a Flink

2、jobCheckpointCheckpoint-a snapshot taken automatically by FlinkSavepointSavepointa snapshot triggered manually by a userSQL 与状态DataStream 作业SQL 作业可以通过 API 完成对状态的控制:1.注册和使用状态2.查询和修改状态3.设置 UID/UIDHash 以进行状态迁移4.自定义 State Serializer5.RelNodeTransformationSQLJobGraphStreamGraphSQLTableDataStream APIRunti

3、meSQL作业的状态对用户来说是完全黑盒的?SQL 作业使用状态的痛点l状态查询困难状态查询困难n 校验逻辑正确性n 快速定位异常输出来源l状态状态难以难以在迭代版本间迁移在迭代版本间迁移n 简单的字段增减不影响状态恢复n 简单的逻辑变更不影响状态恢复#2#2SQLSQL 作业的状态迁移作业的状态迁移现状状态无法迁移时:1.丢弃旧状态2.回拨 Offset,重跑任务资源浪费产生 LAG长周期任务输入缺失为什么需要迁移旧状态?现状状态迁移的充分必要条件:1.Operator ID 保持不变2.State Serializer 前后兼容问题1:DAG 极易发生变更SourceCalcGroupAg

4、gSink1.隐式修改:修改计算逻辑、打开 minibatch 等2.显式修改:增删维表、source、sink 等为什么 SQL 作业状态难以迁移?现状状态迁移的充分必要条件:1.Operator ID 保持不变2.State Serializer 前后兼容为什么为什么 SQLSQL 作业状态难以迁移?作业状态难以迁移?问题问题2 2:State Serializer 不可兼容1.增删字段2.修改字段的类型AccumulatorAccumulators:ValueStateMAXMAXCOUNTCOUNTSUMSUMSUMSUMMAXMAXCOUNTCOUNTSUMSUMSUMSUMLAST

5、_VALUELAST_VALUEData Talks线上最近30日状态恢复失败 Case 分析由拓扑图发生变更导致的 Case 占比75%由 State Serializer不可兼容导致的Case 占比25%解决思路RelNodeTransformationSQLJobGraphStreamGraphSQLTableDataStream APIRuntimeFlink 在 DataStream API 层为状态迁移提供的能力:设置设置 UID/UID/UIDHashUIDHashSQL 作业如何使用这种能力?1.为 SQL 作业提供的 DAG 预览2.允许用户对 DAG 进行编辑3.将用户编辑

6、的 UID/UIDHash 传递到运行时DAG 预览#1 为什么不直接存储StreamGraph?#2 如何与算子形成稳定映射?复用 StreamGraphHasherV2 来为每个节点生成确定性的 id保持隔离,不被 Flink 迭代影响PlanGraph 抽象FieldExplainid确定性idgeneratedOperatorID与 JobGraph 中算子的OperatorID 对应userProvidedHash用户提供的 UIDHashhasState此算子是否使用状态其他 StreamNode 属性DAG 预览初始可视化效果初始可视化效果DAG 预览任务修改产生 Diff 后整

7、体使用流程SQLNormal Configs生成1 12 23 3PlanGraphSQLNormal Configs修改生成2 23 34 4PlanGraph1 1自动映射/手工修改2 2333 3PlanGraph1 12 23 34 4JobGraph1 1共同提交旧版本易用性问题对于较为复杂的 DAG 来说1.为所有节点手动配置成本较高2.难以快速地定位需要配置的节点易用性问题的解决方案对于较为复杂的 DAG 来说1.为所有节点手动配置成本较高2.难以快速地定位需要配置的节点提供 Best Effort 的自动映射高亮使用状态的节点提供 JSON 代码比较Best Effort 的自

8、动映射Note:算子的 Description 是描述算子强有力信息A1A2A1A2A3A1A2#1#1分别在旧图和新图里收集具有相同Description 的算子#2#2为每一对节点计算相似度,并放入最大堆A1A2Maximum Heap#3#3轮询最大堆,直到新图或旧图中的节点都已完成匹配#4 4为匹配的节点,从旧节点中取出 GeneratedOperatorID 填入到新节点的 UserProvidedHash中A1A2相似度计算相似度计算 TipsTips:1.比较所有入节点的属性2.比较所有出节点的属性JSON 代码比较JSON 中所有的节点已按拓扑排序的顺序展示通过文本模式的对比,

9、使用户能够更快速地定位需要的节点其他优化有状态节点使用特殊标识提示用户重点关注支持按节点属性进行搜索总结SQL 作业 DAG 极易发生变更状态迁移困难SourceCalcGroupAggSink我们怎么解决的?我们遇到了什么问题?提供 DAG 预览,允许用户为算子设置UID/UIDHash 等提供 Best Effort 的自动映射功能高亮有状态节点、提供 JSON 代码对比等,提升手动编辑易用性#1#2#3#3 3StateState 的离线查询的离线查询State Processor API特性介绍Flink 1.9.0 推出的新功能使用 DataSet API用于 读取/修改/生成 Sa

10、vepoint状态读取状态读取状态修正状态修正状态初始化状态初始化State Processor APIState Processor APISavepointSavepointState Processor API使用方式Step 1:创建 Existing SavepointExistingSavepoint savepoint=Savepoint.load(env,savepointPath,StateBackend);Step 2:定义 ReaderFunctionclass ReaderFunction extends KeyedStateReaderFunction/重新注册 st

11、ate/遍历所有 key,访问 StateStep 3:读取状态savepoint.readKeyedState(my-uid,new ReaderFunction();State Processor API状态查询原理ClientKeyedStateInputFormatSavepoint_metadata_metadataopAopA-1 1-statestateopAopA-2 2-statestateopBopB-1 1-statestateopBopB-2 2-statestateOperatorIDSavepointPathStateBackendparse metadatacre

12、ate inputSplitrestore statereaderFunc.open()register stateKeys IteratorreaderFunc.readKey(key)traversekey开发成本开发成本需要开发 Java 代码使用门槛使用门槛需要了解状态定义细节查询限制查询限制无法查询多个算子无法查询状态元信息内容匮乏内容匮乏主要问题成本降低成本降低功能增强功能增强使用 Flink Batch SQL 查询状态不需要知道任何信息支持查询多个算子任务支持查询状态元信息RuntimeRuntimeStorageStoragephysicalphysicalsavepoint

13、savepoint-1 1savepointsavepoint-n nlogiclogicDB&TablesDB&TablesDB&TablesDB&TablesmappingmappingStateState Query on Query on FlinkFlink SQLSQL解决方案如何用 SQL 表达 State如何实现 0 门槛查询如何表示单个 State如何表示一个算子/任务所有 State如何表示任务有哪些 State查询状态需要哪些信息?如何对用户屏蔽 State 细节?实现难点查询查询 StateState 需要哪些信息?需要哪些信息?BackendTypeKeyedType

14、StateNameOperatorIDSavepointPathStateSerializerReaderFunctionExistingSavepointToo much to KnowToo much to KnowSavepointPathStateMetaInformationStateMetaInformationBackendTypeKeyedTypeStateNameOperatorIDStateSerializerStateMetaStateMeta SnapshotSnapshot将 StateMeta 信息添加到 Savepoint 中SavepointSavepointS

15、tateMeta SnapshotTMTM 上报上报 StateMetaStateMetaOPCheckpointCoordinatorsavepointstate1stateNStateMetaInfooperatorNameoperatorIDKeySerializerStateDescriptorsDistrisbuteTypesnamespaceSerializersTaskTask 制作制作 SavepointSavepoint 时,对时,对 StateMetaStateMeta 进行快照进行快照快照完成之后,将快照完成之后,将 StateMetaStateMeta 信息上报至信息上

16、报至 JMJMState State 注册时,注册时,Task Task 保存保存 StateMetaStateMetaJMJM 持久化持久化 StateMetaStateMetasnapshotStateHandlesStateMetaInformationSnapshot StateMeta_metadata_stateInfoPersist汇总写入汇总写入 SavepointSavepoint 目录中目录中对相同对相同 operator operator 的的 StateMetaStateMeta 进行合并进行合并StateMeta Snapshot如何用 SQL 表达 State?Ta

17、ble1Table2Table3TableNTable1Table2Table3TableNDatabaseTable1Table2Table3TableNTable1Table2Table3TableNTable1Table2Table3TableNDatabaseTable1Table2Table3TableNTable1Table2Table3TableNTable1Table2Table3TableNDatabaseTable1Table2Table3TableNCatalogTable1Table1DatabaseTable1DatabaseSP1Savepointstate_met

18、aop1_state1op1_all_stateall_stateCatalog.Database.TableSavepoint.SP_ID.StateTableState 元信息单个 State算子/任务所有 StateState As DatabasenamespaceAggregate(op2)Sink(op3)ValueState sumListState offsetkeyBykeyBy partition:1,offset:63 partition:2,offset:88 partition:3,offset:75 valuekeyKafkaSource(op1)valuevoid

19、10void8k1k2namespacevoid6k3如何表示单个 Stateop2_sumop2_sumop1_offsetop1_offsetTableName=OperatorID+StateNameOperatorStateTableOperatorStateTableKeyedStateTableKeyedStateTableState As Database如何表示一个算子所有的 state?namespacekeyvaluevoid1void2k1k2namespacekeyvaluevoid3void4k1k2value1086namespacekeyvaluevoid1voi

20、d2k1k2void3k1state_namekeyedState1keyedState1keyedState2void4null10k2nullnull8nullkeyedState2op_stateop_statenull6nullop_stateTableName=OperatorID+all_statesop1_all_statesop1_all_statesop1_keyedState2op1_keyedState2op1_keyedState1op1_keyedState1op1_op_statop1_op_state eAllStatesTableAllStatesTableSt

21、ate As Database如何表示任务有哪些 state?Aggregate(OP2)Sink(OP3)ValueState sumListState offsetkeyBykeyByKafkaSource(OP1)state_metastate_metaop_idop_namestate_nameOP1kafkaSourceoffsetAggregateOP2sumstate_typeListStateValueStateTableName=state_metaStateMetaTableStateMetaTableState As Database2、启动作业3、制作 Savepoin

22、t,生成 SavepointID1、编辑 SQL 任务准备工作准备工作使用 Flink SQL 查询状态场景一:查询任务有哪些状态场景二:查询 Source 算子的 Offset场景三:查询聚合算子某个 Key 的中间结果使用介绍使用介绍使用 Flink SQL 查询状态#4 4未来展望未来展望未来展望丰富 State 功能加强 State 可用性使用 Flink SQL 实现 State 的修改和初始化优化 State Serializer 持续增强 State 的恢复能力提供完善的 State 不可恢复事前检查能力支持 State 到异步数据源的导入和导出THANKSContact usContact us:

友情提示

1、下载报告失败解决办法
2、PDF文件下载后,可能会被浏览器默认打开,此种情况可以点击浏览器菜单,保存网页到桌面,就可以正常下载了。
3、本站不支持迅雷下载,请使用电脑自带的IE浏览器,或者360浏览器、谷歌浏览器下载即可。
4、本站报告下载后的文档和图纸-无水印,预览文档经过压缩,下载后原文更清晰。

本文(4、周伊莎、马越-Flink SQL上的状态迁移和查询 .pdf)为本站 (云闲) 主动上传,三个皮匠报告文库仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知三个皮匠报告文库(点击联系客服),我们立即给予删除!

温馨提示:如果因为网速或其他原因下载失败请重新下载,重复下载不扣分。
会员购买
客服

专属顾问

商务合作

机构入驻、侵权投诉、商务合作

服务号

三个皮匠报告官方公众号

回到顶部