It was a surprise to meet the following error when I ran a glue spark job to load raw data in AWS S3 downloaded from AWS RDS MySQL Database through AWS DMS:
org.apache.spark.sql.AnalysisException: Parquet type not supported: INT64 (UINT_64);
I did not expect it since I thought I was using all the services from AWS and AWS would look after the data type compatibility across the services, and obviously there is something that needs special attention.
What happened
I was building a data lake solution which ingests full data from a table in a MySQL Database using DMS to the S3 raw bucket in parquet format, and later uses a Glue job to load the ingested data to S3 curated bucket.
And the error happened when the Glue job ran the following code:
spark_df = spark.read.parquet( s3_raw_bucket_path )
After some investigation, I realized that spark engine does not support unsigned integer, and the table from which I extracted data from has a lot of columns with unsigned bigint(20) type. Later I found similiar errors on the internet as well, like below.
org.apache.spark.sql.AnalysisException: Parquet type not supported: INT32 (UINT_32);
There are many options to fix the issue. This artile talks about two of them.
Option 1 - Transform data types in the data migration job in DMS
DMS actually has some way to transform one source data type to a new one, though it is not easy to discover for the new DMS users.
Step 1, Switch to JSON editor in Table mapping section in Database migration task page
Step 2, Add the following items to the JSON structure
{
"rules": [
...
{
"rule-type": "transformation",
"rule-id": "5",
"rule-name": "uint32-to-int32",
"rule-action": "change-data-type",
"rule-target": "column",
"object-locator": {
"schema-name": "my_schema",
"table-name": "%",
"column-name": "%",
"data-type": "uint4"
},
"data-type": {
"type": "int4"
}
},
{
"rule-type": "transformation",
"rule-id": "6",
"rule-name": "unit64-to-int64",
"rule-action": "change-data-type",
"rule-target": "column",
"object-locator": {
"schema-name": "my_schema",
"table-name": "%",
"column-name": "%",
"data-type": "uint8"
},
"data-type": {
"type": "int8"
}
}
]}
Those new two rules will transform uint32 columns to int32, and uint64 to int64. More rules could be applied to deal with more unsigned integer types.
AWS DMS supports column data type transformations for the following DMS data types: "bytes", "date", "time", "datetime", "int1", "int2", "int4", "int8", "numeric", "real4", "real8", "string", "uint1", "uint2", "uint4", "uint8", "wstring", "blob", "nclob", "clob", "boolean", "set", "list" "map", "tuple"
Please refer to https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.html for more details. And DMS has different names of data types, for example, int64 is labelled as ‘int8’ in DMS.
After this transformation, the parquet files generated by DMS job will not have unsigned integers any more, and Glue Spark will feel more comfortable to load the parquet files to a spark dataframe.
Option 2 - Use AWS Athena to directly query on the raw data
The second option is to get rid of Glue job and its Spark engine, but totally rely on AWS Athena and its Presto engine. See the data flow below.
Step 1, Use Glue Crawler to build table schema in data catalog
Glue Crawler can still build schema for parquet files with unsigned integers, though the data type in data catalog would be signed.
What happened was Data Catalog, actually Hadoop Hive Metastore, supports only signed numeric type( all supported Hive data types are in this link). That is the reason why unsigned number in parquet has been intepreted as signed in data catalog by Glue Crawler.
Step 2, Use Athena to query directly on the table in data catalog
Once the table schema is available in data catalog, I can use Athena to query the raw data, very much stress-freely.
Step 3, (optional) Use CTAS query to write the query data back to the S3 curated bucket
If we still need write the data back to a S3 curated bucket, we can utilize ‘Creating a Table from Query Results’ (CTAS) to do it. And we can do varieties of data type conversion as we want in the CTAS query.
CREATE TABLE my_schema.my_new_table
WITH(
format='PARQUET',external_location='s3://curated_bucket/folder'
)AS
SELECT
...,
CAST(col1_uint64 AS double) as col1
FROM my_schema.my_table
All the magic here is the Presto engine, which AWS Athena is based on, is more tolarent to unsigned types in parquet files than Spark engine. However, I could not find any official document talking about this advantage.
Please let me know if you know something about it. Also please find Presto data types in link, and Spark data types in link if you feel interested in the difference of supported data types of those two engines.
Option 3 - Use a specific schema to read the parquet using Glue
The third option is most recommended for advanced Glue users, which is to convert uint data in parquet to be int type during the read process in Glue job.
The following example code is to convert the unsigned types inside the parquet file to be IntegerType, LongType or DoubleType.
from pyspark.sql.types import *
customSchema = StructType([
StructField("a", IntegerType(), True),
StructField("b", LongType(), True),
StructField("c", DoubleType(), True)])
df = spark.read.schema(customSchema).parquet("test.parquet")
This option will leave no changes at DMS side, but always requires an extra type conversion while reading data. It might be not friendly enough to those new Glue users.
At the end
The first two data flows are not supposed to be a sophisticated data lake solution. They are only demonstrated to showcase this ‘unsigned’ issue and difference of two engines.
I will write a few other artiles to talk about the complete AWS data lake solution I am building.
Thanks for your reading.