Skip to content

Flask Integration

Flask is a lightweight and flexible Python web framework that’s perfect for building REST APIs with Genkit. This guide shows you how to integrate Genkit flows with Flask applications to create powerful AI-powered web services.

Make sure you have completed the Get Started guide for Python and have Genkit installed in your project.

Install the Genkit Flask plugin:

Terminal window
pip install genkit-plugin-flask

Or add it to your requirements.txt:

requirements.txt
genkit-plugin-flask
genkit
genkit-plugin-google-genai
flask

Create a Flask application with Genkit integration:

main.py
from flask import Flask
from genkit.ai import Genkit
from genkit.plugins.flask import genkit_flask_handler
from genkit.plugins.google_genai import GoogleAI
# Initialize Genkit
ai = Genkit(
plugins=[GoogleAI()],
model='googleai/gemini-2.5-flash',
)
# Create Flask app
app = Flask(__name__)
@app.post('/joke')
@genkit_flask_handler(ai)
@ai.flow()
async def joke_flow(name: str, ctx):
"""Generate a joke about the given name."""
return await ai.generate(
on_chunk=ctx.send_chunk,
prompt=f'Tell a medium-sized joke about {name}',
)
if __name__ == '__main__':
app.run(debug=True)
Terminal window
# Standard Flask development server
flask --app main.py run
# With Genkit Developer UI
genkit start -- flask --app main.py run
Terminal window
# Test the joke endpoint
curl -X POST http://127.0.0.1:5000/joke \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"data": "banana"}'

Define flows with proper schemas for better API documentation and validation:

from pydantic import BaseModel
from typing import List
class JokeRequest(BaseModel):
topic: str
style: str = "funny"
length: str = "medium"
class JokeResponse(BaseModel):
joke: str
topic: str
rating: str
@app.post('/structured-joke')
@genkit_flask_handler(ai)
@ai.flow()
async def structured_joke_flow(request: JokeRequest, ctx) -> JokeResponse:
"""Generate a structured joke response."""
prompt = f"""
Generate a {request.length} {request.style} joke about {request.topic}.
Make it appropriate and entertaining.
"""
response = await ai.generate(
prompt=prompt,
on_chunk=ctx.send_chunk,
)
return JokeResponse(
joke=response.text,
topic=request.topic,
rating="family-friendly"
)

Genkit Flask integration supports streaming responses out of the box:

@app.post('/story')
@genkit_flask_handler(ai)
@ai.flow()
async def story_flow(prompt: str, ctx):
"""Generate a story with streaming output."""
# The ctx.send_chunk function enables streaming
response = await ai.generate(
prompt=f"Write a short story about: {prompt}",
on_chunk=ctx.send_chunk, # This enables streaming
)
return {"story": response.text, "status": "complete"}

Client-side streaming consumption:

import requests
def consume_stream(url, data):
"""Example of consuming a streaming response."""
response = requests.post(
url,
json={"data": data},
headers={"Accept": "text/event-stream"},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
chunk_data = decoded_line[6:]
print(f"Received: {chunk_data}")
# Usage
consume_stream("http://localhost:5000/story", "a magical forest")

Implement custom authentication using context providers:

from genkit.types import GenkitError
from flask import request
import jwt
async def auth_context_provider(request):
"""Custom authentication context provider."""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return {'user': None}
token = auth_header.split(' ')[1]
try:
# Validate JWT token (replace with your validation logic)
payload = jwt.decode(token, 'your-secret-key', algorithms=['HS256'])
return {
'user': {
'id': payload.get('user_id'),
'email': payload.get('email'),
'roles': payload.get('roles', [])
}
}
except jwt.InvalidTokenError:
return {'user': None}
@app.post('/protected-endpoint')
@genkit_flask_handler(ai, context_provider=auth_context_provider)
@ai.flow()
async def protected_flow(message: str, ctx):
"""A protected endpoint that requires authentication."""
if not ctx.context.get('user'):
raise GenkitError(
status='UNAUTHENTICATED',
message='Authentication required'
)
user = ctx.context['user']
return await ai.generate(
prompt=f"Hello {user['email']}, here's a response to: {message}",
on_chunk=ctx.send_chunk,
)
from functools import wraps
def require_role(required_role: str):
"""Decorator to require specific roles."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Get context from the flow
ctx = kwargs.get('ctx')
if not ctx or not ctx.context.get('user'):
raise GenkitError(status='UNAUTHENTICATED', message='Authentication required')
user_roles = ctx.context['user'].get('roles', [])
if required_role not in user_roles:
raise GenkitError(status='PERMISSION_DENIED', message=f'Role {required_role} required')
return await func(*args, **kwargs)
return wrapper
return decorator
@app.post('/admin-action')
@genkit_flask_handler(ai, context_provider=auth_context_provider)
@ai.flow()
@require_role('admin')
async def admin_flow(action: str, ctx):
"""Admin-only endpoint."""
user = ctx.context['user']
return await ai.generate(
prompt=f"Admin {user['email']} requested: {action}",
on_chunk=ctx.send_chunk,
)
from flask import jsonify
from genkit.types import GenkitError
@app.errorhandler(GenkitError)
def handle_genkit_error(error):
"""Handle Genkit-specific errors."""
return jsonify({
'error': error.status,
'message': error.message,
'type': 'GenkitError'
}), 400
@app.errorhandler(Exception)
def handle_general_error(error):
"""Handle general exceptions."""
return jsonify({
'error': 'INTERNAL_ERROR',
'message': 'An unexpected error occurred',
'type': 'Exception'
}), 500
@app.post('/safe-endpoint')
@genkit_flask_handler(ai)
@ai.flow()
async def safe_flow(input_text: str, ctx):
"""Flow with proper error handling."""
try:
if not input_text or len(input_text.strip()) == 0:
raise GenkitError(
status='INVALID_ARGUMENT',
message='Input text cannot be empty'
)
response = await ai.generate(
prompt=f"Process this text: {input_text}",
on_chunk=ctx.send_chunk,
)
return {"result": response.text, "status": "success"}
except Exception as e:
# Log the error for debugging
app.logger.error(f"Error in safe_flow: {str(e)}")
raise GenkitError(
status='INTERNAL_ERROR',
message='Failed to process request'
)
import os
from flask import Flask
from genkit.ai import Genkit
from genkit.plugins.flask import genkit_flask_handler
from genkit.plugins.google_genai import GoogleAI
# Environment-based configuration
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key'
GENKIT_MODEL = os.environ.get('GENKIT_MODEL') or 'googleai/gemini-2.5-flash'
DEBUG = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true'
def create_app(config_class=Config):
"""Application factory pattern."""
app = Flask(__name__)
app.config.from_object(config_class)
# Initialize Genkit
ai = Genkit(
plugins=[GoogleAI()],
model=app.config['GENKIT_MODEL'],
)
# Register routes
register_routes(app, ai)
return app
def register_routes(app, ai):
"""Register all application routes."""
@app.post('/health')
def health_check():
return {"status": "healthy", "service": "genkit-flask-app"}
@app.post('/generate')
@genkit_flask_handler(ai)
@ai.flow()
async def generate_flow(prompt: str, ctx):
return await ai.generate(
prompt=prompt,
on_chunk=ctx.send_chunk,
)
# Create app instance
app = create_app()
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=int(os.environ.get('PORT', 5000)),
debug=app.config['DEBUG']
)
from flask_cors import CORS
app = Flask(__name__)
CORS(app, origins=['http://localhost:3000', 'https://yourdomain.com'])
# Or configure CORS per route
@app.post('/api/chat')
@cross_origin(origins=['http://localhost:3000'])
@genkit_flask_handler(ai)
@ai.flow()
async def chat_flow(message: str, ctx):
return await ai.generate(
prompt=f"Chat response to: {message}",
on_chunk=ctx.send_chunk,
)
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["100 per hour"]
)
@app.post('/limited-endpoint')
@limiter.limit("10 per minute")
@genkit_flask_handler(ai)
@ai.flow()
async def limited_flow(input_data: str, ctx):
return await ai.generate(
prompt=f"Limited processing: {input_data}",
on_chunk=ctx.send_chunk,
)
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "main:app"]
requirements.txt
genkit
genkit-plugin-flask
genkit-plugin-google-genai
flask
flask-cors
flask-limiter
gunicorn
python-dotenv
pyjwt
.env
FLASK_ENV=production
SECRET_KEY=your-production-secret-key
GENKIT_MODEL=googleai/gemini-2.5-flash
GOOGLE_AI_API_KEY=your-api-key
PORT=5000
test_app.py
import pytest
from main import create_app
@pytest.fixture
def app():
"""Create test app instance."""
app = create_app()
app.config['TESTING'] = True
return app
@pytest.fixture
def client(app):
"""Create test client."""
return app.test_client()
def test_health_endpoint(client):
"""Test health check endpoint."""
response = client.post('/health')
assert response.status_code == 200
assert response.json['status'] == 'healthy'
def test_generate_endpoint(client):
"""Test generate endpoint."""
response = client.post(
'/generate',
json={'data': 'Hello world'},
headers={'Content-Type': 'application/json'}
)
assert response.status_code == 200
  1. Use connection pooling: Configure database connections properly
  2. Implement caching: Cache frequent AI responses when appropriate
  3. Async processing: Use async/await for I/O operations
  4. Resource management: Monitor memory usage with large models
  1. Input validation: Always validate and sanitize user inputs
  2. Authentication: Implement proper authentication for production
  3. Rate limiting: Protect against abuse with rate limiting
  4. HTTPS only: Use HTTPS in production environments
import logging
from flask import request
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.before_request
def log_request_info():
"""Log request information."""
logger.info(f"Request: {request.method} {request.url}")
@app.after_request
def log_response_info(response):
"""Log response information."""
logger.info(f"Response: {response.status_code}")
return response