Code

Understanding Delta Lake's Change Data Feed

What it's good for, how it tracks changes, and how to view them

November 1, 2022

The release of Delta Lake 2.0.0 came with an exciting new feature called Change data feed which tracks row-level changes to a delta table. This ability unlocks new use cases:

To get started with Change data feed, there are three things to know: how to enable them, how to view changes and how the changes are stored.

Enable Change data feed

I’m going to demonstrate the Change data feed feature using spark-shell. Here’s my launch command:

spark-shell --packages io.delta:delta-core_2.12:2.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

I have Spark 3.2 installed on my machine so I’m using Delta Lake 2.0 (there is also a 2.1 version for Spark 3.3).

Let’s create a table to store programming language popularity rankings by year. Here are the top 3 from 2004:

scala> val data = Seq(
     | (2004,"Java",1),
     | (2004,"PHP",2),
     | (2004,"C/C++",3))
scala> spark.createDataFrame(data).
     | toDF("year","lang","rank").
     | write.
     | format("delta").
     | mode("append").
     | save("/tmp/spark-shell/langpop")

Currently the Change data feed feature can only be enabled via a SQL query:

scala> spark.sql("""ALTER TABLE delta.`/tmp/spark-shell/langpop`
     | SET TBLPROPERTIES (delta.enableChangeDataFeed = true)""")

This adds a commit to the table’s delta log which enables the Change data feed feature, and bumps the minWriterVersion protocol to 4, so old Delta Lake clients¹ will no longer be able to write to the table.

View Row Level Changes

Now that the table is tracking changes, let’s add more data to it; the top 3 programming languages from 2014:

scala> val data = Seq(
     | (2014,"Java",1),
     | (2014,"Pascal",2),
     | (2014,"Python",3))
scala> spark.createDataset(data).
     | toDF("year","lang","rank").
     | write.
     | format("delta").
     | mode("append").
     | save("/tmp/spark-shell/langpop")

Finally I’m ready to view changes to the table:

scala> spark.read.format("delta").
     | option("readChangeFeed", "true").
     | option("startingVersion", 2).
     | load("/tmp/spark-shell/langpop").
     | show(false)
+----+------+----+------------+---------------+---------------------+
|year|lang  |rank|_change_type|_commit_version|_commit_timestamp    |
+----+------+----+------------+---------------+---------------------+
|2014|Python|3   |insert      |2              |2022-11-01 21:45:52.7|
|2014|Java  |1   |insert      |2              |2022-11-01 21:45:52.7|
|2014|Pascal|2   |insert      |2              |2022-11-01 21:45:52.7|
+----+------+----+------------+---------------+---------------------

Delta returns the table data, plus the change data event columns. Note that the first 3 rows I inserted (pl popularity 2004) are not returned. The option startingVersion of 2 excludes them. I can query the table history to see why:

scala> import io.delta.tables.DeltaTable
scala> val dt = DeltaTable.forPath(spark, "/tmp/spark-shell/langpop")
scala> dt.history.select("version","timestamp","operation").show
+-------+--------------------+-----------------+
|version|           timestamp|        operation|
+-------+--------------------+-----------------+
|      2|2022-11-01 21:45:...|            WRITE|
|      1|2022-11-01 21:40:...|SET TBLPROPERTIES|
|      0|2022-11-01 21:39:...|            WRITE|
+-------+--------------------+-----------------+

In commit 0 the table was created and the initial rows written to it. Commit 1 enabled Change data feed. So commit 2 is the first commit to track changes.

What if I say “screw it” and try to read changes earlier than commit 2?

scala> spark.read.format("delta").
     | option("readChangeFeed", "true").
     | option("startingVersion", 0).
     | load("/tmp/spark-shell/langpop").
     | show(false)
org.apache.spark.sql.AnalysisException: Error getting change data for range
[0 , 2] as change data was not recorded for version [0]

Where Delta Lake stores change data

