I need a shared observable to emit a new .startWith()
value every time a shared stream is subscribed to (subscriber count goes from 0 to 1). Unfortunately the value returned by .startWith()
is being reused across the lifetime of the shared observable instance, even after this shared stream has no subscribers.
Ideally there would be a .startWith()
overload that takes a function as a parameter and re-executes it every time the subscriber count changes from 0-1.
var count: Int = 0
@Before
fun setUp() {
count = 0 //reset
}
fun getTheCount(): Int {
count++
return count
}
@Test
fun startWithDefaultValue() {
val relay = PublishRelay.create<Int>()
val instance by lazy {
relay
.startWith(getTheCount())
.share()
}
val disposable1 = instance.subscribe {
println(it) //should print 1, and does!
}
disposable1.dispose() //subscriber count on shared instance drops from 1 to 0
//should print 2, but prints 1. getTheCount() is not called again on this subscription
val disposable2 = instance.subscribe {
println(it)
}
}
I guess val instance by lazy
means initialize once and thus you have a single call to getTheCount()
method. (I also hope you understand that Observable.just(getTheCount())
will emit the same value to every observer and never "call" getTheCount()
again.)
You should either defer the lazy internals:
Observable.defer(() -> relay.startWith(getTheCount()))
.share();
Or use fromCallable
and concatWith:
relay.concatWith(Observable.fromCallable(() -> getTheCount()))
.share();
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments