Custom orchestration
Most Genkit agents should use the standard prompt-backed loop. Custom orchestration is for cases where the application must control turn processing directly while still using the Agents API for sessions, snapshots, streaming, HTTP transport, and background execution. If you need complete ownership of the backend contract instead, a Genkit flow with direct generate() calls may be a better fit.
When to use a custom agent
Section titled “When to use a custom agent”Use ai.defineCustomAgent() when the workflow needs one of these behaviors:
- Several model calls in one user turn.
- Dynamic model selection or custom stopping rules.
- Planner and executor loops with application decisions between calls.
- Manual message history management.
- Custom state and artifact updates before the final response.
- Custom stream chunks that do not come directly from a single model call.
Runtime contract
Section titled “Runtime contract”A custom agent receives a SessionRunner and an options object:
async (sess, { sendChunk, abortSignal, context }) => { // Custom loop.};Use sess.run(async (input, turnContext) => {}) to process each input turn. The runner adds input.message to history before the callback runs. For server-managed agents, turnContext.snapshotId is reserved before the turn starts and reused when the turn snapshot is saved.
The runner exposes helpers for the parts of session state you are most likely to need. Use getState() for the full state, getMessages() and addMessages() for conversation history, getCustom() and updateCustom(fn) for typed application state, and getArtifacts() and addArtifacts(artifacts) for generated outputs.
Use updateCustom(fn) for progress and control data that the UI should react to during a turn. Use addArtifacts() when the agent produces a named output the user may inspect later, such as a report, patch, or JSON result. Custom state should stay compact because it is part of the conversation state that gets snapshotted or returned to the client.
Multi-step example
Section titled “Multi-step example”export const researchAgent = ai.defineCustomAgent( { name: 'researchAgent', description: 'Plans research, answers subquestions, and synthesizes results.', stateSchema: ResearchStateSchema, store, }, async (sess, { sendChunk, abortSignal }) => { let finalMessage;
await sess.run(async (input, turnContext) => { const userText = input.message?.content.find((part) => part.text)?.text ?? ''; const priorMessages = sess.getMessages();
sess.updateCustom((state) => ({ ...state, status: 'Decomposing question', turn: turnContext.turnIndex, }));
const plan = await ai.generate({ model: liteModel, prompt: `Break this question into three subquestions:\n${userText}`, output: { format: 'json', schema: z.array(z.string()).length(3) }, abortSignal, });
const subQuestions = plan.output ?? [userText];
sess.updateCustom((state) => ({ ...state, subQuestions, status: 'Researching', }));
const answers = []; for (const question of subQuestions) { const answer = await ai.generate({ prompt: `Answer in two paragraphs:\n${question}`, abortSignal, }); answers.push({ question, answer: answer.text }); }
sess.updateCustom((state) => ({ ...state, answers, status: 'Synthesizing', }));
const stream = ai.generateStream({ messages: priorMessages, prompt: `Synthesize these findings:\n${JSON.stringify(answers)}`, abortSignal, });
for await (const chunk of stream.stream) { sendChunk({ modelChunk: chunk }); }
const response = await stream.response; finalMessage = response.message;
if (response.message) { sess.addMessages([response.message]); }
sess.addArtifacts([ { name: `research-${turnContext.snapshotId}.json`, parts: [{ text: JSON.stringify(answers) }], }, ]);
return { finishReason: response.finishReason }; });
return { message: finalMessage, artifacts: sess.getArtifacts(), finishReason: sess.lastTurnFinishReason, }; },);Use input.message for the current user message. The custom handler is not passed an input.messages array. Read history from sess.getMessages().
Failure and recovery
Section titled “Failure and recovery”If the per-turn callback throws, the runtime marks the turn as failed, emits a failed turn end, and resolves the invocation with finishReason: 'failed'. For client-managed agents, the response carries the last-good state. For server-managed agents, the response carries the last-good snapshot ID. This lets clients retry without preserving partial failed-turn mutations.
Streaming custom data
Section titled “Streaming custom data”sendChunk({ modelChunk }) forwards model chunks. sess.updateCustom() emits a customPatch chunk. sess.addArtifacts() records artifacts, while sendChunk({ artifact }) can stream an artifact chunk explicitly when the UI needs immediate visibility.
When to use a custom agent
Section titled “When to use a custom agent”Use genkitx.DefineCustomAgent when the prompt-backed loop does not fit:
- The agent must call multiple models in one turn.
- The workflow chooses models or tools dynamically.
- You need custom retry, planning, or validation around each turn.
- The agent emits artifacts or state updates outside a normal model stream.
- You need the reserved turn snapshot ID before work starts.
Runtime contract
Section titled “Runtime contract”The custom function receives:
ctxis the invocation context. It is canceled on client disconnect or abort.resp aix.Responderstreams model chunks and artifacts to the client.sess *aix.SessionRunner[State]manages turns, messages, custom state, artifacts, and snapshots.
Call sess.Run(ctx, fn) to process input turns. The runner adds each user message to the session before the callback runs, emits turn-end chunks, and writes snapshots when a store exists.
Custom agent example
Section titled “Custom agent example”import ( aix "github.com/firebase/genkit/go/ai/exp" genkitx "github.com/firebase/genkit/go/genkit/exp")coder := genkitx.DefineCustomAgent(g, "coder", func(ctx context.Context, resp aix.Responder, sess *aix.SessionRunner[CoderState]) (*aix.AgentResult, error) { err := sess.Run(ctx, func(ctx context.Context, input *aix.AgentInput) (*aix.TurnResult, error) { turn := aix.TurnContextFromContext(ctx)
sess.UpdateCustom(func(state CoderState) CoderState { state.Status = "Generating answer" state.LastSnapshotID = turn.SnapshotID return state })
for chunk, err := range genkit.GenerateStream(ctx, g, ai.WithModelName("googleai/gemini-flash-latest"), ai.WithSystem("You are a concise coding assistant."), ai.WithMessages(sess.Messages()...), ) { if err != nil { return nil, fmt.Errorf("stream model: %w", err) } if chunk.Done { sess.AddMessages(chunk.Response.Message) return &aix.TurnResult{ FinishReason: aix.AgentFinishReason(chunk.Response.FinishReason), }, nil } resp.SendModelChunk(chunk.Chunk) }
return nil, nil }) if err != nil { // sess.Run surfaces a turn-loop failure; the framework marks the // turn failed and resolves the invocation with the last-good state. return nil, fmt.Errorf("run turn: %w", err) } return sess.Result(), nil }, aix.WithSessionStore(store), aix.WithDescription[CoderState]("Concise coding assistant"),)sess.Result() is a convenience that returns the last message and artifacts currently recorded in the session.
Turn context
Section titled “Turn context”aix.TurnContextFromContext(ctx) returns read-only turn metadata:
SnapshotIDis the snapshot ID reserved before this turn runs. It is empty for client-managed agents.ParentSnapshotIDis the snapshot this turn continues from.TurnIndexis the zero-based turn number within the invocation.
Use this when external resources need to line up with the snapshot that will later be saved.
Responder behavior
Section titled “Responder behavior”Responder.SendModelChunk(chunk) streams token-level model output. Responder.SendArtifact(artifact) streams an artifact and records it in the session. Send methods return promptly when the work context is canceled. Their session side effects are applied before they return, so snapshots and sess.Result() observe them.
Failure and detach behavior
Section titled “Failure and detach behavior”If the per-turn callback returns an error, the invocation resolves as a failed AgentOutput with structured error details and a last-good recovery point. A client can resume from the last-good state or snapshot. When a client detaches, chunks after detach are not forwarded, but session side effects such as artifacts still apply to the final snapshot.