Building a Lightweight Python Automation Framework with FastAPI and Gemini

A lightweight Python automation framework can be built by combining FastAPI for high-performance web routing with Google Gemini for intelligent intent classification. This architecture reduces operational costs by replacing persistent VMs with serverless Cloud Run instances while improving reliability through structured Pydantic data validation.

Three weeks ago, my Monday morning started with a Slack notification that every developer dreads: a legacy automation script had timed out, leaving 4,500 internal support tickets in a "pending" state. This script was a monolithic Python file, a relic from 2022 that relied on a series of nested if-else statements and fragile regex patterns to route tasks. It was slow, impossible to test, and had become a single point of failure for our operations team.

The failure wasn't just a logic error; it was a resource exhaustion issue. The script was running on a single-core instance, trying to process linear tasks that should have been handled concurrently. When I looked at the logs, the execution time per task had crept up from 2 seconds to nearly 15 seconds as the external APIs we integrated with became more sluggish. I realized we didn't just need a fix; we needed a complete architectural overhaul. I needed a Python automation framework that was lightweight enough to run on Cloud Run without costing a fortune, yet smart enough to handle complex decision-making without a massive codebase.

My goal was to build a system that used FastAPI for the interface, Pydantic for data validation, and the Gemini API to replace the brittle regex logic with actual semantic understanding. I wanted to move away from the "heavyweight" agent frameworks like LangChain or Auto-GPT, which I've found often introduce more complexity and latency than they solve for specific internal tools. Here is how I built a production-ready Python automation framework from scratch.

Why Heavyweight AI Frameworks Often Fail in Production Environments

Production environments often prioritize latency and cost, making complex "ReAct" loops in heavyweight frameworks less efficient than targeted classification. Before I started coding, I evaluated several popular AI agent frameworks. While they are impressive for demos, they often fail in a production environment where latency and cost are primary constraints. Most of these frameworks rely on a "ReAct" (Reasoning and Acting) loop, where the LLM decides the next step, executes it, observes the result, and repeats. For a simple task like "summarize this ticket and post it to Slack," a ReAct loop might take 4 or 5 round-trips to the LLM. At $0.01 per 1,000 tokens, and 2-3 seconds per call, that's both expensive and slow.

I decided on a "Task-Router" architecture. In this model, the LLM is used once at the beginning to classify the intent and extract structured data. Once the intent is known, the system executes pre-defined Python functions (Tasks). This approach provides the reliability of traditional code with the flexibility of AI. It also allows for much better debugging; if a task fails, I know exactly which function was running, rather than trying to decipher a 50-step LLM thought chain.

How to Define a Robust Core Task Schema Using Pydantic

A well-defined Pydantic schema acts as a security and stability layer, ensuring that your Python automation framework only processes validated data. I started by defining what a "Task" looks like. In my framework, every automation is a discrete unit of work. Using Pydantic, I created a schema that ensures every incoming request is validated before it even touches the AI layer. This is crucial for preventing injection attacks or malformed data from crashing the worker.

from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any

class AutomationTask(BaseModel):
    task_id: str = Field(..., description="Unique identifier for the task")
    source: str = Field(..., description="The origin of the trigger (e.g., Jira, Zendesk)")
    raw_payload: Dict[str, Any] = Field(..., description="The original data from the source")
    priority: int = Field(default=1, ge=1, le=5)
    metadata: Optional[Dict[str, str]] = None

class TaskResult(BaseModel):
    success: bool
    output: str
    execution_time_ms: float
    tokens_used: int

This structure allows me to track performance metrics per task. When I later deployed this to Cloud Run, having the execution_time_ms and tokens_used in the logs was vital for identifying which tasks were driving up our GCP bill. If you're running into issues with your Cloud Run deployment, specifically with networking, you might find my previous post on Fixing Intermittent Python Cloud Run Connection Resets helpful for stabilizing your environment.

Integrating Gemini 1.5 Flash for High-Speed Intent Classification

Using Gemini 1.5 Flash for intent classification provides a balance of low latency and high accuracy for routing internal tasks. The "brain" of the framework is a single FastAPI endpoint that accepts a webhook and passes the payload to Gemini. I used the google-generativeai Python SDK. The trick to making this fast and cheap is the system instruction. I don't ask Gemini to "solve the problem." I ask it to "return a JSON object representing the user's intent."

import google.generativeai as genai
import os

# Initialize the model
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
model = genai.GenerativeModel(
    model_name="gemini-1.5-flash",
    system_instruction="You are a task router. Analyze the input and return a JSON object with 'intent' and 'parameters'."
)

async def classify_intent(payload: str):
    prompt = f"Analyze this support ticket: {payload}"
    response = await model.generate_content_async(
        prompt,
        generation_config=genai.types.GenerationConfig(
            response_mime_type="application/json",
        )
    )
    return response.text

By using the gemini-1.5-flash model instead of pro, I reduced the classification latency from 1.8 seconds to about 450ms. For internal tools, the "flash" model is more than capable of basic classification. The response_mime_type="application/json" parameter is a lifesaver—it eliminates the need for complex parsing logic or retries due to malformed string outputs.

Implementing the Dispatcher Pattern for Modular Task Execution

Decoupling intent classification from task execution via a registry pattern prevents a single bug from crashing the entire automation system. Once Gemini returns the intent (e.g., "intent": "refund_request"), the framework needs to execute the corresponding code. I implemented a simple registry pattern for this. This keeps the codebase modular; adding a new automation is as simple as writing a new function and registering it.

