A.I. Tools

Use Delta Lake as the Master Data Management (MDM) Source for Downstream Applications | by Manoj Kukreja | Feb, 2023

Image by Satheesh Sankaran from Pixabay

As per ACID rules, the theory of isolation states that “the intermediate state of any transaction should not affect other transactions”. Almost every modern database has been built to follow this rule. Unfortunately, until recently the same rule could not be effectively implemented in the big data world. What was the reason?

Modern distributed processing frameworks such as Hadoop MapReduce and Apache Spark perform computations in batches. After the computations are completed a set of output files is generated, and each file stores a collection of records. Usually, the number of partitions and reducers influences how many output files will be generated. But there are a few problems:

A minor record level change or new records addition (CDC) forces you to recompute the entire batch every time — a huge wastage of compute cycles impacting both cost and time.Downstream consumers may see inconsistent data during the time the batch is being recomputed

Image by author

The Delta Lake framework adds the notion of transaction management to Spark computing. By adding support for ACID transactions, schema enforcement, indexing, versioning and data pruning, Delta Lake aims at improving the reliability, quality, and performance of data.

In simplistic terms, using Delta Lake the entire batch does not need to be recomputed even though a few CDC records may have been added or changed. Instead, it provides functionality to INSERT, UPDATE, DELETE or MERGE data. Delta Lake works by selecting the files containing data that has changed, reading them in memory, and writing results in a new file.

Image by author

Delta Lake is widely used as the foundation for implementing the modern data Lakehouse architecture. Its core capabilities make it extremely suitable for merging data sets from diverse sources and varying schemas into a common data layer often referred to as the “single source of truth”. The “single source of truth” aka MDM layer is used to serve all analytical workloads including BI, data science, machine learning, and artificial intelligence.

In this article, we will try to extend our understanding of Delta Lake one step further. If Delta Lake can act as a sink for merging data from varying sources, why can’t we use it as a source for capturing change data (CDC) for downstream consumers? Even better if we can use the medallion architecture to achieve this. The medallion architecture can be used to merge CDC data from source systems into the bronze, silver, and gold layers of the data lakehouse.

Even better, we can capture changes and publish them to downstream consumers in a streaming fashion. Let us explore a use-case as under:

Hotel prices change several times during the dayAn e-commerce company is in the business of tracking the latest hotel prices around the world and displaying them on its web portal so that customers can book them based on real-time data.

Image by author

In this example, we will read real-time price information from three API sources using a producer program. The producer will send data as events in JSON format to Amazon Kinesis. We will then read these events in a Databricks notebook using structured streaming. Finally, the CDC events are transmitted to a relational database. The idea is that the relational database is used by the e-commerce portal to display the ever-evolving hotel prices on their e-commerce portal.

Performing Prerequisite Steps for Running CDF Notebook

The code for the example above is available at:

To run this code, you will need operational AWS and Databricks accounts. Before running the notebook in Databricks there are a few prerequisite steps that need t be performed on AWS:

Get access to the AWS access key using the link below. The access key (access key ID and secret access key) will be used as the credentials for the Databricks notebook to access AWS services like Amazon Kinesis. connected to the AWS portal click on the AWS cloud shell menu. Then run the commands below to create the prerequisite AWS resource required by this article:$ git clone <LINK>$ cd blogs/cdc-source$ sh pre-req

Image by author

Start the Producer that will read events from APIs and sends them to Amazon Kinesis.$ nohup python3 hotel-producer.py &

Image by author

Keep the AWS cloud shell session running. From here onward the producer will send events to Amazon Kinesis every 5 minutes.

Delta Lake change feed in Action!

Now that we have prerequisite resources created on AWS, we are ready run the CDC as a source notebook. Code in the Databricks notebook will read events from Amazon Kinesis, merge changes to the bronze layer, then perform cleanup and merge results to the silver layer. All of this will be accomplished in a streaming fashion, finally, the results (the change data feed) will be synced to an external relational database table. At this point, you need to be logged in to your Databricks account.

Preparing the Delta Lake as a Change Data Feed Source Notebook Environment

Import the delta-as-cdc-source-notebook.ipynb notebook in you Databricks workspace. To run the notebook, you will need to replace three variables (awsAccessKeyId, awsSecretKey and rdsConnectString) with values fetched from the previous section.

Image by author

Creating the Delta Tables in the Bronze Layer

