1、黄兴勃(断尘)-Apache Flink Committer 阿里巴巴高级开发工程师基于基于 FFI FFI 的的 PyFlinkPyFlink 下一代下一代 Python Python 运行时介绍运行时介绍PyFlinkPyFlink最新功能最新功能PyFlinkPyFlinkRuntimeRuntime基于基于FFIFFI的的JCPJCPPyFlinkPyFlinkRuntimeRuntime 2.02.0FutureFutureWorkWork#1#2#3#4#5#1#1PyFlinkPyFlink最新功能最新功能PyFlinkPyFlink 1.141.14的新功能的新功能#1性能性能
2、Operator FusionState 序列化/反序列化优化Finish Bundle优化功能功能State TTL config易用性易用性支持上传tar.gz依赖包ProfilePrintLocal Debug#2#3#2 2PyFlinkPyFlink RuntimeRuntimePyFlink Architecture OverviewPython Table API&SQLPython DataStreamAPIPy4JTable API&SQL(Declarative)DataStream API(Imperative)CommonRulesOptimizerPythonRule
3、sJobGraphJavaOperatorsPythonOperatorsRuntimeJavaOperatorsPythonOperatorsDataServiceStateServiceDataServiceStateServiceUDFPyFlink RuntimeJava OperatorPython Workercheckpoint handlingwatermarkhandlingstaterequesthandlingJVMPVMPyFlink Runtime WorkFlow性能瓶颈性能瓶颈1.计算(Call UDF 环节的耗时)3.通信(JVM和PVM的进程间通信)2.序列化
4、/反序列化(输入数据和UDF返回结果)codegen functioncython自定义序列化器generatorcythonJava/PythonJava/Python互相调用的问题互相调用的问题#3 3基于基于FFIFFI的的JCPJCPJava/PythonJava/Python互相调用的方案互相调用的方案#1基于基于FFIFFI的方案的方案IPCIPC通信方案通信方案PythonPython运行在运行在JVMJVM的方案的方案#2#32.共享内存 Shared MemoryPySpark Runtime py4jPyFlink&PySpark ClientAlink Runtime s
5、ocketTensorflow On FlinkPyArrow Plasma1.将Python 转成 Java p2j voc2.基于Java实现的Python解释器 Jython Graalvm grpcPyFlink On BeamIPCIPC性能问题性能问题1.网络通信兼容性问题兼容性问题FFIFFI#1什么是什么是FFIFFIA foreign function interface(FFI)is a mechanism by which a program written in one programming language can call routines or make use
6、 of services written in another.This can be done in several ways:Requiring that guest-language functions which are to be host-language callable be specified or implemented in a particular way,often using a compatibility library of some sort.Use of a tool to automatically wrap guest-language function
7、s with appropriate glue code,which performs any necessary translation.Use of wrapper libraries对应的解决方案对应的解决方案JNI(Java Native Interface)Python/C API(CPython)CythonCtypes#2基于基于FFIFFI的方案的方案利用JNI和Python/C API,打通Java和PythonJavaCPythonJNIPython/C API各种实现对比各种实现对比Project描述缺点JPype通过PVM启动JVM的方式,实现了Python调用Java
8、的功能。不支持Java调用PythonJEP通过JVM启动PVM的方式,实现了Java调用Python的功能。1.不支持Python 调用 Java2.只能源码包安装(对环境有要求)3.不支持插拔使用(只能启动时配置)4.框架层的性能开销比较大JCP通过JVM启动PVM的方式,实现了Java调用Python的功能,同时也支持Python对象回调Java的功能。几种方案性能性能对比几种方案性能性能对比27802390600005000000320000035000005000000002000000250000030000003500000100byt
9、es String1k bytes StringPerformance of UDF(String Upper)JythonJepJcpJavaThroughput(QPS)JCPJCP架构架构JVMDamonThreadJNIPython/C APIThread 1Thread 2PythonSub InterpreterPythonSub InterpreterPythonMain InterpreterJNIJNIUDFUDFPython/C APIPython/C APIPVMJCP C LibraryProcess#4 4PyFlinkPyFlink RuntimeRuntime 2
10、.02.0PyFlinkPyFlink RuntimeRuntime 2.02.0GrpcServiceGrpcServiceGrpcServiceGrpcServiceUDFPyFlink RuntimeJava OperatorPython Workercheckpoint handlingwatermarkhandlingstaterequesthandlingJVMPVMJcp LibJcp LibJcp LibJcp LibUDFPyFlink Runtime 2.0Java OperatorPython Workercheckpoint handlingwatermarkhandl
11、ingstaterequesthandlingJVMPVMProcess 1Process 2One ProcessUDFUDF性能性能对比性能性能对比22000002800006000000380000050000000020000002500000100bytes String1k bytes StringPerformance of UDF(String Upper)Java UDFPython UDFPython UDF On JcpThroughput(QPS)#5 5FutureFuture WorkWorkFutureFuture WorkWork#1JCPJCPJCP开源,作为第一个版本支持Java调用Python的功能JCP支持Numpy原生数据结构JCP支持Python调用Java的功能让更多的项目使用JCPPyFlinkPyFlink OnOn JcpJcpPyFlink 1.15将会依赖JCP#220212021-1212-0505THANKS