Streaming Example
The streaming example demonstrates both producer (server-streaming) and exchange (bidirectional-streaming) methods.
Source code
Section titled “Source code”import { type RecordBatch } from "@query-farm/apache-arrow";import { Protocol, VgiRpcServer, int32, float, type OutputCollector,} from "@query-farm/vgi-rpc";
const protocol = new Protocol("Streaming");
protocol.producer<{ limit: number; current: number; batchSize: number }>( "count", { params: { limit: int32, batch_size: int32 }, outputSchema: { n: int32, n_squared: int32 }, init: async ({ limit, batch_size }) => ({ limit, current: 0, batchSize: batch_size, }), produce: async (state, out) => { if (state.current >= state.limit) { out.finish(); return; }
const remaining = state.limit - state.current; const count = Math.min(state.batchSize, remaining);
const nValues: number[] = []; const sqValues: number[] = []; for (let i = 0; i < count; i++) { const n = state.current + i; nValues.push(n); sqValues.push(n * n); } state.current += count;
out.emit({ n: nValues, n_squared: sqValues }); }, doc: "Count from 0 to limit-1, emitting n and n_squared.", defaults: { batch_size: 1 }, paramTypes: { limit: "int", batch_size: "int" }, },);
protocol.exchange<{ factor: number }>("scale", { params: { factor: float }, inputSchema: { value: float }, outputSchema: { value: float }, init: async ({ factor }) => ({ factor }), exchange: async (state, input: RecordBatch, out: OutputCollector) => { const value = input.getChildAt(0)?.get(0) as number; out.emitRow({ value: value * state.factor }); }, doc: "Scale input values by a factor.",});
const server = new VgiRpcServer(protocol, { enableDescribe: true });server.run();Producer: count
Section titled “Producer: count”The count method generates a sequence of numbers with their squares.
State management
Section titled “State management”The state object tracks:
limit— total numbers to generatecurrent— next number to emitbatchSize— how many rows per batch
Batch emission
Section titled “Batch emission”For efficiency, the producer emits multiple rows per batch using column arrays (out.emit()), rather than one row at a time (out.emitRow()).
Stream termination
Section titled “Stream termination”When current >= limit, the producer calls out.finish() to signal end of stream.
Exchange: scale
Section titled “Exchange: scale”The scale method multiplies each input value by a configurable factor.
Lockstep protocol
Section titled “Lockstep protocol”The exchange reads one input batch, applies the transformation, and emits one output batch. This lockstep pattern prevents deadlocks in the stdin/stdout transport.
Running
Section titled “Running”# Producer streamvgi-rpc --cmd "bun run examples/streaming.ts" call count limit=5 --format table# n n_squared# 0 0# 1 1# 2 4# 3 9# 4 16
# Exchange streamvgi-rpc --cmd "bun run examples/streaming.ts" call scale factor=2.0