Document toolboxDocument toolbox

Iceberg Tables POC

Introduction

Apache Iceberg (https://iceberg.apache.org/ ) is an open format for (big) analytic datasets. It is especially optimized for cloud storage (e.g. S3) and provides a familiar SQL table interface supporting ACID transactions.

In particular it supports update, delete and upsert operations, all while integrating with compute engines such as presto, spark etc. Other features include hidden partitioning and time-travel queries.

While there are other technologies that have similar features, iceberg is better integrated with the AWS ecosystem and in particular both in Glue (https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html ) and Athena (https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html ) supporting not only reading but also writing data.

Within the Synapse data warehouse, we make use of append only tables defined in the glue catalog and stored in S3 using the parquet format for efficient querying by Athena and other query engines. This works well with log like operations (e.g. events such as downloads) and we use the same approach for tracking changes in objects (e.g. snapshots of entities).

The purpose of this POC is to experiment with iceberg tables and see how it works.

We have a nodesnapshots table in glue that stores a copy of entities taken at various points in time and includes additional de-normalized information for efficient querying (e.g. is the entity public, the project it belongs to etc).

For the purpose of this POC, I pushed a dedicated set of events that emulates the evolution of changes of two nodes within 3 separate days, the data in the nodesnapshots looks something like the following (I omitted some of the columns for brevity):

id

snapshot_date

parent_id

created_on

modified_on

name

is_public

change_type

id

snapshot_date

parent_id

created_on

modified_on

name

is_public

change_type

987654321

2023-04-08

123456789

2023-04-08 18:15:12.289

2023-04-08 18:15:12.289

nodeOne

false

CREATE

987654321

2023-04-09

123456789

2023-04-08 18:15:12.289

2023-04-09 18:15:12.289

nodeOne

true

UPDATE

987654321

2023-04-10

123456789

2023-04-08 18:15:12.289

2023-04-10 18:15:12.289

nodeOne

true

DELETE

987654322

2023-04-08

123456789

2023-04-08 18:15:12.289

2023-04-08 18:15:12.289

nodeTwo

true

CREATE

987654322

2023-04-09

123456789

2023-04-08 18:15:12.289

2023-04-09 18:15:12.289

nodeTwo

false

UPDATE

987654322

2023-04-10

123456789

2023-04-08 18:15:12.289

2023-04-10 18:15:12.289

nodeTwo updated

true

UPDATE

We can see that nodeOne was created on 2023-04-08, updated on 2023-04-09 and deleted on 2023-04-10. NodeTwo was created on 2023-04-08, updated on 2023-04-09 and updated again on 2023-04-10. Basically the current state of the data should only include nodeTwo.

Creating the initial table

We can use apache iceberg tables directly from Athena starting from defining the table:

CREATE TABLE iceberg_nodes ( id bigint, parent_id bigint, name string, created_on timestamp, modified_on timestamp, is_public boolean ) LOCATION 's3://dev.datawarehouse.sagebase.org/datawarehouse/iceberg/nodes' TBLPROPERTIES ( 'table_type' = 'ICEBERG' )

This creates an empty table at the given S3 location. Now for the purpose of the POC we want to start with the initial state of the DB and then emulate potential incremental updates. We can run a query to insert into the table the first snapshot for all the nodes:

MERGE INTO iceberg_nodes n USING ( WITH first_snapshot AS ( select row_number() OVER (PARTITION BY id ORDER BY snapshot_timestamp) AS snapshot_number, s.* from nodesnapshots s ) SELECT * FROM first_snapshot WHERE snapshot_number = 1 ) s ON (n.id = n.id) WHEN MATCHED THEN UPDATE SET parent_id = s.parent_id, name = s.name, modified_on = s.modified_on, is_public = s.is_public WHEN NOT MATCHED THEN INSERT (id, parent_id, name, created_on, modified_on, is_public) VALUES(s.id, s.parent_id, s.name, s.created_on, s.modified_on, s.is_public)

Note that we use a window function to partition over each node and take only the first row according to the timestamp of the snapshot (e.g. the first snapshot). We use that data to “MERGE” into the iceberg table, since the table is empty the merge is not necessary but we show how an upsert could be done. In this case the only case that “matches” is the last condition (WHEN NOT MATCHED) since the table is empty. After executing the update we can query the table:

SELECT * FROM iceberg_nodes

id

parent_id

name

created_on

modified_on

is_public

id

parent_id

name

created_on

modified_on

is_public

987654321

123456789

nodeOne

2023-04-08 18:15:12.289000

2023-04-08 18:15:12.289000

false

987654322

123456789

nodeTwo

2023-04-08 18:15:12.289000

2023-04-08 18:15:12.289000

true

Updating the table

Now we can emulate an update to the table data taking a specific timeframe (e.g. just the day following the create) and considering only the last snapshot for the day:

Note that we now integrated a delete operation, if the id exists and the change type in the source was a “DELETE” we delete the record in the target. On day 2023-04-09 both nodes were updated, so we can query the table again:

id

parent_id

name

created_on

modified_on

is_public

id

parent_id

name

created_on

modified_on

is_public

987654321

123456789

nodeOne

2023-04-08 18:15:12.289000

2023-04-09 18:15:12.289000

true

987654322

123456789

nodeTwo

2023-04-08 18:15:12.289000

2023-04-09 18:15:12.289000

false

We can see that it (as expected) now reflects the data as seen in the node snapshot table after the first update to both nodes (note the is_public flip and modified_on).

Finally, we update the table after “processing” the 3rd day (same query as before but with the snapshot_date changed):

If we now query the table we only have one record (on the 10th the first node was deleted and the second node was updated again):

id

parent_id

name

created_on

modified_on

is_public

id

parent_id

name

created_on

modified_on

is_public

987654322

123456789

nodeTwo updated

2023-04-08 18:15:12.289000

2023-04-10 18:15:12.289000

true

Time-Travel Queries

Now we can even query the metadata of the table itself, for example we can see the transactions on the table:

committed_at

snapshot_id

parent_id

operation

manifest_list

summary

committed_at

snapshot_id

parent_id

operation

manifest_list

summary

2023-04-13 19:27:55.068 UTC

8789226363139948963

overwrite

s3://dev.datawarehouse.sagebase.org/datawarehouse/iceberg/nodes/metadata/snap-8789226363139948963-1-f08adb60-914a-4900-8d7f-0561c596cf24.avro

{changed-partition-count=1, ...

2023-04-13 19:33:40.041 UTC

1576607425771918736

8789226363139948963

overwrite

s3://dev.datawarehouse.sagebase.org/datawarehouse/iceberg/nodes/metadata/snap-1576607425771918736-1-6aa44df5-cf83-4663-bc18-522db04c072c.avro

{added-data-files=1, added-position-deletes=2, total-equality-deletes=0, added-records=2, ...

2023-04-13 19:35:16.900 UTC

2343194305532193090

1576607425771918736

overwrite

s3://dev.datawarehouse.sagebase.org/datawarehouse/iceberg/nodes/metadata/snap-2343194305532193090-1-4d5a401b-340a-4582-9735-4a87430126a3.avro

{added-data-files=1, added-position-deletes=2, total-equality-deletes=0, added-records=1, ...

And we can perform time-travel queries using the special syntax FOR TIMESTAMP AS OF TIMESTAMP:

_col0

id

parent_id

name

created_on

modified_on

is_public

_col0

id

parent_id

name

created_on

modified_on

is_public

initial

987654322

123456789

nodeTwo

2023-04-08 18:15:12.289000

2023-04-08 18:15:12.289000

true

initial

987654321

123456789

nodeOne

2023-04-08 18:15:12.289000

2023-04-08 18:15:12.289000

false

update

987654322

123456789

nodeTwo

2023-04-08 18:15:12.289000

2023-04-09 18:15:12.289000

false

update

987654321

123456789

nodeOne

2023-04-08 18:15:12.289000

2023-04-09 18:15:12.289000

true

current

987654322

123456789

nodeTwo updated

2023-04-08 18:15:12.289000

2023-04-10 18:15:12.289000

true