Writing Spark: Scala Vs Java

stylestrip

Ryan

Posted on February 27, 2020

Writing Spark: Scala Vs Java

Background

I joined a team in early April of 2019. They were writing Spark jobs to do a series of different things in Scala. At that time, I only knew Java, very little Scala and barely anything about Spark. Fast forward to today and now I have a few months of experience in Scala and Spark.

Very recently, I was put on a project that scans an HBase table, does a few things to the data, and writes it to another HBase table. Easy peasy in Scala, right? A few weeks later, I've delivered my first pass of the job. Red flags go flying. It was written in Scala, not Java! Scala hasn't been accepted as a language the company wants to "invest" in yet. Barely anyone knows the language and can't support the job once it is handed off. I need to re-write the job in Java and so this leads me to this very blog post.

The point of this blog post is to record my trials and tribulations of writing the Spark job in Java. While, yes, you can absolutely write a Spark job in Java, you should also look how much LESS you have to write if you can use Scala instead. This post can also be used as ammunition in your argument for why your company should invest in Scala if you are using Spark in any capacity. Also, full disclosure, there may absolutely be BETTER ways of how to write the following Java code when comparing it to Scala. So, if you see something poorly done, feel free to leave a comment and educate me on a better way.

Now, let's get into the meat of the post...

Why You Should Use Scala for Your Spark Jobs

SPARK

Right off the bat, if your company is using Spark, they should be able to support using Scala. Spark was written in Scala. If you are looking forward to a new feature in Spark, there is a chance you are going to get that feature first in Scala and later in other languages.

The Spark Shell is another big reason. The Spark Shell is basically a Scala REPL that lets you interact with the Spark API. Do you know that you have to scan an HBase table but can't quite think of the correct syntax to do it? Open up the Spark Shell, type in some Scala, and you will find out pretty quick whether or not you are on the right track.

With Java, there isn't an easy way. In my experience, it was poke around on the Internet for an answer, write it, package the code up in a JAR, transfer it to where I can upload it to our cluster, and run it (hoping for the best).

Coding

Alright, enough of the overall details. Let's dig into some code and where I think you'll run into the most pain while writing Spark jobs in Java.

DataFrames & DataSets

DataFrames are data organized into named columns. They are immutable, distributed collection of data.

DataSets are an extension of the DataFrame API. It provides the type-safe, OOP interface.

More about the two can be found here.

So, let's say you just scanned an HBase table and you have an object of the table rows called rows. You're trying to get specific data out of that table (based on a family and qualifier) and put it in a DataSet.
How would you do that?

Scala

val rowsDS = 
  rows.map(kv => (Bytes.toString(kv._1.get()),
    Bytes.toString(kv._2.getValue(Bytes.toBytes("family"), Bytes.toBytes("qualifier"))))
  ).toDS()
Enter fullscreen mode Exit fullscreen mode

Like magic, the .toDS() method is all you need.

What about Java?

Java, assuming you have a JavaPairRDD called rowRDD set up already with your scan results. spark variable here is the SparkSession needed to set up your job.

Dataset<Row> rowsDS = spark.createDataFrame(rowRDD, MyRowSchema.class);
Enter fullscreen mode Exit fullscreen mode

Cool! One line as well! Wait, what's that second parameter? Why do you need a class? Well, that is needed to apply a specific schema to your RDD in order to convert it into a DataSet. Remember above when I said that DataSets are type safe? So, there's an extra step needed like mapping your rowRDD results into an object of type MyRowSchema.

JavaRDD<MyRowSchema> javaRDD = rowRDD.map((Function<Tuple2<ImmutableBytesWritable, Result>, String>) tuple -> {
Result result = tuple._2;

return new MyRowSchema(Bytes.toString(result.getValue(Bytes.toBytes("family"), Bytes.toBytes("qualifier"))));

});

Dataset<Row> rowsDS = spark.createDataFrame(javaRDD, MyRowSchema.class);
Enter fullscreen mode Exit fullscreen mode

Due to the Java boilerplate, there's a lot more code to write and a few more hoops to get through via createDataFrame in Java than using the simple .toDS() method you get in Scala.

Transformations

Transformations are a cool way for chaining custom transformations, like adding a column of data to a DataSet. See this awesome blog post for more details on Transformations.

Say you want to get all people in a data frame whose name is "Ryan" and their respective ages...
Scala has you define your methods first and then chain them together.

def retrieveAllRyansAge()(df: DataFrame): DataFrame = {
  df.filter(col("name").startsWith("Ryan"))
    .select("name", "age")
}
Enter fullscreen mode Exit fullscreen mode
val ryansAgeDF = peoplesAgeDF
  .transform(RyanTransformer.retrieveAllRyansAge)
Enter fullscreen mode Exit fullscreen mode

And you are done!

If you have been following along so far, you can guess the Java equivalent is going to be a bit longer...

public static Function1<Dataset<Row>, Dataset<Row>> retrieveAllRyansAge = new AbstractFunction1<Dataset<Row>, Dataset<Row>>() {
    @Override
    public Dataset<Row> apply(Dataset<Row> peopleDS) {
        return retrieveRyansWithAge(peopleDS);
    }
};

private static Dataset<Row> retrieveRyansWithAge(Dataset<Row> df) {
    return df.filter(col("name").startsWith("Ryan"))
            .select("name", "age");
}
Enter fullscreen mode Exit fullscreen mode

Granted, I did add an extra method to separate the actual work so you could theoretically make the above one method.

