Fine control over execution in kotlin

amol

Amol

Posted on October 13, 2021

Fine control over execution in kotlin

Alt Text
Programing language that supports concurrency and parallelism needs to provide cancellation mechanism as well. Well thought cancellation mechanism also gives space to clean up resources.

In this blog post we’ll take see how to cancel coroutine and impact of suspend functions on it.

Basics

The Job interface has a method called cancel, which allows to cancel the job.

fun main() = runBlocking {
  val job = launch {
    repeat(1_00) { i ->
      delay(200)
      println("printing $i")
    }
  }

  delay(1_150)
  job.cancelAndJoin()
  println("cancelled successfully")
}
// printing 0
// printing 1
// printing 2
// printing 3
// printing 4
// cancelled successfully

Enter fullscreen mode Exit fullscreen mode

Calling cancel has following effects on job.

  • ends execution job at first suspension point
  • if job has some children, they are also canceled at first suspension point
  • Once job is canceled, it can not used as a parent for any new coroutines.

Lets verify our first assumption by replacing delay call by Thread.sleep

fun main() = runBlocking {
  val job = launch {
    repeat(1_00) { i ->
      // delay(200)
      Thread.sleep(200)
      println("printing $i")
    }
  }

  delay(1_150)
  job.cancelAndJoin()
  println("cancelled successfully")
}
// what will be the outcome?

// outcome:
// printing 0
// printing ..
// printing 99
// cancelled successfully

Enter fullscreen mode Exit fullscreen mode

Thread blocking code

Once we removed delay function job has no suspension point and hence it continues to work until the end of computation. Thread.sleep is thread blocking function let’s extract repeat code block into suspended function.

fun main() = runBlocking {
    val job = launch {
        repeat(1_00) { i -> threadBlockingFn(i) }
        println("job completed")
    }
    delay(1_150)
    job.cancelAndJoin()
    println("cancelled successfully")
}

suspend fun threadBlockingFn(i: Int) = coroutineScope {
    Thread.sleep(200)
    println("printing $i")
}

fun println(msg: Any) = with(Thread.currentThread()) {
    kotlin.io.println("$id:$name\t=> $msg")
} 
// what will be the outcome?

// outcome:
// 1:main   => printing 0
// 1:main   => printing .. 🧐
// 1:main   => printing 99 🤔
// 1:main   => job completed
// 1:main   => cancelled successfully

Enter fullscreen mode Exit fullscreen mode

job is running on single thread and even we have suspended function threadBlockingFn main thread does not get unblocked to observe cancellation request. So how to create suspension point in this case?

In order to have more fine control, lets create coroutine for every call of threadBlockFn and join it back with parent job

fun main() = runBlocking {
    val job = launch {
        repeat(1_00) { i -> launch { threadBlockingFn(i) }.join() }
        // also produces same output
        // repeat(1_00) { i -> async { threadBlockingFn(i) }.await() }
        println("job completed")
    }
    delay(1_150)
    job.cancelAndJoin()
    println("cancelled successfully")
}
// output: 
// 1:main   => printing 0
// 1:main   => printing 1
// 1:main   => printing 2
// 1:main   => printing 3
// 1:main   => printing 4
// 1:main   => printing 5
// 1:main   => cancelled successfully

Enter fullscreen mode Exit fullscreen mode

Lets use default dispatcher to delegate threadBlockFn execution

fun main() = runBlocking {
    val job = launch {
        repeat(1_00) { i -> withContext(Dispatchers.Default) { threadBlockingFn(i) } }
        println("job completed")
    }
    delay(1_150)
    job.cancelAndJoin()
    println("cancelled successfully")
}
// output: 
// 14:DefaultDispatcher-worker-1    => printing 0
// 14:DefaultDispatcher-worker-1    => printing 1
// 14:DefaultDispatcher-worker-1    => printing 2
// 14:DefaultDispatcher-worker-1    => printing 3
// 14:DefaultDispatcher-worker-1    => printing 4
// 14:DefaultDispatcher-worker-1    => printing 5
// 1:main   => cancelled successfully

Enter fullscreen mode Exit fullscreen mode

Summary

Coroutine, dispatcher along with suspended function provides very powerful mechanism to have full control over execution of code blocks. Default function such as delay are cancel aware functions. To summarize, I would say use following thumb rules to achieve fine controlled code

  1. Have more suspension points in code base
  2. Avoid thread blocking code areas
  3. coroutines are very lightweight, use them more frequently
  4. use async/await, withContext around appropriate spaces
  5. Break down computation heavy operation into smaller pieces with more suspension points

Dispatchers

Coroutine has different kind of dispatchers available as mentioned in the doc

  • Dispatchers.Default — is used by all standard builders if no dispatcher or any other ContinuationInterceptor is specified in their context. It uses a common pool of shared background threads. This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
  • Dispatchers.IO — uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive blocking operations (like file I/O and blocking socket I/O).
  • Dispatchers.Unconfined — starts coroutine execution in the current call-frame until the first suspension, whereupon the coroutine builder function returns. The coroutine will later resume in whatever thread used by the corresponding suspending function, without confining it to any specific thread or pool. The Unconfined dispatcher should not normally be used in code.
  • Private thread pools can be created with newSingleThreadContext and newFixedThreadPoolContext.
  • An arbitrary Executor can be converted to a dispatcher with the asCoroutineDispatcher extension function.
💖 💪 🙅 🚩
amol
Amol

Posted on October 13, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related