Document toolboxDocument toolbox

Building Iceberg Tables from Snapshots in Data Warehouse

Introduction

 

Currently, when a user makes a change to an Entity in Synapse, an automatic process will create a “snapshot” of the updated entity. We have extends the object snapshot process to include other object types, such as Team, File, and ACLs. We also have plans to snapshot governance objects such as AccessRequirement and AccessSubmissions.

In addition to taking object snapshots when a user changes an object, we also periodically snapshot all objects.

One major use case for the object snapshot data is to recreate what the system looked like at a specific point in time. While the most common use case might be to recreate the system in its current state, we often need to recreate the system in the past. For example, when investigating a possible data breach we might need to answer the following questions:

  1. What ACL was applied to the File at the time of the breach?

  2. Who change the ACL?

  3. Was the file accidentally moved to a public folder?

In order to answer these questions we would need to recreate the node, ACL, and team data at the time of the breach. It is challenging to do this with the raw snapshot data. In fact, we often restore the entire database from a database snapshot to help with the investigation. It would be better if we could simply query a node, ACL, and team table at the time of the breach from within the data warehouse.

 

Apache Iceberg is a technology that supports building extremely large tables. An iceberg table automatically, maintains the full history all table changes as snapshots. This means that is possible to both query the current state of an iceberg table, but also query against any previous snapshot. A query against a previous snapshot is referred to as time traveling.

The primary purpose of this prof-of-concept, was to test if we could build an example iceberg table from one of our snapshots. Secondary, can we run a time traveling query against the resulting table?

 

Building an Iceberg Table

 

While there are many technologies that support iceberg table, we will focus on using AWS Athena for this POC. The main benefit of Athena, is it is server-less. This means we do not need to setup and maintain a cluster to query or build our tables. Note: The entire iceberg table exists as data in S3. This means any technology capable of reading an iceberg table from S3 should be able to utilize what we build with Athena.

 

For this POC, we chose to build an iceberg table using snapshot data from datawarehouse.nodesnapshots. We created our iceberg table using the following DDL:

DROP TABLE `node_iceberg`;
CREATE TABLE jhill.node_iceberg ( id bigint, version_number bigint, change_type string, change_timestamp timestamp, change_user_id bigint, snapshot_timestamp timestamp, benefactor_id bigint, project_id bigint, parent_id bigint, node_type string, created_on timestamp, created_by bigint, modified_on timestamp, modified_by bigint, file_handle_id bigint, name string) PARTITIONED BY (year(snapshot_timestamp)) LOCATION 's3://dev.datawarehouse.sagebase.org/jhill' TBLPROPERTIES ( 'table_type'='iceberg', 'format'='parquet' );

At this point we have an empty iceberg table with its contents stored in S3 at the location shown at line 19.

Note: The table_type =iceberg and the format=parquet. We selected a partition of year, for this example. It is important to note, that partitioning is not as critical for iceberg tables as it is for other tables such as hive. In fact, users can query iceberg table without using partition filters, or even knowing how the table is partitioned. This is one of the major benefit of iceberg.

 

Now that we have our empty iceberg table we are ready to populate it with the node snapshot data. The following DDL is iceberg specific that is designed to append data to an existing data:

MERGE INTO jhill.node_iceberg t USING ( WITH M AS ( SELECT id, version_number, max(snapshot_timestamp) as s_max from datawarehouse.nodesnapshots where snapshot_date between date('2023-05-01') and date('2023-05-31') group by id, version_number ) SELECT s.id, s.version_number, s.change_type, s.change_timestamp, s.change_user_id, s.snapshot_timestamp, s.benefactor_id, s.project_id, s.parent_id, s.node_type, s.created_on, s.created_by, s.modified_on, s.modified_by, s.file_handle_id, s.name FROM datawarehouse.nodesnapshots s JOIN m on (s.id=m.id and s.version_number=m.version_number and m.s_max = s.snapshot_timestamp) ) s ON t.id = s.id and t.version_number = s.version_number WHEN NOT MATCHED AND s.change_type <> 'DELETE' THEN INSERT VALUES (s.id, s.version_number, s.change_type, s.change_timestamp, s.change_user_id, s.snapshot_timestamp, s.benefactor_id, s.project_id, s.parent_id, s.node_type, s.created_on, s.created_by, s.modified_on, s.modified_by, s.file_handle_id, 'not matched') WHEN MATCHED AND s.change_timestamp > t.change_timestamp and s.change_type = 'UPDATE' THEN UPDATE SET change_timestamp = s.change_timestamp, change_type = s.change_type, change_user_id = s.change_user_id, benefactor_id = s.benefactor_id, project_id = s.project_id, parent_id = s.parent_id, modified_by = s.modified_by, modified_on = s.modified_on, file_handle_id = s.file_handle_id, name = 'matched' WHEN MATCHED AND s.change_timestamp > t.change_timestamp and s.change_type = 'DELETE' THEN DELETE;

There is a lot going on in this statement so we will go through it in parts. The first line states that we want to merge data into our new iceberg table. Specifically, we will merge data by running a query against the datawarehouse.nodesnapshots table (lines 3-16). The query first identifies the latest snapshot of each node identified by both its id and version. This is the first step in de-duplicating the snapshot data. Notices that line 7 limits the results to snapshots that occurred in the month of May only. This means alias s includes data the latest snapshot for each node identified by id and version for the month of May.

Line 17 defines how each row from s is to be matched with rows that are already in the iceberg table t. We are effectively treating the id and version pair as primary key for our iceberg table.

The three WHEN conditions at lines: 18, 23, & 35 define how data is to be merged with the iceberg table based on the match condition.

  1. The first WHEN at line 18 defines the condition where the row from s does not exist in t. For this case data is inserted into the table (unless it is a delete).

  2. The second WHEN at line 23, defines the condition when the row from s exists in t with the additional constrain that s.change_timestamp > t.change_timestamp. This means if the row exists and represents an actual change (s.change_timestamp > t.change_timestamp) then update the existing row.

  3. The third WHEN at line 35 defines the condition when the row from s exists in t but the s.change_type = 'DELETE'. If the change is also newer than the existing data, then the row should be deleted.

 

Note: The first time we run this statement, the iceberg table is empty so only inserts will occur. The other two conditions will be involved when we add more months of data to the iceberg table in the later.

After running the above statement for the month of May, iceberg will insert the full month of data as a single transaction which will result in a new iceberg snapshot of our node_iceberg table. The snapshot ID is the key to executing time walking queries so we will need to capture it. We can get the latest snapshot id of our table by running the following metadata query:

The above query returned a snapshot id of 4872619882033119834. The Apache version of iceberg supports creating a named tag for a snapshot. For example, we could tag 4872619882033119834 as “2023-05” since that snapshot contains all data from May. Unfortunately, tagging is not currently supported in Athena.

However, Athena does support creating a view using a time walking query:

The query at line 2 is a time walking query that allows the table to be queried at a specific snapshot. By using this query to define our view, we effectively named snapshot 4872619882033119834 as “2023_05”. Therefore, any query against node_iceberg_2023_05 will only show results up to May 2023. This becomes more useful as we add more data.

To add data for June, July, August, and September, we executed a new MERGE INTO using each month’s date range. After each MERGE INTO, the latest snapshot ID was captured as new view. The end result was five view:

view

view

node_iceberg_2023_05

node_iceberg_2023_06

node_iceberg_2023_07

node_iceberg_2023_08

node_iceberg_2023_09

In order to query the latest data from the iceberg table, one can simply query node_iceberg. In order to time walk, one only needs to query the appropriate view.