Streaming

#[rpc_stream] is a procedure type alongside queries and mutations that enables HTTP streaming responses via Server-Sent Events. It's built on Axum's streaming primitives and Vercel's streaming support.

How it works: The handler receives typed input plus a StreamSender for emitting chunks. Each chunk is serialized as a data: {json}\n\n SSE event. The generated TypeScript client gets a stream() method returning an AsyncGenerator, and framework wrappers provide reactive state management via createStream / useStream.

Examples

Countdown — Structured Streaming

A stream handler with typed input and structured output chunks. Demonstrates timeout attribute and graceful client disconnection handling.

rust
use metaxy::{rpc_stream, StreamSender};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub struct CountdownInput {
    pub from: u32,
    pub delay_ms: u64,
}

#[derive(Serialize)]
pub struct CountdownTick {
    pub remaining: u32,
    pub message: String,
}

#[rpc_stream(timeout = "30s")]
async fn countdown(input: CountdownInput, tx: StreamSender<CountdownTick>) {
    let from = input.from.min(10);
    let delay = Duration::from_millis(input.delay_ms.max(100));

    for i in (0..=from).rev() {
        let tick = CountdownTick {
            remaining: i,
            message: if i == 0 { "Done!".into() } else { format!("{i}...") },
        };
        if tx.send(tick).await.is_err() { break; }
        if i > 0 { tokio::time::sleep(delay).await; }
    }
}

Token Stream — LLM-style Streaming

Simulates LLM token streaming by splitting input into words and streaming them back with realistic latency. Demonstrates the typical AI integration pattern.

rust
use metaxy::{rpc_stream, StreamSender};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub struct TokenStreamInput {
    pub prompt: String,
}

#[derive(Serialize)]
pub struct Token {
    pub text: String,
    pub index: u32,
}

#[rpc_stream(timeout = "60s")]
async fn token_stream(input: TokenStreamInput, tx: StreamSender<Token>) {
    let words: Vec<&str> = input.prompt.split_whitespace().collect();

    for (i, word) in words.iter().enumerate() {
        let sep = if i == 0 { "" } else { " " };
        let token = Token { text: format!("{sep}{word}"), index: i as u32 };
        if tx.send(token).await.is_err() { break; }
        tokio::time::sleep(Duration::from_millis(50 + word.len() as u64 * 15)).await;
    }
}

Client Usage

ts
// Direct call — async generator
for await (const chunk of rpc.stream('countdown', { from: 5, delay_ms: 500 })) {
  console.log(chunk.remaining, chunk.message);
}

// With call options
for await (const chunk of rpc.stream('token_stream', { prompt: "Hello world" }, {
  signal: controller.signal,
})) {
  process(chunk);
}
ts
// Reactive wrapper — manages chunks, state, and cleanup
const stream = createStream(rpc, 'countdown', () => ({
  from: countdownFrom,
  delay_ms: 500,
}));

// stream.chunks     — CountdownTick[]
// stream.isStreaming — boolean
// stream.isDone     — boolean
// stream.start()    — begin streaming
// stream.stop()     — abort

Supported Attributes

AttributeStreamNotes
timeoutYesSends SSE error event on expiry
initYesCold-start initialization, state injection
cacheNoStreaming responses cannot be cached
idempotentNoStreams are inherently non-idempotent

Client-Side Options

How RpcClientConfig and CallOptions behave for streams:

OptionStreamNotes
callOptions.signalYesMerged with internal controller via AbortSignal.any()
callOptions.timeoutYesAborts the SSE connection after the given duration
onRequestYesFires before the fetch
onErrorYesFires on non-ok response or network error
onResponseNoNo single response body — use onChunk / onDone in framework wrappers
retryNoStream restart is the application's responsibility
config.timeoutNoServer manages stream duration via #[rpc_stream(timeout)]
dedupeNoEach stream call opens its own SSE connection

Try it

Countdown — Structured Streaming

Streams countdown ticks with configurable start value and delay.

Token Stream — LLM-style Streaming

Simulates LLM token-by-token generation. Watch the text assemble in real time.

visit GitHub to learn more about metaxy