Building a Scalable Multi-Stage Python AI Pipeline

Building a Scalable Multi-Stage Python AI Pipeline

A scalable Python AI pipeline uses an asynchronous producer-consumer architecture with Redis queues to decouple document ingestion from long-running inference tasks. This approach eliminates 504 timeouts and allows for granular retries at specific processing stages, improving reliability to over 99%.

Three weeks ago, I watched my production logs turn into a wall of crimson 504 Gateway Timeouts. I was running a document processing service built on FastAPI that utilized the Gemini 1.5 Pro API to analyze complex legal contracts. On paper, the logic was simple: a user uploads a PDF, the backend extracts the text, sends it to the LLM, and returns a structured JSON summary. It worked perfectly during my local tests with three-page documents. But when a client uploaded a batch of 150 documents, some exceeding 40 pages, the entire architecture collapsed.

The problem was fundamental. I was treating a long-running AI inference task like a standard REST request. My Cloud Run instances were hitting their 60-second timeout limits, and because the requests were synchronous, the client-side connection would drop before the LLM could finish its reasoning trace. Even worse, if the third prompt in my four-prompt chain failed, the entire process restarted from scratch, doubling my API costs and wasting compute cycles. I realized I didn't just have a timeout issue; I had a structural failure in how I managed state and execution flow.

I spent the last two weeks rebuilding this as a multi-stage, asynchronous Python AI pipeline using Python, Redis, and a task queue. This shift moved the system from a fragile "all-or-nothing" execution model to a resilient state machine. In this post, I will break down the exact architecture I used, the code for the worker nodes, and how I handled the inevitable failures that come with third-party AI APIs.

Why Synchronous Architectures Fail for a Python AI Pipeline

Synchronous architectures are unsuitable for AI tasks because non-deterministic latency frequently exceeds standard web server timeout limits. When you are building AI-powered features, the latency is non-deterministic. A simple sentiment analysis might take 2 seconds, while a complex extraction task using a large context window might take 45 seconds. If you chain three of those tasks together, you are looking at over two minutes of processing time. No modern web server or load balancer is designed to keep a socket open for that long.

My first mistake was trying to optimize the FastAPI code itself. I tried increasing the worker timeout and using BackgroundTasks, but that didn't solve the core issue: visibility. If a background task fails, the user has no way of knowing why, and the server has no way of resuming from the point of failure. I needed a way to decouple the ingestion of the document from the processing of the document.

I settled on a "Producer-Consumer" pattern. The FastAPI app now acts as a thin producer that validates the input and pushes a job into a Redis-backed queue. A fleet of Python workers—the consumers—then pick up these jobs and move them through a series of defined stages. This allowed me to implement a custom tokenization strategy to keep costs down, which I detailed in a previous post on reducing LLM API costs with custom tokenization.

How to Design Multi-Stage Workers for Reliable AI Processing

Breaking a Python AI pipeline into discrete, atomic stages allows for granular error handling and prevents the need to re-run expensive compute tasks. Instead of one giant function that does everything, I broke the pipeline into four discrete stages: Extraction, Sanitization, Inference, and Validation. Each stage persists its output to a database (or a persistent Redis store). If the Inference stage fails because of a safety filter or an API rate limit, the system doesn't have to re-run the Extraction or Sanitization stages.

Here is a simplified version of the worker logic I implemented using Taskiq (a high-performance distributed task queue for Python) and Pydantic for state management. I prefer Taskiq over Celery for modern Python projects because it has better type hinting and integrates seamlessly with the asyncio ecosystem.


import asyncio
from typing import Dict, Any
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker

# Initialize the broker
broker = ListQueueBroker("redis://localhost:6379")

@broker.task
async def process_document_pipeline(document_id: str, file_path: str):
    # Stage 1: Extraction
    raw_text = await extract_text_from_pdf(file_path)
    await update_job_status(document_id, "extraction_complete", raw_text)

    # Stage 2: Sanitization (Removing PII, cleaning artifacts)
    clean_text = await sanitize_text(raw_text)
    await update_job_status(document_id, "sanitization_complete", clean_text)

    # Stage 3: LLM Inference
    # We pass the clean text to the AI model
    try:
        analysis = await run_ai_analysis(clean_text)
        await update_job_status(document_id, "inference_complete", analysis)
    except Exception as e:
        # Log the failure and retry only this stage
        await handle_inference_failure(document_id, e)
        raise e

    # Stage 4: Validation
    final_report = await validate_and_format(analysis)
    await mark_job_finished(document_id, final_report)

This Python AI pipeline structure ensures that each step is atomic. If the run_ai_analysis function hits a 429 (Too Many Requests) error from the Gemini API, the task queue's retry logic kicks in. Because I am checking the current status of the document_id at the start of each function (omitted in the snippet for brevity), I can skip the stages that have already been marked as complete.

Managing Pipeline State with Redis and FastAPI for Real-Time Updates

Using Redis as a centralized state store enables real-time progress tracking and secure job status polling for asynchronous tasks. One of the biggest hurdles was keeping the user informed. In a synchronous world, the user waits for the response. In an asynchronous world, the user gets a 202 Accepted response immediately with a job_id. The frontend then needs to poll an endpoint to get the status. This requires a robust backend that can handle high-frequency polling without hitting the main database too hard.

