Android AsyncTask Using Kotlin Coroutines

Yes, you read it right, in this post I’ll indeed show you how to implement the notorious Android AsyncTask using Kotlin Coroutines. I guess that this premise will raise some eyebrows, so let me clarify right away that YOU SHOULD NOT USE THE CODE BELOW IN YOUR PROJECTS.

Why the hell do I write this post then? Well, just recently I came around this funny meme which reminded me of my first full-time Android job. When I interviewed for that position, they asked me to implement a simplified version of AsyncTask and when I succeeded, the interviewers were so impressed that they offered me much higher salary than what I asked for. In a tribute to that awesome company (which had been sold since then and ceased to exist), I decided to take on this challenge. In addition, as you’ll see shortly, there is a lot to be learnt from this exercise.

The Original AsyncTask

For a reference, here you can find the source code of the original AsyncTask. Even though legacy code and concurrency are two areas that I specialize in, it still took me couple of hours to understand the implementation of this class. Therefore, don’t be too hard on yourself if you can’t make sense of this code because it’s a mess.

Additionally, if you heard that AsyncTask is bad, but don’t know the exact reasons why (or believe the myth that it causes memory leaks), then you might be interested in this article that I wrote a while ago. It explains the real reasons why AsyncTask was an awful API and derives some important lessons from its failure.

CoroutinesAsyncTask

Alright, without further ado, I present to you CoroutinesAsyncTask class:

abstract class CoroutinesAsyncTask<Params, Progress, Result> {

    enum class Status {
        PENDING,
        RUNNING,
        FINISHED,
    }

    private val _status = AtomicReference<Status>(Status.PENDING)
    val status get() = _status.get()!!

    private val _isCancelled = AtomicBoolean(false)
    val isCancelled get() = _isCancelled.get()

    private val coroutineJob = AtomicReference<Job?>(null)

    private var executionResult = AtomicReference<Result?>()
    private var executionException = AtomicReference<Throwable?>()
    private val lock = ReentrantLock(true)
    private val executionCompletedCondition = lock.newCondition()

    @WorkerThread
    protected abstract fun doInBackground(vararg params: Params): Result

    @MainThread
    protected open fun onPreExecute() {}

    @MainThread
    protected open fun onPostExecute(result: Result?) {}

    @MainThread
    protected open fun onProgressUpdate(vararg values: Progress) {}

    @MainThread
    protected open fun onCancelled(result: Result?) {
        onCancelled()
    }

    @MainThread
    protected open fun onCancelled() {}

    @MainThread
    fun execute(vararg params: Params): CoroutinesAsyncTask<Params, Progress, Result> {
        when(status) {
            Status.RUNNING -> throw IllegalStateException(
                    "Cannot execute task: the task is already running."
            )
            Status.FINISHED -> throw IllegalStateException(
                    "Cannot execute task: the task has already been executed (a task can be executed only once)"
            )
            Status.PENDING -> {}
        }

        _status.set(Status.RUNNING)

        onPreExecute()

        val job = COROUTINE_SCOPE.launch(BACKGROUND_DISPATCHER) {
            try {
                if (!isCancelled) {
                    runInterruptible {
                        executionResult.set(doInBackground(*params))
                    }
                }

                if (isCancelled) {
                    throw CancellationException()
                }
            } catch (t: Throwable) {
                _isCancelled.set(true)
                when(t) {
                    is CancellationException -> executionException.set(java.util.concurrent.CancellationException())
                    else -> executionException.set(ExecutionException(t))
                }
            } finally {
                finish()
                _status.set(Status.FINISHED)
                lock.withLock {
                    executionCompletedCondition.signalAll()
                }
            }
        }

        coroutineJob.set(job)

        return this
    }

    private fun finish() = COROUTINE_SCOPE.launch(UI_DISPATCHER) {
        val result = executionResult.get()
        if (isCancelled) {
            onCancelled(result)
        } else {
            onPostExecute(result)
        }
    }

    @WorkerThread
    fun publishProgress(vararg values: Progress) {
        if (!isCancelled) {
            COROUTINE_SCOPE.launch(UI_DISPATCHER) {
                onProgressUpdate(*values)
            }
        }
    }

    fun get(): Result? {
        lock.withLock {
            while (status != Status.FINISHED) {
                executionCompletedCondition.await()
            }
        }
        val exception = executionException.get()
        if (exception != null) {
            throw exception
        } else {
            return executionResult.get()
        }
    }

    fun cancel(mayInterruptIfRunning: Boolean): Boolean {
        _isCancelled.set(true)
        val job = coroutineJob.get() ?: return true
        return if (job.isCompleted || job.isCancelled) {
            false
        } else {
            if (mayInterruptIfRunning) {
                job.cancel()
            }
            true
        }
    }