We will start by reading events from Amazon Kinesis. By default, the Kinesis connector for structured streaming is included in Databricks runtime. You may have noticed previously that we are sending JSON events in the payload of the stream. In the example below we are reading events using structured streaming, applying the schema to JSON, extracting values from it, and finally saving results as a Delta table in the bronze layer. We have chosen Amazon S3 as our storage location for all Delta Tables.

Image by author

Notice that data in the bronze layer is the raw representation of events data, therefore we are adhering to a schema that matches the stream in its original form.

Curating Data in the Change Data Stream

From here onward the bronze layer table will keep adding new partitions based on data read from the Kinesis stream. It is a good practice to choose the timestamp as the partition column in the bronze layer. This helps easily identify the chronology of events as they are read from the source and play an important role if we need to replay events in the future.

In the next step, we are performing a few transformations to curate data such as changing Unix epoch time to date, changing data types, and splitting a field.

Image by author

We are ready to merge data into the silver layer now. But before that, we need to understand how CDC works in structured streaming. More importantly, how does the change log stream from the Delta layer gets published to downstream consumers?

Understanding the Flow of Delta as a Change Feed

Reference to the example below lets us understand the flow of change feed data with Delta Lake being the source. In structured streaming data gets processed in micro-batches. The implementation involves writing the change data feed concurrently to multiple tables also known as Idempotent writes as follows:

The silver layer table (hotels_silver) where records from each micro-batch are either inserted as a new record or merged into existing ones. Every change creates a new version of the delta table.A change log table (change_log) that stores the Key and batchId. View data in this table as an immutable log of changes over time.

In the example below, the bronze layer stream shows two records (highlighted in the image below) for the Mariott hotel in New York. Notice the variation of price between the two records over time. Time chronology wise when the first record was read from Kinesis at timestamp=022–02–16T21:06:57 it was assigned to batchId=2. Now if we join the record using the key from the change_log to the record in the hotels_silver table we can reconstruct the row back and send it as a CDC record for downstream consumers. In the example below, the same record was sent twice to the downstream consumer at different time intervals.

Image by author

Second time a change record at timestamp=022–02–16T21:07:41 it was assigned to batchId=3 and sent downstream. The downstream consumers can receive the CDC and keep its state up to date with the ongoing changes.

Implementing Delta as a Change Data Feed

With the understanding of the flow of data, let us deep dive into the actual implementation. The function below runs at the micro-batch level. For each micro-batch, this function performs Idempotent writes to the silver layer as well as the change record table.

This function is invoked using foreachBatch() operation that allows arbitrary operations and writing logic on the output of a streaming query. In the code below we are performing an idempotent write of the curated data stream to two tables concurrently.

Image by author

While the idempotent writes are going on, for every new micro-batch the change data is joined to the silver table to reconstruct the CDC record.

Image by author

The reconstructed CDC records can then be synced downstream. In the example below we are sending the CDC records to a relational data store.

Image by author

The relational data store receives the immutable CDC record stream and performs deduplication logic to show the latest record equivalent on their applications. Let’s check how that happens in the section below.

Checking Hotel Prices in Downstream Consumer

Now that we have the CDC stream pushed to a downstream consumer (a relational MySQL database in our case), let’s query a few records to see how the records are evolving. The CDC record stream from the Databricks notebook is being continuously pushed to the hotelcdcprices table. But this table holds all records including changes over time. Therefore, a view is created over the CDC table that ranks the change rows based on the timestamp.

Image by author

This view shows the equivalent of the latest prices for any hotel at any given time. This view can be used by the web application to display the latest prices on the portal.

Image by author

What are the typical use cases for Change Data Feed?

Here are some common use cases that can benefit from using Delta tables as a sink for merging CDC data from varying sources and sending it downstream to consumers for their use:

Read Change Data Feed and Merge to Silver Layer in a Streaming Fashion

Capture CDC from streaming data sources and merge micro-batches into the silver layer in a continuous fashion.

Perform Aggregations in Gold Layer without Recomputing the Entire Batch

Using only the change data from the silver layer aggregate corresponding rows in the gold layer without recomputing the entire batch.

Transparently Transmit Changes in Delta Tables to Downstream Consumers

Easily transmit changes to delta tables downstream to consumers such as relational databases and applications.

To conclude, using the change data feed feature in Delta tables you can not only make the process of CDC data collection and merging easier, but extend its usage to transmit change data downstream to relational databases, No-SQL databases, and other applications. These downstream applications can effectively use this CDC data for any purpose deemed necessary.

I hope this article was helpful. Delta Lake and Change Data Feed is covered as part of the AWS Big Data Analytics course offered by Datafence Cloud Academy. The course is taught online by myself on weekends.


Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button
Translate »