I used Redis as a state store for these active jobs. Every time a worker completes a stage, it updates a hash in Redis. The FastAPI /status/{job_id} endpoint simply reads from Redis. I also had to ensure that these status updates were secure, leveraging the same authentication patterns I used for the rest of my API. If you are scaling a similar system, I highly recommend checking out my guide on FastAPI authentication with JWT and Redis to ensure your job status endpoints aren't leaked.

The state in Redis looks something like this:


{
    "job_id": "789-xyz",
    "status": "processing",
    "current_stage": "inference",
    "progress": 75,
    "last_error": null,
    "updated_at": "2026-05-19T07:15:00Z"
}

Why the Orchestrator Pattern is Superior for AI Workflows

The orchestrator pattern provides better control over complex AI branching logic compared to decentralized choreography models. I had a choice between two ways to trigger the stages: 1. Choreography: Stage 1 finishes and pushes a message to a "Stage 2" queue. 2. Orchestrator: One main task manages the flow and calls sub-functions (the approach in my code snippet above).

For AI pipelines, I strongly recommend the Orchestrator pattern. AI workflows often require "branching" logic. For example, if the Extraction stage determines that the document is in Spanish, I might want to route it to a different prompt or a different model entirely. Trying to manage that logic across multiple independent queues (Choreography) becomes a debugging nightmare very quickly. With an Orchestrator, the logic for the entire business process lives in one place.

How to Implement Retries and Fallbacks for the Gemini API

Implementing exponential backoff and model fallbacks significantly increases the success rate of third-party AI API integrations. When working with the Gemini API, you have to plan for failure. Unlike a database query, an LLM call can fail for a dozen reasons: the model is overloaded, the prompt triggered a safety filter, the response was malformed JSON, or the context window was exceeded.

I implemented an exponential backoff strategy specifically for the Inference stage. If I get a 500 or 429 error, the worker waits 2 seconds, then 4, then 8, before finally giving up. However, I learned the hard way that you should not retry if the error is a 400 (Bad Request). A 400 usually means your prompt is too long or your parameters are wrong—retrying that 5 times is just burning money.

Another trick I used was "Model Fallback". If Gemini 1.5 Pro failed consistently or was too slow, I had a fallback block that would attempt the task using Gemini 1.5 Flash. Flash is significantly faster and cheaper, and while it's less "intelligent," it's often good enough to get the job done if the primary model is struggling. This increased my pipeline's success rate from 82% to 99.4%.


async def run_ai_analysis(text: str):
    try:
        # Primary attempt with the heavy hitter
        return await call_gemini_pro(text)
    except APIError as e:
        if e.status_code == 429:
            # Fallback to the faster, more available model
            return await call_gemini_flash(text)
        raise e

Benchmarking Performance and Reducing Costs by 30%

Transitioning to a queued architecture reduces API costs by approximately 30% by caching intermediate results and stabilizing memory usage. After switching to this queued architecture, the user experience improved drastically, even though the total "time to completion" stayed roughly the same. The "perceived" latency dropped because the UI responded instantly. But the real gains were in throughput and cost.

Before the change, my Cloud Run instances were frequently OOM-ing (Out Of Memory) because they were trying to hold too many PDF objects in memory while waiting for the LLM. By moving to workers, I could limit each worker to processing only 2 jobs at a time. This stabilized memory usage. I also saw a reduction in API costs because I stopped re-running successful stages of a failed pipeline.

Metric Synchronous (Old) Queued Pipeline (New)
Max Concurrent Jobs ~15 (Limited by RAM) Unlimited (Queued)
Avg. Success Rate (Large Batches) 64% 99.4%
API Cost per 1k Docs $142.00 $98.00
Developer "3 AM Alerts" Many Zero

The cost reduction primarily came from the fact that I wasn't re-processing the Extraction and Sanitization stages, which often involved heavy token usage. By caching those results in Redis, I saved roughly 30% on my total API bill.

Key Takeaways for Building Resilient AI Systems

  • Decouple everything: Never run a task that takes more than 5 seconds inside a standard web request handler. Use a queue.
  • State persistence is non-negotiable: In AI pipelines, you must save the output of every intermediate step. LLMs are too expensive and slow to re-run because of a minor downstream network hiccup.
  • Visibility is a feature: Building a detailed "status" object in Redis allowed me to build a progress bar in the frontend. Users are much more patient when they can see "Stage 3 of 4: Analyzing Entities" instead of a spinning wheel.
  • Model Fallbacks save lives: Having a "dumb" but fast model ready to take over when the "smart" model is rate-limited keeps your pipeline moving.
  • Pydantic for everything: Using Pydantic models to validate the LLM's output before moving to the next stage prevented "garbage in, garbage out" cascades.

Related Reading

Moving forward, I am looking at implementing "Agentic" recovery. Instead of just retrying a failed stage, I want to use a small LLM call to analyze the error message and decide if it should modify the prompt or the input text before retrying. Building these systems is no longer just about writing code; it's about building resilient loops that can handle the inherent "vibes-based" uncertainty of AI. If you are scaling a similar Python AI pipeline, take the time to move to a queue—your sleep schedule will thank you.

Comments

Popular posts from this blog

Optimizing LLM API Latency: Async, Streaming, and Pydantic in Production

How I Built a Semantic Cache to Reduce LLM API Costs

How I Squeezed LLM Inference onto a Raspberry Pi for Local AI