1、盛宇帆/StreamNative打造批打造批流融合:流融合:Pulsar Connector Pulsar Connector 的的设计、开发和使用设计、开发和使用批流融合的批流融合的前世今生前世今生#1FlinkFlink 批流批流融合融合 APIAPI#2Pulsar ConPulsar Con-nectornector 设计设计#3使用使用 PulsarPulsarConnectorConnector#4#1#1批流融合的前世今生批流融合的前世今生批流分离:Lambda 架构NoSQLIOTLogsERPDBMS原始数据流查询流处理增量视图批次预计算实时结果批量结果数据集批量结果流批So
2、urce(1)Source(2)map(1)map(2)window(1)window(2)334291414W(33)W(17)W(17)A|30B|73C|33D|18E|31F|15G|91H|94K|77生成水位线L|35N|39O|97M|89I|18Q|23T|99S|9733881795批流融合:更精确的流批流融合:更精确的流算子状态检查点SourceSourceKeyByKeyByMapMapSink批流融合:批为有界的流Streaming RuntimeDataStream APIDataSet APIBatch LibrariesTable API/S
3、QLStreaming LibrariesFlink API 一览(1.9 之前)Streaming Operators/DAGDataStream APIStateful FunctionsTable API/SQLLibraries(e.g.,ML)Streaming RuntimeLibraries(e.g.,CEP)Flink API 一览o 存储 Pulsar 和 BK 的元数据o 服务发现o Brokerso 处理消息分发和连接o 无状态但有缓存o 自动负载均衡o Topic 由 Segment 组成o Bookieso 存储消息和游标o 消息按照 Segment 和Ledgers
4、 分组o 一个 Ledger 由一组Bookie 实例构成元数据、服务发现存储消息元数据、服务发现卸载至层级存储Pulsar-批流融合的存储架构#2 2Flink Flink 批流融合批流融合 APIAPIFLIP-27 简介发现并分配 Split 给 Reader批:执行一次,流:周期执行EnumeratorEnumerator基于 Split 去读取数据ReaderReader定义了需要消费的数据的元信息例:文件信息,Topic 信息SplitSplitFLIP-27 基本模型Source(类似工厂)EnumeratorReaderReader创建创建FLIP-27 执行流程Source(
5、类似工厂)EnumeratorReaderReader创建创建发现并分配 Split,发送全局水位线申请新的 Split下游算子FLIP-27 执行流程Source(类似工厂)EnumeratorReaderReader创建创建发现并分配 Split,发送全局水位线申请新的 Split下游算子发送特定的 SourceEvent回应特定的 SourceEventFLIP-27 执行流程Source(类似工厂)EnumeratorReaderReader创建创建发现并分配 Split,发送全局水位线申请新的 Split下游算子发送特定的 SourceEvent回应特定的 SourceEvent触发
6、检查点检查点屏障FLIP-27 执行流程#3 3Pulsar ConnectorPulsar Connector 设计设计Topic PartitionTopic名称分区IDTopic Range开始结束Pulsar Split 设计Topic PartitionTopic名称分区IDTopic Range开始结束StartCursorStopCursorPulsar Split 设计Topic PartitionTopic名称分区IDTopic Range开始结束StartCursorStopCursorMessageIDTxnIDPulsar Partition SplitPulsar S
7、plit 设计ReaderEnumeratorAssignerSubscriberPartitionSplitPulsar Enumerator 设计SubscriberTopicListSubscriberTopicPatternSubscriber批批:只在启动时执行一次查询流流:周期性地查询 Topic 信息Pulsar Enumerator 设计Producer 1Producer 2Pulsar TopicSubscription DConsumer D-1Consumer D-2Key共享Subscription CConsumer C-1Consumer C-2共享Subscri
8、ption AConsumer A独占m0m1m2m3m4Pulsar 订阅模式Subscription BConsumer B-1Consumer B-2当 Consumer B-1 宕机时轮询ReaderReaderAssignerSplit 3Split 1Split 2Split 4Pulsar Enumerator 设计Subscription DConsumer D-1Consumer D-2Key共享Subscription AConsumer A独占Subscription BConsumer B-1Consumer B-2当 Consumer B-1 宕机时轮询ReaderR
9、eaderAssignerSplit 2Split 1Split 1Split 2Pulsar Enumerator 设计Subscription CConsumer C-1Consumer C-2共享Ordered ReaderUnordered ReaderPulsar Reader 分类Subscription DConsumer D-1Consumer D-2Key共享Subscription CConsumer C-1Consumer C-2共享Subscription AConsumer A独占Subscription BConsumer B-1Consumer B-2当 Cons
10、umer B-1 宕机时轮询Pulsar Reader 分类Topic PartitionTopic名称分区IDStartCursorStopCursorMessageIDPulsar Partition SplitOrdered ReaderUnordered ReaderTopic PartitionTopic名称分区IDTopic Range开始结束StartCursorStopCursorTxnIDPulsar Partition SplitJob CoordinatorEnumeratorOrdered ReaderOrderedSplit Reader#1 发起检查点#2 快照 S
11、plit分配状态#4 快照消费的位置消费的位置#3 发起检查点#5 结合快照 ID 暂存消费的位置消费的位置#6 通知点执行完毕#7 提交对应的消息消息 IDID,标记消费状态Pulsar Ordered Reader 检查点Job CoordinatorEnumerator#1 发起检查点#2 快照 Split分配状态#4 创建新的事务新的事务#3 发起检查点#5 结合快照 ID 暂存之前的事务事务 IDID#6 通知点执行完毕#7 提交对应的事务事务 IDID,标记消费状态Pulsar Unordered Reader 检查点Unordered ReaderUnorderedSplit R
12、eaderReaderDownstreamSchema.BYTESDeserializationSchemaTypeInformationReaderDownstreamSchema.BYTESSchema消息序列化设计#4 4使用使用 Pulsar ConnectorPulsar Connector获取 Pulsar connectorFlink 1.14 Flink 1.14 之后版本之后版本Flink 1.1Flink 1.13 3 之前版本之前版本快速上手 Pulsar Source必须设置必须设置的选项有:setServiceUrlsetServiceUrl:Pulsar 的消费地址
13、,使用 Pulsar 自有的协议通信setAdminUrlsetAdminUrl:基于 RESTful API 的 Pulsar 管理地址setTopicssetTopics/setTopicPatternsetTopicPattern:定义需要消费的 Topic 或相关分区setDeserializationSchemasetDeserializationSchema:定义如何解析 Pulsar 的消息setSubscriptionNamesetSubscriptionName:Pulsar 的订阅名称使用 PulsarSourceBuilderTopic 名称有无分区persistent:
14、/sample/flink/simple-string有persistent:/sample/flink/simple-string-partition-0无persistent:/sample/flink/simple-string-partition-1无persistent:/sample/flink/simple-string-partition-2无Pulsar Topic 由分区构成。每个分区等价于一个无分区的 Topic,可直接被用于消费。Pulsar Topic 命名规范Pulsar source 提供两种 Topic 消费定义:#1 基于给定的TopicTopic或者是分区分
15、区列表进行消费#2基于正则表达式进行匹配,选择命中的 Topic 进行消费定义想要消费的 Topic如果只基于 Pulsar Message 的消息体进行解析,我们可以使用如下三种预定义的PulsarDeserializationSchema。#1 基于 Pulsar 的SchemaSchema进行解析#2 基于 Flink 的DeserializationSchemaDeserializationSchema进行解析#3 基于 Flink 的TypeInformationTypeInformation进行解析创建 PulsarDeserializationSchema使用 Builder 的
16、setStartCursorsetStartCursor方法来设定消费位置#2 从 Topic 里最新的那条消息开始消费#3 从指定的消息 ID 开始消费#4 从指定的消息时间开始消费#1 从 Topic 里最早的那条消息开始消费设定开始消费的位置-StartCursor使用 Builder 的setUnboundedStopCursorsetUnboundedStopCursor/setBoundedStopCursorsetBoundedStopCursor方法来设定停止位置#2 停止于启动消费时,Topic 里存在的最新一条消息#3 消费至指定消息后(不包含)停止消费#4 消费完指定消息后(包含)停止消费#5 消费至指定的消息时间#1 永不停止(默认)设定停止消费的位置-StopCursor当前 Source 支持的配置有 2 种:#1PulsarOptionsPulsarOptionsPulsarClient 和 PulsarAdmin 相关的配置#2PulsarSourceOptionsPulsarSourceOptions与 Consumer 和 Connector 有关的配置设定额外配置