And now the actual transform

Dataset<Row> ryansAgeDF = peoplesAgeDF
        .transform(RyanTransformer.retrieveAllRyansAge);
Enter fullscreen mode Exit fullscreen mode
User Defined Functions

Another handy item for writing Spark jobs are User Defined Functions (UDFs). These are functions written by the user, as you could guess by the name, at times where built-in functions are not capable of doing what is needed.

For example, if you need to convert from the 3-letter language code to the 2-letter language code e.g. "eng" to "en". You could write something like the below in Scala.

def getFormattedLanguage(lang: String): String = {

  Option(lang).map({
    l =>
      l.replaceAll(" ", "").toLowerCase match {
        case "eng" => "en"
        case "fre" => "fr"
        case _ => "en"
      }
  }).getOrElse("en")
}

val getFormattedLanguageUDF = udf[String, String] (getFormattedLanguage)

val formattedLanguages = languagesDF.withColumn("formattedLanguage", LanguageFormatter.getFormattedLanguageUDF(col("language")))
Enter fullscreen mode Exit fullscreen mode

There's a special way of calling UDFs in Java.
Something along the lines of:

spark.udf().register(UDF_FORMATTED_LANG, (UDF1<String, String>) LanguageFormatter::getFormattedLanguage, DataTypes.StringType);

Dataset<Row> formattedLanguages = languagesDF.withColumn("formattedLanguage", callUDF(UDF_FORMATTED_LANG, col("language")));
Enter fullscreen mode Exit fullscreen mode

Where UDF_FORMATTED_LANG is a String with the name of the UDF.

Unit Testing

The last topic I wanted to cover which is also near and dear to my heart is unit testing the code you write for your Spark job whether it be in Java or Scala.

Taking the example above I wrote for transformations, if I wanted to grab all Ryans and their ages from a DataFrame and count how many I had in a test, it would appear like below:

trait SparkSessionTestWrapper {
  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local")
      .appName("spark test example")
      .getOrCreate()
  }
}
Enter fullscreen mode Exit fullscreen mode

The trait above is needed for the below class:

class NameAndAgeExtractorTest extends FunSuite with BeforeAndAfter with SparkSessionTestWrapper {

  test("Ryan retrieval returns empty data frame when no Ryans 
  found") {
   val sourceDF = Seq(
      ("Bryan", "21", "M")
    ).toDF("name", "age", "gender")

   val actualDF = 
     sourceDF.transform(RyanTransformer.retrieveAllRyansAge())

   assert(actualDF.count() == 0)
  }

}
Enter fullscreen mode Exit fullscreen mode

Notice how I was able to create a DataFrame in a few lines of code? And then able to call the transform method with the method I wanted to test and get my answer in two other lines of code?

Now, onto Java...

class NameAndAgeExtractorTest {

@BeforeEach
void setup() {
    spark = SparkSession.builder()
             .appName("Tests")
             .master("local")
             .getOrCreate();

    javaSparkContext = new 
             JavaSparkContext(spark.sparkContext());

    namesSchema = DataTypes.createStructType(new StructField[]{
            DataTypes.createStructField("name", 
              DataTypes.StringType, false),
            DataTypes.createStructField("age", 
              DataTypes.StringType, false),
            DataTypes.createStructField("gender", 
              DataTypes.StringType, false)
    });
}
}
Enter fullscreen mode Exit fullscreen mode

Here, we have just the setup method. We have to define a schema for our DataFrame which will be referred to in the below test. Unlike, Scala where we can dynamically create a DataFrame without a schema, Java demands we keep to types and so creates more code we have to write to test one little method that interacts with a DataFrame.

@Test
void retrieveAllRyans_returns_empty_dataset_when_no_Ryans_found() {
    List<String[]> people = new ArrayList<>();
    people.add(new String[]{"Bryan", "21", "M"});

    JavaRDD<Row> rowRDD = javaSparkContext.parallelize(people).map(RowFactory::create);

    Dataset<Row> df = spark.createDataFrame(rowRDD, namesSchema);

    Dataset<Row> resultDf = df.transform(RyanTransformer.retrieveAllRyansAge);

    Assertions.assertEquals(0L, resultDf.count());
}
Enter fullscreen mode Exit fullscreen mode

There is a few more steps we have to go through before testing our method. One, create a list and add items to it. Two, create a JavaRDD to conform to what Java needs. Finally, we can create a DataFrame and now use our method and confirm our result.

Now, this is for only one item and for one DataFrame. Imagine the code we would need if we want more than one item and what would happen if we needed different kinds of DataFrames. You are looking at two lines of code for Scala compared to tens of lines in Java.

Conclusion

Looking back at the topics I covered:

  1. DataFrames and DataSets
  2. Transformations
  3. User Defined Functions
  4. Unit Testing

These topics are what I consider to be very important when writing Spark jobs. If you take away anything from this blog post, I hope you stop and think about the direction of your future Spark jobs. Again, you can absolutely write them in Java (and write them well). But the extra time you will take writing the boilerplate and some of the complexity of the hoops you will have to jump through when Scala offers it to you in less lines is going to cost you in the long run.

Thanks for reading!

💖 💪 🙅 🚩
stylestrip
Ryan

Posted on February 27, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Apache Spark Unit Testing Strategies
scala Apache Spark Unit Testing Strategies

February 28, 2022

Writing Spark: Scala Vs Java
spark Writing Spark: Scala Vs Java

February 27, 2020