Streaming Example
The streaming example demonstrates both producer (server-streaming) and exchange (bidirectional-streaming) methods.
Source code
import { type RecordBatch } from "apache-arrow";import { Protocol, VgiRpcServer, int32, float, type OutputCollector,} from "vgi-rpc-typescript";
const protocol = new Protocol("Streaming");
protocol.producer<{ limit: number; current: number; batchSize: number }>( "count", { params: { limit: int32, batch_size: int32 }, outputSchema: { n: int32, n_squared: int32 }, init: async ({ limit, batch_size }) => ({ limit, current: 0, batchSize: batch_size, }), produce: async (state, out) => { if (state.current >= state.limit) { out.finish(); return; }
const remaining = state.limit - state.current; const count = Math.min(state.batchSize, remaining);
const nValues: number[] = []; const sqValues: number[] = []; for (let i = 0; i < count; i++) { const n = state.current + i; nValues.push(n); sqValues.push(n * n); } state.current += count;
out.emit({ n: nValues, n_squared: sqValues }); }, doc: "Count from 0 to limit-1, emitting n and n_squared.", defaults: { batch_size: 1 }, },);
protocol.exchange<{ factor: number }>("scale", { params: { factor: float }, inputSchema: { value: float }, outputSchema: { value: float }, init: async ({ factor }) => ({ factor }), exchange: async (state, input: RecordBatch, out: OutputCollector) => { const value = input.getChildAt(0)?.get(0) as number; out.emitRow({ value: value * state.factor }); }, doc: "Scale input values by a factor.",});
const server = new VgiRpcServer(protocol, { enableDescribe: true });server.run();Producer: count
The count method generates a sequence of numbers with their squares.
State management
The state object tracks:
limit— total numbers to generatecurrent— next number to emitbatchSize— how many rows per batch
Batch emission
For efficiency, the producer emits multiple rows per batch using column arrays (out.emit()), rather than one row at a time (out.emitRow()).
Stream termination
When current >= limit, the producer calls out.finish() to signal end of stream.
Exchange: scale
The scale method multiplies each input value by a configurable factor.
Lockstep protocol
The exchange reads one input batch, applies the transformation, and emits one output batch. This lockstep pattern prevents deadlocks in the stdin/stdout transport.
Running
# Producer streamvgi-rpc --cmd "bun run examples/streaming.ts" call count limit=5 --format table# n n_squared# 0 0# 1 1# 2 4# 3 9# 4 16
# Exchange streamvgi-rpc --cmd "bun run examples/streaming.ts" call scale factor=2.0