Python Asyncio Pitfalls in Long-Running Background Jobs

Python Asyncio Pitfalls in Long-Running Background Jobs

Common Python asyncio pitfalls include silent task failure due to garbage collection, event loop blocking from CPU-bound tasks, and connection pool exhaustion. To resolve these, developers should maintain strong task references, offload heavy processing to a ProcessPoolExecutor, and implement graceful shutdown handlers for signal management.

Last Tuesday at 3:14 AM, my PagerDuty went off. My Cloud Run instances were hitting a 95% memory utilization threshold, and the request latency for my FastAPI backend had spiked from a comfortable 200ms to a staggering 12 seconds. When I checked the logs, I saw nothing. No tracebacks, no explicit errors—just a series of "Liveness probe failed" messages followed by SIGTERM signals as GCP killed my containers.

I was in the middle of scaling an AI agent system that processes long-running document analysis tasks. These jobs take anywhere from thirty seconds to five minutes because they involve multiple calls to the Gemini API, heavy JSON parsing, and database synchronization. I thought I was being clever by using asyncio.create_task() to fire off these jobs in the background so I could return an immediate 202 Accepted response to the client. I was wrong. My "fire and forget" strategy had turned into a "fire and leak" catastrophe. Understanding these Python asyncio pitfalls is crucial for maintaining stable production environments.

In this post, I’m going to break down the specific asyncio architectural failures I encountered, the performance regressions that cost me nearly $400 in unnecessary compute overhead, and the code patterns I now use to ensure background jobs actually finish without bringing down the entire service.

How to Prevent the Silent Death of Python Asyncio Background Tasks

The most common Python asyncio pitfall is failing to keep a strong reference to background tasks, which allows the garbage collector to terminate them prematurely. In a FastAPI environment, if you create a task and don't keep a reference to it, the Python garbage collector can—and will—reclaim that task object if it's not currently running. Even worse, if that task encounters an unhandled exception, it dies silently. You won't see a traceback in your console because there is nothing "awaiting" that result to catch the failure.

I realized that dozens of my agentic workflows were simply vanishing into the ether. My database showed "Processing" status for records that would never be updated. To fix this, I had to implement a manual task registry. This isn't just about keeping the tasks alive; it's about having a hook to inspect the state of the system when things go sideways.


import asyncio
from typing import Set

# I learned the hard way that you MUST keep a strong reference
background_tasks: Set[asyncio.Task] = set()

async def run_agent_workflow(workflow_id: str):
    try:
        # Simulate long-running Gemini API calls
        await asyncio.sleep(30) 
        print(f"Workflow {workflow_id} completed")
    except Exception as e:
        # Without this, errors vanish
        logging.error(f"Task failed: {e}", exc_info=True)
    finally:
        # Clean up the reference so we don't leak memory
        background_tasks.discard(asyncio.current_task())

def start_background_job(workflow_id: str):
    task = asyncio.create_task(run_agent_workflow(workflow_id))
    background_tasks.add(task)
    # This ensures the task is removed even if it's never awaited
    task.add_done_callback(background_tasks.discard)

By implementing this set-based registry, I stopped the silent task deaths. However, this revealed a secondary problem: I was now successfully running so many tasks that I was hitting the limits of my connection pools. If you're building a similar system, you might want to look at my previous guide on Python Monorepo Architecture for Scalable FastAPI Microservices to see how I structure these background workers as separate services to avoid impacting the main API's responsiveness.

Why CPU-Bound Logic Blocks the Python Asyncio Event Loop

Event loop lag occurs when synchronous, CPU-intensive operations like JSON parsing pause the execution of all other concurrent tasks. I noticed that even when my background tasks were supposedly "waiting" for I/O, my entire API would become unresponsive. Using aiomonitor, I discovered that my event loop was being blocked for 400ms to 600ms at a time.

The culprit? JSON parsing and token counting. When the Gemini API returns a 2MB JSON response containing nested metadata and extracted text, calling json.loads() is a synchronous, CPU-intensive operation. While Python is busy turning that string into a dictionary, the event loop is paused. It cannot handle incoming HTTP requests, it cannot process heartbeats, and it cannot start other I/O tasks.

I had to move these CPU-bound operations to a ProcessPoolExecutor. Using a ThreadPoolExecutor isn't enough here because of Python's Global Interpreter Lock (GIL). If you're doing heavy data manipulation, you need a separate process.


import json
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI

app = FastAPI()
executor = ProcessPoolExecutor(max_workers=4)

def heavy_json_processing(raw_data: str):
    # This happens in a separate process, leaving the event loop free
    data = json.loads(raw_data)
    # Perform complex transformations
    return data

@app.post("/process-llm-response")
async def handle_response(payload: str):
    loop = asyncio.get_running_loop()
    # Offload the CPU work
    processed_data = await loop.run_in_executor(executor, heavy_json_processing, payload)
    return {"status": "success"}

This change alone dropped my p99 latency by 70%. It’s a reminder that "async" does not mean "parallel." If you have logic that calculates embeddings or cleans large strings, and you do it inside an async def function without an executor, you are killing your throughput.

How to Solve HTTPX Connection Pool Exhaustion in Async Environments

High-concurrency background jobs often fail because default HTTP client connection limits are too restrictive for large-scale asynchronous workloads. As I scaled the number of background jobs, I started seeing httpx.PoolTimeout errors. I was using a single httpx.AsyncClient() instance across my entire application, which is the recommended practice. However, I hadn't tuned the limits. The default limit for most async HTTP clients is 100 connections.

