- Kotlin
1.3.+ - RxJava
3.0.+ - Kotest
4.1.+
// SingleValueCache.kt
package com.example
import io.reactivex.rxjava3.core.Single
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
abstract class SingleValueCache<V> {
protected abstract fun retrieve(): Single<V>
protected abstract fun validate(cached: V): Single<Boolean>
private inner class Cached(val version: Int, val value: V)
private val mutex: Semaphore = Semaphore(1)
private val cached: AtomicReference<Cached> = AtomicReference()
fun get(): Single<V> =
Single.defer { cached.get() ?.let(::existing) ?: initial() }.map { it.value }
private fun initial(): Single<Cached> = retrieve(1)
private fun existing(cached: Cached): Single<Cached> =
validate(cached.value).concatMap { valid ->
if (valid)
Single.just(cached)
else
retrieve(cached.version + 1)
}
private fun retrieve(version: Int): Single<Cached> = Single.defer {
val unlock: () -> Unit = lock()
val curr = cached.get()?.takeIf { it.version >= version }
when (curr) {
null ->
retrieve()
.map { v ->
Cached(version, v).also { fresh ->
cached.set(fresh)
unlock()
}
}
.doOnDispose { unlock() }
.doOnError { unlock() }
else ->
Single.just(curr).also { unlock() }
}
}
private fun lock(): () -> Unit {
mutex.acquire()
// i've observed some strange scenarios where the lock was released more than once,
// which is why the AtomicBoolean is being used here -
// to make absolutely sure that the lock is released *exactly* once
val locked = AtomicBoolean(true)
return {
val wasLocked = locked.getAndSet(false)
if (wasLocked)
mutex.release()
}
}
}
// SingleValueCacheTest.kt
package com.example
import io.kotest.core.spec.style.FreeSpec
import io.kotest.matchers.shouldBe
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.atomic.AtomicInteger
class SingleValueCacheTest : FreeSpec() {
companion object {
fun setup(): Pair<SingleValueCache<Int>, () -> Unit> {
val target = AtomicInteger(1)
val cache: SingleValueCache<Int> = object : SingleValueCache<Int>() {
val curr = AtomicInteger(0)
override fun retrieve(): Single<Int> = Single.fromCallable { curr.incrementAndGet() }
override fun validate(cached: Int): Single<Boolean> = Single.just(cached == target.get())
}
return Pair(cache, { target.incrementAndGet() ; Unit })
}
}
init {
"get" {
val (cache, invalidate) = setup()
val (par, trials) = (10 to 100)
val trial = Flowable
.fromCallable { cache.get() }
.repeat(par.toLong())
.parallel(par, 1)
.runOn(Schedulers.io())
.flatMap { it.toFlowable() }
.sequential()
val results = trial
.concatWith(Completable.fromCallable { invalidate() })
.repeat(trials.toLong())
.toList()
.blockingGet()
results shouldBe ((1..trials).flatMap { v -> (1..par).map { v } })
}
}
}
The above seems to work well, but I'm a bit concerned about correctly releasing the lock in edge cases. Specifically, is calling unlock() sufficient in the doOnError and doOnDispose? I haven't been able to find any detailed docs around the lifecycle and proper places for releasing resources - there are many doOn* variations and, personally, I find it confusing to understand how to use those callbacks correctly and effectively.