Understanding Delta Lake's Change Data Feed
What it's good for, how it tracks changes, and how to view them
November 1, 2022
The release of Delta Lake 2.0.0 came with an exciting new feature called Change data feed which tracks row-level changes to a delta table. This ability unlocks new use cases:
- Batch selection of new data; previously new rows could be streamed from a delta table, but streams are opaque. Batch mode enables operations on a whole set of changes.
- Streaming Change Data Capture (CDC) rows from delta tables, similar to MySQL’s binlog.
- Row-level observability to help debug, understand data patterns and collect better table stats.
To get started with Change data feed, there are three things to know: how to enable them, how to view changes and how the changes are stored.
Enable Change data feed
I’m going to demonstrate the Change data feed feature using spark-shell
. Here’s my launch command:
spark-shell --packages io.delta:delta-core_2.12:2.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
I have Spark 3.2 installed on my machine so I’m using Delta Lake 2.0 (there is also a 2.1 version for Spark 3.3).
Let’s create a table to store programming language popularity rankings by year. Here are the top 3 from 2004:
scala> val data = Seq(
| (2004,"Java",1),
| (2004,"PHP",2),
| (2004,"C/C++",3))
scala> spark.createDataFrame(data).
| toDF("year","lang","rank").
| write.
| format("delta").
| mode("append").
| save("/tmp/spark-shell/langpop")
Currently the Change data feed feature can only be enabled via a SQL query:
scala> spark.sql("""ALTER TABLE delta.`/tmp/spark-shell/langpop`
| SET TBLPROPERTIES (delta.enableChangeDataFeed = true)""")
This adds a commit to the table’s delta log which enables the Change data feed feature, and bumps the minWriterVersion protocol to 4, so old Delta Lake clients¹ will no longer be able to write to the table.
View Row Level Changes
Now that the table is tracking changes, let’s add more data to it; the top 3 programming languages from 2014:
scala> val data = Seq(
| (2014,"Java",1),
| (2014,"Pascal",2),
| (2014,"Python",3))
scala> spark.createDataset(data).
| toDF("year","lang","rank").
| write.
| format("delta").
| mode("append").
| save("/tmp/spark-shell/langpop")
Finally I’m ready to view changes to the table:
scala> spark.read.format("delta").
| option("readChangeFeed", "true").
| option("startingVersion", 2).
| load("/tmp/spark-shell/langpop").
| show(false)
+----+------+----+------------+---------------+---------------------+
|year|lang |rank|_change_type|_commit_version|_commit_timestamp |
+----+------+----+------------+---------------+---------------------+
|2014|Python|3 |insert |2 |2022-11-01 21:45:52.7|
|2014|Java |1 |insert |2 |2022-11-01 21:45:52.7|
|2014|Pascal|2 |insert |2 |2022-11-01 21:45:52.7|
+----+------+----+------------+---------------+---------------------
Delta returns the table data, plus the change data event columns. Note that the first 3 rows I inserted (pl popularity 2004) are not returned. The option startingVersion
of 2 excludes them. I can query the table history to see why:
scala> import io.delta.tables.DeltaTable
scala> val dt = DeltaTable.forPath(spark, "/tmp/spark-shell/langpop")
scala> dt.history.select("version","timestamp","operation").show
+-------+--------------------+-----------------+
|version| timestamp| operation|
+-------+--------------------+-----------------+
| 2|2022-11-01 21:45:...| WRITE|
| 1|2022-11-01 21:40:...|SET TBLPROPERTIES|
| 0|2022-11-01 21:39:...| WRITE|
+-------+--------------------+-----------------+
In commit 0 the table was created and the initial rows written to it. Commit 1 enabled Change data feed. So commit 2 is the first commit to track changes.
What if I say “screw it” and try to read changes earlier than commit 2?
scala> spark.read.format("delta").
| option("readChangeFeed", "true").
| option("startingVersion", 0).
| load("/tmp/spark-shell/langpop").
| show(false)
org.apache.spark.sql.AnalysisException: Error getting change data for range
[0 , 2] as change data was not recorded for version [0]
Where Delta Lake stores change data
Delta Lake should save change data in the _change_data directory of the delta table, however I see no such directory:
ls -1 /tmp/spark-shell/langpop
_delta_log
part-00000-a6f68dca-06ef-4280-b5b7-edb38b7f3a70-c000.snappy.parquet
part-00000-e1b18e95-ded0-4ee8-b011-961e609dc8a4-c000.snappy.parquet
part-00001-3efe7e39-4354-4e20-ad20-8771ab46ac19-c000.snappy.parquet
part-00001-b1dcb6c9-2151-4818-ae1c-ba23de955249-c000.snappy.parquet
part-00002-95dc0d80-1403-43c6-aa14-002841226baf-c000.snappy.parquet
part-00002-aee452dc-92f0-4b16-9243-c9b8ca34a8bf-c000.snappy.parquet
The Delta Lake docs explain:
insert-only operations and full partition deletes will not generate data in the _change_data directory.
In other words, non-destructive table changes do not trigger writes to the _change_data directory. Here’s a handy table of operations, actions and their destructivity:
+---------+------+------------+
|Operation|Action|Destructive?|
+---------+------+------------+
| append|insert| false|
|overwrite|insert| false|
|overwrite|delete| true|
| merge|update| true|
| merge|insert| false|
| delete|delete| true|
| update|update| true|
+---------+------+------------+
Now as it happens, I inserted some bad data earlier. Did you spot it? Come on you didn’t think Pascal was the #2 most popular programming language in 2014 did you? (✊ #WirthSquad). Sorry, it was PHP again. This calls for a (destructive) update!
scala> import org.apache.spark.sql.functions._
scala> dt.update(col("lang") === "Pascal", Map("lang"->lit("PHP")))
scala> dt.toDF.sort("year","rank").show
+----+------+----+
|year| lang|rank|
+----+------+----+
|2004| Java| 1|
|2004| PHP| 2|
|2004| C/C++| 3|
|2014| Java| 1|
|2014| PHP| 2|
|2014|Python| 3|
+----+------+----+
And now the _change_data directory is there:
ls -1 /tmp/spark-shell/langpop/_change_data/
cdc-00000-892bcc16-fa7e-4e87-9ccc-d47fdd8d5e92.c000.snappy.parquet
But what’s in that parquet file? Even though it’s in binary, I can get the gist using hexdump
:
hexdump -c /tmp/spark-shell/langpop/_change_data/*
0000000 P A R 1 025 004 025 \b 025 \f 025 005
0000010 < 025 002 025 004 \0 \0 004 \f \a \0 \0 025 \0 025
0000020 020 025 024 025 227 016 034 025 004 025 004 025 006
0000030 025 \b \0 \0 \b 034 002 \0 \0 \0 003 003 \0 003 025 \0
0000040 025 . 025 2 025 215 005 034 025 004 025 \0 025
0000050 006 025 \b \0 \0 027 X 002 \0 \0 \0 003 003 006 \0 \0
0000060 \0 P a s c a l 003 \0 \0 \0 P H P 025 004
0000070 025 \b 025 \f 025 230 \b < 025 002 025 004 \0
0000080 \0 004 \f 002 \0 \0 \0 025 \0 025 020 025 024 025
0000090 227 016 034 025 004 025 004 025 006 025 \b \0 \0 \b 034
00000a0 002 \0 \0 \0 003 003 \0 003 025 \0 025 Z 025 P 025
00000b0 220 f 034 025 004 025 \0 025 006 025 \b \0 \0 - h
00000c0 002 \0 \0 \0 003 003 017 \0 \0 \0 u p d a t e
00000d0 _ p r e i m a g e 020 \0 031 023 034 o s
00000e0 t i m a g e 031 021 002 031 030 004 \a \0 \0
00000f0 031 030 004 \a \0 \0 025 002 031 026 \0 \0 031 021 002
0000100 031 030 003 P H P 031 030 006 P a s c a l 025
0000110 002 031 026 \0 \0 031 021 002 031 030 004 002 \0 \0 \0 031
0000120 030 004 002 \0 \0 \0 025 002 031 026 \0 \0 031 021 002 031
0000130 030 020 u p d a t e _ p o s t i m a
0000140 g e 031 030 017 u p d a t e _ p r e i
0000150 m a g e 025 002 031 026 \0 \0 031 034 026 : 025 B
0000160 026 \0 \0 \0 031 034 026 | 025 ` 026 \0 \0 \0 031 034
0000170 026 216 002 025 B 026 \0 \0 \0 031 034 026 002 025 |
0000180 026 \0 \0 \0 025 002 031 \ H \f s p a r k _
0000190 s c h e m a 025 \b \0 025 002 % 002 030 004 y
00001a0 e a r \0 025 \f % 002 030 004 l a n g % \0
00001b0 L 034 \0 \0 \0 025 002 % 002 030 004 r a n k \0
00001c0 025 \f % 002 030 \f _ c h a n g e _ t y
00001d0 p e % \0 L 034 \0 \0 \0 026 004 031 034 031 L &
00001e0 : 034 025 002 031 5 004 \b 006 031 030 004 y e a r
00001f0 025 002 026 004 026 l 026 t & : & \b 034 030 004
0000200 \a \0 \0 030 004 \a \0 \0 026 \0 ( 004 \a \0
0000210 \0 030 004 \a \0 \0 \0 031 , 025 004 025 004 025 002
0000220 \0 025 \0 025 004 025 002 \0 \0 026 005 025 024 026
0000230 003 025 . \0 & | 034 025 \f 031 5 \0 \b 006 031 030
0000240 004 l a n g 025 002 026 004 026 \ 026 ` & | <
0000250 6 \0 ( 006 P a s c a l 030 003 P H P \0
0000260 031 034 025 \0 025 \0 025 002 \0 \0 026 005 025 024 026
0000270 003 025 0 \0 & 216 002 034 025 002 031 5 004 \b 006
0000280 031 030 004 r a n k 025 002 026 004 026 l 026 t &
0000290 216 002 & 001 034 030 004 002 \0 \0 \0 030 004 002 \0
00002a0 \0 \0 026 \0 ( 004 002 \0 \0 \0 030 004 002 \0 \0 \0
00002b0 \0 031 , 025 004 025 004 025 002 \0 025 \0 025 004 025 002
00002c0 \0 \0 026 005 025 026 026 004 025 . \0 & 002
00002d0 034 025 \f 031 5 \0 \b 006 031 030 \f _ c h a n
00002e0 g e _ t y p e 025 002 026 004 026 206 001 026 |
00002f0 & 002 < 6 \0 ( 017 u p d a t e _ p
0000300 r e i m a g e 030 020 u p d a t e _
0000310 p o s t i m a g e \0 031 034 025 \0 025 \0
0000320 025 002 \0 \0 026 005 025 026 026 004 025 \ \0 026
0000330 003 026 004 & \b 026 003 024 \0 \0 031 , 030 030
0000340 o r g . a p a c h e . s p a r k
0000350 . v e r s i o n 030 005 3 . 2 . 1 \0
0000360 030 ) o r g . a p a c h e . s p a
0000370 r k . s q l . p a r q u e t . r
0000380 o w . m e t a d a t a 030 236 002 { "
0000390 t y p e " : " s t r u c t " , "
00003a0 f i e l d s " : [ { " n a m e "
00003b0 : " y e a r " , " t y p e " : "
00003c0 i n t e g e r " , " n u l l a b
00003d0 l e " : t r u e , " m e t a d a
00003e0 t a " : { } } , { " n a m e " :
00003f0 " l a n g " , " t y p e " : " s
0000400 t r i n g " , " n u l l a b l e
0000410 " : t r u e , " m e t a d a t a
0000420 " : { } } , { " n a m e " : " r
0000430 a n k " , " t y p e " : " i n t
0000440 e g e r " , " n u l l a b l e "
0000450 : t r u e , " m e t a d a t a "
0000460 : { } } , { " n a m e " : " _ c
0000470 h a n g e _ t y p e " , " t y p
0000480 e " : " s t r i n g " , " n u l
0000490 l a b l e " : t r u e , " m e t
00004a0 a d a t a " : { } } ] } \0 030 J p
00004b0 a r q u e t - m r v e r s i o
00004c0 n 1 . 1 2 . 2 ( b u i l d
00004d0 7 7 e 3 0 c 8 0 9 3 3 8 6 e c 5
00004e0 2 c 3 c f a 6 c 3 4 b 7 e f 3 3
00004f0 2 1 3 2 2 c 9 4 ) 031 L 034 \0 \0 034 \0
0000500 \0 034 \0 \0 034 \0 \0 \0 204 003 \0 \0 P A R 1
0000510
The official docs explain the layout more precisely but roughly: it begins with the 4 byte magic number PAR1
to identify the file type. Next comes each column chunk; a careful reading shows that it stores both the before and after values (“Pascal”, “PHP”) and the action (“update_(pre|post)image”). Every parquet file ends with the file metadata (which includes the schema), metadata length and 4 byte magic number.
But there’s no need to squint over hexdumps until your eyes bleed, we can read the change data feed instead:
scala> spark.read.format("delta").
| option("readChangeFeed", "true").
| option("startingVersion", 3).
| load("/tmp/spark-shell/langpop").
| show(false)
+----+------+----+----------------+---------------+-----------------------+
|year|lang |rank|_change_type |_commit_version|_commit_timestamp |
+----+------+----+----------------+---------------+-----------------------+
|2014|Pascal|2 |update_preimage |3 |2022-11-01 22:59:13.273|
|2014|PHP |2 |update_postimage|3 |2022-11-01 22:59:13.273|
+----+------+----+----------------+---------------+-----------------------+
Conclusion
At $work we’re trialing use of Change data feed to get row changes to union with an output partition which is sorted and deduped. By selecting changed partitions in batch, we’re able to process one partition at a time, stabilizing Spark’s resource demands and making them more predictable.
Whilst Change data feed is powerful, it’s listed as experimental and I think the interface could be friendlier:
- Reading changes for a non-existent commit version number always throws an exception. Why? The empty dataframe is a perfectly truthful response which allows consumers not have to wrap calls in a try/catch.
- Every update to a row emits a pair of records containing the before and after state. This seems like it would require a self-join to combine them into a single record suitable for CDC. It would be nice to have the option of fetching changes in the combined format instead.
- The feature can only be enabled via a SQL query. Giving the DeltaTable class an “enableChangeDataFeed” method would convenient. I expect this is on the Delta Lake roadmap, once the feature stabilizes.
Notes
- Older than 1.2.1.
Tags: spark parquet scala delta-table