read

To use for await item in streamOfItems {...}, you need an AsyncStream. It is very common that you already have an existing Combine publisher, and you want to use the nice Swift Concurrency syntax.

The short answer: you can write publisher.values to get the async stream easily.

If you want to customize the stream, this is a guide on how you can do it.

Existing Publisher

Let’s use a simple example where you have a publisher that generates random number every second.

let subject = PassthroughSubject<Int, Never>()
let publisher = subject.eraseToAnyPublisher()
Timer.publish(every: 1, on: .main, in: .common)
    .autoconnect()
    .sink { _ in
        subject.send(Int.random(in: 1...99))
    }

Create the AsyncStream

We will then create a custom magicNumberStream that yields what the publisher emits.

var magicNumberStream: AsyncStream<Int> {
    AsyncStream { continuation in
        // Produce item using yield
        let cancellable = publisher.sink { num in
                continuation.yield(num)
            }

        // If the publisher can finish, call `continuation.finish()`

        // Handle termination
        continuation.onTermination = { termination in
            switch termination {
            case .finished:
                print("continuation.finish() was called")
            case .cancelled:
                print("Task was cancelled")
            }

            // Cancel the publisher's subscription
            cancellable.cancel()
        }
    }
}

You can then use it in a Task, with support for task cancellation!

Task {
    for await num in magicNumberStream {
        print("Magic", num)
    }
}

Image

@samwize

¯\_(ツ)_/¯

Back to Home