10 days with the ZIO 2/10

gurghet

Andrea

Posted on January 27, 2020

10 days with the ZIO 2/10

This miniseries aims at teaching how to use the ZIO at a very relaxed pace. Today we are going to build a word count app.

This means that given a text we want to know how many times each word appears.

Sneak Peek

Suppose you have the text

How much wood would a woodpecker peck if a woodpecker would peck wood?

The program should output

(a,2), (peck,2), (woodpecker,2), (would,2), (if,1), (wood,1), (much,1), (How,1), (wood?,1)

Setup

Like last time we are going to use Mill instead of SBT. Why? because it’s easier and takes 5 seconds.

  1. Create a folder zio-word-counter
  2. Inside it create a build.sc file with the following content
import mill._, scalalib._

object WordCounter extends ScalaModule {
  def scalaVersion = "2.13.0"
  val zioVersion = "1.0.0-RC17"
  def ivyDeps = Agg(
    ivy"dev.zio::zio:$zioVersion",
    ivy"dev.zio::zio-streams:$zioVersion"
  )
}

And we are done. We will need zio-streams as well since we are going to treat our text like a stream of words.

A simple stream of words

Create a folder named WordCounter (it has to match the object name we defined earlier) and a src folder nested inside. From there we can create a file named Main.scala with the following empty canvas.

// WordCounter/src/Main.scala
import zio._
import zio.console._
import zio.stream._

object Main extends App {
  def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
    putStrLn("hello world").map(_ => 0)
}

The first thing that we want is to have a decent text source. A string variable will do.

val someText = "How much wood would a woodpecker peck if a woodpecker would peck wood?"

And the run function can become like this

def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
  (for {
    chunk <- Stream(someText).run(ZSink.splitOn(" "))
    _ <- putStrLn(chunk.mkString(", "))
  } yield ()).map(_ => 0)

You can run this with the command mill --watch WordCounter (the watch will recompile and run when you save the file).

You should see the output

How, much, wood, would, a, woodpecker, peck, if, a, woodpecker, would, peck

The last word wood? is not printed because it doesn’t end with a space. You can add a space at the end to include it if you want; I will add one before the question mark.

Counting the words

We have our stream of words we need now to do two simple operations. The first operation is to pair words that are the same, the second is to count them.

The plan is to create one stream per word, since new streams are created and tracked by their key, setting their key to the word itself will work.

Counting the words will be done by collecting all the words in a list per stream. At the end we can just count the length of the list that will tell the number of words for that particular word.

streamOfWords.groupByKey(identity) { case (word, stream) =>
  stream.aggregate(ZSink.collectAll[String]).map(list => (list.size, word))
}

The identity function tells the groupByKey to use the word itself as the key. Then, for every stream we create a tuple of type(Int, String) that contains the count of the word and the word itself.

The complete run function looks like this.

def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
  (for {
    chunk <- Stream(someText).run(ZSink.splitOn(" "))
    counts <- Stream.fromChunk(chunk).groupByKey(identity) { case (word, stream) =>
      stream.aggregate(ZSink.collectAll[String]).map(list => (list.size, word))
    }.runCollect
    _ <- putStrLn(counts.mkString(", "))
  } yield ()).map(_ => 0)

The only differnte being that we created a stream from the chunk of words and printed the count instead.
Running this will print

(2,would), (2,woodpecker), (2,peck), (1,How), (2,a), (2,wood), (1,much), (1,if)

Simple to Complex

Admittedly we didn’t accomplish a lot. Just counting a few words is not really impressive. How about a word count of a sizeable file like the Divina Commedia? Let’s stash it in our tmp folder for later.

wget http://www.gutenberg.org/files/1012/1012-0.txt --output-document /tmp/divina.txt

First we need to read the file. To do it correctly we will take the name of the file from the command line like so

args.headOption
  .map(filePath => IO(Paths.get(filePath)))
  .getOrElse(IO.fail("No file specified"))

This means that if the argument is not passed the entire program will fail with an error that is represented by the string No file specified. We are not printing the error anywhere now. To do it we need to adapt the code after the yield.

def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
  (for {
    file <- args.headOption
      .map(filePath => IO(Paths.get(filePath)))
      .getOrElse(IO.fail("No file specified"))
    // ... other code ...
  } yield ())
    .onError(failure => putStrLn(failure.toString))
    .fold(_ => 1, _ => 0)

Note that we turned the map into a fold that will return 1 to the operating system in case of failure.

After getting the file we create a new InputStream which is the preferred way to read binary files.

inputStream <- IO(Files.newInputStream(file))

But this is a text file! 😱

We will have to turn the pesky bytes into utf8 character.

counts <- ZStream
  .fromInputStream(inputStream)
  .chunks
  .aggregate(ZSink.utf8DecodeChunk)
  // ... 

Notes on the notes on the code

Warning From version 1.0.0-RC19 on all streams work by chunk so the above code and below notes are outdated. Just delete chunks and use a ZTransducer insted of a ZSink and it should just work.

Notes on the code

