Durable Streaming
Genkit supports durable streaming, which allows flow state to be persisted. This enables clients to disconnect and reconnect to a stream and replay the full result. This is particularly useful for long-running operations or unreliable network connections.
How it works
Section titled “How it works”When durable streaming is enabled, Genkit uses a StreamManager to store the chunks of a stream as they are generated. The client receives a streamId which can be used to reconnect to the stream and replay the full transcript.
Configuration
Section titled “Configuration”To enable durable streaming, you need to configure a StreamManager in your flow server (Express or Next.js).
Development
Section titled “Development”For development and testing, or simple single-instance server, you can use the InMemoryStreamManager.
import { InMemoryStreamManager } from 'genkit/beta';
// ...Production
Section titled “Production”For production, you should use a durable storage solution. For example, @genkit-ai/firebase plugin provides implementations for Firestore and Realtime Database.
npm i @genkit-ai/firebaseimport { FirestoreStreamManager, RtdbStreamManager } from '@genkit-ai/firebase/beta';import { initializeApp } from 'firebase-admin/app';import { getFirestore } from 'firebase-admin/firestore';
const app = initializeApp();const firestore = new FirestoreStreamManager({ firebaseApp: app, db: getFirestore(app), collection: 'streams',});
// Or for RTDBconst rtdb = new RtdbStreamManager({ firebaseApp: app, refPrefix: 'streams',});Framework Integration
Section titled “Framework Integration”Express
Section titled “Express”To enable durable streaming in Express, pass the streamManager to expressHandler:
import { expressHandler } from '@genkit-ai/express';import { InMemoryStreamManager } from 'genkit/beta';
app.post('/myDurableFlow', expressHandler(myFlow, { streamManager: new InMemoryStreamManager(), // or firestore/rtdb}));Next.js
Section titled “Next.js”To enable durable streaming in Next.js, pass the streamManager to appRoute:
import { appRoute } from '@genkit-ai/next';import { InMemoryStreamManager } from 'genkit/beta';
export const POST = appRoute(myFlow, { streamManager: new InMemoryStreamManager(), // or firestore/rtdb});Client Usage
Section titled “Client Usage”Clients can initiate a stream and receive a streamId. This ID can be used to reconnect.
import { streamFlow } from 'genkit/beta/client';
// Start a new streamconst result = streamFlow({ url: `http://localhost:8080/myDurableFlow`, input: 'tell me a long story',});
// Save this ID for laterconst streamId = await result.streamId;
// ... later, reconnect if needed ...const reconnectedResult = streamFlow({ url: `http://localhost:8080/myDurableFlow`, streamId: streamId,});
for await (const chunk of reconnectedResult.stream) { console.log(chunk);}Limitations
Section titled “Limitations”- Firestore: The entire stream history (chunks and final result) is stored in a single document. Firestore has a strict 1MB limitation on document size. If your stream output exceeds this limit, the flow will fail.
- Realtime Database: While RTDB does not have the same 1MB limit, storing very large streams may impact performance or hit other quotas.