read
To use for await item in streamOfItems {...}
, you need an AsyncStream
. It is very common that you already have an existing Combine publisher, and you want to use the nice Swift Concurrency syntax.
The short answer: you can write publisher.values
to get the async stream easily.
If you want to customize the stream, this is a guide on how you can do it.
Existing Publisher
Let’s use a simple example where you have a publisher that generates random number every second.
let subject = PassthroughSubject<Int, Never>()
let publisher = subject.eraseToAnyPublisher()
Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.sink { _ in
subject.send(Int.random(in: 1...99))
}
Create the AsyncStream
We will then create a custom magicNumberStream
that yields what the publisher emits.
var magicNumberStream: AsyncStream<Int> {
AsyncStream { continuation in
// Produce item using yield
let cancellable = publisher.sink { num in
continuation.yield(num)
}
// If the publisher can finish, call `continuation.finish()`
// Handle termination
continuation.onTermination = { termination in
switch termination {
case .finished:
print("continuation.finish() was called")
case .cancelled:
print("Task was cancelled")
}
// Cancel the publisher's subscription
cancellable.cancel()
}
}
}
You can then use it in a Task
, with support for task cancellation!
Task {
for await num in magicNumberStream {
print("Magic", num)
}
}