How to make a column non-nullable in Spark Structured Streaming
Kevin Wallimann
Posted on July 11, 2020
TLDR
Like this:
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col
dataFrame
.withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
Changing column nullability in Batch mode
For Spark in Batch mode, one way to change column nullability is by creating a new dataframe with a new schema that has the desired nullability.
val schema = dataframe.schema // modify [[StructField] with name `cn` val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m) case y: StructField => y }) // apply new schema df.sqlContext.createDataFrame( df.rdd, newSchema )
https://stackoverflow.com/a/33195510/13532243
However, this approach isn't supported for a structured streaming dataframe, which fails with the following error.
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
Make a column nullable in structured streaming
In the same stackoverflow thread, another answer provides a way how to make a non-nullable column nullable, which works for Structured Streaming queries.
dataframe.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
https://stackoverflow.com/a/46119565/13532243
This is a neat trick, since Spark has to account for the (hypothetical) fact that a value could be null and mark the column nullable, even though the column doesn't contain any null value in practice.
Make a column non-nullable in structured streaming
If you know that a nullable column in fact only contains non-nullable values, you may want to make that column non-nullable. Here's the trick with AssertNotNull
again:
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col
dataFrame
.withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
How does it work? Looking at its implementation https://github.com/apache/spark/blob/3fdfce3120f307147244e5eaf46d61419a723d50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L1591-L1628, the key is that AssertNotNull
overrides nullable
and always returns false
. That's how Spark determines this column to be non-nullable. Of course, if your column unexpectedly contains null values, the query will fail with a NullPointerException
.
Posted on July 11, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 29, 2024
November 29, 2024
November 28, 2024