When 200 background tasks all wake up at once to poll an LLM status endpoint, 100 of them get the connections and the other 100 sit in a queue. If they sit there for more than 5 seconds (the default timeout), they crash. This led to a cascading failure where tasks would retry, creating even more pressure on the pool. This is closely related to the issues I documented in Debugging LLM API Cost Spikes, where failed and retried tasks led to a massive bill because of redundant prompt processing.

I had to explicitly configure the Limits in HTTPX to handle the concurrency of long-running jobs. According to the official HTTPX documentation, managing these limits is critical for high-concurrency applications. Here is the configuration that finally stabilized my production environment:


import httpx

# Optimized for high-concurrency background jobs
limits = httpx.Limits(
    max_keepalive_connections=20, 
    max_connections=500, 
    keepalive_expiry=30.0
)

client = httpx.AsyncClient(limits=limits, timeout=httpx.Timeout(60.0))

I also learned that you should never use a global timeout for background jobs. Some LLM calls take 2 seconds; some take 90 seconds. Using a blanket 10-second timeout meant that my most valuable (and expensive) tasks were being killed just before they finished.

Using ContextVars to Fix Missing Trace IDs in Asyncio Tasks

Asynchronous tasks do not automatically inherit the execution context of their parent, leading to a loss of critical observability data like trace IDs. When you use asyncio.create_task(), you lose the context of the original request. This was a nightmare for debugging. My logs showed an error in a background task, but I had no idea which user triggered it or what the original Request ID was. Standard logging filters that work in FastAPI's main thread don't propagate to background tasks automatically.

I had to use contextvars to manually pass the trace context into the background worker. This is essential for maintaining observability when you are running hundreds of concurrent agents.


import contextvars
import uuid

# Define the context variable
request_id_var = contextvars.ContextVar("request_id", default="unknown")

async def background_worker():
    # This will now correctly show the ID set in the parent task
    rid = request_id_var.get()
    print(f"Working on request: {rid}")

@app.get("/trigger")
async def trigger():
    request_id_var.set(str(uuid.uuid4()))
    # In Python 3.11+, context is automatically copied to the task
    asyncio.create_task(background_worker())
    return {"status": "accepted"}

Note that if you are on a Python version older than 3.11, you have to use contextvars.copy_context().run(func) to ensure the variables are passed correctly. Since I'm running on the 3.12 runtime in Cloud Run, this was handled for me, but the lack of explicit initialization was still causing "unknown" to show up in my logs until I fixed the middleware ordering.

How to Implement Graceful Shutdowns for Python Asyncio Services

Serverless environments like Cloud Run require explicit signal handling to prevent background tasks from being terminated mid-execution during scaling events. When you deploy a new version or when the autoscaler scales down, GCP sends a SIGTERM signal to your container. You have 10 seconds (by default) to finish what you're doing before a SIGKILL follows.

My background tasks were being brutally murdered mid-execution. This left my database in an inconsistent state and led to duplicate processing when the tasks were retried. I had to implement a graceful shutdown handler that would wait for the background_tasks set we created earlier to empty out before allowing the process to exit.


import signal

async def shutdown(sig, loop):
    print(f"Received exit signal {sig.name}...")
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    
    print(f"Waiting for {len(tasks)} tasks to complete...")
    # Give tasks a chance to finish, but set a hard limit
    await asyncio.wait(tasks, timeout=8)
    
    loop.stop()

# Register the signals
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
    loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s, loop)))

Without this, I was losing roughly 2% of my background jobs during every deployment. If you are processing payments or expensive LLM calls, 2% is an unacceptable failure rate. For more on managing the costs associated with these kinds of failures, check out my post on How I Optimized LLM API Costs for Fine-Tuning Workloads.

Summary of Best Practices for Avoiding Python Asyncio Pitfalls

  • Strong References: Never "fire and forget" with create_task. Store your tasks in a set to prevent garbage collection and to enable monitoring.
  • Offload CPU Work: If you are doing json.loads() or json.dumps() on multi-megabyte strings, or any heavy data processing, use loop.run_in_executor with a ProcessPoolExecutor.
  • Connection Pool Tuning: The default settings for httpx and aiohttp are too conservative for background workers. Increase max_connections and set granular timeouts.
  • Observability: Use contextvars to ensure your Trace IDs and User IDs follow the execution flow into the background task.
  • Shutdown Logic: Handle SIGTERM. If you don't, you'll face data corruption and redundant API costs when tasks are killed mid-stream.

Related Reading

Moving forward, I'm looking into moving these long-running tasks out of the FastAPI process entirely and into a dedicated task queue like Taskiq or ARQ. While asyncio is powerful for I/O concurrency, trying to turn a web server into a heavy-duty job processor is a path filled with edge cases. While avoiding these Python asyncio pitfalls improves stability, my next step is implementing a persistent results store so that if a container does get nuked, the next one can pick up exactly where the previous one left off without re-running the expensive LLM stages.

Comments

Popular posts from this blog

Optimizing LLM API Latency: Async, Streaming, and Pydantic in Production

How I Built a Semantic Cache to Reduce LLM API Costs

How I Squeezed LLM Inference onto a Raspberry Pi for Local AI