Streaming
#[rpc_stream] is a procedure type alongside queries and mutations that enables HTTP streaming
responses via Server-Sent Events. It's built on Axum's streaming primitives and Vercel's streaming
support.
How it works: The handler receives typed input plus a
StreamSender for emitting chunks. Each chunk is serialized as a data: {json}\n\n SSE event. The generated TypeScript client gets a stream() method returning an AsyncGenerator, and framework wrappers
provide reactive state management via createStream / useStream.Examples
Countdown — Structured Streaming
A stream handler with typed input and structured output chunks. Demonstrates timeout attribute and graceful client disconnection handling.
use metaxy::{rpc_stream, StreamSender};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
pub struct CountdownInput {
pub from: u32,
pub delay_ms: u64,
}
#[derive(Serialize)]
pub struct CountdownTick {
pub remaining: u32,
pub message: String,
}
#[rpc_stream(timeout = "30s")]
async fn countdown(input: CountdownInput, tx: StreamSender<CountdownTick>) {
let from = input.from.min(10);
let delay = Duration::from_millis(input.delay_ms.max(100));
for i in (0..=from).rev() {
let tick = CountdownTick {
remaining: i,
message: if i == 0 { "Done!".into() } else { format!("{i}...") },
};
if tx.send(tick).await.is_err() { break; }
if i > 0 { tokio::time::sleep(delay).await; }
}
}Token Stream — LLM-style Streaming
Simulates LLM token streaming by splitting input into words and streaming them back with realistic latency. Demonstrates the typical AI integration pattern.
use metaxy::{rpc_stream, StreamSender};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
pub struct TokenStreamInput {
pub prompt: String,
}
#[derive(Serialize)]
pub struct Token {
pub text: String,
pub index: u32,
}
#[rpc_stream(timeout = "60s")]
async fn token_stream(input: TokenStreamInput, tx: StreamSender<Token>) {
let words: Vec<&str> = input.prompt.split_whitespace().collect();
for (i, word) in words.iter().enumerate() {
let sep = if i == 0 { "" } else { " " };
let token = Token { text: format!("{sep}{word}"), index: i as u32 };
if tx.send(token).await.is_err() { break; }
tokio::time::sleep(Duration::from_millis(50 + word.len() as u64 * 15)).await;
}
}Client Usage
// Direct call — async generator
for await (const chunk of rpc.stream('countdown', { from: 5, delay_ms: 500 })) {
console.log(chunk.remaining, chunk.message);
}
// With call options
for await (const chunk of rpc.stream('token_stream', { prompt: "Hello world" }, {
signal: controller.signal,
})) {
process(chunk);
}// Reactive wrapper — manages chunks, state, and cleanup
const stream = createStream(rpc, 'countdown', () => ({
from: countdownFrom,
delay_ms: 500,
}));
// stream.chunks — CountdownTick[]
// stream.isStreaming — boolean
// stream.isDone — boolean
// stream.start() — begin streaming
// stream.stop() — abortSupported Attributes
| Attribute | Stream | Notes |
|---|---|---|
timeout | Yes | Sends SSE error event on expiry |
init | Yes | Cold-start initialization, state injection |
cache | No | Streaming responses cannot be cached |
idempotent | No | Streams are inherently non-idempotent |
Client-Side Options
How RpcClientConfig and CallOptions behave for streams:
| Option | Stream | Notes |
|---|---|---|
callOptions.signal | Yes | Merged with internal controller via AbortSignal.any() |
callOptions.timeout | Yes | Aborts the SSE connection after the given duration |
onRequest | Yes | Fires before the fetch |
onError | Yes | Fires on non-ok response or network error |
onResponse | No | No single response body — use onChunk / onDone in framework wrappers |
retry | No | Stream restart is the application's responsibility |
config.timeout | No | Server manages stream duration via #[rpc_stream(timeout)] |
dedupe | No | Each stream call opens its own SSE connection |
Try it
Countdown — Structured Streaming
Streams countdown ticks with configurable start value and delay.
Token Stream — LLM-style Streaming
Simulates LLM token-by-token generation. Watch the text assemble in real time.