2
\$\begingroup\$
  • Kotlin 1.3.+
  • RxJava 3.0.+
  • Kotest 4.1.+
// SingleValueCache.kt
package com.example

import io.reactivex.rxjava3.core.Single
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

abstract class SingleValueCache<V> {
    protected abstract fun retrieve(): Single<V>
    protected abstract fun validate(cached: V): Single<Boolean>

    private inner class Cached(val version: Int, val value: V)

    private val mutex: Semaphore = Semaphore(1)
    private val cached: AtomicReference<Cached> = AtomicReference()

    fun get(): Single<V> =
        Single.defer { cached.get() ?.let(::existing) ?: initial() }.map { it.value }

    private fun initial(): Single<Cached> = retrieve(1)

    private fun existing(cached: Cached): Single<Cached> =
        validate(cached.value).concatMap { valid ->
            if (valid)
                Single.just(cached)
            else
                retrieve(cached.version + 1)
        }

    private fun retrieve(version: Int): Single<Cached> = Single.defer {
        val unlock: () -> Unit = lock()
        val curr = cached.get()?.takeIf { it.version >= version }

        when (curr) {
            null ->
                retrieve()
                    .map { v ->
                        Cached(version, v).also { fresh ->
                            cached.set(fresh)
                            unlock()
                        }
                    }
                    .doOnDispose { unlock() }
                    .doOnError { unlock() }
            else ->
                Single.just(curr).also { unlock() }
        }
    }

    private fun lock(): () -> Unit {
        mutex.acquire()
        // i've observed some strange scenarios where the lock was released more than once,
        // which is why the AtomicBoolean is being used here -
        // to make absolutely sure that the lock is released *exactly* once
        val locked = AtomicBoolean(true)

        return {
            val wasLocked = locked.getAndSet(false)
            if (wasLocked)
                mutex.release()
        }
    }
}
// SingleValueCacheTest.kt
package com.example

import io.kotest.core.spec.style.FreeSpec
import io.kotest.matchers.shouldBe
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.atomic.AtomicInteger

class SingleValueCacheTest : FreeSpec() {
    companion object {
        fun setup(): Pair<SingleValueCache<Int>, () -> Unit> {
            val target = AtomicInteger(1)

            val cache: SingleValueCache<Int> = object : SingleValueCache<Int>() {
                val curr = AtomicInteger(0)
                override fun retrieve(): Single<Int> = Single.fromCallable { curr.incrementAndGet() }
                override fun validate(cached: Int): Single<Boolean> = Single.just(cached == target.get())
            }

            return Pair(cache, { target.incrementAndGet() ; Unit })
        }
    }

    init {
        "get" {
            val (cache, invalidate) = setup()
            val (par, trials) = (10 to 100)

            val trial = Flowable
                .fromCallable { cache.get() }
                .repeat(par.toLong())
                .parallel(par, 1)
                    .runOn(Schedulers.io())
                    .flatMap { it.toFlowable() }
                .sequential()

            val results = trial
                .concatWith(Completable.fromCallable { invalidate() })
                .repeat(trials.toLong())
                .toList()
                .blockingGet()

            results shouldBe ((1..trials).flatMap { v -> (1..par).map { v } })
        }
    }
}

The above seems to work well, but I'm a bit concerned about correctly releasing the lock in edge cases. Specifically, is calling unlock() sufficient in the doOnError and doOnDispose? I haven't been able to find any detailed docs around the lifecycle and proper places for releasing resources - there are many doOn* variations and, personally, I find it confusing to understand how to use those callbacks correctly and effectively.

\$\endgroup\$

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.