Skip to content

Exchange Streams

Exchange streams implement bidirectional streaming where the client sends batches and the server responds with one output batch per input batch.

import { Protocol, float, type OutputCollector } from "@query-farm/vgi-rpc";
import { type RecordBatch } from "@query-farm/apache-arrow";
const protocol = new Protocol("MyService");
protocol.exchange<{ factor: number }>("scale", {
params: { factor: float },
inputSchema: { value: float },
outputSchema: { value: float },
init: async ({ factor }) => ({ factor }),
exchange: async (state, input: RecordBatch, out: OutputCollector) => {
const value = input.getChildAt(0)?.get(0) as number;
out.emitRow({ value: value * state.factor });
},
doc: "Scale input values by a factor.",
});

An exchange method has two phases:

  1. init — receives the request parameters, returns the initial state object
  2. exchange — called once per input batch with the current state, input batch, and an OutputCollector

The exchange function must emit exactly one data batch per call. Unlike producer streams, calling out.finish() is not allowed in exchange methods.

type ExchangeFn<S> = (
state: S,
input: VgiBatch,
out: OutputCollector,
) => Promise<void> | void;
  • state — mutable state object from init
  • input — the input batch sent by the client, typed as VgiBatch (the backend-abstract batch type from the Arrow facade)
  • outOutputCollector for emitting the response batch

The example above annotates input as the arrow-js RecordBatch, which works structurally because RecordBatch satisfies the VgiBatch shape. The canonical type, however, is VgiBatch — prefer it when you want code that stays backend-agnostic.

Access input batch data using Arrow’s column API:

exchange: async (state, input, out) => {
// By column index
const col0 = input.getChildAt(0);
// By field name
const values = input.getChild("value");
// Iterate rows
for (let i = 0; i < input.numRows; i++) {
const val = values?.get(i);
// process each row...
}
out.emitRow({ result: processedValue });
},

Exchange streams use a lockstep protocol: the server reads one input batch and writes one output batch before reading the next. This interleaved pattern prevents deadlocks when both sides are reading and writing on the same pipe.

Like producer streams, the generic <S> parameter threads state types:

interface TransformState {
factor: number;
totalProcessed: number;
}
protocol.exchange<TransformState>("transform", {
params: { factor: float },
inputSchema: { value: float },
outputSchema: { value: float, running_total: float },
init: async ({ factor }) => ({ factor, totalProcessed: 0 }),
exchange: async (state, input, out) => {
const value = input.getChildAt(0)?.get(0) as number;
const scaled = value * state.factor;
state.totalProcessed += scaled;
out.emitRow({ value: scaled, running_total: state.totalProcessed });
},
});

A client can cancel an in-flight exchange stream by sending an input batch whose metadata carries the vgi_rpc.cancel key (the CANCEL_KEY constant). The server ends the stream cleanly without calling exchange again for that batch.

Register an optional onCancel hook on the exchange config to release resources held in state. It runs exactly once on cancellation:

protocol.exchange<TransformState>("transform", {
params: { factor: float },
inputSchema: { value: float },
outputSchema: { value: float },
init: async ({ factor }) => ({ factor, totalProcessed: 0 }),
exchange: async (state, input, out) => {
const value = input.getChildAt(0)?.get(0) as number;
out.emitRow({ value: value * state.factor });
},
onCancel: (state) => {
// Cleanup; runs once when the client cancels.
// Signature: (state: S) => void | Promise<void>
},
});

The TypeScript client does not expose a dedicated cancel() method — cancellation is initiated by sending the cancel-metadata input batch, and onCancel is the server-side hook that responds to it.

Exchange methods can optionally send a one-time header before the data stream. See Stream Headers.