上海品茶

您的当前位置:上海品茶 > 报告分类 > PDF报告下载

任庆盛、蒋晓峰-Flink Connector 社区新动向与 Hybrid Source 原理实践.pdf

编号:101800 PDF 28页 7.41MB 下载积分:VIP专享
下载报告请您先登录!

任庆盛、蒋晓峰-Flink Connector 社区新动向与 Hybrid Source 原理实践.pdf

1、任庆盛/蒋晓峰Flink Connector Flink Connector 社区新动向与社区新动向与 Hybrid Source Hybrid Source 原理实践原理实践New Trends in Flink Connector and Principles&Practice of Hybrid Source Source API新特性Sink API新特性ConnectorTesting Framework发布与贡献ConnectorHybrid Source#1#2#3#4#5#1#1Source API Source API 新特性新特性Source API测试 Testing开发

2、 Developing发布 PublishingSplit EnumeratorJob ManagerSource ReaderTask ManagerSource ReaderTask ManagerSource ReaderTask ManagerSplit分片(Split)外部系统中的一个分区分片枚举器(Split Enumerator)发现与分配分片,Source 的控制中心Source 读取器(Source Reader)读取数据,Source 的工作者线程模型与容错机制在 SourceReaderBase 中提供了基本抽象Source API 新特性测试 Testing开发 Dev

3、eloping发布 Publishing Flink 1.14 标记为 Public 使用新 API 开发新的 Source 尽快将旧 Source 迁移至新 API 添加了 FLIP-33 中定义的大部分指标 对延迟、数据量、水印进度的监控 正确汇报事件时间和水印 水印对齐 Watermark Alignment FLIP-182,正在讨论中 有助于减少下游算子的状态大小 混合数据源 Hybrid Source 在不同数据源间无缝切换 衔接历史数据与线上数据#1#2#3#4#2 2Sink API Sink API 新特性新特性Sink API测试 Testing开发 Developing发

4、布 PublishingSink WriterSink CommitterCommittableGlobal Committer可提交数据(Committable)需要在 Checkpoint 时 commit 的信息Sink 写入器(Sink Writer)将数据写出至外部系统生成 CommittableSink 提交器(Sink Committer)将 Committable 提交至外部系统全局提交器(Global Committer)并行度为 1收集 Committable 进行全局提交Sink API 的新特性测试 Testing开发 Developing发布 Publishing F

5、LIP-171 AsyncSink 为具有异步功能 Client 的外部系统提供更易于实现的接口 Buffering 模式,在 Buffer 满足条件时(大小、超时)异步提交 仅支持 At-least once FLIP-191 小文件合并 针对列存格式、Hive、Iceberg 等 Sink 的优化 单独的 Aggregator 算子/Sink Coordinator/自定义拓扑#1#2SourceFunction/SinkFunction测试 Testing开发 Developing发布 Publishing在 Flink 1.0 前引入只针对于流模式功能限制复杂的线程模型复杂的分片发现、

6、分配,数据的序列化,状态管理与 Checkpoint 的联动(Checkpoint Lock)实现精确一次(Exactly-once)的语义即将被标注为 Deprecated,不再提供支持Deprecatedpublic interface SourceFunctionDeprecatedpublic interface SinkFunction请尽快迁移至新 API!#3 3Connector Testing FrameworkConnector Testing FrameworkConnector Testing Framework测试 Testing开发 Developing发布 Pub

7、lishingSingle SplitMultiple SplitsTrigger FailoverCheckpoint/Savepoint外部系统上下文 External Context外部系统 External System测试环境 Test Environment测试 Testing开发 Developing发布 Publishing测试环境 Test Environment获取 Source/Sink 实例 或 DDL 参数获取 Connector JAR获取测试数据写入或读取验证数据的接口外部系统上下文 External Context测试作业运行的 Flink 集群提供了内置实现M

8、ini Cluster(集成测试)Flink on Container(端到端)Remote Cluster(基于 REST API)外部系统 External System测试框架控制资源生命周期提供了基于 Test Containers 的实现Connector Testing Framework测试 Testing开发 Developing发布 Publishing提供了标准测试用例DataStream API Source/Sink 基本读写 JM/TM/网络异常场景 FLIP-33 Metrics Savepoint 与修改并行度 精确一次与至少一次语义Table/SQL API创建

9、基本 Source/Sink 表和基本读写Table/SQL 数据类型的支持ExtendedWith(ConnectorTestingExtension.class)public class TestSuiteBase TestTemplatepublic void testBasicRead(TestEnvironment env,ExternalContext context)TestTemplatepublic void testTaskManagerFailover(TestEnvironment env,ExternalContext context)TestTemplatepubl

