Skip to content

Custom orchestration

Most Genkit agents should use the standard prompt-backed loop. Custom orchestration is for cases where the application must control turn processing directly while still using the Agents API for sessions, snapshots, streaming, HTTP transport, and background execution. If you need complete ownership of the backend contract instead, a Genkit flow with direct generate() calls may be a better fit.

Use ai.defineCustomAgent() when the workflow needs one of these behaviors:

  • Several model calls in one user turn.
  • Dynamic model selection or custom stopping rules.
  • Planner and executor loops with application decisions between calls.
  • Manual message history management.
  • Custom state and artifact updates before the final response.
  • Custom stream chunks that do not come directly from a single model call.

A custom agent receives a SessionRunner and an options object:

async (sess, { sendChunk, abortSignal, context }) => {
// Custom loop.
};

Use sess.run(async (input, turnContext) => {}) to process each input turn. The runner adds input.message to history before the callback runs. For server-managed agents, turnContext.snapshotId is reserved before the turn starts and reused when the turn snapshot is saved.

The runner exposes helpers for the parts of session state you are most likely to need. Use getState() for the full state, getMessages() and addMessages() for conversation history, getCustom() and updateCustom(fn) for typed application state, and getArtifacts() and addArtifacts(artifacts) for generated outputs.

Use updateCustom(fn) for progress and control data that the UI should react to during a turn. Use addArtifacts() when the agent produces a named output the user may inspect later, such as a report, patch, or JSON result. Custom state should stay compact because it is part of the conversation state that gets snapshotted or returned to the client.

export const researchAgent = ai.defineCustomAgent(
{
name: 'researchAgent',
description:
'Plans research, answers subquestions, and synthesizes results.',
stateSchema: ResearchStateSchema,
store,
},
async (sess, { sendChunk, abortSignal }) => {
let finalMessage;
await sess.run(async (input, turnContext) => {
const userText =
input.message?.content.find((part) => part.text)?.text ?? '';
const priorMessages = sess.getMessages();
sess.updateCustom((state) => ({
...state,
status: 'Decomposing question',
turn: turnContext.turnIndex,
}));
const plan = await ai.generate({
model: liteModel,
prompt: `Break this question into three subquestions:\n${userText}`,
output: { format: 'json', schema: z.array(z.string()).length(3) },
abortSignal,
});
const subQuestions = plan.output ?? [userText];
sess.updateCustom((state) => ({
...state,
subQuestions,
status: 'Researching',
}));
const answers = [];
for (const question of subQuestions) {
const answer = await ai.generate({
prompt: `Answer in two paragraphs:\n${question}`,
abortSignal,
});
answers.push({ question, answer: answer.text });
}
sess.updateCustom((state) => ({
...state,
answers,
status: 'Synthesizing',
}));
const stream = ai.generateStream({
messages: priorMessages,
prompt: `Synthesize these findings:\n${JSON.stringify(answers)}`,
abortSignal,
});
for await (const chunk of stream.stream) {
sendChunk({ modelChunk: chunk });
}
const response = await stream.response;
finalMessage = response.message;
if (response.message) {
sess.addMessages([response.message]);
}
sess.addArtifacts([
{
name: `research-${turnContext.snapshotId}.json`,
parts: [{ text: JSON.stringify(answers) }],
},
]);
return { finishReason: response.finishReason };
});
return {
message: finalMessage,
artifacts: sess.getArtifacts(),
finishReason: sess.lastTurnFinishReason,
};
},
);

Use input.message for the current user message. The custom handler is not passed an input.messages array. Read history from sess.getMessages().

If the per-turn callback throws, the runtime marks the turn as failed, emits a failed turn end, and resolves the invocation with finishReason: 'failed'. For client-managed agents, the response carries the last-good state. For server-managed agents, the response carries the last-good snapshot ID. This lets clients retry without preserving partial failed-turn mutations.

sendChunk({ modelChunk }) forwards model chunks. sess.updateCustom() emits a customPatch chunk. sess.addArtifacts() records artifacts, while sendChunk({ artifact }) can stream an artifact chunk explicitly when the UI needs immediate visibility.

Use genkitx.DefineCustomAgent when the prompt-backed loop does not fit:

  • The agent must call multiple models in one turn.
  • The workflow chooses models or tools dynamically.
  • You need custom retry, planning, or validation around each turn.
  • The agent emits artifacts or state updates outside a normal model stream.
  • You need the reserved turn snapshot ID before work starts.

The custom function receives:

  • ctx is the invocation context. It is canceled on client disconnect or abort.
  • resp aix.Responder streams model chunks and artifacts to the client.
  • sess *aix.SessionRunner[State] manages turns, messages, custom state, artifacts, and snapshots.

Call sess.Run(ctx, fn) to process input turns. The runner adds each user message to the session before the callback runs, emits turn-end chunks, and writes snapshots when a store exists.

import (
aix "github.com/firebase/genkit/go/ai/exp"
genkitx "github.com/firebase/genkit/go/genkit/exp"
)
coder := genkitx.DefineCustomAgent(g, "coder",
func(ctx context.Context, resp aix.Responder, sess *aix.SessionRunner[CoderState]) (*aix.AgentResult, error) {
err := sess.Run(ctx, func(ctx context.Context, input *aix.AgentInput) (*aix.TurnResult, error) {
turn := aix.TurnContextFromContext(ctx)
sess.UpdateCustom(func(state CoderState) CoderState {
state.Status = "Generating answer"
state.LastSnapshotID = turn.SnapshotID
return state
})
for chunk, err := range genkit.GenerateStream(ctx, g,
ai.WithModelName("googleai/gemini-flash-latest"),
ai.WithSystem("You are a concise coding assistant."),
ai.WithMessages(sess.Messages()...),
) {
if err != nil {
return nil, fmt.Errorf("stream model: %w", err)
}
if chunk.Done {
sess.AddMessages(chunk.Response.Message)
return &aix.TurnResult{
FinishReason: aix.AgentFinishReason(chunk.Response.FinishReason),
}, nil
}
resp.SendModelChunk(chunk.Chunk)
}
return nil, nil
})
if err != nil {
// sess.Run surfaces a turn-loop failure; the framework marks the
// turn failed and resolves the invocation with the last-good state.
return nil, fmt.Errorf("run turn: %w", err)
}
return sess.Result(), nil
},
aix.WithSessionStore(store),
aix.WithDescription[CoderState]("Concise coding assistant"),
)

sess.Result() is a convenience that returns the last message and artifacts currently recorded in the session.

aix.TurnContextFromContext(ctx) returns read-only turn metadata:

  • SnapshotID is the snapshot ID reserved before this turn runs. It is empty for client-managed agents.
  • ParentSnapshotID is the snapshot this turn continues from.
  • TurnIndex is the zero-based turn number within the invocation.

Use this when external resources need to line up with the snapshot that will later be saved.

Responder.SendModelChunk(chunk) streams token-level model output. Responder.SendArtifact(artifact) streams an artifact and records it in the session. Send methods return promptly when the work context is canceled. Their session side effects are applied before they return, so snapshots and sess.Result() observe them.

If the per-turn callback returns an error, the invocation resolves as a failed AgentOutput with structured error details and a last-good recovery point. A client can resume from the last-good state or snapshot. When a client detaches, chunks after detach are not forwarded, but session side effects such as artifacts still apply to the final snapshot.