Skip to content

Durable Streaming

Genkit supports durable streaming, which allows flow state to be persisted. This enables clients to disconnect and reconnect to a stream and replay the full result. This is particularly useful for long-running operations or unreliable network connections.

When durable streaming is enabled, Genkit uses a StreamManager to store the chunks of a stream as they are generated. The client receives a streamId which can be used to reconnect to the stream and replay the full transcript.

To enable durable streaming, you need to configure a StreamManager in your flow server (Express or Next.js).

For development and testing, or simple single-instance server, you can use the InMemoryStreamManager.

import { InMemoryStreamManager } from 'genkit/beta';
// ...

For production, you should use a durable storage solution. For example, @genkit-ai/firebase plugin provides implementations for Firestore and Realtime Database.

Terminal window
npm i @genkit-ai/firebase
import { FirestoreStreamManager, RtdbStreamManager } from '@genkit-ai/firebase/beta';
import { initializeApp } from 'firebase-admin/app';
import { getFirestore } from 'firebase-admin/firestore';
const app = initializeApp();
const firestore = new FirestoreStreamManager({
firebaseApp: app,
db: getFirestore(app),
collection: 'streams',
});
// Or for RTDB
const rtdb = new RtdbStreamManager({
firebaseApp: app,
refPrefix: 'streams',
});

To enable durable streaming in Express, pass the streamManager to expressHandler:

import { expressHandler } from '@genkit-ai/express';
import { InMemoryStreamManager } from 'genkit/beta';
app.post('/myDurableFlow', expressHandler(myFlow, {
streamManager: new InMemoryStreamManager(), // or firestore/rtdb
}));

To enable durable streaming in Next.js, pass the streamManager to appRoute:

import { appRoute } from '@genkit-ai/next';
import { InMemoryStreamManager } from 'genkit/beta';
export const POST = appRoute(myFlow, {
streamManager: new InMemoryStreamManager(), // or firestore/rtdb
});

Clients can initiate a stream and receive a streamId. This ID can be used to reconnect.

import { streamFlow } from 'genkit/beta/client';
// Start a new stream
const result = streamFlow({
url: `http://localhost:8080/myDurableFlow`,
input: 'tell me a long story',
});
// Save this ID for later
const streamId = await result.streamId;
// ... later, reconnect if needed ...
const reconnectedResult = streamFlow({
url: `http://localhost:8080/myDurableFlow`,
streamId: streamId,
});
for await (const chunk of reconnectedResult.stream) {
console.log(chunk);
}
  • Firestore: The entire stream history (chunks and final result) is stored in a single document. Firestore has a strict 1MB limitation on document size. If your stream output exceeds this limit, the flow will fail.
  • Realtime Database: While RTDB does not have the same 1MB limit, storing very large streams may impact performance or hit other quotas.