Now I need your attention for a second as it’s not completely clear what is happening in the above code. The ZStream.fromInputStream returns a StreamEffectChunk type. This means that it’s a stream (the Stream part, duh! 🤪) that it emits thunks (the Effect part) and works with chunks of bytes instead of single bytes. By default the size of the chunks is 4096 but it can be set manually.

The reason it emit thunks (lazily evaluated functions) is because it will read the next chunk of bytes only when the downstream is requesting them. This allows to read files that are bigger than the computer memory by doing it chunk by chunk.

Finally, the reason it uses chunks instead of single bytes is because of speed. Reading 4096 bytes all at once is order of magnitudes faster than reading them one by one. (Why 4096? Usually the file systems can’t access less than that, so if you set it to 1 for example, you are really accessing 4096 bytes and throwing away 4095 of them).

Next we have a choice to make, we can either work with the chunks or with the single bytes. By writing .chunks we cast to StreamEffect with Chunk[Byte] as emitted type. The alternative is .flattenChunks which casts as well to StreamEffect but now the emitted type is Byte.

We go with .chunks both for performance reasons and because there are some built-in functions that only work on chunks. The first one is aggregation with ZSink.utf8DecodeChunk.

Counting Words for serial now!

So we now are at a point in which we can generate Strings from the chunks of bytes. The next step is to separate the words. To do it we write

counts <- ZStream
  .fromInputStream(inputStream)
  .chunks
  .aggregate(ZSink.utf8DecodeChunk)
  .aggregate(ZSink.splitOn(" "))
  .flatMap(ZStream.fromChunk)
  .filter(_.matches("[\\w]+"))
  // ... 

The splitOn unfortunately returns chunks so we “unchunk” them with .flatMap(ZStream.fromChunk). What this does is concatenating all the chunks in strict order and emitting the strings inside them one by one.

Next we filter the strings to make sure they are actual words. The [\w]+ regex basically means “letters of the alphabet and nothing else”.

The last thing we need to do is count the words! By recycling the previous code we get the following program

def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
  (for {
    file <- args.headOption
      .map(filePath => IO(Paths.get(filePath)))
      .getOrElse(IO.fail("No file specified"))
    inputStream <- IO(Files.newInputStream(file))
    counts <- ZStream
      .fromInputStream(inputStream)
      .chunks
      .aggregate(ZSink.utf8DecodeChunk)
      .aggregate(ZSink.splitOn(" "))
      .flatMap(ZStream.fromChunk)
      .filter(_.matches("[\\w]+"))
      .groupByKey(identity) {
        case (word, stream) =>
          stream
            .aggregate(ZSink.collectAll[String])
            .map(list => (list.size, word))
      }
      .runCollect
    _ <- putStrLn(counts.sortBy(_._1).reverse.mkString(", "))
  } yield ())
    .onError(failure => putStrLn(failure.toString))
    .fold(_ => 1, _ => 0)

The only thing I did is sorting the results by occurrence.
Launching this (mill --watch WordCounter /tmp/divina.txt) will take about a minute ☕️ and will print something like:

(3616,e), (3535,che), (2260,la), (1859,a), (1826,di), (1339,non), (1319,per), (1123,in), (1042,si), (777,le), (752,li), (736,mi), (652,il), (623,con), ...

Yes it’s a lot of work but a minute seems really long. I couldn’t figure out why it was so slow but I suspect the groupByKey is optimized to work with a few of streams, not thousands.

A faster alternative

A small modification will yield a much faster result (⚡️about 50x faster⚡️)

def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
  (for {
    file <- args.headOption
      .map(filePath => IO(Paths.get(filePath)))
      .getOrElse(IO.fail("No file specified"))
    inputStream <- IO(Files.newInputStream(file))
    counts <- ZStream
      .fromInputStream(inputStream)
      .chunks
      .aggregate(ZSink.utf8DecodeChunk)
      .aggregate(ZSink.splitOn(" "))
      .flatMap(ZStream.fromChunk)
      .filter(_.matches("[\\w]+"))
      .fold(HashMap.empty[String, Int])(
        (map, word) =>
          map.updatedWith(word)(
            maybeCounter => maybeCounter.map(_ + 1).orElse(Some(1))
          )
      )
      .map(_.toList)
    _ <- putStrLn(counts.sortBy(_._2).reverse.mkString(", "))
  } yield ())
    .onError(failure => putStrLn(failure.toString))
    .fold(_ => 1, _ => 0)

I swapped the groupByKey with a fold and sorted by occurrence (which is now _2).
Maybe with older versions of scala it would be even faster with mutable hash maps, but that would not be functional and thus not cool nor hip. I’m not even gonna try.

Conclusion

This was a short journey in very simple streaming applications. The zio-stream code is still a bit clunky in my opinion but it clearly shows a lot of potential and will surely be more powerful than both fs2 and Akka Streams. In the next tutorial we are going to do some concurrent functional programming.

💖 💪 🙅 🚩
gurghet
Andrea

Posted on January 27, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Composing complex programs with ZIO
functional Composing complex programs with ZIO

March 1, 2021

10 days with the ZIO 1/10
zio 10 days with the ZIO 1/10

December 13, 2019

10 days with the ZIO 2/10
zio 10 days with the ZIO 2/10

January 27, 2020