《邸星星-基于Flink的实时计算平台3.0建设实践.pdf》由会员分享,可在线阅读,更多相关《邸星星-基于Flink的实时计算平台3.0建设实践.pdf(47页珍藏版)》请在三个皮匠报告上搜索。
1、邸星星/汽车之家基于基于FlinkFlink的实时计算平台的实时计算平台3.03.0建设实践建设实践应用场景应用场景预算资源管控预算资源管控FlinkFlink伸缩容伸缩容湖仓一体湖仓一体PyFlinkPyFlink实践实践#1#2#3#4#5#1#1应用场景与平台现状应用场景与平台现状流量分析、车展大屏、818大屏实时指标统计145632应用日志监控、视频播放质量监控预警内容中台、资源池实时数据处理智能推荐、智能搜索、实时画像、风控实时用户行为流量、日志、业务数据分钟级入湖实时入湖数据接入/分发平台、实时多维分析实时数据传输应用场景平台建设历程回顾基础的实时计算能力基于Spout、Bolt模
2、型编码开发StormStorm基于Flink1.7扩展DDL,支持SQL开发模式支持多种Connector引入Flink1.9支持原生DDL自助UDF诊断、健康评分高可用AutoStream2.0AutoStream2.0引入Flink1.12湖仓一体Native on k8sPyFlink智能伸缩容AutoStream1.0AutoStream1.0AutoStream3.0AutoStream3.0AutoStream2.0AutoStream2.0回顾回顾痛点预算没有强控,资源浪费资源管控资源管控需要反复调整任务资源数据实时入湖湖仓生态湖仓生态人工智能团队难以上手开发调优成本调优成本仅支
3、持仅支持JavaJavaKubernetesPyFlinkFlinkClient ServiceSQL IDE任务管理健康检查任务诊断预算管控自动伸缩容YARNZookeeperHDFSMeta Server脚本管理/CIIcebergStarRocksTiDBMySQLElasticsearchRedis权限管理监控报警AutoStream 3.0整体介绍整体介绍#2 2FlinkFlink预算资源管控策略预算资源管控策略服务器资源没有按团队划分,先用先得任务资源利用率低,存在浪费用户主动优化资源意识薄弱背景背景初始化团队占用预算团队间预算调拨白名单启用预算强控机制启用预算强控机制CPU使用
4、率内存使用率空闲SLOT识别识别低利用率任务低利用率任务运营机制及收益#流程规范全年没有新增服务器严控低利用率任务占比,规范用户主动优化利用率引入自动伸缩容,降低调优成本#技术升级#收益#3 3FlinkFlink自动伸缩容功能自动伸缩容功能背景介绍#1 提升资源利用率#2 降低优化成本#3 降低资源调优对任务稳定性的影响功能页面展示架构设计2.请求伸缩容RescaleCoordinatorHDFS&ZK0.从 Prometheus拉取metric,计算各项健康评分DispatcherJobMasterResourceManager3.通知某个任务伸缩容4.请求新的TM1.定期检查是否需要伸缩
5、容TaskManagerTaskManager5.创建TaskManager6.注册到JobMaster7.待所有的TM就绪,开始“重启”任务,真正执行伸缩容操作平台服务Prometheus8.将伸缩容后的状态持久化实现思路1.先向ResourceManager申请TM Container,并为其打上标记cpu:1核内存:1Gcpu:2核内存:2Gcpu:1核内存:1G实现思路2.停掉任务,删除ExecutionGraphcpu:1核内存:1Gcpu:2核内存:2Gcpu:1核内存:1Gcpu:2核内存:2G实现思路3.释放掉TM,在标记的TM上从保存点恢复任务cpu:1核内存:1Gcpu:2
6、核内存:2Gcpu:2核内存:2G实现思路4.将伸缩容后的资源信息持久化到Zookeeper和HDFScpu:2核内存:2GHDFS 和Zookeeper“cpu”:“2核”,“内存”:“2048MB”,“Jobgraph”:“伸缩容策略类型扩容/缩容原因并行度扩容存在消费延迟,且CPU综合使用率低于阈值并行度缩容存在空闲slotCPU扩容/缩容根据特定的CPU使用率的阈值内存扩容/缩容根据健康监测评分#4 4FlinkFlink+IcebergIceberg 数据湖实践数据湖实践基于Hive的数据仓库的痛点#时效性写入型Schema,对Schema变更支持不友好T+1或H+1无法支持准实时数
7、据分析不支持Upsert,难以处理业务库数据不支持Row-level delete,数据修正成本高#Upsert#Table EvolutionIceberg原理与架构#1 开放的表格式#2 增量快照机制#4 读取型Schema#5 流批接口的支持#3 支持ACID语义Hive MetastoreHDFS计算平台计算引擎Table Format存储引擎AutoStreamAutoDTS离线平台Iceberg集成问题描述:Upsert场景下,需要确保主键相同的数据写入到同一Bucket下解决方案:建表时支持设置bucket属性/指定bucket字段partition.bucket.source=
8、id,/指定bucket数partition.bucket.num=10支持定义Bucket问题描述:目前Iceberg表本身无法直接反映数据写入的进度,离线调度难以精准触发下游任务解决方案(PR-2109):在commit阶段将flink的watermark记录到Iceberg表的properties中,可直观的反映端到端的延迟情况,同时可以用来判断分区数据完整性,用于调度触发下游任务记录Watermark问题描述:Iceberg需要访问HDFS集群及HiveMetastore,需要和离线权限系统打通解决方案:扩展Hadoop FileSystem及HiveMetaStoreClient,支
9、持代理到指定的团队账号团队账号支持优化实践1问题描述:Iceberg默认基于Hive metastore的分布式锁控制并发commit,Flink 进程意外退出时导致锁无法自动释放解决方案:引入基于ZK的分布式锁,解决锁无法自动释放的问题;同时设置相对较长的session timeout时间,避免因gc 导致意外释放锁基于ZK的分布式锁问题描述:HiveConf的构造方法的误用,导致hive客户端中声明的配置被覆盖,导致访问Hive metastore时异常解决方案(PR-2075):修复HiveConf的构造,显示调用addResource方法,确保配置不会被覆盖:hiveConf.addR
10、esource(conf);访问HiveMetastore异常问题描述:Iceberg表无法访问,报NotFoundException Failed to open input stream for file:xxx.metadata.json解决方案(PR-2328):当调Hive metastore 更新Iceberg 表的metadata_location超时后,增加检查机制,确认元数据未保存成功后再删除元数据文件元数据文件丢失优化实践2Mixed position-delete and equality-deleteTxn1:Insert(1,100),(2,200);Txn2:Ins
11、ert(3,300);update(3,301);Delete(2,200);I(1,100)I(2,200)datafile1I(1,100)I(2,200)datafile1I(3,300)I(3,301)datafile2Position delete fileD(file2,0)equality delete fileD(2,200)SeqNum-2SeqNum-1第一个字段为主键两个事务SeqNum-11、将Delete行在写到EqualityDelete File中2、若Delete行在本次Txn内Insert过,则把Insert行的(file,pos)写到Position Del
12、ete File中写入思路D(3,300)Mixed position-delete and equality-deleteTxn1:Insert(1,100),(2,200);Txn2:Insert(3,300);update(3,301);Delete(2,200);I(1,100)I(2,200)datafile1I(3,300)I(3,301)datafile2Position delete fileD(file2,0)equality delete fileD(2,200)SeqNum-2SeqNum-1I(1,100)I(2,200)datafile1SeqNum-11、Posit
13、ion Delete File和那些SeqNum不大于自己的SeqNum的Data File 做 JOIN2、Equality Delete File和那些SeqNum小于自己SeqNum的Data File 做JOIN读取思路(1,100)(3,301)查询结果D(3,300)I(1,100)I(2,200)datafile1I(3,300)I(3,301)datafile2Position delete fileD(file2,0)equality delete fileD(2,200)SeqNum-2SeqNum-11、和读取思路完全一致,即将两种Delete File Apply 到合
14、适的Data File,合并后删除对旧文件的引用,改为引用新生成的Data File2、SeqNum会一直递增,所以合并后的Data File 对应的SeqNum会变大合并小文件思路Iceberg合并小文件I(1,100)I(3,301)datafile3SeqNum-3Txn 3:合并小文件D(3,300)Iceberg合并小文件I(1,100)I(2,200)datafile1I(3,300)I(3,301)datafile2Position delete fileD(file2,0)equality delete fileD(2,200)SeqNum-2SeqNum-1Txn 3:del
15、ete(3,301);equality delete fileD(3,301)SeqNum-3I(1,100)I(3,301)datafile4SeqNum-4Txn 4:合并小文件1、小文件合并过程修改SeqNum会导致和流写事务冲突,导致示例中Txn3的delete语句失效2、Txn3 将主键为3的数据Delete,但当Txn4 小文件合并成功后,会把这条数据加回来,因为这条数据的SeqNum 变为4,已经比Delete File的SeqNum 3还大了合并小文件的冲突D(3,300)Iceberg合并小文件I(1,100)I(2,200)datafile1I(3,300)I(3,301)
16、datafile2Position delete fileD(file2,0)equality delete fileD(2,200)SeqNum-2SeqNum-1Txn 3:delete(3,301);equality delete fileD(3,301)SeqNum-3I(1,100)I(3,301)datafile4SeqNum-2Txn 4:合并小文件1、合并小文件从本质上说不会对最终数据做修改,仅仅是优化文件存储,复用被合并的文件中最大的SeqNum即可2、示例中Txn4 之后的Data File对应的SeqNum为2,依旧小于Txn3 的SeqNum 3,可以保证Delete
17、File的语义正确性3、社区最近也做了类似修改:PR-3480解决合并小文件的冲突D(3,300)收益收益#1业务库数据可以准实时同步入湖;可以将实时聚合任务的结果入湖,打造准实时的物化视图流量、内容、线索主题数据时效性提升,从小时级提升到10分钟以内;核心任务SLA提前2小时完成特征工程提效,提供准实时的样本数据,提高模型训练时效性#2#3#5 5PyFlinkPyFlink实践实践背景#1 将Flink能力输出给Python用户#2 Python生态功能分布式化#3 完善平台生态原理和架构部署选型python运行环境镜像内置python运行环境资源隔离资源隔离其他机器学习依赖其他机器学习依
18、赖YarnKubernetesPyFlink依赖管理#1第三方依赖第三方依赖:提交任务时自动下载依赖配置,依靠PyFlink的自动安装,在启动Python进程前安装JarJar依赖依赖:依赖平台提供的文件管理服务,将运行时需要的Jar依赖添加到Flink的运行环境中PythonPython文件依赖文件依赖:依赖平台的文件管理服务统一管理,在提交运行时下载到镜像内部#2#3任务提交Web Backend文件服务Upload FileUpload PythonSubmit SQLK8s containerJob Client ServiceLaunch JobSubmitJar依赖Python文件
19、Python依赖清单docker-entrypoint.sh#其他脚本Download FilesCatalog与UDF#1 对接Catalog,和已有的connector打通#2 支持SQL+UDF模式开发案例演示自定义任务在开发PyFlink程序时,依靠PyFlink提供的Gateway调用平台内置的Catalog和UDF注册类,完成Catalog和UDF的注册,避免开发时的重复定义和重复开发案例演示Python UDFUDFUDF开发开发保持原有项目结构不变,按照PyFlink UDF开发规范,在eval方法中调用相应的处理逻辑案例演示Python UDFUDFUDF使用使用在SQL任务中创建Function时指定language为python,并在高级配置中添加python UDF所需的文件。#6 6后续后续规划规划后续规划后续规划实时、离线混部,错峰计算继续优化自动伸缩容策略基于数据湖技术,探索存储层流批一体Yarn细粒度资源调度利用Flink批处理计算资源流批一体FLIP-188