Skip to content

Durable Streaming

Genkit supports durable streaming, which allows flow state to be persisted. This enables clients to disconnect and reconnect to a stream and replay the full result. This is particularly useful for long-running operations or unreliable network connections.

When durable streaming is enabled, Genkit uses a StreamManager to store the chunks of a stream as they are generated. The client receives a streamId which can be used to reconnect to the stream and replay the full transcript.

To enable durable streaming, you need to configure a StreamManager in your flow server (Express or Next.js).

To enable durable streaming, you need to configure a StreamManager and pass it to genkit.Handler().

For development and testing, or simple single-instance server, you can use the InMemoryStreamManager.

import { InMemoryStreamManager } from 'genkit/beta';
// ...
import "github.com/firebase/genkit/go/core/x/streaming"
// Create an in-memory stream manager with optional TTL for completed streams
sm := streaming.NewInMemoryStreamManager(
streaming.WithTTL(10 * time.Minute), // Optional: how long to retain completed streams
)

Note that InMemoryStreamManager stores streams in memory, so they will be lost if the server restarts. For production use cases where persistence across restarts is required, use FirestoreStreamManager.

For production, you should use a durable storage solution.

The @genkit-ai/firebase plugin provides implementations for Firestore and Realtime Database.

Terminal window
npm i @genkit-ai/firebase
import { FirestoreStreamManager, RtdbStreamManager } from '@genkit-ai/firebase/beta';
import { initializeApp } from 'firebase-admin/app';
import { getFirestore } from 'firebase-admin/firestore';
const app = initializeApp();
const firestore = new FirestoreStreamManager({
firebaseApp: app,
db: getFirestore(app),
collection: 'streams',
});
// Or for RTDB
const rtdb = new RtdbStreamManager({
firebaseApp: app,
refPrefix: 'streams',
});

The firebase plugin provides FirestoreStreamManager for durable stream storage.

import (
"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/plugins/firebase"
firebasex "github.com/firebase/genkit/go/plugins/firebase/x"
)
// Initialize Genkit with the Firebase plugin
g := genkit.Init(ctx, genkit.WithPlugins(&firebase.Firebase{}))
// Create a Firestore stream manager
sm, err := firebasex.NewFirestoreStreamManager(ctx, g,
firebasex.WithCollection("genkit-streams"), // Required: Firestore collection for stream documents
firebasex.WithTimeout(2 * time.Minute), // Optional: how long subscribers wait for new events
firebasex.WithTTL(10 * time.Minute), // Optional: how long completed streams are retained
)
if err != nil {
log.Fatalf("Failed to create Firestore stream manager: %v", err)
}

FirestoreStreamManager provides:

  • Persistence across restarts: Clients can reconnect to streams after server restarts
  • Multi-instance support: Multiple server instances can serve the same stream
  • Automatic cleanup: Completed streams are automatically deleted via Firestore TTL policies

For automatic cleanup of old streams, configure a TTL policy on your Firestore collection:

Terminal window
gcloud firestore fields ttls update expiresAt \
--collection-group=genkit-streams \
--enable-ttl \
--project=YOUR_PROJECT_ID

See Firestore TTL documentation for more details.

To enable durable streaming in Express, pass the streamManager to expressHandler:

import { expressHandler } from '@genkit-ai/express';
import { InMemoryStreamManager } from 'genkit/beta';
app.post('/myDurableFlow', expressHandler(myFlow, {
streamManager: new InMemoryStreamManager(), // or firestore/rtdb
}));

To enable durable streaming in Next.js, pass the streamManager to appRoute:

import { appRoute } from '@genkit-ai/next';
import { InMemoryStreamManager } from 'genkit/beta';
export const POST = appRoute(myFlow, {
streamManager: new InMemoryStreamManager(), // or firestore/rtdb
});

To enable durable streaming with Go’s standard net/http server, pass the StreamManager to genkit.Handler() using the WithStreamManager option:

package main
import (
"context"
"log"
"net/http"
"time"
"github.com/firebase/genkit/go/core/x/streaming"
"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/plugins/server"
)
func main() {
ctx := context.Background()
g := genkit.Init(ctx)
// Define a streaming flow
myFlow := genkit.DefineStreamingFlow(g, "myFlow",
func(ctx context.Context, input string, sendChunk func(context.Context, string) error) (string, error) {
// Your streaming logic here
for i := 0; i < 5; i++ {
if err := sendChunk(ctx, fmt.Sprintf("Chunk %d", i)); err != nil {
return "", err
}
time.Sleep(1 * time.Second)
}
return "Done!", nil
})
// Set up HTTP server with durable streaming
mux := http.NewServeMux()
mux.HandleFunc("POST /myFlow", genkit.Handler(myFlow,
genkit.WithStreamManager(streaming.NewInMemoryStreamManager(
streaming.WithTTL(10 * time.Minute),
)),
))
log.Fatal(server.Start(ctx, "127.0.0.1:8080", mux))
}

For production deployments with persistence across restarts:

package main
import (
"context"
"log"
"net/http"
"time"
"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/plugins/firebase"
firebasex "github.com/firebase/genkit/go/plugins/firebase/x"
"github.com/firebase/genkit/go/plugins/server"
)
func main() {
ctx := context.Background()
g := genkit.Init(ctx, genkit.WithPlugins(&firebase.Firebase{}))
myFlow := genkit.DefineStreamingFlow(g, "myFlow",
func(ctx context.Context, input string, sendChunk func(context.Context, string) error) (string, error) {
// Your streaming logic here
return "Done!", nil
})
sm, err := firebasex.NewFirestoreStreamManager(ctx, g,
firebasex.WithCollection("genkit-streams"),
firebasex.WithTimeout(2 * time.Minute),
firebasex.WithTTL(10 * time.Minute),
)
if err != nil {
log.Fatalf("Failed to create Firestore stream manager: %v", err)
}
mux := http.NewServeMux()
mux.HandleFunc("POST /myFlow", genkit.Handler(myFlow, genkit.WithStreamManager(sm)))
log.Fatal(server.Start(ctx, "127.0.0.1:8080", mux))
}

Clients can initiate a stream and receive a streamId. This ID can be used to reconnect.

import { streamFlow } from 'genkit/beta/client';
// Start a new stream
const result = streamFlow({
url: `http://localhost:8080/myDurableFlow`,
input: 'tell me a long story',
});
// Save this ID for later
const streamId = await result.streamId;
// ... later, reconnect if needed ...
const reconnectedResult = streamFlow({
url: `http://localhost:8080/myDurableFlow`,
streamId: streamId,
});
for await (const chunk of reconnectedResult.stream) {
console.log(chunk);
}

When durable streaming is enabled, the server returns a X-Genkit-Stream-Id header with the stream ID. Clients can use this ID to reconnect to the stream.

Terminal window
curl -N -i -H "Accept: text/event-stream" \
-d '{"data": "your input"}' \
http://localhost:8080/myFlow

The response headers will include X-Genkit-Stream-Id: <stream-id>. Save this ID to reconnect later.

To reconnect to an in-progress or completed stream, pass the stream ID in the X-Genkit-Stream-Id header:

Terminal window
curl -N -H "Accept: text/event-stream" \
-H "X-Genkit-Stream-Id: <stream-id-from-above>" \
-d '{"data": "your input"}' \
http://localhost:8080/myFlow

The subscription will:

  • Replay any buffered chunks that were already sent
  • Continue with live updates if the stream is still in progress
  • Return all chunks plus the final result if the stream has already completed
  • Firestore: The entire stream history (chunks and final result) is stored in a single document. Firestore has a strict 1MB limitation on document size. If your stream output exceeds this limit, the flow will fail.
  • Realtime Database: While RTDB does not have the same 1MB limit, storing very large streams may impact performance or hit other quotas.
  • Firestore: The entire stream history (chunks and final result) is stored in a single document. Firestore has a strict 1MB limitation on document size. If your stream output exceeds this limit, the flow will fail.
  • InMemoryStreamManager: Streams are stored in memory and will be lost if the server restarts. Not suitable for production use cases where persistence is required.
OptionDefaultDescription
streaming.WithTTL(duration)5 minutesHow long completed streams are retained in memory before cleanup
OptionDefaultDescription
firebasex.WithCollection(name)(required)Firestore collection for stream documents
firebasex.WithTimeout(duration)60 secondsHow long subscribers wait for new events before timeout
firebasex.WithTTL(duration)5 minutesHow long completed streams are retained before auto-deletion