Swift combine type as Publisher to async/await

drekka

I'm trying to convert some combine friendly code to async/await. For this post I'll use a simple example cut down from some real code just to keep the discussion simple.


import Combine
import Foundation

enum State {
    case a
    case b
    case c
}

class StateMachine: Publisher {

    typealias Output = State
    typealias Failure = Error

    private let currentState = CurrentValueSubject<State, Error>(State.a)

    func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
        currentState.receive(subscriber: subscriber)
    }

    func gotoB() { currentState.value = .b }
    func gotoC() { currentState.value = .c }
}

let sm = StateMachine()
let c = sm.sink {
    print("Failed with \($0)")
}
receiveValue: {
    print("New value \($0)")
}

sm.gotoB()
sm.gotoC()

So in this code (which works in a playground) I want the StateMachine to act as a publisher so it can be subscribed to directly. To achieve this I use an internal CurrentValueSubject and forward the publisher's receive(subscriber:) to it.

However when I change class StateMachine: to actor StateMachine:

actor StateMachine: Publisher {
    //...
}

// ...
await sm.gotoB()
await.sm.gotoC()

this code no longer compiles and throws these errors:

expression failed to parse:
error: SwiftFormat.playground:18:10: error: actor-isolated instance method 'receive(subscriber:)' cannot be used to satisfy nonisolated protocol requirement
    func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
         ^

SwiftFormat.playground:18:10: note: add 'nonisolated' to 'receive(subscriber:)' to make this instance method not isolated to the actor
    func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
         ^
    nonisolated 

Combine.Publisher:5:10: note: 'receive(subscriber:)' declared here
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
         ^

Now I can add nonisolated as suggested, but I'm not sure that isn't going to break the concurrency handling that actor is introducing because there's nothing to stop concurrent access to the internal subject.

So is there a way to keep the type as a Combine Publisher or is there some other async/away approach I've just not read about which is now the preferred approach?

Fogmeister

The first thing to ask here is "what is it that is asynchronous?"

I don't think in this case that the gotoA or gotoB are asynchronous. The thing that is asynchronous is the subscription to the current value.

The consumers will subscribe to it and then at some point later in time it will change and they are updated.

In async await this can be modelled as an AsyncStream.

To you an async stream you access it something like:

for await state in stateMachine.values {
  print("received value \(state)")
}

So to make this work we need to make stateMachine give out an AsyncStream.

Something like this would work...

enum State {
    case a
    case b
    case c
}

class StateMachine {
    var subscriptions: [UUID: AsyncStream<State>.Continuation] = [:] // 1.

    var currentState = State.a {
        didSet {
            subscriptions.allValues.forEach {
                $0.yield(currentState)
            }
        }
    } // 2.

    var values: AsyncStream<State> {
        AsyncStream { continuation in
            let id = UUID()
            subscriptions[id] = continuation

            continuation.yield(currentValue)

            continuation.onTermination = { [weak self] _ in
                self?.subscriptions.removeValue(forKey: id)
            }
        }
    } // 3.

    func gotoB() {
        currentState = .b
    }
}

In this we...

  1. Create a place to store any incoming subscriptions
  2. Make sure that when current state changes we update all the subscribers
  3. When a new subscriber is added we give it an AsyncStream to listen to

This is the beginnings of what you would need to create. You might need to use some sort of Singleton access to this if you have multiple places in your app that need to subscribe to the same state.

But this should give you the initial idea. We use something very similar to this in the app I am currently working on. Sometimes to create a subscribable cache. Sometimes to respond to error messages, etc...

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Creating a sequential publisher in Swift Combine

iOS - Combine - Change Publisher type to child type

Create a Timer Publisher using Swift Combine

Wrapping asynchronous code in Swift's Combine publisher

Setting up a combine publisher in swift using map

Swift Combine using CombineLatest with optional publisher

Making custom Deffered Future Publisher in Swift Combine?

Swift Combine return Publisher from func

Swift Combine: Using timer publisher in an observable object

Converting Publisher's error type to Never in Combine

Swift Combine: How to create a single publisher from a list of publishers?

Swift Combine: direct output of one Publisher to the input of another

Make custom Publisher run on a different DispatchQueue on Swift Combine

How do you run a Swift Combine Publisher for a certain amount of time?

Swift Combine custom Publisher: Storing a reference to an array of Subscribers?

Is there a way to detect when a publisher has a new subscriber? | Swift, Combine

How can I create a Swift Combine publisher from two publishers A and B where publisher B consumes the value from publisher A?

Swift Combine, how to combine publishers and sink when only one publisher's value changes?

Combine: Convert Closure into Publisher

Publisher vs AnyPublisher in Combine

Combine assign(to: on:) another publisher

Combine assign publisher to PassthroughSubject

iOS Swift: Value of type 'NotificationCenter' has no member 'publisher'

Converting one publisher into another publisher in Combine

Swift. Combine. Is there any way to call a publisher block more than once when retry?

Swift Combine - Observe change of object's property publisher inside an array of objects

Map Publisher to another publishers sequentially and handle errors for each step separately (or skip) in Swift Combine?

Swift Combine: subsequent Publisher that consumes other Publishers (using CombineLatest) doesn't "fire"

Swift Combine - How to get a Publisher that delivers events for every character change of UITextField's text property