Exchange Streams
Exchange streams implement bidirectional streaming where the client sends batches and the server responds with one output batch per input batch.
Basic usage
Section titled “Basic usage”import { Protocol, float, type OutputCollector } from "@query-farm/vgi-rpc";import { type RecordBatch } from "@query-farm/apache-arrow";
const protocol = new Protocol("MyService");
protocol.exchange<{ factor: number }>("scale", { params: { factor: float }, inputSchema: { value: float }, outputSchema: { value: float }, init: async ({ factor }) => ({ factor }), exchange: async (state, input: RecordBatch, out: OutputCollector) => { const value = input.getChildAt(0)?.get(0) as number; out.emitRow({ value: value * state.factor }); }, doc: "Scale input values by a factor.",});How it works
Section titled “How it works”An exchange method has two phases:
init— receives the request parameters, returns the initial state objectexchange— called once per input batch with the current state, input batch, and anOutputCollector
The exchange function must emit exactly one data batch per call. Unlike producer streams, calling out.finish() is not allowed in exchange methods.
Exchange function signature
Section titled “Exchange function signature”type ExchangeFn<S> = ( state: S, input: VgiBatch, out: OutputCollector,) => Promise<void> | void;state— mutable state object frominitinput— the input batch sent by the client, typed asVgiBatch(the backend-abstract batch type from the Arrow facade)out—OutputCollectorfor emitting the response batch
The example above annotates input as the arrow-js RecordBatch, which works structurally because RecordBatch satisfies the VgiBatch shape. The canonical type, however, is VgiBatch — prefer it when you want code that stays backend-agnostic.
Reading input data
Section titled “Reading input data”Access input batch data using Arrow’s column API:
exchange: async (state, input, out) => { // By column index const col0 = input.getChildAt(0);
// By field name const values = input.getChild("value");
// Iterate rows for (let i = 0; i < input.numRows; i++) { const val = values?.get(i); // process each row... }
out.emitRow({ result: processedValue });},Lockstep protocol
Section titled “Lockstep protocol”Exchange streams use a lockstep protocol: the server reads one input batch and writes one output batch before reading the next. This interleaved pattern prevents deadlocks when both sides are reading and writing on the same pipe.
Type-safe state
Section titled “Type-safe state”Like producer streams, the generic <S> parameter threads state types:
interface TransformState { factor: number; totalProcessed: number;}
protocol.exchange<TransformState>("transform", { params: { factor: float }, inputSchema: { value: float }, outputSchema: { value: float, running_total: float }, init: async ({ factor }) => ({ factor, totalProcessed: 0 }), exchange: async (state, input, out) => { const value = input.getChildAt(0)?.get(0) as number; const scaled = value * state.factor; state.totalProcessed += scaled; out.emitRow({ value: scaled, running_total: state.totalProcessed }); },});Cancellation
Section titled “Cancellation”A client can cancel an in-flight exchange stream by sending an input batch whose metadata carries the vgi_rpc.cancel key (the CANCEL_KEY constant). The server ends the stream cleanly without calling exchange again for that batch.
Register an optional onCancel hook on the exchange config to release resources held in state. It runs exactly once on cancellation:
protocol.exchange<TransformState>("transform", { params: { factor: float }, inputSchema: { value: float }, outputSchema: { value: float }, init: async ({ factor }) => ({ factor, totalProcessed: 0 }), exchange: async (state, input, out) => { const value = input.getChildAt(0)?.get(0) as number; out.emitRow({ value: value * state.factor }); }, onCancel: (state) => { // Cleanup; runs once when the client cancels. // Signature: (state: S) => void | Promise<void> },});The TypeScript client does not expose a dedicated cancel() method — cancellation is initiated by sending the cancel-metadata input batch, and onCancel is the server-side hook that responds to it.
Stream headers
Section titled “Stream headers”Exchange methods can optionally send a one-time header before the data stream. See Stream Headers.