Daniel Westheide
Posted on November 9, 2017
This article was originally posted on Daniel Westheide's blog.
For the past 15 months, I have been working on a new library on and off. So far, I have been mostly silent about it, because I didn't feel like it was ready for a wider audience to use – even though we had been using it successfully in production for a while. However, since I broke my silence as long ago as April this year, when I did a talk about it at this year's ScalarConf in Warsaw, a blog post is overdue in which I explain what this library does and why I set out to write it in the first place.
Last year, I was involved in a project that required my team to implement a few Spark applications. For most of them, the business logic was rather complex, so we tried to implement this business logic in a test-driven way, using property-driven tests.
The pain of unit-testing Spark applications
At first glance, it looks like this is a great match. When it comes down to it, a Spark application consists of IO stages (reading from and writing to data sources) and transformations of data sets. The latter constitute our business logic and are relatively easy to separate from the IO parts. They are mostly built from pure functions. Functions like these are usually a perfect fit for test-driven development as well as for property-based testing.
However, all was not great. It may be old news to you if you have been working with Apache Spark for a while, but it turns out that writing real unit tests is not actually supported that well by Spark, and as a result, it can be quite painful. The thing is that in order to create an RDD
, we always need a SparkContext
, and the most light-weight mechanism for getting one is to create a local SparkContext
. Creating a local SparkContext
means that we start up a server, which takes a few seconds, and testing our properties with lots of different generated input data takes a really long time. Most certainly, we are losing the fast feedback loop we are used to from developing web applications, for example.
Abstracting over RDDs with kontextfrei
Now, we could confine ourselves to only unit-testing the functions that we pass to RDD
operators, so that our unit tests do not have any dependency on Spark and can be verified as quickly as we are used to. However, this leaves quite a lot of business logic uncovered. Instead, at a Scala hackathon last May, I started to experiment with the idea of abstracting over Spark's RDD
, and kontextfrei was born.
The idea is the following: By abstracting over RDD
, we can write business logic that has no dependency on the RDD
type. This means that we can also write test properties that are Spark-agnostic. Any Spark-agnostic code like this can either be executed on an RDD
(which you would do in your actual Spark application and in your integration tests), or on a local and fast Scala collection (which is really great for unit tests that you continously run locally during development).
Obtaining the library
It's probably easier to show how this works than to describe it with words alone, so let's look at a really minimalistic example, the traditional word count. First, we need to add the necessary dependencies to our SBT build file. Kontextfrei consists of two different modules, kontextfrei-core
and kontextfrei-scalatest
. The former is what you need to abstract over RDD
in your main code base, the former to get some additional support for writing your RDD-independent tests using ScalaTest with ScalaCheck. Let's add them to our build.sbt
file, together with the usual
Spark dependency you would need anyway:
resolvers += "dwestheide" at "https://dl.bintray.com/dwestheide/maven"
libraryDependencies += "com.danielwestheide" %% "kontextfrei-core-spark-2.2.0" % "0.6.0"
libraryDependencies += "com.danielwestheide" %% "kontextfrei-scalatest-spark-2.2.0" % "0.6.0" % "test,it"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
Please note that in this simple example, we create a Spark application that you can execute in a self-contained way. In the real world, you would add spark-core
as a provided
dependency and create an assembly JAR that you pass to spark-submit
.
Implementing the business logic
Now, let's see how we can implement the business logic of our word count application using kontextfrei. In our example, we define all of our business logic in a trait called WordCount
:
package com.danielwestheide.kontextfrei.wordcount
import com.danielwestheide.kontextfrei.DCollectionOps
import com.danielwestheide.kontextfrei.syntax.SyntaxSupport
trait WordCount extends SyntaxSupport {
def counts[F[_]: DCollectionOps](text: F[String]): F[(String, Long)] =
text
.flatMap(line => line.split(" "))
.map(word => (word, 1L))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
def formatted[F[_]: DCollectionOps](counts: F[(String, Long)]): F[String] =
counts.map {
case (word, count) => s"$word,$count"
}
}
The first thing you'll notice is that the implementations of counts
and formatted
look exactly the same as they would if you were programming against Spark's RDD
type. You could literally copy and paste RDD
-based code into a program written with kontextfrei.
The second thing you notice is that the method signatures of counts
and formatted
contain a type constructor, declared as F[_]
, which is constrained by a context bound: For any concrete type constructor we pass in here, there must be an instance of kontextfrei's DCollectionOps
typeclass. In our business logic, we do not care what concrete type constructor is used for F
, as long as the operations defined in DCollectionOps
are supported for it. This way, we are liberating our business logic from any dependency on Spark, and specifically on the annoying SparkContext
.
In order to be able to use the familiar syntax we know from the RDD
type, we mix in kontextfrei's SyntaxSupport
trait, but you could just as well use an import instead, if that's more to your liking.
Plugging our business logic into the Spark application
At the end of the day, we want to be able to have a runnable Spark application. In order to achieve that, we must plug our Spark-agnostic business logic together with the Spark-dependent IO parts of our application. Here is what this looks like in our word count example:
package com.danielwestheide.kontextfrei.wordcount
import com.danielwestheide.kontextfrei.rdd.RDDOpsSupport
import org.apache.spark.SparkContext
object Main extends App with WordCount with RDDOpsSupport {
implicit val sparkContext: SparkContext =
new SparkContext("local[1]", "word-count")
val inputFilePath = args(0)
val outputFilePath = args(1)
try {
val textFile = sparkContext.textFile(inputFilePath, minPartitions = 2)
val wordCounts = counts(textFile)
formatted(wordCounts).saveAsTextFile(outputFilePath)
} finally {
sparkContext.stop()
}
}
Our Main
object mixes in our WordCount
trait as well as kontextfrei's RDDOpsSupport
, which proves to the compiler that we have an instance of the DCollectionOps
typeclass for the RDD
type constructor. In order to prove this, we also need an implicit SparkContext
. Again, instead of mixing in this trait, we can also use an import.
Now, our Main
object is all about doing some IO and integrating our business logi into it.
Writing Spark-agnostic tests
So far so good. We have liberated our business logic from any dependency on Spark, but what do we gain from this? Well, now we are able to write our unit tests in a Spark-agnostic way as well. First, we define a BaseSpec
which inherits from kontextfrei's KontextfreiSpec
and mixes in a few other goodies from kontextfrei-scalatest and from ScalaTest itself:
package com.danielwestheide.kontextfrei.wordcount
import com.danielwestheide.kontextfrei.scalatest.KontextfreiSpec
import com.danielwestheide.kontextfrei.syntax.DistributionSyntaxSupport
import org.scalactic.anyvals.PosInt
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.scalatest.{MustMatchers, PropSpecLike}
trait BaseSpec[F[_]]
extends KontextfreiSpec[F]
with DistributionSyntaxSupport
with PropSpecLike
with GeneratorDrivenPropertyChecks
with MustMatchers {
implicit val config: PropertyCheckConfiguration =
PropertyCheckConfiguration(minSuccessful = PosInt(100))
}
BaseSpec
, like our WordCount
trait, takes a type constructor, which it simply passes along to the KontextfreiSpec
trait. We will get back to that one in a minute.
Our actual test properties can now be implemented for any type constructor F[_]
for which there is an instance of DCollectionOps
. We define them in a trait WordCountProperties
, which also has to be parameterized by a type constructor:
package com.danielwestheide.kontextfrei.wordcount
trait WordCountProperties[F[_]] extends BaseSpec[F] with WordCount {
import collection.immutable._
property("sums word counts across lines") {
forAll { (wordA: String) =>
whenever(wordA.nonEmpty) {
val wordB = wordA.reverse + wordA
val result =
counts(Seq(s"$wordB $wordA $wordB", wordB).distributed)
.collectAsMap()
assert(result(wordB) === 3)
}
}
}
property("does not have duplicate keys") {
forAll { (wordA: String) =>
whenever(wordA.nonEmpty) {
val wordB = wordA.reverse + wordA
val result =
counts(Seq(s"$wordA $wordB", s"$wordB $wordA").distributed)
assert(
result.keys.distinct().collect().toList === result.keys
.collect()
.toList)
}
}
}
}
We want to be able to test our Spark-agnostic properties both against fast Scala collections as well as against RDD
s in a local Spark cluster. To get there, we will need to define two test classes, one in the test
sources directory, the other one in the it
sources directory. Here is the unit test:
package com.danielwestheide.kontextfrei.wordcount
import com.danielwestheide.kontextfrei.scalatest.StreamSpec
class WordCountSpec extends BaseSpec[Stream]
with StreamSpec
with WordCountProperties[Stream]
We mix in BaseSpec
and pass it the Stream
type constructor. Stream
has the same shape as RDD
, but it is a Scala collection. The KontextfreiSpec
trait extended by BaseSpec
defines an abstract implicit DCollectionOps
for its type constructor. By mixing in StreamSpec
, we get an instance of DCollectionOps
for Stream
. When we implement our business logic, we can run the WordCountSpec
test and get instantaneous feedback. We can use SBT's triggered execution and have it run our unit tests upon every detected source change, using ~test
, and it will be really fast.
In order to make sure that none of the typical bugs that you would only notice in a Spark cluster have sneaked in, we also define an integration test, which tests exactly the same properties:
package com.danielwestheide.kontextfrei.wordcount
import com.danielwestheide.kontextfrei.scalatest.RDDSpec
import org.apache.spark.rdd.RDD
class WordCountIntegrationSpec extends BaseSpec[RDD]
with RDDSpec
with WordCountProperties[RDD]
This time, we mix in RDDSpec
because we pass parameterize BaseSpec
with the RDD
type constructor.
Design goals
It was an explicit design goal to stick to the existing Spark API as closely as possible, allowing people with existing Spark code bases to switch to kontextfrei as smoothly as possible, or even to migrate parts of their application without too much hassle, with the benefit of now being able to cover their business logic with missing tests without the usual pain.
An alternative to this, of course, would have been to build this library based on the ever popular interpreter pattern. To be honest, I wish Spark itself was using this pattern – other libraries like Apache Crunch have shown successfully that this can help tremendously with enabling developers to write tests for the business logic of their applications. If Spark was built on those very principles, there wouldn't ne any reason for kontextfrei to exist at all.
Limitations
kontextfrei is still a young library, and while we have been using it in production in one project, I do not know of any other adopters. One if its limitations is that it doesn't yet support all operations defined on the RDD
type – but we are getting closer. In addition, I have yet to find a clever way to support broadcast variables and accumulators. And of course, who is using RDD
s anyway in 2017? While I do think that there is still room for RDD
-based Spark applications, I am aware that many people have long moved on to Dataset
s and to Spark Streaming. It would be nice to create a similar typeclass-based abstraction for datasets and for streaming applications, but I haven't had the time to look deeper into what would be necessary to implement either of those.
Summary
kontextfrei is a Scala library that aims to provide developers with a faster feedback loop when developing Apache Spark applications. To achieve that, it enables you to write the business logic of your Spark application, as well as your test code, against an abstraction over Spark’s RDD.
I would love to hear your thoughts on this approach. Do you think it's worth it defining the biggest typeclass ever and reimplementing the RDD
logic for Scala collections for test purposes? Please, if this looks interesting, do try it out. I am always interested in feedback and in contributions of all kind.
Links
Posted on November 9, 2017
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.