AWS Data Lake Solution based on Apache Hudi without requiring Database CDC

Fred Gu
13 min readNov 1, 2021

--

Targeted Audience : Solution Architect & Senior AWS Data Engineer

This post talks about an incremental load solution based on Apache Hudi (see [0] Apache Hudi Concepts), a storage management layer over Hadoop compatible storage. The new solution does not require change Data Capture (CDC) at the source database side, which is a big relief to some scenarios.

The new solution was originally proposed and prototyped by my friends from AWS, please see appreciation section at the end.

This post also talks a bit about what the writer likes and dislikes about this solution and the related tools, also the alternative frameworks.

Traditional Data Lake Solutions

Traditionally, AWS Data Lake solution could be designed and implemented with DMS + Glue, as long as the source database has CDC turned on.

Traditional AWS Data Lake

I have another post, specifically talking about this traditional data lake solutoin. https://fred-gu.medium.com/traditional-aws-data-lake-solution-3abe4b23d08b/.

Some Data Scenarios & Challenges

Usually the DMS + Glue solution could fit most of the database scenarios. However, it does NOT fit some.

Scenario 1: Aurora Serverless does not support CDC

If you are using MySQL as the source database, it could be strait-forward to turn on the binary log by following the steps mentioned in [4]Using an AWS-managed MySQL-compatible database as a source for AWS DMS for details.

But it does not work in the Aurora Serverless MySQL. See [5] Limitations on using a MySQL database as a source for AWS DMS and also the following quote.

You can use Aurora MySQL-Compatible Edition Serverless for full load, but you can’t use it for CDC. This is because you can’t enable the prerequisites for MySQL. For more information, see Parameter groups and Aurora Serverless v1.

Hence, it is technically infeasible to use DMS to perform incremental load from Aurora Serverless v1.

Scenario 2: Data in some database tables is extremely volatile

Sometimes, the data in some database tables is extremely volatile, for example, there is a price table which captures the real-time prices. And the values of the price column have been updated every 100 milliseconds by a sync application.

Since CDC is set up at database server level, we cannot avoid the database capturing the unnecessary binary logs for this ‘crazy’ price updating. So it is not a good idea to turn on CDC in this scenario either.

Scenario 3: Source Database is out of Data team’s control

It is also common to see that the source database is controled by either the engineering team or the 3rd party software provider. Hence, it could take ages to even make a conclusion on whether it is safe to turn on the CDC. Maybe some alternative solution without CDC should discussed.

Can we do CDC directly in AWS Glue?

Since DMS can no longer capture CDC from the databases under those scenarios, can we do CDC directly in a Glue job?

  • The answer would be ‘it is difficult’.

Why?

  • When new data comes, the Glue job is supposed to either insert new entries or update old entries to a data catalog table, or sometime delete.
  • A data catalog table does not have a primary key, so it is difficult to determine whether the incoming data is new or not, whether it should be an insert or update or delete.
  • Although it is still doable by comparing ‘expected’ key column between incoming data and the data catalog table data, but it will be ugly and difficult to implement and maintain.
  • This is the reason why it is difficult to do CDC directly in Glue jobs.

Both Data Catalog and Spark do not support Primary Key

Glue Data Catalog, actually Hive Metastore, is not designed to work as a rational database, but an index to the location, schema, and runtime metrics of your S3 data. It only supports partition key, but does not support primary key and foreign key. See [6] Defining Tables in the AWS Glue Data Catalog and [7] SQL features missing in Hive.

Similarly, Spark DataFrame cannot have primary key either, which actually makes sense since Spark engine is a calculation engine rather than a database.

So we need one 3rd party framework to help enable both the primary keys and insert/update/delete operations.

Yes, it is Apache Hudi

Apache Hudi [8]https://hudi.apache.org/, is an open-source storage management layer on top of Hadoop HDFS which is what AWS S3 bases on.

Apache Hudi Logo

Hudi provides primary key, full bulk load, upsert (insert + Update) load and deletion. Hudi can be integrated into AWS Glue, and be able to create/rewrite/append to data catalog tables by itself.

