Channels
Channels provide a way for concurrent tasks to communicate by sending and receiving messages. They are inspired by Go channels and work with Tova's async/await system.
Creating Channels
Channel.new
tova
Channel.new(capacity?) -> ChannelCreates a new channel. The optional capacity parameter sets the buffer size:
- No capacity (or 0): Unbuffered --
sendblocks until a receiver is ready - Positive capacity: Buffered --
sendblocks only when the buffer is full
tova
// Unbuffered channel
ch = Channel.new()
// Buffered channel with capacity 10
ch = Channel.new(10)Sending and Receiving
send
tova
await ch.send(value) -> NilSends a value into the channel. For unbuffered channels, this suspends until a receiver is ready. For buffered channels, this suspends only when the buffer is full.
tova
ch = Channel.new(5)
await ch.send("hello")
await ch.send(42)receive
tova
await ch.receive() -> Option<T>Receives a value from the channel. Returns Some(value) if a value is available, or None if the channel is closed and empty.
tova
msg = await ch.receive()
match msg {
Some(value) => print("Got: {value}")
None => print("Channel closed")
}Closing Channels
close
tova
ch.close() -> NilCloses the channel. After closing:
- No more values can be sent (sending will throw an error)
- Pending values in the buffer can still be received
- Once the buffer is drained,
receive()returnsNone
tova
ch = Channel.new(10)
await ch.send(1)
await ch.send(2)
ch.close()
await ch.receive() // Some(1)
await ch.receive() // Some(2)
await ch.receive() // NoneAsync Iteration
Channels support async iteration with for await:
tova
ch = Channel.new(10)
// Producer (in another async context)
async fn produce(ch) {
for i in range(5) {
await ch.send(i)
}
ch.close()
}
// Consumer
for await msg in ch {
print("Received: {msg}")
}
// Prints: 0, 1, 2, 3, 4Examples
Producer-Consumer
tova
async fn producer(ch, items) {
for item in items {
await ch.send(item)
}
ch.close()
}
async fn consumer(ch) {
for await item in ch {
result = process(item)
print("Processed: {result}")
}
}
ch = Channel.new(10)
await parallel([
producer(ch, data),
consumer(ch)
])Fan-Out
Distribute work across multiple consumers:
tova
async fn worker(id, ch) {
for await task in ch {
print("Worker {id} processing: {task}")
await do_work(task)
}
}
ch = Channel.new(100)
// Start 3 workers
workers = range(3) |> map(fn(id) worker(id, ch))
// Send work
for task in tasks {
await ch.send(task)
}
ch.close()
await parallel(workers)Pipeline
Chain channels together for multi-stage processing:
tova
async fn stage1(input, output) {
for await raw in input {
await output.send(parse(raw))
}
output.close()
}
async fn stage2(input, output) {
for await parsed in input {
await output.send(transform(parsed))
}
output.close()
}
ch1 = Channel.new(10)
ch2 = Channel.new(10)
ch3 = Channel.new(10)
// Feed input
async fn feed(ch) {
for item in data {
await ch.send(item)
}
ch.close()
}
await parallel([
feed(ch1),
stage1(ch1, ch2),
stage2(ch2, ch3),
consumer(ch3)
])