How to get a shared observable to emit a new value from .startWith() every time a shared stream gets a new subscriber?

ZakTaccardi

I need a shared observable to emit a new .startWith() value every time a shared stream is subscribed to (subscriber count goes from 0 to 1). Unfortunately the value returned by .startWith() is being reused across the lifetime of the shared observable instance, even after this shared stream has no subscribers.

Ideally there would be a .startWith() overload that takes a function as a parameter and re-executes it every time the subscriber count changes from 0-1.

var count: Int = 0

@Before
fun setUp() {
    count = 0 //reset
}

fun getTheCount(): Int {
    count++
    return count
}

@Test
fun startWithDefaultValue() {
    val relay = PublishRelay.create<Int>()

    val instance by lazy {
        relay
                .startWith(getTheCount())
                .share()
    }

    val disposable1 = instance.subscribe {
        println(it) //should print 1, and does!
    }

    disposable1.dispose() //subscriber count on shared instance drops from 1 to 0

    //should print 2, but prints 1. getTheCount() is not called again on this subscription
    val disposable2 = instance.subscribe {
        println(it) 
    }
}
akarnokd

I guess val instance by lazy means initialize once and thus you have a single call to getTheCount() method. (I also hope you understand that Observable.just(getTheCount()) will emit the same value to every observer and never "call" getTheCount() again.)

You should either defer the lazy internals:

Observable.defer(() -> relay.startWith(getTheCount()))
.share();

Or use fromCallable and concatWith:

relay.concatWith(Observable.fromCallable(() -> getTheCount()))
.share();

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

RxJS: Share() an Observable & Emit Last Value to each new subscriber

How to get an observable's value without triggering new emit

Emit new value to Observable

RxJS - emit whole array every time new value is received

RXjs - How to wait for new value on observable A, every time observable B says its okay to start 'watching'

how to append every value from list to url and generate new url every time

how can I write the new value in shared preference

How Do I Get the Overall.RMS_level in Real-Time from a UDP Stream as an Integer Appended to as Shared Log File?

The same default value gets used every time by `new mongoose.Schema`

New String or get parameter from http request every time?

Angular 4 shared component creating new instances on every load

How to return new value every time when function is called

Creating a new Shared Drive

How to create new shared project in TFS

How to get value from a theano tensor variable backed by a shared variable?

How to Get Value from Shared View in Main View

How to get from outside value inside subscriber

How to return Observable from subscriber?

new and make_shared for shared pointers

How can I display a dialog whenever my stream gets a new value?

How to add new select box every time I change value of new created selectbox in jQuery?

How to get a Subscriber and Publisher from a broadcasted Akka stream?

Get new value of an observable in "beforeChange" subscription

how to update the FutureBuilder text every time the shared-preference is updated

Stop observable stream of shared data service Angular

Hot and shared Observable from an EventEmitter

return a shared Observable from a service

How to start a new row number every time, when there is a change in value(+ve/-ve) from the preceding row in SQL SERVER

check if key exist shared preferences value and then open a new screen

TOP Ranking

HotTag

Archive