Another Data Lake Solution Based on Apache Hudi

Before everything, we just present the new data lake solution based on Apache Hudi with the below diagram.

AWS Data Lake Solution based on Apache Hudi

This new solution could be described with the following steps:

Step 1, run a DMS replication task to download full data from the source database. The downloaded data is written to a folder specifically for DMS.

If the source database has unsigned integers, remember to set type conversion in DMS task configuration. Please see my previous post as reference [9]Glue spark does not like Unsigned Int from MySQL.

Step 2, run a Glue job to run a SQL query to download the new data from the source database, and the downloaded data is written to a folder specifically for Glue.

2.1 Partially Download from a JDBC connection

There are not many options to send SQL query inside Glue, the easiest option would be taking advantage of the ‘dbtable’ parameter of spark.read. Please see the following code as an example.

yesterday_modify_ts = date.today() - timedelta(days=1)yesterday_df = spark.read.format("jdbc")
.option("url", mysql_url)
.option("user", mysql_username)
.option("password", mysql_password)
.option("dbtable", f"( select id, value, create_ts, modify_ts from table_name where modified_ts > '{yesterday_modify_ts}' ) as temp")
.load()

The example code is trying to load all data modified since yesterday.

It is common to see a Glue job hits errors on accessing to the source databases in a private subnet. A few things need done to enable the access:

  1. Enable network connection between Glue and source database. Please refer to [10] Connect to and run ETL jobs across multiple VPCs using a dedicated AWS Glue VPC for detailed actions.
  2. Setup a JDBC connection to the source database inside Glue console. Make sure that VPC, subnet and security group has been given correctly in the JDBC connection.
  3. Assign this JDBC connection to the Glue job, otherwise, the Glue job is running in a managed network, without accessing any private subnet.

2.2 Save data to Raw Bucket with Partitions

Eventually, the downloaded data is written to a ‘Glue’ folder inside raw bucket, with partitions on ‘modify_year’, ‘modify_month’ and ‘modify_day’.

yesterday_df = yesterday_df
.withColumn('modify_year', year(col('modify_ts')))
.withColumn('modify_month', month(col('modify_ts')))
.withColumn('modify_day', day(col('modify_ts')))
connection_s3_options = {
"path": f"s3://{raw_bucket}/Glue/{schema_name}/{table_name}",
"partitionKeys": ['modify_year', 'modify_month', 'modify_day']
}
glueContext.write_dynamic_frame.from_options(
frame = DynamicFrame.fromDF( yesterday_df, glueContext, "yesterday_df"),
connection_type = "s3",
connection_options = connection_s3_options,
format = "parquet"
)

The reason why DMS data and Glue data is put in different folders is that usually DMS tasks and SQL query in Glue jobs can produce different data types, and create_dynamic_frame() requires identical data types in each single execution otherwise it dumps.

It is important to have partitions defined. Partitions can bring big efficiency to the data loading in Step 3.

Step 3, Run a Glue job to load files from DMS folder and Glue folder sequentially to Dynamic Dataframes with bookmark enabled, and then ‘overwrite’ or ‘append’ dataframes to Hudi table.

3.1 Enable Hudi Driver

It actually includes dozens of steps to enable Hudi driver in a AWS Glue Job. This post is not intend to repeat what has been introduced in other posts. There are two methods to enable Hudi driver:

Method 1, use Hudi JAR files in Glue job.

Please refer to [11] Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift for details.

The jar files mentioned in the blog could be found in a public S3 bucket [12], feel free to directly download, it has been tested to work.

Jar Files in Glue Job

Method 2, use Glue Custom Connector for Hudi

The second method is to use Glue custom connector to build a Hudi environment. Please refer to [13] Writing to Apache Hudi tables using AWS Glue Custom Connector for detailed steps.

Both methods should work, and I personally perfer the first mothod, because the second one feels like ‘on trial’. I might consider developing one connector by myself.

3.2 Load new data from DMS and Glue folders, Sequentially

It is literally easy to load new data from raw bucket using create_dynamic_frame(), and the bookmark could help skip processed files but only load the new files. Please find the sample code below.

schema_name = 'schema_name'
table_name = 'table_name'
for tool in ['DMS', 'Glue']:
table_raw_path = "s3://{raw_bucket}/{tool}/{schema_name}/{table_name}"
bookmark = schema_name + '_' + table_name
new_data_dyf = glueContext.create_dynamic_frame_from_options(
connection_type='s3',
connection_options={
'paths': [table_raw_path],
'groupFiles': 'inPartition',
'recurse': True
},
format='parquet',
format_options={},
transformation_ctx=bookmark
)
# covert dynamicframe to spark dataframe
new_data_df = new_data_dyf.toDF()
if new_data_df.count() > 0:
# here, standardize data schema
# other complex transformation
if tool == 'DMS':
# here, overwrite the Hudi table
else:
# here, Upsert to Hudi table

Please refer to [14] Process data with varying data ingestion frequencies using AWS Glue job bookmarks for more details about how bookmark works.

After the data is loaded to a dynamic frame, it is better to convert it to a spark dataframe for schema standardization and other complex transformation processes.

3.3 Standardize the Data Schema

There are many ways to standardize the data schema. Here is my example and it works well.

new_schema = { 
'id': IntegerType(),
'value': DoubleType(),
'create_ts':TimestampType(),
'modify_ts':TimestampType()
}
def convert_spark_df_schema( old_df ):
selected_columns = [column for column in new_schema.keys()]
new_df = old_df.select( selected_columns )
for key in selected_columns :
new_df = new_df.withColumn(key, col(key).cast( new_schema[key] ) )
return new_df
...
# here, standardize data schema
new_data_df = convert_spark_df_schema( new_data_df )

3.4 Overwrite to Hudi Table

Please refer to [11] for the detailed example of ‘overwriting’. A sample code is given below for a quick read.

targetPath="s3://{cleaned_bucket}/{schema_name}/{table_name}"commonConfig = {
'className' : 'org.apache.hudi',
'hoodie.datasource.write.recordkey.field' : primaryKey,
'hoodie.table.name' : tableName,
'hoodie.consistency.check.enabled' : 'true',
'hoodie.datasource.hive_sync.enable' : 'true',
'hoodie.datasource.hive_sync.table' : tableName,
'hoodie.datasource.hive_sync.database' : dbName,
'hoodie.datasource.hive_sync.use_jdbc' : 'false',
'hive.metastore.schema.verification' : 'true',
'path' : targetPath
}
partitionDataConfig = {
'hoodie.datasource.write.partitionpath.field' : partitionKey,
'hoodie.datasource.hive_sync.partition_extractor_class' :'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields' : partitionKey
}
initLoadConfig = {
'hoodie.bulkinsert.shuffle.parallelism' : 3,
'hoodie.datasource.write.operation' :'bulk_insert'
}
combinedConf = {
**commonConfig,
**partitionDataConfig,
**initLoadConfig
}
new_data_df.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)

After the execution, one table will be created or re-created in Glue Data Catalog ( Hive Metadata ) automatically, without Glue Crawlers’ involvement. The new Data Catalog table is mapped to data inside the S3 path ‘targetPath’. This S3 target path by Hudi is not tuiable for being crawlered.

A few important parameters need be given inside combinedConf:

  • hoodie.datasource.write.storage.type, which determines the storage type of the Hudi table, either COPY_ON_WRITE or MERGE_ON_READ.
  • hoodie.datasource.write.recordkey.field, which nominates the primary key of the Hudi table
  • hoodie.table.name, table name to register in Glue Data Catalog/Hive Metadata
  • hoodie.datasource.hive_sync.database, database name to which the new table will registered to Glue Data Catalog/Hive Metadata
  • hoodie.datasource.write.partitionpath.field, partition path field, which will enable partitions in S3 path
  • hoodie.datasource.write.operation, this important field decides whether it will be a ‘bulk_insert’ operation or an ‘upsert’ operation.

