《使用 DMS 和 DLT 捕获变更数据.pdf》由会员分享,可在线阅读,更多相关《使用 DMS 和 DLT 捕获变更数据.pdf(28页珍藏版)》请在三个皮匠报告上搜索。
1、CDC with DLT and AWS DMSNeil Patel,Lead SSAGanesh Chand,Lead SSAContentsChange Data CaptureAWS DMS(CDC Tool)Databricks Delta Live TableBringing it all Together3Change Data CaptureChange Data CaptureWhat is Change Data Capture?Capture changes from a set of data sourcesOPEMPLOYEE_IDSALARYCDC_TIMESTAMP
2、I1100002018-01-01 16:02:00U1110002019-01-01 16:02:01D1110002019-01-01 16:02:01I2200002019-01-10 16:02:00I3300002020-01-01 16:02:00Change Data CaptureWhy CDC?Data Replication for BI and AIData IntegrationEvent-Driven ArchitecturesRegulatory Compliance and Security AuditsChange Data CaptureJDBCReachin
3、g into production DB to grab large swathes of data is generally not allowed just for a ETL processYou will need to keep track of what was last readReason about Updates and DeletesSnapshotsDealing with daily snapshots of a database is costly and time consumingSame as above you will need to reason abo
4、ut Updates/DeletesDelta Live Table does provide a way to handle that now(more on that later)How not to do itHow do we do CDC with Transaction Log?Change Data CaptureTransaction Log(aka Binlog for mysql,WAL for Postgres,etc)Written to log file and contains some of the followingSql statements Data tha
5、t changed(CDC)Who ran queryParse and read the transaction log for the change data related records,often CDC tools are used for thisChange Data CaptureCloud Vendors Native ServicesAWS DMS,Azure Data Factory,GCP DatastreamSAAS VendorsFivetran,Arcion,Informatica,Oracle GoldenGate,Talend,Qlik Replicate,
6、StreamSets,IBM InfosphereOpen SourceDebeziumCDC Tools9AWS Database Migration Service(DMS)AWS DMS(CDC Tool)Since the Transaction Log contains a variety of information it needs to parsed and filtered for data that will ultimately give us this outputWhat is CDC Tool?OPEMPLOYEE_IDSALARYCDC_TIMESTAMPI110
7、0002018-01-01 16:02:00U1110002019-01-01 16:02:01D1110002019-01-01 16:02:01I2200002019-01-10 16:02:00I3300002020-01-01 16:02:00AWS DMS(CDC Tool)The database needs to have their respective transaction log enabledhttps:/ is needed for DMS to work as CDC Tools?How does it work?MySQL Settingsbinlog_forma
8、t=rowbinlog_row_image=Fulllog_slave_updates=TrueAWS DMS(CDC Tool)S3 Endpoint ConfigscdcMaxBatchInterval/cdcMinFileSizeUseful for limiting too many cdc files createddataFormatCsv or Parquet outputtimeStampColumnNameAdds Timestamp column to s3 dataFullLoad-timestamp for when data was sent from target
9、to source by DMSCdc data-timestamp of the commit from the source databaseaddColumnNameFor csv format provides column names(header)UseTaskStartTimeForFullLoadTimestampInstead of using time it went to target endpoint(s3)it uses start time of FullLoadLets it be sortable with CDC data(if you are seeing
10、timestamps that overlap)What is needed for DMS to work as CDC Tools?How does it work?Example BinLogOPEMPLOYEE_IDSALARYCDC_TIMESTAMPI1100002018-01-01 16:02:00U1110002019-01-01 16:02:01D1110002019-01-01 16:02:01I2200002019-01-10 16:02:00I3300002020-01-01 16:02:0014Databricks Delta Live Table(DLT)Delta
11、 Live TableWhat is Delta Live Tables?Delta Live Tables(DLT)is the first ETL framework that uses a simple,declarative approach to building reliable data pipelines.DLT automatically manages your infrastructure at scale so data analysts and engineers can spend less time on tooling and focus on getting
12、value from data.Accelerate ETL DevelopmentAutomatically manage your infrastructure Have confidence in your dataSimplify batch and streamingDelta Live TableCDC with Apply Changes APIDeclarative API to a handle CDC data from various sourcesdlt.create_streaming_table(employees)dlt.apply_changes(target=
13、employees,source=employees_raw,keys=employee_id,sequence_by=col(cdc_timestamp),apply_as_deletes=expr(op=D),except_column_list=op,cdc_timestamp,stored_as_scd_type=1)apply_changes(target=,source=,keys=key1,key2,keyN,sequence_by=,ignore_null_updates=False,apply_as_deletes=None,apply_as_truncates=None,c
14、olumn_list=None,except_column_list=None,stored_as_scd_type=,track_history_column_list=None,track_history_except_column_list=None)OPEMPLOYEE_IDSALARYCDC_TIMESTAMPI1100002018-01-01 16:02:00U1110002019-01-01 16:02:01D1110002019-01-01 16:02:01I2200002019-01-10 16:02:00I3300002020-01-01 16:02:00Delta Liv
15、e TableCDC with Apply Changes API(SQL)Declarative API to a handle CDC data from various sourcesCREATE OR REFRESH STREAMING TABLE employees;APPLY CHANGES INTO live.employeesFROM stream(cdc_data.employees_raw)KEYS (employee_id)APPLY AS DELETE WHEN op=DSEQUENCE BY cdc_timestampCOLUMNS*EXCEPT (op,cdc_ti
16、mestamp)STORED AS SCD TYPE 1;CREATE OR REFRESH STREAMING TABLE table_name;APPLY CHANGES INTO LIVE.table_nameFROM sourceKEYS(keys)SEQUENCE BY orderByColumnWHERE conditionIGNORE NULL UPDATESAPPLY AS DELETE WHEN conditionAPPLY AS TRUNCATE WHEN conditionCOLUMNS columnList|*EXCEPT(exceptColumnList)STORED
17、 AS SCD TYPE 1|SCD TYPE 2TRACK HISTORY ON columnList|*EXCEPT(exceptColumnList)OPEMPLOYEE_IDSALARYCDC_TIMESTAMPI1100002018-01-01 16:02:00U1110002019-01-01 16:02:01D1110002019-01-01 16:02:01I2200002019-01-10 16:02:00I3300002020-01-01 16:02:00Delta Live TableSample DLT Codeimport dltfrom pyspark.sql.fu
18、nctions import col,exprdlt.table(name=customers_cdc_raw,table_properties=quality:bronze,comment=CDC Feed for Customers table provided by DMS)def customers_cdc_feed():return(spark.readStream.format(cloudFiles).option(cloudFiles.format,csv).option(cloudFiles.inferSchema,true).option(cloudFiles.inferCo
19、lumnTypes,true).load(s3:/my-s3-path)dlt.create_streaming_live_table(name=customers,comment=Customers table with SCD Type 2)dlt.apply_changes(target=customers,source=customers_cdc_feed,keys=id,sequency_by=col(dmsTimestamp),apply_as_deletes=expr(Op=D),except_column_list=Op,dmsTimestamp,_rescued_data,s
20、tored_as_scd_type=2)19DemoBringing it All TogetherKey TakeawaysReal-Time CDC is no longer a hard problem to solve.Leverage CDC Tool to provide continuous feed of database changesUse Delta Live Tables to process cdc records21Q&A22AppendixAppendix A:APPLY CHANGES FROM SNAPSHOTdef apply_changes_from_sn
21、apshot(target,snapshot_and_version,keys,stored_as_scd_type,track_history_column_list=None,track_history_except_column_list=None)-NoneDeclarative API to determine source data changes by comparing a series of ordered snapshots.Supports both SCD Type 1 and Type 2.Appendix B:Dealing with complex“Sequenc
22、e Key”Apply Changes API only takes one columnTo Handle this scenarioCreate a struct column with the columns that uniquely identify a rowstruct(id,timestamp)Appendix C:Dealing with no primary keysIf Source isnt providing Primary Key i.e.Example First,Last,Email Columns,use all 3 columns as primary co
23、lsAppendix D:How is Apply Changes API different from Delta Merge APISCD Type 2,implementation is complex in regular mergeHandling Out of order CDC events Hard to get correctness with Delta Merge APIAppendix E:FanOut From Bronze ApproachEach node(thats a table)in a DLT pipeline is a spark stream(be it batch or continuous)Having a lot Spark Streams adds additional resource contention on the driver/executorsLanding bronze into one table and then reading and filtering based on the silver table target will lower the amount of active streams,since bronze will only be oneEND