10、ic void testTableSource(TestEnvironment env,ExternalContext context)Connector Testing Framework充分利用 JUnit 5 特性Extension、Test template、Annotation轻松上手继承测试基类标注测试环境 TestEnvironment标注上下文 ExternalContext标注外部系统 ExternalSystempublic class FooSourceE2ETest extends SourceTestSuiteBase TestEnvpublic TestEnviro

11、nment env=new FlinkContainerTestEnvironment()ExternalContextpublic ExternalContext context=new FooSourceContext();ExternalSystempublic FooSystem system=new FooSystem 测试 Testing开发 Developing发布 PublishingConnector Testing Framework#4 4发布与贡献发布与贡献 ConnectorConnector发布与贡献 Connector贡献新的 Connector关注 user 和

12、 dev 邮件列表关注 JIRAflink-packages.orgConnector、工具、指标、示例等生态项目独立的 flink-connectors 仓库更灵活的版本管理更丰富的功能更易于贡献与维护更好地支持 Flink 多版本测试 Testing开发 Developing发布 Publishing#5 5HybridHybrid SourceSourceHybrid Source变更数据捕获(CDCCDC)用户可能有一个存储在 HDFS/S3 中的快照和数据库 Binlog 或 Kafka 中的活动变更日志。#1Flink 作业需要按顺序从多个源读取数据。变更数据捕获(CDC)和机器学

13、习特征回溯是此种消费模式的两个具体场景。DebeziumKafka ConnectTiDBHologresClickHouseIcebergHudi(upsert-kafka)(changelog-json)Hybrid SourceApplications(SearchRecommendationAdvertisement)Historical User BehaviorsReal-time user behaviors(PageView、Click、Add to cart,etc)Kafka(Queue)HDFS(Offline Storage)Apache Flink(Static FG

14、)Apache Flink(Real-time FG)HBase/Redis(Feature Store)Kafka for Online TrainingHDFS For Offline Training(Sample Store)Alink(Flink ML)TensorFlow(Online Training)Alink(Flink ML)TensorFlow(Offline Training)Apache Flink(Inference Service)HDFS(Model Center)(Model Validation)Apache Flink(Real-time Training

15、 Sample Assembly)KafkaTensorFlowFeaturesFeaturesApache Flink(Real-time FG)机器学习特征回溯当一个新特征被添加到模型中时,该特征需要从几个月前到现在的原始数据中计算出来。在大多数情况下,历史数据和实时数据存储在两个不同的存储系统中,例如分别是HDFS 和 Kafka。#2Hybrid SourceSource 之间切换需要考虑控制上游 Source 的具体状态以及下游 Source 如何将上游状态转换为初始状态。Source 之间的自动切换实现较为复杂。大多数情况下用户添加自定义 Source,Flink 按照指定的添加顺

16、序自动切换这些 Source。目前没有有效机制支持历史和实时数据之间的平滑 Source 迁移,其需要定义Source 切换的规则和时间以及用于切换的凭据以确保数据的完整性和一致性。用户必须运行两个不同的 Flink 作业或在 SourceFunction 中进行一些 hack。但是大多数用户发现这不像听起来那么容易:Hybrid Source无需任何更改即可重用基于 FLIP-27 2构建的现有 Connector 的 Source。#1FLIPFLIP-150:Introduce Hybrid Source150:Introduce Hybrid Source1设计目标:#2支持任意 So

17、urce 组合,形成混合 Source。1 https:/cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source2 https:/cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+InterfaceHybrid Source 设计要点:HybridSourceSourceASourceASourceBSourceSwitchContextStop PositionStart PositionHybrid So

18、urceFileSource 和 KafkaSource 组成的 HybridSource 示例:HybridSourceHDFSSourceHDFSSourceKafkaSourceKafkaSourceHybridSplitEnumeratorHDFSSpitEnumeratorHDFSSpitEnumeratorKafkaSpitEnumeratorKafkaSpitEnumeratorcreateEnumeratorcreateEnumeratorSwitchSourceSwitchContextStartPositionEndPositionStartPositionEndPosit

19、ionHDFS record max timestampKafka start message timestampHybrid SourceHybrid Source 原型实现:https:/ HybridSource 基于配置的 Source 链切换底层 Source 的混合 Source,实现 Source 接口。l HybridSourceSplitEnumerator 包装实际的 SplitEnumerator 并且促进 Source 切换。当 Source 切换发生时,会延迟创建SplitEnumerator 以支持运行时位置转换。l HybridSourceReader 委托给实际

