Skip to content

Custom orchestration

Most Genkit agents should use the standard prompt-backed loop. Custom orchestration is for cases where the application must control turn processing directly while still using the Agents API for sessions, snapshots, streaming, HTTP transport, and background execution. If you need complete ownership of the backend contract instead, a Genkit flow with direct generate() calls may be a better fit.

Use ai.defineCustomAgent() when the workflow needs one of these behaviors:

  • Several model calls in one user turn.
  • Dynamic model selection or custom stopping rules.
  • Planner and executor loops with application decisions between calls.
  • Manual message history management.
  • Custom state and artifact updates before the final response.
  • Custom stream chunks that do not come directly from a single model call.

A custom agent receives a SessionRunner and an options object:

async (sess, { sendChunk, abortSignal, context }) => {
// Custom loop.
};

Use sess.run(async (input, turnContext) => {}) to process each input turn. The runner adds input.message to history before the callback runs. For server-managed agents, turnContext.snapshotId is reserved before the turn starts and reused when the turn snapshot is saved.

The runner exposes helpers for the parts of session state you are most likely to need. Use getState() for the full state, getMessages() and addMessages() for conversation history, getCustom() and updateCustom(fn) for typed application state, and getArtifacts() and addArtifacts(artifacts) for generated outputs.

Use updateCustom(fn) for progress and control data that the UI should react to during a turn. Use addArtifacts() when the agent produces a named output the user may inspect later, such as a report, patch, or JSON result. Custom state should stay compact because it is part of the conversation state that gets snapshotted or returned to the client.

export const researchAgent = ai.defineCustomAgent(
{
name: 'researchAgent',
description:
'Plans research, answers subquestions, and synthesizes results.',
stateSchema: ResearchStateSchema,
store,
},
async (sess, { sendChunk, abortSignal }) => {
let finalMessage;
await sess.run(async (input, turnContext) => {
const userText =
input.message?.content.find((part) => part.text)?.text ?? '';
const priorMessages = sess.getMessages();
sess.updateCustom((state) => ({
...state,
status: 'Decomposing question',
turn: turnContext.turnIndex,
}));
const plan = await ai.generate({
model: liteModel,
prompt: `Break this question into three subquestions:\n${userText}`,
output: { format: 'json', schema: z.array(z.string()).length(3) },
abortSignal,
});
const subQuestions = plan.output ?? [userText];
sess.updateCustom((state) => ({
...state,
subQuestions,
status: 'Researching',
}));
const answers = [];
for (const question of subQuestions) {
const answer = await ai.generate({
prompt: `Answer in two paragraphs:\n${question}`,
abortSignal,
});
answers.push({ question, answer: answer.text });
}
sess.updateCustom((state) => ({
...state,
answers,
status: 'Synthesizing',
}));
const stream = ai.generateStream({
messages: priorMessages,
prompt: `Synthesize these findings:\n${JSON.stringify(answers)}`,
abortSignal,
});
for await (const chunk of stream.stream) {
sendChunk({ modelChunk: chunk });
}
const response = await stream.response;
finalMessage = response.message;
if (response.message) {
sess.addMessages([response.message]);
}
sess.addArtifacts([
{
name: `research-${turnContext.snapshotId}.json`,
parts: [{ text: JSON.stringify(answers) }],
},
]);
return { finishReason: response.finishReason };
});
return {
message: finalMessage,
artifacts: sess.getArtifacts(),
finishReason: sess.lastTurnFinishReason,
};
},
);

Use input.message for the current user message. The custom handler is not passed an input.messages array. Read history from sess.getMessages().

If the per-turn callback throws, the runtime marks the turn as failed, emits a failed turn end, and resolves the invocation with finishReason: 'failed'. For client-managed agents, the response carries the last-good state. For server-managed agents, the response carries the last-good snapshot ID. This lets clients retry without preserving partial failed-turn mutations.

sendChunk({ modelChunk }) forwards model chunks. sess.updateCustom() emits a customPatch chunk. sess.addArtifacts() records artifacts, while sendChunk({ artifact }) can stream an artifact chunk explicitly when the UI needs immediate visibility.