3.5 Upsert to Hudi Table

Please refer to [11] for the detailed example of ‘Upserting/Appending’. A sample code is given below for a quick read.

targetPath="s3://{cleaned_bucket}/{schema_name}/{table_name}"commonConfig = {
'className' : 'org.apache.hudi',
'hoodie.datasource.write.recordkey.field' : primaryKey,
'hoodie.table.name' : tableName,
'hoodie.consistency.check.enabled' : 'true',
'hoodie.datasource.hive_sync.enable' : 'true',
'hoodie.datasource.hive_sync.table' : tableName,
'hoodie.datasource.hive_sync.database' : dbName,
'hoodie.datasource.hive_sync.use_jdbc' : 'false',
'hive.metastore.schema.verification' : 'true',
'path' : targetPath
}
partitionDataConfig = {
'hoodie.datasource.write.partitionpath.field' : partitionKey,
'hoodie.datasource.hive_sync.partition_extractor_class' :'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields' : partitionKey
}
incrementalConfig = {
'hoodie.upsert.shuffle.parallelism' : 20,
'hoodie.datasource.write.operation' : 'upsert',
'hoodie.cleaner.policy' :'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained' : 10
}
combinedConf = {
**commonConfig,
**partitionDataConfig,
**incrementalConfig
}
new_data_df.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)

Step 4, Check whether Hudi table appears in Glue Data Catalog or not after ‘overwrite’

There can be many reasons which lead to the failure of table creation in Glue Data Catalog by Hudi. Please leave your errors in the comments if you want to get some help.

Step 5, Run a Glue job to Perform Complex Transformation Consuming data from Hudi tables, and save data to S3 in a normal way

Since now, we shall have Hudi tables appearing in Glue Data Catalog. And how can we consume their data and build more complex transformation logic?

Can we use create_dynamic_frame() to load data from Hudi table directly just like how we load from normal Data Catalog table?

allDataDyf = glueContext.create_dynamic_frame.from_catalog(
database=schema_name,
table_name=table_name)

The answer is NO. The default driver of the function create_dynamic_frame() is for S3, and the Hudi driver is required to load dynamic frame from Hudi table.

The simplest approach is to use the following code to query the snapshot of the Hudi table.

cleanedPath="s3://{cleaned_bucket}/{schema_name}/{table_name}"
snapshot_df = spark.read.format('org.apache.hudi').load(cleanedPath)

There are other ways to read data from Hudi table, please refer to [15]Hudi Spark Datasource about Snapshot query, Incremental query and SQL Query in Spark SQL.

After all the transformations are done, the final reporting data can be written to the Curated bucket.

Step 6, Crawler the S3 data in Curated bucket and build Athena views and further Quicksight dashboard

This step and the following ones are just usual routine data lake processes.

What I like

1, Full Load by AWS DMS is unbelievably efficient

Don’t try to beat it by coding by yourself, because it is a waste of time and I knew it :-)

2, Glue Job Bookmark is a big relief to file management

Years ago, the data lake solutions usually archived the processed data/files to somewhere else, and moved them back if re-processing is needed. It was very much a pain to do this kind of file management.

With Glue Job Bookmark, all the landed files can safely retain in the landing bucket and only the newly added/modified files would be picked up and processed.

The rewinding functionality of a job bookmark, released in late 2017, made it possible to start over easily after any change happens in the job logic.

See [16]Load data incrementally and optimized Parquet writer with AWS Glue.

3, Hudi massively eases the unsert process

As we addressed in the previous chapter, the biggest challenge which our data scenarios bring is the Data Catalog / Spark does not have primary key, which makes it very difficult to do unsert operations. And Apache Hudi exactly fills up the gap.

With Hudi, we can append whatever incoming data to the Hudi table, and Hudi will make sure the data integrity of the single source of truth.

Where I see potential improvements from

During the solution implementation, I do see there are some potential improvements out of this solution and the AWS products themselves.

1, No official interface to send SQL queries to JDBC connection inside a Glue job

