Skip to content

Exchange Streams

Exchange streams implement bidirectional streaming where the client sends batches and the server responds with one output batch per input batch.

Basic usage

import { Protocol, float, type OutputCollector } from "vgi-rpc-typescript";
import { type RecordBatch } from "apache-arrow";
const protocol = new Protocol("MyService");
protocol.exchange<{ factor: number }>("scale", {
params: { factor: float },
inputSchema: { value: float },
outputSchema: { value: float },
init: async ({ factor }) => ({ factor }),
exchange: async (state, input: RecordBatch, out: OutputCollector) => {
const value = input.getChildAt(0)?.get(0) as number;
out.emitRow({ value: value * state.factor });
},
doc: "Scale input values by a factor.",
});

How it works

An exchange method has two phases:

  1. init — receives the request parameters, returns the initial state object
  2. exchange — called once per input batch with the current state, input batch, and an OutputCollector

The exchange function must emit exactly one data batch per call. Unlike producer streams, calling out.finish() is not allowed in exchange methods.

Exchange function signature

type ExchangeFn<S> = (
state: S,
input: RecordBatch,
out: OutputCollector,
) => Promise<void> | void;
  • state — mutable state object from init
  • input — the Arrow RecordBatch sent by the client
  • outOutputCollector for emitting the response batch

Reading input data

Access input batch data using Arrow’s column API:

exchange: async (state, input, out) => {
// By column index
const col0 = input.getChildAt(0);
// By field name
const values = input.getChild("value");
// Iterate rows
for (let i = 0; i < input.numRows; i++) {
const val = values?.get(i);
// process each row...
}
out.emitRow({ result: processedValue });
},

Lockstep protocol

Exchange streams use a lockstep protocol: the server reads one input batch and writes one output batch before reading the next. This interleaved pattern prevents deadlocks when both sides are reading and writing on the same pipe.

Type-safe state

Like producer streams, the generic <S> parameter threads state types:

interface TransformState {
factor: number;
totalProcessed: number;
}
protocol.exchange<TransformState>("transform", {
params: { factor: float },
inputSchema: { value: float },
outputSchema: { value: float, running_total: float },
init: async ({ factor }) => ({ factor, totalProcessed: 0 }),
exchange: async (state, input, out) => {
const value = input.getChildAt(0)?.get(0) as number;
const scaled = value * state.factor;
state.totalProcessed += scaled;
out.emitRow({ value: scaled, running_total: state.totalProcessed });
},
});

Stream headers

Exchange methods can optionally send a one-time header before the data stream. See Stream Headers.