ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Coroutine - Cancellation and timeouts
    프로그래밍/Kotlin 2022. 5. 9. 19:51
    반응형

    https://kotlinlang.org/docs/coroutines-basics.html#structured-concurrency

     

    Canceling coroutine execution

    The launch function returns a Job that can be used to cancel the running coroutine:

    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion 
    println("main: Now I can quit.")

     

    execution

     

    Computation codes cannot be cancelled

    Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled. However, if a coroutine is working in a computation and does not check for cancellation, then it cannot be cancelled.

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // computation loop, just wastes CPU
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")

    execution

     

    Run it to see that it continues to print "I'm sleeping" even after cancellation until the job completes by itself after five iterations.

     

    Making computation code cancellable

    There are two approaches to making computation code cancellable.

    • The first one is to periodically invoke a suspending function that checks for cancellation. There is a yield function that is a good choice for that purpose.
    • The other one is to explicitly check the cancellation status.

    Let us try the latter approach.

    Replace while (i < 5) in the previous example with while (isActive) and rerun it.

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")

    execution

    As you can see, now this loop is canceled. isActive is an extension property available inside the coroutine via the CoroutineScope object.

     

    Closing resources with finally

    Cancellable suspending functions throw CancellationException on cancellation which can be handled in the usual way.

    For example, try {...} finally {...} expression and Kotlin use function execute their finalization actions normally when a coroutine is cancelled:

    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("job: I'm running finally")
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")

    Both join and cancelAndJoin wait for all finalization actions to complete.

    Output

    job: I'm sleeping 0 ...
    job: I'm sleeping 1 ...
    job: I'm sleeping 2 ...
    main: I'm tired of waiting!
    kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@42e26948
    job: I'm running finally
    main: Now I can quit.

     

     

    Delayed codes runs on finally block

    Usually, this is not a problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a communication channel) are usually non-blocking and do not involve any suspending functions

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
    
        val job = launch {
            try {
                repeat(1000) { i ->
                    println("job: I'm sleeping $i ...")
                    delay(500L)
                }
            } catch(e:Exception) {
                
            } finally {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
        delay(1300L) // delay a bit
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // cancels the job and waits for its completion
        println("main: Now I can quit.")
    }

     

    suspending한 finally는 실행되지 않고 종료됨.

     

    Output

    job: I'm sleeping 0 ...
    job: I'm sleeping 1 ...
    job: I'm sleeping 2 ...
    main: I'm tired of waiting!
    job: I'm running finally
    main: Now I can quit.

     

    How can we fix

    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")

    Output

    job: I'm sleeping 0 ...
    job: I'm sleeping 1 ...
    job: I'm sleeping 2 ...
    main: I'm tired of waiting!
    job: I'm running finally
    job: And I've just delayed for 1 sec because I'm non-cancellable
    main: Now I can quit.

     

    Timeout

    While you can manually track the reference to the corresponding Job and launch a separate coroutine to cancel the tracked one after delay, there is a ready to use withTimeout function that does it. 

    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }

    execution

     

    Output

    stack trace 찍히는  exception 발생 (cancel은 코루틴에서 완료 처리로 보아 stack trace 안찍힘)

    I'm sleeping 0 ...
    I'm sleeping 1 ...
    I'm sleeping 2 ...
    Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
     at (Coroutine boundary. (:-1) 
     at FileKt$main$1$1.invokeSuspend (File.kt:-1) 
     at FileKt$main$1.invokeSuspend (File.kt:-1) 
    Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
    at kotlinx.coroutines.TimeoutKt .TimeoutCancellationException(Timeout.kt:184)
    at kotlinx.coroutines.TimeoutCoroutine .run(Timeout.kt:154)
    at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask .run(EventLoop.common.kt:502)

    The TimeoutCancellationException that is thrown by withTimeout is a subclass of CancellationException.

    We have not seen its stack trace printed on the console before. That is because inside a cancelled coroutine CancellationException is considered to be a normal reason for coroutine completion. 

     

    You can wrap the code with timeout in a try {...} catch (e: TimeoutCancellationException) {...} block if you need to do some additional action specifically on any kind of timeout or use the withTimeoutOrNull function that is similar to withTimeout but returns null on timeout instead of throwing an exception:

    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // will get cancelled before it produces this result
    }
    println("Result is $result")

     

    Output

    I'm sleeping 0 ...
    I'm sleeping 1 ...
    I'm sleeping 2 ...
    Result is null

     

    Asynchronous timeout and resources

     

    The timeout event in withTimeout is asynchronous with respect to the code running in its block and may happen at any time, even right before the return from inside of the timeout block.

     

    For example, here we imitate a closeable resource with the Resource class, that simply keeps track of how many times it was created by incrementing the acquired counter and decrementing this counter from its close function.

    Let us run a lot of coroutines with the small timeout try acquire this resource from inside of the withTimeout block after a bit of delay and release it from outside.

    time out은 다른 thread 에서 실행됨.

    time out 만 없으면 완전히 thread-safe 한 코드.

    var acquired = 0
    
    class Resource {
        init { acquired++ } // Acquire the resource
        fun close() { acquired-- } // Release the resource
    }
    
    fun main() {
        runBlocking {
            repeat(100_000) { // Launch 100K coroutines
                launch { 
                    val resource = withTimeout(60) { // Timeout of 60 ms
                        delay(50) // Delay for 50 ms
                        Resource() // Acquire a resource and return it from withTimeout block     
                    }
                    resource.close() // Release the resource
                }
            }
        }
        // Outside of runBlocking all coroutines have completed
        println(acquired) // Print the number of resources still acquired
    }

    execution

    output 이 1이나 2 등이 찍힘.

    If you run the above code you'll see that it does not always print zero, though it may depend on the timings of your machine you may need to tweak timeouts in this example to actually see non-zero values.

    To workaround this problem you can store a reference to the resource in the variable as opposed to returning it from the withTimeout block.

    try-catch로 resource 의 leak 을 방지해야됨

    runBlocking {
        repeat(100_000) { // Launch 100K coroutines
            launch { 
                var resource: Resource? = null // Not acquired yet
                try {
                    withTimeout(60) { // Timeout of 60 ms
                        delay(50) // Delay for 50 ms
                        resource = Resource() // Store a resource to the variable if acquired      
                    }
                    // We can do something else with the resource here
                } finally {  
                    resource?.close() // Release the resource if it was acquired
                }
            }
        }
    }
    // Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired

     

     

     

     

    반응형
Designed by Tistory.