    private companion object {
        private val THREAD_COUNT = AtomicInteger(0)

        private val THREAD_FACTORY = ThreadFactory { runnable ->
            Thread(runnable, "CoroutinesAsyncTask #${THREAD_COUNT.getAndIncrement()}")
        }

        private val BACKGROUND_DISPATCHER = ThreadPoolExecutor(
                3,
                Int.MAX_VALUE,
                60L,
                TimeUnit.SECONDS,
                SynchronousQueue<Runnable>(),
                THREAD_FACTORY
        ).let {
            it.prestartCoreThread()
            it.asCoroutineDispatcher()
        }

        private val UI_DISPATCHER = Dispatchers.Main

        private val COROUTINE_SCOPE = CoroutineScope(SupervisorJob() + UI_DISPATCHER)
    }
}

As you might notice, I allowed myself to get rid of part of AsyncTask’s API to make this class simpler. Those extra APIs weren’t that useful anyway, so not a major loss.

CoroutinesAsyncTask’s implementation is far from trivial, so, in the next sections, I’ll discuss the main challenges that I faced when working on it.

Integrating Blocking and Suspending Worlds

When we discuss standard multithreading in Java, we’re in so-called “blocking world”. However, when we use Kotlin Coroutines, there is an additional level of abstraction over the “blocking world” which we can refer to as a “suspending world”. Since the API of AsyncTask is defined in terms of the “blocking world”, but I wanted to implement it using Coroutines, I had to bridge the gap between these realms in code. That was the biggest challenge in implementing CoroutinesAsyncTask.

For example, cancel() method takes an argument that specifies whether the code in doInBackground() should be interrupted when AsyncTask is cancelled. However, interruption is a concept from the “blocking world” which doesn’t exist in Coroutines. Therefore, I had to use the relatively new runInterruptiple construct to build this bridge. However, it wasn’t as simple as “wrap everything into runInterruptible” because if the argument of cancel() is false, then there should be no interruption. Supporting this aspect in the new class required quite a bit of effort.

Another example is get() method which should block until the task completes. In the original AsyncTask, this method is trivially simple because it just delegates to FutureTask.get(), which is standard Java’s API. In this case, however, since I wanted to use Coroutines, I had to connect all the dots myself.

Coroutines Cancellation

Many developers believe that Coroutines make cancellation of concurrent tasks simple. Well, indeed, it’s really simple to call job.cancel(). However, the implications of this call, especially in non-trivial cases like the one we’re dealing with right here, are anything but simple.

For example, note the implementation of finish() method:

    private fun finish() = COROUTINE_SCOPE.launch(UI_DISPATCHER) {
        val result = executionResult.get()
        if (isCancelled) {
            onCancelled(result)
        } else {
            onPostExecute(result)
        }
    }

The “odd” thing here is that I intentionally “break” so-called Structured Concurrency. However, there is nothing “odd” here, really. It’s just that too many developers proclaim that Structured Concurrency is the best thing since sliced bread, so “breaking” it is automatically considered bad. However, SC is just a tool, which you don’t always need. And when you don’t need it, things become really tricky and brittle.

In this case, supporting proper cancellation was one of the reasons why I decided to give up on Structured Concurrency. For instance, theoretically, I could implement finish() in this manner:

    private suspend fun finish() = withContext(UI_DISPATCHER) {
        val result = executionResult.get()
        if (isCancelled) {
            onCancelled(result)
        } else {
            onPostExecute(result)
        }
    }

The above implementation would preserve Structured Concurrency, but would constitute a very serious, but also very hard to reproduce bug. For example, out of 20 unit tests that I wrote for CoroutinesAsyncTask, only one fails if I use this approach.

I won’t go into deeper explanations regarding this point here. Instead, I’ll leave this puzzle as an exercise to you. Developers who completed my Coroutines course should be able to explain why the simple implementation that utilizes just withContext(UI_DISPATCHER)wouldn’t work properly in this case (hint: you’ve solved two exercises in the course dealing with this type of bugs and I warned you that they are tricky as hell).

So, Coroutines cancellation was the second biggest challenge here. In general, as far as I’m concerned, “simple Coroutines cancellation” is just a marketing slogan which has very little to do with reality.

Thread Safety

The last challenge that I had to overcome when implementing CoroutinesAsyncTask is related to general thread-safety concerns. See, when you write simple concurrent logic using Coroutines, you don’t usually think about thread-safety. However, in this case, I had to dedicate quite a bit of time to think about so-called “happens before” relationship and race conditions. That’s why I erred on the side of using atomic variables for everything, even though I’m sure I don’t need them in all these instances.

