How to make a column non-nullable in Spark Structured Streaming

kevinwallimann

Kevin Wallimann

Posted on July 11, 2020

How to make a column non-nullable in Spark Structured Streaming

TLDR

Like this:

import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col

dataFrame
  .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))

Changing column nullability in Batch mode

For Spark in Batch mode, one way to change column nullability is by creating a new dataframe with a new schema that has the desired nullability.

 val schema = dataframe.schema
 // modify [[StructField] with name `cn`
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) 
        =>  StructField( c, t, nullable = nullable, m)
   case y: StructField => y
 })
 // apply new schema
 df.sqlContext.createDataFrame( df.rdd, newSchema )

https://stackoverflow.com/a/33195510/13532243

However, this approach isn't supported for a structured streaming dataframe, which fails with the following error.

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)

Make a column nullable in structured streaming

In the same stackoverflow thread, another answer provides a way how to make a non-nullable column nullable, which works for Structured Streaming queries.

dataframe.withColumn("col_name", when(col("col_name").isNotNull,
  col("col_name")).otherwise(lit(null)))

https://stackoverflow.com/a/46119565/13532243

This is a neat trick, since Spark has to account for the (hypothetical) fact that a value could be null and mark the column nullable, even though the column doesn't contain any null value in practice.

Make a column non-nullable in structured streaming

If you know that a nullable column in fact only contains non-nullable values, you may want to make that column non-nullable. Here's the trick with AssertNotNull again:

import  org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col

dataFrame
  .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))

How does it work? Looking at its implementation https://github.com/apache/spark/blob/3fdfce3120f307147244e5eaf46d61419a723d50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L1591-L1628, the key is that AssertNotNull overrides nullable and always returns false. That's how Spark determines this column to be non-nullable. Of course, if your column unexpectedly contains null values, the query will fail with a NullPointerException.

πŸ’– πŸ’ͺ πŸ™… 🚩
kevinwallimann
Kevin Wallimann

Posted on July 11, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related