Delta Lake should save change data in the _change_data directory of the delta table, however I see no such directory:

ls -1 /tmp/spark-shell/langpop
_delta_log
part-00000-a6f68dca-06ef-4280-b5b7-edb38b7f3a70-c000.snappy.parquet
part-00000-e1b18e95-ded0-4ee8-b011-961e609dc8a4-c000.snappy.parquet
part-00001-3efe7e39-4354-4e20-ad20-8771ab46ac19-c000.snappy.parquet
part-00001-b1dcb6c9-2151-4818-ae1c-ba23de955249-c000.snappy.parquet
part-00002-95dc0d80-1403-43c6-aa14-002841226baf-c000.snappy.parquet
part-00002-aee452dc-92f0-4b16-9243-c9b8ca34a8bf-c000.snappy.parquet

The Delta Lake docs explain:

insert-only operations and full partition deletes will not generate data in the _change_data directory.

In other words, non-destructive table changes do not trigger writes to the _change_data directory. Here’s a handy table of operations, actions and their destructivity:

+---------+------+------------+
|Operation|Action|Destructive?|
+---------+------+------------+
|   append|insert|       false|
|overwrite|insert|       false|
|overwrite|delete|        true|
|    merge|update|        true|
|    merge|insert|       false|
|   delete|delete|        true|
|   update|update|        true|
+---------+------+------------+

Now as it happens, I inserted some bad data earlier. Did you spot it? Come on you didn’t think Pascal was the #2 most popular programming language in 2014 did you? (✊ #WirthSquad). Sorry, it was PHP again. This calls for a (destructive) update!

scala> import org.apache.spark.sql.functions._
scala> dt.update(col("lang") === "Pascal", Map("lang"->lit("PHP")))
scala> dt.toDF.sort("year","rank").show
+----+------+----+
|year|  lang|rank|
+----+------+----+
|2004|  Java|   1|
|2004|   PHP|   2|
|2004| C/C++|   3|
|2014|  Java|   1|
|2014|   PHP|   2|
|2014|Python|   3|
+----+------+----+

And now the _change_data directory is there:

ls -1 /tmp/spark-shell/langpop/_change_data/
cdc-00000-892bcc16-fa7e-4e87-9ccc-d47fdd8d5e92.c000.snappy.parquet

But what’s in that parquet file? Even though it’s in binary, I can get the gist using hexdump:

hexdump -c /tmp/spark-shell/langpop/_change_data/*
0000000   P   A   R   1 025 004 025  \b 025  \f 025             005
0000010   < 025 002 025 004  \0  \0 004  \f     \a  \0  \0 025  \0 025
0000020 020 025 024 025          227 016 034 025 004 025 004 025 006
0000030 025  \b  \0  \0  \b 034 002  \0  \0  \0 003 003  \0 003 025  \0
0000040 025   . 025   2 025       215    005 034 025 004 025  \0 025
0000050 006 025  \b  \0  \0 027   X 002  \0  \0  \0 003 003 006  \0  \0
0000060  \0   P   a   s   c   a   l 003  \0  \0  \0   P   H   P 025 004
0000070 025  \b 025  \f 025    230        \b   < 025 002 025 004  \0
0000080  \0 004  \f 002  \0  \0  \0 025  \0 025 020 025 024 025
0000090    227 016 034 025 004 025 004 025 006 025  \b  \0  \0  \b 034
00000a0 002  \0  \0  \0 003 003  \0 003 025  \0 025   Z 025   P 025
00000b0 220      f 034 025 004 025  \0 025 006 025  \b  \0  \0   -   h
00000c0 002  \0  \0  \0 003 003 017  \0  \0  \0   u   p   d   a   t   e
00000d0   _   p   r   e   i   m   a   g   e 020  \0 031 023 034   o   s
00000e0   t   i   m   a   g   e 031 021 002 031 030 004     \a  \0  \0
00000f0 031 030 004     \a  \0  \0 025 002 031 026  \0  \0 031 021 002
0000100 031 030 003   P   H   P 031 030 006   P   a   s   c   a   l 025
0000110 002 031 026  \0  \0 031 021 002 031 030 004 002  \0  \0  \0 031
0000120 030 004 002  \0  \0  \0 025 002 031 026  \0  \0 031 021 002 031
0000130 030 020   u   p   d   a   t   e   _   p   o   s   t   i   m   a
0000140   g   e 031 030 017   u   p   d   a   t   e   _   p   r   e   i
0000150   m   a   g   e 025 002 031 026  \0  \0 031 034 026   : 025   B
0000160 026  \0  \0  \0 031 034 026   | 025   ` 026  \0  \0  \0 031 034
0000170 026 216 002 025   B 026  \0  \0  \0 031 034 026    002 025   |
0000180 026  \0  \0  \0 025 002 031   \   H  \f   s   p   a   r   k   _
0000190   s   c   h   e   m   a 025  \b  \0 025 002   % 002 030 004   y
00001a0   e   a   r  \0 025  \f   % 002 030 004   l   a   n   g   %  \0
00001b0   L 034  \0  \0  \0 025 002   % 002 030 004   r   a   n   k  \0
00001c0 025  \f   % 002 030  \f   _   c   h   a   n   g   e   _   t   y
00001d0   p   e   %  \0   L 034  \0  \0  \0 026 004 031 034 031   L   &
00001e0   : 034 025 002 031   5 004  \b 006 031 030 004   y   e   a   r
00001f0 025 002 026 004 026   l 026   t   &   :   &  \b 034 030 004
0000200  \a  \0  \0 030 004     \a  \0  \0 026  \0   ( 004     \a  \0
0000210  \0 030 004     \a  \0  \0  \0 031   , 025 004 025 004 025 002
0000220  \0 025  \0 025 004 025 002  \0  \0 026    005 025 024 026
0000230 003 025   .  \0   &   | 034 025  \f 031   5  \0  \b 006 031 030
0000240 004   l   a   n   g 025 002 026 004 026   \ 026   `   &   |   <
0000250   6  \0   ( 006   P   a   s   c   a   l 030 003   P   H   P  \0
0000260 031 034 025  \0 025  \0 025 002  \0  \0 026    005 025 024 026
0000270    003 025   0  \0   & 216 002 034 025 002 031   5 004  \b 006
0000280 031 030 004   r   a   n   k 025 002 026 004 026   l 026   t   &
0000290 216 002   &    001 034 030 004 002  \0  \0  \0 030 004 002  \0
00002a0  \0  \0 026  \0   ( 004 002  \0  \0  \0 030 004 002  \0  \0  \0
00002b0  \0 031   , 025 004 025 004 025 002  \0 025  \0 025 004 025 002
00002c0  \0  \0 026    005 025 026 026    004 025   .  \0   &    002
00002d0 034 025  \f 031   5  \0  \b 006 031 030  \f   _   c   h   a   n
00002e0   g   e   _   t   y   p   e 025 002 026 004 026 206 001 026   |
00002f0   &    002   <   6  \0   ( 017   u   p   d   a   t   e   _   p
0000300   r   e   i   m   a   g   e 030 020   u   p   d   a   t   e   _
0000310   p   o   s   t   i   m   a   g   e  \0 031 034 025  \0 025  \0
0000320 025 002  \0  \0 026    005 025 026 026    004 025   \  \0 026
0000330    003 026 004   &  \b 026    003 024  \0  \0 031   , 030 030
0000340   o   r   g   .   a   p   a   c   h   e   .   s   p   a   r   k
0000350   .   v   e   r   s   i   o   n 030 005   3   .   2   .   1  \0
0000360 030   )   o   r   g   .   a   p   a   c   h   e   .   s   p   a
0000370   r   k   .   s   q   l   .   p   a   r   q   u   e   t   .   r
0000380   o   w   .   m   e   t   a   d   a   t   a 030 236 002   {   "
0000390   t   y   p   e   "   :   "   s   t   r   u   c   t   "   ,   "
00003a0   f   i   e   l   d   s   "   :   [   {   "   n   a   m   e   "
00003b0   :   "   y   e   a   r   "   ,   "   t   y   p   e   "   :   "
00003c0   i   n   t   e   g   e   r   "   ,   "   n   u   l   l   a   b
00003d0   l   e   "   :   t   r   u   e   ,   "   m   e   t   a   d   a
00003e0   t   a   "   :   {   }   }   ,   {   "   n   a   m   e   "   :
00003f0   "   l   a   n   g   "   ,   "   t   y   p   e   "   :   "   s
0000400   t   r   i   n   g   "   ,   "   n   u   l   l   a   b   l   e
0000410   "   :   t   r   u   e   ,   "   m   e   t   a   d   a   t   a
0000420   "   :   {   }   }   ,   {   "   n   a   m   e   "   :   "   r
0000430   a   n   k   "   ,   "   t   y   p   e   "   :   "   i   n   t
0000440   e   g   e   r   "   ,   "   n   u   l   l   a   b   l   e   "
0000450   :   t   r   u   e   ,   "   m   e   t   a   d   a   t   a   "
0000460   :   {   }   }   ,   {   "   n   a   m   e   "   :   "   _   c
0000470   h   a   n   g   e   _   t   y   p   e   "   ,   "   t   y   p
0000480   e   "   :   "   s   t   r   i   n   g   "   ,   "   n   u   l
0000490   l   a   b   l   e   "   :   t   r   u   e   ,   "   m   e   t
00004a0   a   d   a   t   a   "   :   {   }   }   ]   }  \0 030   J   p
00004b0   a   r   q   u   e   t   -   m   r       v   e   r   s   i   o
00004c0   n       1   .   1   2   .   2       (   b   u   i   l   d
00004d0   7   7   e   3   0   c   8   0   9   3   3   8   6   e   c   5
00004e0   2   c   3   c   f   a   6   c   3   4   b   7   e   f   3   3
00004f0   2   1   3   2   2   c   9   4   ) 031   L 034  \0  \0 034  \0
0000500  \0 034  \0  \0 034  \0  \0  \0 204 003  \0  \0   P   A   R   1
0000510

The official docs explain the layout more precisely but roughly: it begins with the 4 byte magic number PAR1 to identify the file type. Next comes each column chunk; a careful reading shows that it stores both the before and after values (“Pascal”, “PHP”) and the action (“update_(pre|post)image”). Every parquet file ends with the file metadata (which includes the schema), metadata length and 4 byte magic number.

But there’s no need to squint over hexdumps until your eyes bleed, we can read the change data feed instead:

scala> spark.read.format("delta").
     | option("readChangeFeed", "true").
     | option("startingVersion", 3).
     | load("/tmp/spark-shell/langpop").
     | show(false)
+----+------+----+----------------+---------------+-----------------------+
|year|lang  |rank|_change_type    |_commit_version|_commit_timestamp      |
+----+------+----+----------------+---------------+-----------------------+
|2014|Pascal|2   |update_preimage |3              |2022-11-01 22:59:13.273|
|2014|PHP   |2   |update_postimage|3              |2022-11-01 22:59:13.273|
+----+------+----+----------------+---------------+-----------------------+

Conclusion

At $work we’re trialing use of Change data feed to get row changes to union with an output partition which is sorted and deduped. By selecting changed partitions in batch, we’re able to process one partition at a time, stabilizing Spark’s resource demands and making them more predictable.

Whilst Change data feed is powerful, it’s listed as experimental and I think the interface could be friendlier:

Notes

  1. Older than 1.2.1.

Tags: spark parquet scala delta-table