Apache Hudi on AWS Glue
Sagar Lakshmipathy
Posted on May 19, 2024
Have you wondered how to write Hudi tables (Scala) in AWS Glue?
Look no further.
Pre-requisites
- Create a Glue Database called
hudi_db
from theDatabases
underData Catalog
menu in the Glue Console
Let's pick the Apache Hudi Spark QuickStart guide to drive this example.
Configuring the job
- In Glue console, choose
ETL Jobs
then chooseScript Editor
- Now in the tabs above, choose
Job details
and inLanguage
chooseScala
- Feel free to make any infra changes as required.
- Click on
Advanced properties
and navigate toJob parameters
and add the below parameters one by one. Of course, change these variables as you prefer.-
--S3_OUTPUT_PATH
ass3://hudi-spark-quickstart/write-path/
-
--class
asGlueApp
-
--conf
asspark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
-
--datalake-formats
ashudi
-
Note: In this example, I'm using the default Hudi version - 0.12.0 - that comes with Glue 4.0. If you want to use a different Hudi version, you might have to add the jar to the class path by adding one more property
--extra-jars
and point to the S3 path of the Hudi JAR file.
On to the cool stuff now.
Scripting
Navigate to the Script
tab and add the below Scala code
Let's add the boiler plate imports
import com.amazonaws.services.glue.{GlueContext, DynamicFrame}
import com.amazonaws.services.glue.util.GlueArgParser
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import com.amazonaws.services.glue.log.GlueLogger
Add glue specific code, i.e. to parse the job parameters and to create a glueContext
object GlueApp {
def main(sysArgs: Array[String]) {
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "S3_OUTPUT_PATH").toArray)
val spark: SparkSession = SparkSession.builder().appName("AWS Glue Hudi Job").getOrCreate()
val glueContext: GlueContext = new GlueContext(spark.sparkContext)
val logger = new GlueLogger()
Prepping the data.
import spark.implicits._
val tableName = "trips"
val recordKeyColumn = "uuid"
val precombineKeyColumn = "ts"
val partitionKeyColumn = "city"
val s3OutputPath = args("S3_OUTPUT_PATH")
val glueDbName = "hudi_db"
val writePath = s"$s3OutputPath/$tableName"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
Add the options required by Hudi to write the table and sync it with Glue Database.
val hudiOptions = Map[String, String](
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.recordkey.field" -> recordKeyColumn,
"hoodie.datasource.write.precombine.field" -> precombineKeyColumn,
"hoodie.datasource.write.partitionpath.field" -> partitionKeyColumn,
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> glueDbName,
"hoodie.datasource.hive_sync.table" -> tableName,
"hoodie.datasource.hive_sync.partition_fields" -> partitionKeyColumn,
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms",
"path" -> writePath
)
Finally create the dataframe and write it to S3.
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write
.format("hudi")
.options(hudiOptions)
.mode("overwrite")
.save()
logger.info("Data successfully written to S3 using Hudi")
}
}
Querying
Now that we have written the table to S3, we can query this table from Athena.
SELECT * FROM "hudi_db"."trips" limit 10;
Posted on May 19, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.