class TaskRegistry:
    def __init__(self):
        self._registry = {}

    def register(self, intent: str):
        def wrapper(func):
            self._registry[intent] = func
            return func
        return wrapper

    def get_handler(self, intent: str):
        return self._registry.get(intent)

registry = TaskRegistry()

@registry.register("refund_request")
async def handle_refund(params: Dict):
    # Logic to interface with Stripe or internal DB
    return f"Processed refund for order {params.get('order_id')}"

@registry.register("password_reset")
async def handle_password_reset(params: Dict):
    # Logic to trigger Auth0 or similar
    return "Password reset email sent."

This decoupling is what allowed us to scale. When the legacy script failed, it was because a bug in the "refund" logic would crash the entire process. In this new framework, if handle_refund throws an exception, it's caught at the dispatcher level, logged, and the rest of the tasks continue to process. This is a fundamental shift in reliability.

How to Manage Concurrency and State in a Stateless Framework

Managing concurrency with semaphores and state with Redis allows a stateless FastAPI application to handle high-volume workloads without overwhelming external APIs. One of the biggest challenges I faced was handling state without adding the overhead of a database like Postgres for every single task. Since I'm running this on Google Cloud Run, the service is stateless. If I need to track the state of a multi-step automation (e.g., "Wait for manager approval"), I use Redis (Cloud Memorystore).

However, for 90% of our internal automations, the tasks are fire-and-forget. To maximize throughput, I used Python's asyncio.gather for non-dependent tasks. But be careful: I learned the hard way that hitting external APIs with 50 concurrent requests will often trigger rate limits. I implemented a simple semaphore to throttle outgoing calls.

import asyncio

# Limit to 10 concurrent external API calls
sem = asyncio.Semaphore(10)

async def safe_api_call(url: str):
    async with sem:
        async with httpx.AsyncClient() as client:
            return await client.get(url)

This throttling is essential for maintaining a good reputation with the internal services we consume. Without it, our automation framework looked like a DDoS attack to our own internal infrastructure teams.

Optimizing the Deployment and CI/CD Pipeline for Cloud Run

A streamlined CI/CD pipeline using Cloud Build and slim Docker images significantly reduces cold start times and deployment friction. Deploying this framework required a robust pipeline. I didn't want to manually deploy every time I added a new task handler. I built a Cloud Build pipeline that runs my test suite, builds a Docker image, and pushes it to Artifact Registry. If you're setting this up for the first time, check out my guide on How to Build a Resilient Cloud Build Pipeline for Cloud Run. It covers the cloudbuild.yaml configuration I used for this exact project.

One specific optimization I made in the Dockerfile was using a slim Python image and installing only the necessary dependencies. This reduced our cold start time on Cloud Run by nearly 3 seconds.

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "4"]

I also set the --workers flag to 4. Since Cloud Run instances can have multiple cores, running multiple Uvicorn workers allows us to handle significantly more traffic per instance, which is a key factor in cost optimization.

Measuring Success Through Monitoring and Cost Management

Transitioning from persistent VMs to event-driven Cloud Run services can reduce compute costs by over 60% while dramatically improving response times. After a week in production, I checked the Google Cloud Console. The results were better than I expected. The legacy script used to cost us about $120/month in compute time because it ran on a persistent VM. The new Cloud Run service, despite using the Gemini API, was costing us $42/month.

The cost breakdown looked like this:

  • Cloud Run Compute: $18.50 (mostly during peak business hours)
  • Gemini API (Flash 1.5): $14.20 (approx. 1.2 million tokens)
  • Cloud Memorystore (Redis): $9.30

The real win, however, was the latency. The average time from "Ticket Created" to "Automation Executed" dropped from 4 minutes (due to the old cron job's polling interval) to 12 seconds. By moving to a webhook-based FastAPI architecture, we eliminated the polling delay entirely.

For more details on the Gemini API pricing and capabilities, I highly recommend checking out the official Google AI documentation. It was instrumental in helping me choose the right model for this specific use case.

Final Lessons Learned Building a Python Automation Framework

The most effective Python automation framework focuses on simplicity, structured outputs, and choosing the right model for the specific task at hand. Here are the key takeaways from this build:

  • Intent classification is better than regex: Replacing 200 lines of complex regex with a single Gemini prompt made the system 10x more maintainable and significantly more accurate at handling edge cases.
  • Stateless is cheaper: Moving from a persistent VM to Cloud Run cut our compute costs by 60%. The "pay-as-you-go" model is perfect for internal tools that are mostly idle at night.
  • Flash vs. Pro: Don't use the most powerful model by default. Gemini 1.5 Flash is significantly faster and cheaper for classification tasks. Save the "Pro" model for complex reasoning or long-form content generation.
  • Structured Outputs are Mandatory: Using response_mime_type="application/json" saved me hours of debugging string-parsing errors. Never rely on an LLM to "just return a string" if you need to use that data in code.
  • Observability is not optional: Logging the task ID, intent, and execution time allowed me to find a bug in our Jira integration within minutes of deployment.

Related Reading

Building this Python automation framework reminded me that the best solution isn't always the one with the most features. By ignoring the hype around complex autonomous agents and focusing on a simple, event-driven architecture with a smart router, I was able to build something that actually works in production. My next step is to implement a "human-in-the-loop" flag for tasks where the AI confidence score is below 0.8, ensuring that we maintain the trust of our operations team as we automate more of their workflow.

Comments

Popular posts from this blog

Optimizing LLM API Latency: Async, Streaming, and Pydantic in Production

How I Built a Semantic Cache to Reduce LLM API Costs

How I Squeezed LLM Inference onto a Raspberry Pi for Local AI