To be honest, just like with any piece of non-trivial concurrent code, I’m still not 100% positive that there are no multithreading bugs hiding here. So, if you see anything shady, or you think I could use some Coroutines construct to remove some of the thread-safety concerns related to the integration between blocking and concurrent worlds, let me know please.

Unbounded Coroutine Dispatcher

The last aspect of CoroutinesAsyncTask that I’d like to point your attention to is the fact that it uses custom unbounded CoroutineDispatcher. That’s very different from the thread pool that the original AsyncTask used, so, in some sense, this aspect makes the APIs of these classes somewhat incompatible.

The reason that I didn’t want to use the original approach is because AsyncTask’s thread pool has always been misconfigured. I wrote about that in detail in my article discussing AsyncTask, so I won’t repeat those arguments here.

However, one can still ask why I didn’t use Coroutines’ standard Dispatchers.Default or Dispatchers.IO (or similarly configured custom dispatcher) here? I already explained the reasons why I consider these dispatchers harmful in another post, so, once again, you can read it here if you’re curious.

Unit Tests

Following a request from a reader, I’m sharing the unit tests that I wrote for this class.

class CoroutinesAsyncTaskTest {

    // region constants ----------------------------------------------------------------------------
    val ASSERTION_DELAY_MS = 100L
    val TASK_DELAY_MS = 25L
    // endregion constants -------------------------------------------------------------------------

    // region helper fields ------------------------------------------------------------------------

    val onPreExecuteInvoked = AtomicBoolean()
    val doInBackgroundInvoked = AtomicBoolean()
    val onPostExecuteInvoked = AtomicBoolean()
    val onCancelledInvoked = AtomicBoolean()
    val capturedResult = AtomicReference<String>()
    val capturedProgress = AtomicReference<MutableList<String>>(emptyList<String>().toMutableList())
    val capturedCancellationResult = AtomicReference<String>()

    var throwException = false
    // endregion helper fields ---------------------------------------------------------------------

    lateinit var SUT: CoroutinesAsyncTask<String, String, String>

    @ExperimentalCoroutinesApi
    @Before
    fun setup() {
        Dispatchers.setMain(TestCoroutineDispatcher())
        SUT = ConcatenatingAsyncTask()
    }

    /*
    NOTE:
    These tests can fail if ASSERTION_DELAY is not long enough to allow for all side effects to
    take place. So, if you observe random failures, see if increasing the delay helps before you
    investigate further!
     */