The offical Glue document mentions that DynamicFrameWriter has from_jdbc_conf() function to point to a specific JDBC connection. Surprisingly, DynamicFrameReader does not. See [17] Glue DynamicFrameWriter.

It is just annoying to switch back to spark.read() to send SQL queries instead of sticking to the offical DynamicFrame interface.

2, Why not Data Catalog supporting primary key natively?

It is a good wish.

The reason of it is that this Hudi solution is still a workaround under the current constraints. Some official offering from AWS is expected to support the ‘unsert’ operations.

3, Glue supports concurrency, badly

You can execute a Glue job concurrently after you set the ‘max concurrency’ higher.

Max concurrency in Glue Job Edit

It usually works until you want to pass table name as a parameter to a Glue job, and you want to use Glue Workflow to execute the job against multiple tables concurrently. The bookmark will be messed up by this kind of concurrency.

4, Bad user experience in Glue Studio and Glue Workflow

The Glue console now has classic version and studio version, and their user experiences are different and broken. For example,

  • how to stop a running job in Glue Studio is still mysterious to me
  • the ‘drag and drop’ experience in Glue Studio can be improved if I compare it with Azure Data Factory
  • how to rename/folder/categorize the jobs in both classic Glue and Glue Studio is not clear
  • the button layout across all the Glue pages varies a lot

Similiar problems happen in Glue Workflow as well. Also it is not a pleasant user experience to have to click more than 4 times to view the status of the current running workflow.

Having said that, Glue is still a very reliable and powerful ETL tool for data lake for sure, and actually one of my favourites.

Delta Lake & Iceberg, alternatives to Apache Hudi

One alternative framework needs extra attention, called Delta Lake. There are already some early posts talking about how to enable Delta Lake in Glue jobs. Check [18] UPSERTS and DELETES using AWS Glue and Delta Lake.

Moreover, there is the 3rd framework, called Apache Iceberg. Please check the comparsion between those three in [19] Hudi, Iceberg and Delta Lake: Data Lake Table Formats Compared.

At the end

This post talks about a DMS + Glue + Hudi solution which tries to answer the challenge of the missing CDC at source database side. Hope this post help somebody who is troubled by the similiar data scenarios.

Please feel free to leave your comments and I will try to help if I can.

Appreciation

This solution was proposed and protyped by my friend, Ai-Linh Le and Rachel Mui from AWS, and they helped and supported the whole design and implmentation process. A big appreciation to Ai-Linh and Rachel for their efforts and sharing.

Appendix

[0] Apache Hudi Concepts

[1] Extract, Transform and Load data into S3 data lake using CTAS and INSERT INTO statements in Amazon Athena

[2] Load ongoing data lake changes with AWS DMS and AWS Glue

[3] AWS serverless data analytics pipeline reference architecture

[4]Using an AWS-managed MySQL-compatible database as a source for AWS DMS

[5] Limitations on using a MySQL database as a source for AWS DMS

[6] Defining Tables in the AWS Glue Data Catalog

[7] SQL features missing in Hive

[8] https://hudi.apache.org/

[9] Glue spark does not like Unsigned Int from MySQL

[10] Connect to and run ETL jobs across multiple VPCs using a dedicated AWS Glue VPC

[11] Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift

[12] Hudi Jar Files available for download

[13] Writing to Apache Hudi tables using AWS Glue Custom Connector

[14] Process data with varying data ingestion frequencies using AWS Glue job bookmarks

[15]Hudi Spark Datasource

[16]Load data incrementally and optimized Parquet writer with AWS Glue

[17] Glue DynamicFrameWriter

[18] UPSERTS and DELETES using AWS Glue and Delta Lake

[19] Hudi, Iceberg and Delta Lake: Data Lake Table Formats Compared

--

--

Fred Gu
Fred Gu

Written by Fred Gu

Solution Architect, Data Scientist, Full-Stack Developer, Mobile App Maker, Consultant, Project Manager, Product Owner, A Thinker, Doer and Top-performer

Responses (2)