Skip to content

Run and stream agents

Genkit agents are built around conversations that continue across turns. A session or chat carries continuity, while each turn streams chunks and eventually resolves to a final output. This page covers starting a conversation, streaming a turn, and continuing from an earlier point.

The JavaScript client exposes a high-level interface for driving an agent across turns.

  • AgentApi is the agent handle that remoteAgent() returns. You call chat(), loadChat(), getSnapshot(), and abort() on it; the per-turn methods live on the AgentChat that chat() returns.
  • AgentChat is a stateful conversation. It sends normal turns, streams turns, resumes interrupts, detaches work, and tracks the next-turn state, including the session ID, for you.
  • AgentTurn represents one in-flight streaming turn. It gives you a stream, a final response, and an abort helper.
  • AgentResponse is the completed turn, with text, tool requests, interrupts, finish reason, snapshot ID, custom state, artifacts, and raw output.
  • AgentChunk is one streamed update. It can contain text, accumulated text, model data, tool requests, custom state, or an artifact.
  • AgentInterrupt is a paused tool request. It has the original input and helpers for building resume payloads.
  • DetachedTask is a background task handle. It can poll, wait, or abort the detached turn.

res.state and chat.state are shortcuts for the custom state. Use res.raw.state when you need the full session state with messages, artifacts, and custom state together.

A local agent from ai.defineAgent() and a remote client from remoteAgent() share this interface, so the same code drives both.

const chat = weatherAgent.chat();
const res = await chat.send('Weather in Tokyo?');
console.log(res.text);
console.log(res.sessionId);
console.log(res.snapshotId);
console.log(res.state);

Calling chat() without arguments starts a new conversation. Pass sessionId for the latest server-managed conversation, snapshotId when you need an exact saved point, or state when the client owns the full session state.

const chat = weatherAgent.chat({
sessionId: 'user-session-123',
});
await chat.send('What did we discuss last time?');

When both sessionId and snapshotId are supplied, the snapshot selects the exact resume point and the session ID acts as an ownership guard.

loadChat() reads a server snapshot and hydrates messages, custom state, artifacts, snapshotId, and sessionId before the next turn.

const chat = await weatherAgent.loadChat({ sessionId: 'user-session-123' });
console.log(chat.messages.length);
console.log(chat.state);
await chat.send('Continue from there.');

Use getSnapshot() when you only need to inspect a snapshot, such as a status page or audit view. Use loadChat() when you want to continue the conversation from that saved state.

const chat = weatherAgent.chat();
const turn = chat.sendStream('Weather in Tokyo?');
for await (const chunk of turn.stream) {
if (chunk.text) process.stdout.write(chunk.text);
if (chunk.custom) updateStatus(chunk.custom);
if (chunk.artifact) renderArtifact(chunk.artifact);
}
const res = await turn.response;
console.log(res.finishReason);

The non-streaming send() path drains the stream internally so custom state patches are still applied. This keeps send() and sendStream() consistent for server-managed agents, where final wire output may return a snapshotId instead of full state.

Cancel a foreground turn from the caller.

const controller = new AbortController();
const turn = chat.sendStream('Write a long report.', {
abortSignal: controller.signal,
});
setTimeout(() => controller.abort(), 1000);
const res = await turn.response;
console.log(res.finishReason);

You can also call turn.abort(). Foreground aborts return an aborted response when cancellation is observed.

When a turn fails after the invocation starts, the client throws AgentError. The error carries the last-good state, snapshot ID, and response object when available.

import { AgentError } from 'genkit/beta/client';
try {
await chat.send('Use a broken tool.');
} catch (err) {
if (err instanceof AgentError) {
console.error(err.status);
console.error(err.snapshotId);
console.error(err.state);
}
}

Initialization misuse, such as sending state to a server-managed agent or sessionId to a client-managed agent, is rejected before a turn starts.

Use RunText for the common text-only case. Use Run when you need to send a full AgentInput, such as a resume payload or detach request.

out, err := weatherAgent.RunText(ctx, "Weather in Tokyo?")
if err != nil {
// The turn never started or could not produce a result; an in-band turn
// failure instead resolves on out.FinishReason and out.Error.
return fmt.Errorf("run turn: %w", err)
}
fmt.Println(out.Message.Text())
fmt.Println(out.SessionID)
fmt.Println(out.SnapshotID)

For a structured input:

import aix "github.com/firebase/genkit/go/ai/exp"
out, err := weatherAgent.Run(ctx, &aix.AgentInput{
Message: ai.NewUserTextMessage("Weather in Tokyo?"),
})

In-band failures resolve as AgentOutput with FinishReason set to failed and structured details in out.Error. A non-nil Go error means the invocation did not start or could not produce an output.

  • aix.WithSessionID[State](id) resumes the latest server-managed snapshot for a conversation.
  • aix.WithSnapshotID[State](id) resumes or branches from a specific server-managed snapshot.
  • aix.WithState[State](state) continues a client-managed conversation by sending the full state.

WithState is mutually exclusive with WithSessionID and WithSnapshotID. WithSessionID and WithSnapshotID can be combined to assert that the snapshot belongs to the session.

next, err := weatherAgent.RunText(ctx, "What about Paris?",
aix.WithSessionID[WeatherState](out.SessionID),
)

Use Connect for multi-turn local clients and streaming UIs. The connection lets you send text, messages, resume payloads, or a detach signal while receiving chunks.

Reach for Connect when the caller needs direct control over both sides of the conversation on one live connection. It is useful for command-line tools, local services, workers, and lower-level integrations that need to stream output, observe custom state patches, handle interrupts, send a resume payload, or send another message after a TurnEnd without reconnecting.

For most single-turn server code, use RunText or Run. For browser, mobile, and other HTTP clients, use the JavaScript chat().send() and chat().sendStream() clients through remoteAgent() rather than managing a bidirectional stream directly.

conn, err := weatherAgent.Connect(ctx)
if err != nil {
// Connect fails when the init payload is rejected before any turn runs.
return fmt.Errorf("connect to agent: %w", err)
}
if err := conn.SendText("Weather in Tokyo?"); err != nil {
return fmt.Errorf("send message: %w", err)
}
for chunk, err := range conn.Receive() {
if err != nil {
// A stream error ends the turn, such as the context being cancelled.
return fmt.Errorf("stream turn: %w", err)
}
if chunk.ModelChunk != nil {
fmt.Print(chunk.ModelChunk.Text())
}
if chunk.TurnEnd != nil {
fmt.Printf("\nturn finished: %s\n", chunk.TurnEnd.FinishReason)
break
}
}
out, err := conn.Output()
if err != nil {
return fmt.Errorf("finalize turn: %w", err)
}
fmt.Println(out.SnapshotID)

Breaking from Receive does not cancel the connection. Multi-turn clients commonly break on TurnEnd, send another input, and call Receive again. Call Output() when the invocation should finish. Do not call Output() from one goroutine while another goroutine is iterating Receive.

AgentConnection applies streamed custom-state patches as it receives chunks. Read conn.Custom() to inspect the custom state observed so far.

for chunk, err := range conn.Receive() {
if err != nil {
return fmt.Errorf("stream turn: %w", err)
}
if len(chunk.CustomPatch) > 0 {
state, err := conn.Custom()
if err != nil {
// Fails if an applied patch cannot decode into the State type.
return fmt.Errorf("read custom state: %w", err)
}
renderState(state)
}
}

The authoritative final state is on AgentOutput.State for client-managed agents, or in the saved snapshot for server-managed agents.