    @Test
    fun execute_onPreExecuteInvoked() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(onPreExecuteInvoked.get(), `is`(true))
    }

    @Test
    fun execute_onProgressUpdateCalled() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(capturedProgress.get(), `is`(listOf("a", "ab", "abcd")))
    }

    @Test
    fun execute_onPostExecuteCalledWithCorrectResult() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(capturedResult.get(), `is`("abcd"))
    }

    @Test
    fun cancel_beforeExecution_taskBecomesCancelledImmediately() {
        // Arrange
        // Act
        SUT.cancel(false)
        // Assert
        assertThat(SUT.isCancelled, `is`(true))
    }

    @Test
    fun cancel_beforeExecution_taskStatusBecomesFinishedAfterExecution() {
        // Arrange
        // Act
        SUT.cancel(false)
        SUT.execute("a", "b", "cd")
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(SUT.status, `is`(CoroutinesAsyncTask.Status.FINISHED))
    }

    @Test
    fun cancel_beforeExecution_doInBackgroundNotCalled() {
        // Arrange
        // Act
        SUT.cancel(false)
        SUT.execute("a", "b", "cd")
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(doInBackgroundInvoked.get(), `is`(false))
    }

    @Test
    fun cancel_beforeExecution_onPostExecuteNotCalled() {
        // Arrange
        // Act
        SUT.cancel(false)
        SUT.execute("a", "b", "cd")
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(onPostExecuteInvoked.get(), `is`(false))
    }

    @Test
    fun cancel_beforeExecution_onCancelledIsCalledWithNullResult() {
        // Arrange
        // Act
        SUT.cancel(false)
        SUT.execute("a", "b", "cd")
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(onCancelledInvoked.get(), `is`(true))
        assertThat(capturedCancellationResult.get(), `is`(nullValue()))
    }

    @Test
    fun cancel_duringExecution_taskBecomesCancelledImmediately() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        Thread.sleep(TASK_DELAY_MS / 2)
        SUT.cancel(false)
        // Assert
        assertThat(SUT.isCancelled, `is`(true))
    }

    @Test
    fun cancel_duringExecution_taskStatusEventuallyBecomesFinished() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        Thread.sleep(TASK_DELAY_MS / 2)
        SUT.cancel(false)
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(SUT.status, `is`(CoroutinesAsyncTask.Status.FINISHED))
    }

    @Test
    fun cancel_duringExecution_onPostExecuteNotCalled() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        Thread.sleep(TASK_DELAY_MS / 2)
        SUT.cancel(false)
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(onPostExecuteInvoked.get(), `is`(false))
    }

    @Test
    fun cancel_duringExecution_onProgressUpdateNotCalledAnymore() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        Thread.sleep(TASK_DELAY_MS / 2)
        SUT.cancel(false)
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(capturedProgress.get(), `is`(emptyList<String>()))
    }

    @Test
    fun cancel_duringExecution_onCancelledIsCalledWithCorrectResult() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        Thread.sleep(TASK_DELAY_MS / 2)
        SUT.cancel(false)
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(onCancelledInvoked.get(), `is`(true))
        assertThat(capturedCancellationResult.get(), `is`("abcd"))
    }

    @Test
    fun cancelWithInterruption_duringExecution_onCancelledIsCalledWithNullResult() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        Thread.sleep(TASK_DELAY_MS / 2)
        SUT.cancel(true)
        // Assert
        Thread.sleep(ASSERTION_DELAY_MS)
        assertThat(onCancelledInvoked.get(), `is`(true))
        assertThat(capturedCancellationResult.get(), `is`(nullValue()))
    }

    @Test
    fun get_beforeExecution_returnsCorrectResult() {
        // Arrange
        Thread {
            Thread.sleep(10)
            SUT.execute("a", "b", "cd")
        }.start()
        // Act
        val result = SUT.get()
        // Assert
        assertThat(result, `is`("abcd"))
    }

    @Test
    fun get_duringExecution_returnsCorrectResult() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        val result = SUT.get()
        // Assert
        assertThat(result, `is`("abcd"))
    }

    @Test
    fun get_afterExecution_returnsCorrectResult() {
        // Arrange
        // Act
        SUT.execute("a", "b", "cd")
        Thread.sleep(ASSERTION_DELAY_MS)
        val result = SUT.get()
        // Assert
        assertThat(result, `is`("abcd"))
    }

    @Test(expected = CancellationException::class)
    fun get_cancelledBeforeExecution_cancellationExceptionThrown() {
        // Arrange
        // Act
        SUT.cancel(false)
        SUT.execute("a", "b", "cd")
        val result = SUT.get()
        // Assert
    }

    @Test(expected = CancellationException::class)
    fun get_cancelledDuringExecution_cancellationExceptionThrown() {
        // Arrange
        Thread {
            Thread.sleep(TASK_DELAY_MS / 2)
            SUT.cancel(false)
        }.start()
        // Act
        SUT.execute("a", "b", "cd")
        val result = SUT.get()
        // Assert
    }

    @Test(expected = ExecutionException::class)
    fun get_exceptionInDoInBackground_executionExceptionThrown() {
        // Arrange
        throwException = true
        // Act
        SUT.execute("a", "b", "cd")
        val result = SUT.get()
        // Assert
    }


    // region helper methods -----------------------------------------------------------------------
    // endregion helper methods --------------------------------------------------------------------

    // region helper classes -----------------------------------------------------------------------
    inner class ConcatenatingAsyncTask: CoroutinesAsyncTask<String, String, String>() {
        override fun doInBackground(vararg params: String): String {
            doInBackgroundInvoked.set(true)
            Thread.sleep(TASK_DELAY_MS)

            if (throwException) {
                throw RuntimeException("requested exception")
            }

            return params.fold("") {acc, s ->
                publishProgress(acc + s)
                acc + s
            }
        }

        override fun onPreExecute() {
            onPreExecuteInvoked.set(true)
        }

        override fun onPostExecute(result: String?) {
            onPostExecuteInvoked.set(true)
            capturedResult.set(result)
        }

        override fun onProgressUpdate(vararg values: String) {
            capturedProgress.get().addAll(values.toList())
        }

        override fun onCancelled(result: String?) {
            onCancelledInvoked.set(true)
            capturedCancellationResult.set(result)
        }

        override fun onCancelled() {
        }
    }

// endregion helper classes --------------------------------------------------------------------

}

Kotlin Coroutines in Android Course

Master the most advanced concurrency framework for Android development.

Go to Course

Summary

That’s it, now you know how to implement AsyncTask using Kotlin Coroutines. However, even though I had lots of fun writing this article, let me remind you that you should not use this code in your projects.

If you find the topic of concurrency interesting, take a look at my course about Multithreading in Android and another course about Coroutines specifically. These courses are aimed at professional developers, and they are among the most advanced resources you can find out there.

As usual, thanks for reading and feel free to leave comments or ask questions below.

Check out my premium

Android Development Courses

2 comments on "Android AsyncTask Using Kotlin Coroutines"

Leave a Comment