Durable Streaming
Genkit supports durable streaming, which allows flow state to be persisted. This enables clients to disconnect and reconnect to a stream and replay the full result. This is particularly useful for long-running operations or unreliable network connections.
How it works
Section titled “How it works”When durable streaming is enabled, Genkit uses a StreamManager to store the chunks of a stream as they are generated. The client receives a streamId which can be used to reconnect to the stream and replay the full transcript.
Configuration
Section titled “Configuration”To enable durable streaming, you need to configure a StreamManager in your flow server (Express or Next.js).
To enable durable streaming, you need to configure a StreamManager and pass it to genkit.Handler().
Development
Section titled “Development”For development and testing, or simple single-instance server, you can use the InMemoryStreamManager.
import { InMemoryStreamManager } from 'genkit/beta';
// ...import "github.com/firebase/genkit/go/core/x/streaming"
// Create an in-memory stream manager with optional TTL for completed streamssm := streaming.NewInMemoryStreamManager( streaming.WithTTL(10 * time.Minute), // Optional: how long to retain completed streams)Note that InMemoryStreamManager stores streams in memory, so they will be lost if the server restarts. For production use cases where persistence across restarts is required, use FirestoreStreamManager.
Production
Section titled “Production”For production, you should use a durable storage solution.
The @genkit-ai/firebase plugin provides implementations for Firestore and Realtime Database.
npm i @genkit-ai/firebaseimport { FirestoreStreamManager, RtdbStreamManager } from '@genkit-ai/firebase/beta';import { initializeApp } from 'firebase-admin/app';import { getFirestore } from 'firebase-admin/firestore';
const app = initializeApp();const firestore = new FirestoreStreamManager({ firebaseApp: app, db: getFirestore(app), collection: 'streams',});
// Or for RTDBconst rtdb = new RtdbStreamManager({ firebaseApp: app, refPrefix: 'streams',});The firebase plugin provides FirestoreStreamManager for durable stream storage.
import ( "github.com/firebase/genkit/go/genkit" "github.com/firebase/genkit/go/plugins/firebase" firebasex "github.com/firebase/genkit/go/plugins/firebase/x")
// Initialize Genkit with the Firebase pluging := genkit.Init(ctx, genkit.WithPlugins(&firebase.Firebase{}))
// Create a Firestore stream managersm, err := firebasex.NewFirestoreStreamManager(ctx, g, firebasex.WithCollection("genkit-streams"), // Required: Firestore collection for stream documents firebasex.WithTimeout(2 * time.Minute), // Optional: how long subscribers wait for new events firebasex.WithTTL(10 * time.Minute), // Optional: how long completed streams are retained)if err != nil { log.Fatalf("Failed to create Firestore stream manager: %v", err)}FirestoreStreamManager provides:
- Persistence across restarts: Clients can reconnect to streams after server restarts
- Multi-instance support: Multiple server instances can serve the same stream
- Automatic cleanup: Completed streams are automatically deleted via Firestore TTL policies
Firestore TTL setup
Section titled “Firestore TTL setup”For automatic cleanup of old streams, configure a TTL policy on your Firestore collection:
gcloud firestore fields ttls update expiresAt \ --collection-group=genkit-streams \ --enable-ttl \ --project=YOUR_PROJECT_IDSee Firestore TTL documentation for more details.
Framework Integration
Section titled “Framework Integration”Express
Section titled “Express”To enable durable streaming in Express, pass the streamManager to expressHandler:
import { expressHandler } from '@genkit-ai/express';import { InMemoryStreamManager } from 'genkit/beta';
app.post('/myDurableFlow', expressHandler(myFlow, { streamManager: new InMemoryStreamManager(), // or firestore/rtdb}));Next.js
Section titled “Next.js”To enable durable streaming in Next.js, pass the streamManager to appRoute:
import { appRoute } from '@genkit-ai/next';import { InMemoryStreamManager } from 'genkit/beta';
export const POST = appRoute(myFlow, { streamManager: new InMemoryStreamManager(), // or firestore/rtdb});net/http Server
Section titled “net/http Server”To enable durable streaming with Go’s standard net/http server, pass the StreamManager to genkit.Handler() using the WithStreamManager option:
package main
import ( "context" "log" "net/http" "time"
"github.com/firebase/genkit/go/core/x/streaming" "github.com/firebase/genkit/go/genkit" "github.com/firebase/genkit/go/plugins/server")
func main() { ctx := context.Background() g := genkit.Init(ctx)
// Define a streaming flow myFlow := genkit.DefineStreamingFlow(g, "myFlow", func(ctx context.Context, input string, sendChunk func(context.Context, string) error) (string, error) { // Your streaming logic here for i := 0; i < 5; i++ { if err := sendChunk(ctx, fmt.Sprintf("Chunk %d", i)); err != nil { return "", err } time.Sleep(1 * time.Second) } return "Done!", nil })
// Set up HTTP server with durable streaming mux := http.NewServeMux() mux.HandleFunc("POST /myFlow", genkit.Handler(myFlow, genkit.WithStreamManager(streaming.NewInMemoryStreamManager( streaming.WithTTL(10 * time.Minute), )), )) log.Fatal(server.Start(ctx, "127.0.0.1:8080", mux))}Firestore-backed durable streaming
Section titled “Firestore-backed durable streaming”For production deployments with persistence across restarts:
package main
import ( "context" "log" "net/http" "time"
"github.com/firebase/genkit/go/genkit" "github.com/firebase/genkit/go/plugins/firebase" firebasex "github.com/firebase/genkit/go/plugins/firebase/x" "github.com/firebase/genkit/go/plugins/server")
func main() { ctx := context.Background()
g := genkit.Init(ctx, genkit.WithPlugins(&firebase.Firebase{}))
myFlow := genkit.DefineStreamingFlow(g, "myFlow", func(ctx context.Context, input string, sendChunk func(context.Context, string) error) (string, error) { // Your streaming logic here return "Done!", nil })
sm, err := firebasex.NewFirestoreStreamManager(ctx, g, firebasex.WithCollection("genkit-streams"), firebasex.WithTimeout(2 * time.Minute), firebasex.WithTTL(10 * time.Minute), ) if err != nil { log.Fatalf("Failed to create Firestore stream manager: %v", err) }
mux := http.NewServeMux() mux.HandleFunc("POST /myFlow", genkit.Handler(myFlow, genkit.WithStreamManager(sm))) log.Fatal(server.Start(ctx, "127.0.0.1:8080", mux))}Client Usage
Section titled “Client Usage”Clients can initiate a stream and receive a streamId. This ID can be used to reconnect.
import { streamFlow } from 'genkit/beta/client';
// Start a new streamconst result = streamFlow({ url: `http://localhost:8080/myDurableFlow`, input: 'tell me a long story',});
// Save this ID for laterconst streamId = await result.streamId;
// ... later, reconnect if needed ...const reconnectedResult = streamFlow({ url: `http://localhost:8080/myDurableFlow`, streamId: streamId,});
for await (const chunk of reconnectedResult.stream) { console.log(chunk);}When durable streaming is enabled, the server returns a X-Genkit-Stream-Id header with the stream ID. Clients can use this ID to reconnect to the stream.
Starting a new stream
Section titled “Starting a new stream”curl -N -i -H "Accept: text/event-stream" \ -d '{"data": "your input"}' \ http://localhost:8080/myFlowThe response headers will include X-Genkit-Stream-Id: <stream-id>. Save this ID to reconnect later.
Reconnecting to an existing stream
Section titled “Reconnecting to an existing stream”To reconnect to an in-progress or completed stream, pass the stream ID in the X-Genkit-Stream-Id header:
curl -N -H "Accept: text/event-stream" \ -H "X-Genkit-Stream-Id: <stream-id-from-above>" \ -d '{"data": "your input"}' \ http://localhost:8080/myFlowThe subscription will:
- Replay any buffered chunks that were already sent
- Continue with live updates if the stream is still in progress
- Return all chunks plus the final result if the stream has already completed
Limitations
Section titled “Limitations”- Firestore: The entire stream history (chunks and final result) is stored in a single document. Firestore has a strict 1MB limitation on document size. If your stream output exceeds this limit, the flow will fail.
- Realtime Database: While RTDB does not have the same 1MB limit, storing very large streams may impact performance or hit other quotas.
- Firestore: The entire stream history (chunks and final result) is stored in a single document. Firestore has a strict 1MB limitation on document size. If your stream output exceeds this limit, the flow will fail.
- InMemoryStreamManager: Streams are stored in memory and will be lost if the server restarts. Not suitable for production use cases where persistence is required.
Configuration Options
Section titled “Configuration Options”InMemoryStreamManager Options
Section titled “InMemoryStreamManager Options”| Option | Default | Description |
|---|---|---|
streaming.WithTTL(duration) | 5 minutes | How long completed streams are retained in memory before cleanup |
FirestoreStreamManager Options
Section titled “FirestoreStreamManager Options”| Option | Default | Description |
|---|---|---|
firebasex.WithCollection(name) | (required) | Firestore collection for stream documents |
firebasex.WithTimeout(duration) | 60 seconds | How long subscribers wait for new events before timeout |
firebasex.WithTTL(duration) | 5 minutes | How long completed streams are retained before auto-deletion |