20、 SourceReader 的混合 SourceReader。l SourceFactory HybridSource 的基础 Source 工厂。l SourceSwitchContext 提供给 SourceFactory 的上下文。l SourceReaderFinishedEvent HybridSourceReader 发送到 SplitEnumerator 的 Source 事件,以表示当前 SourceReader 消费SourceSplit 已完成并且可以发送下一个 SourceReader 的 SourceSplit。l SwitchSourceEvent 事件从 Hybri

21、dSourceSplitEnumerator 发送到 HybridSourceReader 以切换到指定的 SourceReader。Hybrid SourceHybridSourceHDFSSourceHDFSSourceKafkaSourceKafkaSourceSourceSourceKafkaSourceKafkaSourceaddSourceHybridSplitEnumeratorHDFSSplitEnumeratorHDFSSplitEnumeratorKafkaSplitEnumeratorKafkaSplitEnumeratorSplitEnumeratorSplitEnum

22、eratorHybridSourceReaderHDFSSourceReaderHDFSSourceReaderKafkaSourceReaderKafkaSourceReaderSplitEnumeratorSplitEnumeratorSend SourceReaderFinishedEventSend SwitchSourceEventHandle SwitchSourceEvent:setCurrentReaderHandle SourceReaderFinishedEvent:switchEnumeratorcreateEnumeratorcreateReaderswitch sou

23、rceswitch enumeratorswitch readerFileSource fileSource=FileSource.forRecordStreamFormat(new TextLineFormat(),Path.fromLocalFile(testDir).build();KafkaSource kafkaSource=KafkaSource.builder().setBootstrapServers(localhost:9092).setGroupId(MyGroup).setTopics(Arrays.asList(quickstart-events).setDeseria

24、lizer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class).setStartingOffsets(OffsetsInitializer.earliest().build();HybridSource hybridSource=HybridSource.builder(fileSource).addSource(kafkaSource).build();Hybrid SourceFileSource 和带有固定 Kafka 起始位置的 KafkaSource 组成 HybridSource 的简单示例:SwitchStart

25、 Position:earliest-offsetFileSourceFileSourceKafkaSourceKafkaSourceKafkaSourceKafkaSourceHybridSourceaddSourceHybridSource hybridSource=HybridSource.builder(fileSource).addSource(switchContext-StaticFileSplitEnumerator previousEnumerator=switchContext.getPreviousEnumerator();/how to get timestamp de

26、pends on specific enumeratorlong timestamp=previousEnumerator.getEndTimestamp();OffsetsInitializer offsets=OffsetsInitializer.timestamp(timestamp);KafkaSource kafkaSource=KafkaSource.builder().setBootstrapServers(localhost:9092).setGroupId(MyGroup).setTopics(Arrays.asList(quickstart-events).setDeser

27、ializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class).setStartingOffsets(offsets).build();return kafkaSource;,Boundedness.CONTINUOUS_UNBOUNDED).build();Hybrid SourceFileSource 和 Kafka 起始位置来自先前 Source 的 KafkaSource 组成 HybridSource 的复杂示例:FileSourceFileSourceKafkaSourceKafkaSourceKafkaSou

28、rceKafkaSourceHybridSourceaddSourceSwitchStart Position:SwitchContext-FileSplitEnumerator#getEndTimestampHybrid SourceHybridSourceHybridSource Table Table 实现https:/issues.apache.org/jira/browse/FLINK-22793#1Hybrid SourceHybrid Source未来规划:#2HybridSourceHybridSource支持FileSourceFileSource中的动态停止位置https:/issues.apache.org/jira/browse/FLINK-2363320212021-1212-0505THANKS

友情提示

1、下载报告失败解决办法
2、PDF文件下载后,可能会被浏览器默认打开,此种情况可以点击浏览器菜单,保存网页到桌面,就可以正常下载了。
3、本站不支持迅雷下载,请使用电脑自带的IE浏览器,或者360浏览器、谷歌浏览器下载即可。
4、本站报告下载后的文档和图纸-无水印,预览文档经过压缩,下载后原文更清晰。

本文(任庆盛、蒋晓峰-Flink Connector 社区新动向与 Hybrid Source 原理实践.pdf)为本站 (云闲) 主动上传,三个皮匠报告文库仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知三个皮匠报告文库(点击联系客服),我们立即给予删除!

温馨提示:如果因为网速或其他原因下载失败请重新下载,重复下载不扣分。
会员购买
客服

专属顾问

商务合作

机构入驻、侵权投诉、商务合作

服务号

三个皮匠报告官方公众号

回到顶部