Skip to content

Streaming Example

The streaming example demonstrates both producer (server-streaming) and exchange (bidirectional-streaming) methods.

Source code

import { type RecordBatch } from "apache-arrow";
import {
Protocol,
VgiRpcServer,
int32,
float,
type OutputCollector,
} from "vgi-rpc-typescript";
const protocol = new Protocol("Streaming");
protocol.producer<{ limit: number; current: number; batchSize: number }>(
"count",
{
params: { limit: int32, batch_size: int32 },
outputSchema: { n: int32, n_squared: int32 },
init: async ({ limit, batch_size }) => ({
limit,
current: 0,
batchSize: batch_size,
}),
produce: async (state, out) => {
if (state.current >= state.limit) {
out.finish();
return;
}
const remaining = state.limit - state.current;
const count = Math.min(state.batchSize, remaining);
const nValues: number[] = [];
const sqValues: number[] = [];
for (let i = 0; i < count; i++) {
const n = state.current + i;
nValues.push(n);
sqValues.push(n * n);
}
state.current += count;
out.emit({ n: nValues, n_squared: sqValues });
},
doc: "Count from 0 to limit-1, emitting n and n_squared.",
defaults: { batch_size: 1 },
},
);
protocol.exchange<{ factor: number }>("scale", {
params: { factor: float },
inputSchema: { value: float },
outputSchema: { value: float },
init: async ({ factor }) => ({ factor }),
exchange: async (state, input: RecordBatch, out: OutputCollector) => {
const value = input.getChildAt(0)?.get(0) as number;
out.emitRow({ value: value * state.factor });
},
doc: "Scale input values by a factor.",
});
const server = new VgiRpcServer(protocol, { enableDescribe: true });
server.run();

Producer: count

The count method generates a sequence of numbers with their squares.

State management

The state object tracks:

  • limit — total numbers to generate
  • current — next number to emit
  • batchSize — how many rows per batch

Batch emission

For efficiency, the producer emits multiple rows per batch using column arrays (out.emit()), rather than one row at a time (out.emitRow()).

Stream termination

When current >= limit, the producer calls out.finish() to signal end of stream.

Exchange: scale

The scale method multiplies each input value by a configurable factor.

Lockstep protocol

The exchange reads one input batch, applies the transformation, and emits one output batch. This lockstep pattern prevents deadlocks in the stdin/stdout transport.

Running

Terminal window
# Producer stream
vgi-rpc --cmd "bun run examples/streaming.ts" call count limit=5 --format table
# n n_squared
# 0 0
# 1 1
# 2 4
# 3 9
# 4 16
# Exchange stream
vgi-rpc --cmd "bun run examples/streaming.ts" call scale factor=2.0