《李钰、唐云-Flink State:2021年的后端改进和发展.pdf》由会员分享,可在线阅读,更多相关《李钰、唐云-Flink State:2021年的后端改进和发展.pdf(26页珍藏版)》请在三个皮匠报告上搜索。
1、李钰/阿里巴巴资深技术专家,ASF Member,Apache Flink&HBase PMC唐云/阿里巴巴技术专家,Apache Flink committerFlink State-Backend Improvements and Evolution in 2021SnapshotSnapshotimprovementimprovementState Backend State Backend improvementimprovementFuture WorkFuture Work#1#2#3#1 1State Backend ImprovementState Backend Improv
2、ement支持状态访问延迟监控:state latency trackingState Storestart:System.nanoTimeend:System.nanoTimeLatency:end-start(record it every N operation access)Operation支持状态访问延迟监控:state latency trackingx配置项默认值描述state.backend.latency-track.keyed-state-enabledfalse是否启用状态访问时的性能监控state.backend.latency-track.sample-interv
3、al100每多少次操作时进行一次状态操作性能采样,越小越精确,但是会影响日常访问性能state.backend.latency-track.history-size128保留的采样数据个数。越大越精确相关配置https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backends-latency-tracking-optionsFRocksDB 版本升级(5.17.2-6.20.3)x支持ARM平台WIP 借助 deleteRange 加速扩并发速度更细粒度的WriteBuf
4、fer内存管控RocksDB Backend 内存管控优化 RecapxFlink申明一个state时,对应RocksDB的一个column family(独立内存)Flink在slot-sharing机制下,一个slot内可以存在多个包含keyed state的operatorFlink不限制用户在一个operator内申明的state数目RocksDB Backend 内存管控优化 RecapxBlock CachereserveDataBlockIndexBlockFilterBlockarena block Write buffer managerarena block arena b
5、lock cachecachecacheRocksDB Write buffer Manager+Block CacheRocksDB Backend 内存管控优化 RecapxRocksDB Write buffer Manager+Block Cache*RocksDB upgraded with 256KB dummy reserved in block cachearena block?8MB)Write buffer managerreserveRocksDB Block Cache256KB*Cache/sahrds shardsreserve.RocksDB Backend 内存
6、管控优化xRocksDB Write buffer Manager+Block CacheFLINK-19238 sanity check for RocksDB arena block sizearena block?8MB)Write buffer managermanager size32MBmutable limit28MBarena block?8MB)arena block?8MB)arena block?8MB)Write buffer managerarena block?8MB)arena block?8MB)new comearena block?8MB)Immutable
7、Write bufferRocksDB Backend 内存管控优化x如何留出内存buffer给RocksDB的额外超用相关配置项默认值taskmanager.memory.jvm-overhead.fraction0.1taskmanager.memory.jvm-overhead.min192MBtaskmanager.memory.jvm-overhead.max1GBRocksDB Backend 内存管控优化 partitioned index&filterxBlock CachereserveDataBlockIndexBlockFilterBlockarena block Wri
8、te buffer managerarena block arena block cachecachecacheData block 与 Index/Filter block在block cache中互相竞争。RocksDB Backend 内存管控优化 partitioned index&filterxFilterBlockIndexBlockIndexBlockDataBlockIndexBlockFilterBlockarena block Write buffer managerarena block arena block FilterBlockBlock Cachereservec
9、achecachecachestate.backend.rocksdb.memory.partitioned-index-filters:true更清晰的状态与容错APIxstate backendHeap/RocksDBDurable Checkpoint StorageJob ManagerPersist checkpoint to MemoryPersist checkpoint to File StoragePersist checkpoint meta to file storage?StateBackendenv.setStateBackend(StateBackend)Befor
10、e:StateBackend 种类MemoryStateBackend()MemoryStateBackend(file:/path)FsStateBackend()RocksDBStateBackend(new MemoryStateBackend()RocksDBStateBackend(new FsStateBackend()更清晰的状态与容错APIxenv.setStateBackend(StateBackend)After:state backendHeap/RocksDBDurable Checkpoint StorageJob ManagerPersist checkpoint
11、to MemoryPersist checkpoint to File StoragePersist checkpoint meta to file storage?StateBackend:CheckpointStorage:env.getCheckpointConfig().setCheckpointStorage(CheckpointStorage)更清晰的状态与容错API:FLINK-19463x旧式 StateBackend 分类新式StateBackend+CheckpointStorage 分类MemoryStateBackend()HashMapStateBackend()+J
12、obManagerCheckpointStorage()MemoryStateBackend(file:/path)HashMapStateBackend()+JobManagerCheckpointStorage(file:/path)FsStateBackend()HashMapStateBackend()+FileSystemCheckpointStorage()RocksDBStateBackend(new MemoryStateBackend()EmbeddedRocksDBStateBackend()+JobManagerCheckpointStorage()RocksDBStat
13、eBackend(new FsStateBackend()EmbeddedRocksDBStateBackend()+FileSystemCheckpointStorage()#2 2Snapshot ImprovementSnapshot Improvement统一的 Savepoint 格式HashMap state backendRocksDB用户可以通过savepoint作为中转,自由地切换Embedded RocksDB和HashMap state-backend快照稳定性增强:unaligned checkpoint将网络channel中的buffer作为in-flight数据,看
14、做operator state的一部分,进行提前持久化,规避barrier对齐的时间快照稳定性增强:unaligned checkpointCheckpointConfig#setAlignedCheckpointTimeout(timeout)task端对齐超过预设阈值后,自动从aligned切换至unaligned checkpoint#3 3Future WorkFuture Work增加更多状态监控指标x汇报 RocksDB memtable/cache hit/miss 等性能指标重定向RocksDB日志可以根据RocksDB内部性能指标,调节memtable/block cache
15、大小,判断bloom filter的FP有效性RocksDB日志文件包含compaction、flush信息,有助判断写放大重定向到task manager的日志目录或者重定向到flink的taskmanager日志中FLIP-193 快照管理xClaim modeNo-claim mode理清 savepoint/checkpoint 的生命周期CHK-1CHK-2CHK-3Checkpoints Timeline Job1deletedCHK-3CHK-4CHK-5Checkpoints Timeline Job2deleteRetained checkpointIncrementalch
16、eckpointCHK-1CHK-2CHK-3Checkpoints Timeline Job1deletedCHK-3CHK-4CHK-5Checkpoints Timeline Job2Retained checkpointFullcheckpoint更快速的容错 引入 WAL 机制xChangelog based state backend(FLIP-158)#AppendixxFLINK-20496:Introduce RocksDB partitioned index filter optionFLINK-19463:Disentangle StateBackends from Ch
17、eckpointingFLINK-21736:Introduce latency tracking stateFLINK-20976:Unify binary format for Keyed State Unify binary format for Keyed State savepointssavepointsFLINK-23041:Change local alignment timeout back to the global time outChange local alignment timeout back to the global time outFLINK-24783:Improve monitoring experience and usability of state backend20212021-1212-0505THANKS