Accessing flows from the client
There are two primary ways to access Genkit flows from client-side applications:
- Using a Genkit client library
- Using the client SDK for your server platform (e.g., the Cloud Functions for Firebase callable function client SDK)
This guide covers the Genkit client libraries.
Using the Genkit client library
Section titled “Using the Genkit client library”You can call your deployed flows using a Genkit client library. The libraries provide a type-safe way to interact with both non-streaming and streaming flows.
Learn about flows in “Defining AI workflows”.
Non-streaming Flow Calls
Section titled “Non-streaming Flow Calls”For a non-streaming response, use the runFlow
function (in JS) or await
the action (in Dart). This is suitable for flows that return a single, complete output.
import { runFlow } from 'genkit/beta/client';
async function callHelloFlow() { try { const result = await runFlow({ url: 'http://127.0.0.1:3400/helloFlow', // Replace with your deployed flow's URL input: { name: 'Genkit User' }, }); console.log('Non-streaming result:', result.greeting); } catch (error) { console.error('Error calling helloFlow:', error); }}
callHelloFlow();
import 'package:genkit/client.dart';
// Define the action (as shown above)final helloFlow = defineRemoteAction( url: 'http://127.0.0.1:3400/helloFlow', fromResponse: (data) => data['greeting'] as String,);
Future<void> callHelloFlow() async { try { final result = await helloFlow(input: {'name': 'Genkit User'}); print('Non-streaming result: $result'); } catch (e) { print('Error calling helloFlow: $e'); }}
Streaming Flow Calls
Section titled “Streaming Flow Calls”For flows that are designed to stream responses (e.g., for real-time updates or long-running operations), use the streamFlow
function (in JS) or the .stream()
method (in Dart).
import { streamFlow } from 'genkit/beta/client';
async function streamHelloFlow() { try { const result = streamFlow({ url: 'http://127.0.0.1:3400/helloFlow', // Replace with your deployed flow's URL input: { name: 'Streaming User' }, });
// Process the stream chunks as they arrive for await (const chunk of result.stream) { console.log('Stream chunk:', chunk); }
// Get the final complete response const finalOutput = await result.output; console.log('Final streaming output:', finalOutput.greeting); } catch (error) { console.error('Error streaming helloFlow:', error); }}
streamHelloFlow();
import 'package:genkit/client.dart';
// Define the action with a chunk decoderfinal helloFlow = defineRemoteAction( url: 'http://127.0.0.1:3400/helloFlow', fromResponse: (data) => data['greeting'] as String, fromStreamChunk: (chunk) => chunk as String,);
Future<void> streamHelloFlow() async { try { final result = helloFlow.stream(input: {'name': 'Streaming User'});
// Process the stream chunks as they arrive await for (final chunk in result) { print('Stream chunk: $chunk'); }
// Get the final complete response final finalOutput = await result.onResult; print('Final streaming output: $finalOutput'); } catch (e) { print('Error streaming helloFlow: $e'); }}
Custom Object Streaming
Section titled “Custom Object Streaming”You can also stream custom objects. For robust JSON serialization in Dart, it’s recommended to use a code generation library like json_serializable
. In TypeScript, you can use standard interfaces to define the shape of your data.
// Define the shape of your datainterface StreamChunk { content: string;}
interface MyOutput { reply: string;}
// In your streaming call, the client will handle JSON parsingasync function streamCustomObjects() { try { const result = streamFlow<MyOutput, StreamChunk>({ url: 'http://localhost:3400/stream-process', input: { message: 'Stream this data', count: 5 }, });
console.log('Streaming chunks:'); for await (const chunk of result.stream) { console.log('Chunk:', chunk.content); }
const finalResult = await result.output; console.log('\nFinal Response:', finalResult.reply); } catch (e) { console.error('Error calling streaming flow:', e); }}
class StreamChunk { final String content;
StreamChunk({required this.content});
factory StreamChunk.fromJson(Map<String, dynamic> json) => StreamChunk( content: json['content'] as String, );}
// Assumes MyOutput and MyInput classes are defined as in the non-streaming examplefinal streamAction = defineRemoteAction( url: 'http://localhost:3400/stream-process', fromResponse: (data) => MyOutput.fromJson(data), fromStreamChunk: (data) => StreamChunk.fromJson(data),);
final input = MyInput(message: 'Stream this data', count: 5);
try { final stream = streamAction.stream(input: input);
print('Streaming chunks:'); await for (final chunk in stream) { print('Chunk: ${chunk.content}'); }
final finalResult = await stream.onResult; print('\nFinal Response: ${finalResult.reply}');} catch (e) { print('Error calling streaming flow: $e');}
Working with Genkit Data Objects
Section titled “Working with Genkit Data Objects”When interacting with Genkit models, you’ll often work with standardized data classes. The client libraries provide these classes for type-safe interaction.
import { streamFlow } from 'genkit/beta/client';import type { MessageData, GenerateResponseChunkData, GenerateResponseData,} from 'genkit/model';
async function streamGenerate() { try { const result = streamFlow<GenerateResponseData, GenerateResponseChunkData>({ url: 'http://localhost:3400/generate', input: { role: 'user', content: [{ text: 'hello' }], } as MessageData, });
console.log('Streaming chunks:'); for await (const chunk of result.stream) { // Note: A chunk may have multiple parts, and not all parts are text. console.log('Chunk:', chunk.content[0].text); }
const finalResult = await result.output; // Note: A response may have multiple candidates. console.log( '\nFinal Response:', finalResult.message.content[0].text ); } catch (e) { console.error('Error calling streaming flow:', e); }}
import 'package:genkit/client.dart';
final generateFlow = defineRemoteAction( url: 'http://localhost:3400/generate', fromResponse: (json) => GenerateResponse.fromJson(json), fromStreamChunk: (json) => GenerateResponseChunk.fromJson(json),);
final stream = generateFlow.stream( input: Message(role: Role.user, content: [TextPart(text: "hello")]),);
print('Streaming chunks:');await for (final chunk in stream) { // Use the .text getter to easily access the text content of the chunk print('Chunk: ${chunk.text}');}
final finalResult = await stream.onResult;// The .text getter also works on the final responseprint('Final Response: ${finalResult.text}');
Authentication
Section titled “Authentication”If your deployed flow requires authentication, you can pass headers with your requests:
const result = await runFlow({ url: 'http://127.0.0.1:3400/helloFlow', // Replace with your deployed flow's URL headers: { Authorization: 'Bearer your-token-here', // Replace with your actual token }, input: { name: 'Authenticated User' },});
// For non-streaming callsfinal result = await helloFlow( input: {'name': 'Authenticated User'}, headers: {'Authorization': 'Bearer your-token-here'},);
// For streaming callsfinal streamResult = helloFlow.stream( input: {'name': 'Authenticated User'}, headers: {'Authorization': 'Bearer your-token-here'},);
When deploying to Cloud Functions for Firebase
Section titled “When deploying to Cloud Functions for Firebase”When deploying to Cloud Functions for Firebase, use the Firebase callable functions client library.
Detailed documentation can be found at https://firebase.google.com/docs/functions/callable?gen=2nd
Here’s a sample for the web:
// Get the callable by passing an initialized functions SDK.const getForecast = httpsCallable(functions, 'getForecast');
// Call the function with the `.stream()` method to start streaming.const { stream, data } = await getForecast.stream({ locations: favoriteLocations,});
// The `stream` async iterable returned by `.stream()`// will yield a new value every time the callable// function calls `sendChunk()`.for await (const forecastDataChunk of stream) { // update the UI every time a new chunk is received // from the callable function updateUi(forecastDataChunk);}
// The `data` promise resolves when the callable// function completes.const allWeatherForecasts = await data;finalizeUi(allWeatherForecasts);
An official Dart client for callable functions is available in the cloud_functions
package.
final result = await FirebaseFunctions.instance.httpsCallable('addMessage').call( { "text": text, "push": true, },);_response = result.data as String;