Iceberg Tables POC
Introduction
Apache Iceberg (https://iceberg.apache.org/ ) is an open format for (big) analytic datasets. It is especially optimized for cloud storage (e.g. S3) and provides a familiar SQL table interface supporting ACID transactions.
In particular it supports update, delete and upsert operations, all while integrating with compute engines such as presto, spark etc. Other features include hidden partitioning and time-travel queries.
While there are other technologies that have similar features, iceberg is better integrated with the AWS ecosystem and in particular both in Glue (https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html ) and Athena (https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html ) supporting not only reading but also writing data.
Within the Synapse data warehouse, we make use of append only tables defined in the glue catalog and stored in S3 using the parquet format for efficient querying by Athena and other query engines. This works well with log like operations (e.g. events such as downloads) and we use the same approach for tracking changes in objects (e.g. snapshots of entities).
The purpose of this POC is to experiment with iceberg tables and see how it works.
We have a nodesnapshots table in glue that stores a copy of entities taken at various points in time and includes additional de-normalized information for efficient querying (e.g. is the entity public, the project it belongs to etc).
For the purpose of this POC, I pushed a dedicated set of events that emulates the evolution of changes of two nodes within 3 separate days, the data in the nodesnapshots looks something like the following (I omitted some of the columns for brevity):
id | snapshot_date | parent_id | created_on | modified_on | name | is_public | change_type |
---|---|---|---|---|---|---|---|
987654321 | 2023-04-08 | 123456789 | 2023-04-08 18:15:12.289 | 2023-04-08 18:15:12.289 | nodeOne | false | CREATE |
987654321 | 2023-04-09 | 123456789 | 2023-04-08 18:15:12.289 | 2023-04-09 18:15:12.289 | nodeOne | true | UPDATE |
987654321 | 2023-04-10 | 123456789 | 2023-04-08 18:15:12.289 | 2023-04-10 18:15:12.289 | nodeOne | true | DELETE |
987654322 | 2023-04-08 | 123456789 | 2023-04-08 18:15:12.289 | 2023-04-08 18:15:12.289 | nodeTwo | true | CREATE |
987654322 | 2023-04-09 | 123456789 | 2023-04-08 18:15:12.289 | 2023-04-09 18:15:12.289 | nodeTwo | false | UPDATE |
987654322 | 2023-04-10 | 123456789 | 2023-04-08 18:15:12.289 | 2023-04-10 18:15:12.289 | nodeTwo updated | true | UPDATE |
We can see that nodeOne was created on 2023-04-08, updated on 2023-04-09 and deleted on 2023-04-10. NodeTwo was created on 2023-04-08, updated on 2023-04-09 and updated again on 2023-04-10. Basically the current state of the data should only include nodeTwo.
Creating the initial table
We can use apache iceberg tables directly from Athena starting from defining the table:
CREATE TABLE iceberg_nodes (
id bigint,
parent_id bigint,
name string,
created_on timestamp,
modified_on timestamp,
is_public boolean
) LOCATION 's3://dev.datawarehouse.sagebase.org/datawarehouse/iceberg/nodes'
TBLPROPERTIES ( 'table_type' = 'ICEBERG' )
This creates an empty table at the given S3 location. Now for the purpose of the POC we want to start with the initial state of the DB and then emulate potential incremental updates. We can run a query to insert into the table the first snapshot for all the nodes:
MERGE INTO iceberg_nodes n USING (
WITH first_snapshot AS (
select row_number() OVER (PARTITION BY id ORDER BY snapshot_timestamp) AS snapshot_number, s.*
from nodesnapshots s
)
SELECT * FROM first_snapshot WHERE snapshot_number = 1
) s ON (n.id = n.id)
WHEN MATCHED
THEN UPDATE SET parent_id = s.parent_id, name = s.name, modified_on = s.modified_on, is_public = s.is_public
WHEN NOT MATCHED
THEN INSERT (id, parent_id, name, created_on, modified_on, is_public) VALUES(s.id, s.parent_id, s.name, s.created_on, s.modified_on, s.is_public)
Note that we use a window function to partition over each node and take only the first row according to the timestamp of the snapshot (e.g. the first snapshot). We use that data to “MERGE” into the iceberg table, since the table is empty the merge is not necessary but we show how an upsert could be done. In this case the only case that “matches” is the last condition (WHEN NOT MATCHED) since the table is empty. After executing the update we can query the table:
SELECT * FROM iceberg_nodes
id | parent_id | name | created_on | modified_on | is_public |
---|---|---|---|---|---|
987654321 | 123456789 | nodeOne | 2023-04-08 18:15:12.289000 | 2023-04-08 18:15:12.289000 | false |
987654322 | 123456789 | nodeTwo | 2023-04-08 18:15:12.289000 | 2023-04-08 18:15:12.289000 | true |
Updating the table
Now we can emulate an update to the table data taking a specific timeframe (e.g. just the day following the create) and considering only the last snapshot for the day:
Note that we now integrated a delete operation, if the id exists and the change type in the source was a “DELETE” we delete the record in the target. On day 2023-04-09 both nodes were updated, so we can query the table again:
id | parent_id | name | created_on | modified_on | is_public |
---|---|---|---|---|---|
987654321 | 123456789 | nodeOne | 2023-04-08 18:15:12.289000 | 2023-04-09 18:15:12.289000 | true |
987654322 | 123456789 | nodeTwo | 2023-04-08 18:15:12.289000 | 2023-04-09 18:15:12.289000 | false |
We can see that it (as expected) now reflects the data as seen in the node snapshot table after the first update to both nodes (note the is_public flip and modified_on).
Finally, we update the table after “processing” the 3rd day (same query as before but with the snapshot_date changed):
If we now query the table we only have one record (on the 10th the first node was deleted and the second node was updated again):
id | parent_id | name | created_on | modified_on | is_public |
---|---|---|---|---|---|
987654322 | 123456789 | nodeTwo updated | 2023-04-08 18:15:12.289000 | 2023-04-10 18:15:12.289000 | true |
Time-Travel Queries
Now we can even query the metadata of the table itself, for example we can see the transactions on the table:
committed_at | snapshot_id | parent_id | operation | manifest_list | summary |
---|---|---|---|---|---|
2023-04-13 19:27:55.068 UTC | 8789226363139948963 |
| overwrite | s3://dev.datawarehouse.sagebase.org/datawarehouse/iceberg/nodes/metadata/snap-8789226363139948963-1-f08adb60-914a-4900-8d7f-0561c596cf24.avro | {changed-partition-count=1, ... |
2023-04-13 19:33:40.041 UTC | 1576607425771918736 | 8789226363139948963 | overwrite | s3://dev.datawarehouse.sagebase.org/datawarehouse/iceberg/nodes/metadata/snap-1576607425771918736-1-6aa44df5-cf83-4663-bc18-522db04c072c.avro | {added-data-files=1, added-position-deletes=2, total-equality-deletes=0, added-records=2, ... |
2023-04-13 19:35:16.900 UTC | 2343194305532193090 | 1576607425771918736 | overwrite | s3://dev.datawarehouse.sagebase.org/datawarehouse/iceberg/nodes/metadata/snap-2343194305532193090-1-4d5a401b-340a-4582-9735-4a87430126a3.avro | {added-data-files=1, added-position-deletes=2, total-equality-deletes=0, added-records=1, ... |
And we can perform time-travel queries using the special syntax FOR TIMESTAMP AS OF TIMESTAMP:
_col0 | id | parent_id | name | created_on | modified_on | is_public |
---|---|---|---|---|---|---|
initial | 987654322 | 123456789 | nodeTwo | 2023-04-08 18:15:12.289000 | 2023-04-08 18:15:12.289000 | true |
initial | 987654321 | 123456789 | nodeOne | 2023-04-08 18:15:12.289000 | 2023-04-08 18:15:12.289000 | false |
update | 987654322 | 123456789 | nodeTwo | 2023-04-08 18:15:12.289000 | 2023-04-09 18:15:12.289000 | false |
update | 987654321 | 123456789 | nodeOne | 2023-04-08 18:15:12.289000 | 2023-04-09 18:15:12.289000 | true |
current | 987654322 | 123456789 | nodeTwo updated | 2023-04-08 18:15:12.289000 | 2023-04-10 18:15:12.289000 | true |