Optimizing LLM API Latency: Async, Streaming, and Pydantic in Production
Optimizing LLM API Latency: Async, Streaming, and Pydantic in Production
I remember the sinking feeling in my stomach. It was late afternoon, and our monitoring dashboards, usually a soothing sea of green, had started to flash angry reds and oranges. Our users were reporting painfully slow response times, some even encountering timeouts. The core of the problem, I quickly pinpointed, was our interaction with external Large Language Model (LLM) APIs. What should have been near-instantaneous content generation was taking upwards of 10-15 seconds, sometimes more. This wasn't just a poor user experience; it was a ticking time bomb for our infrastructure costs, as longer request durations meant more concurrent instances and higher compute bills.
This wasn't an isolated incident. As a lead developer, I've seen my fair share of production fires, but this one felt particularly insidious because the underlying issue wasn't immediately obvious. I was dealing with a distributed system, external API dependencies, and the complexities of Python's asynchronous ecosystem intertwined with data validation. My journey to bring those latency numbers down, to restore the calm green to our dashboards, taught me invaluable lessons about micro-optimizations and the often-overlooked bottlenecks in modern AI-driven applications.
The Latency Scourge: Initial Symptoms and Diagnosis
Our application, at its heart, generates various forms of content using LLMs. When a user requests content, we make an API call to an external LLM provider, process the response, and then present it. Simple enough, right? Initially, for rapid prototyping, I had opted for the straightforward, synchronous approach using the popular requests library. It's fantastic for quick development, but as traffic scaled, its limitations became glaringly obvious.
import requests
import json
import time
def generate_content_sync(prompt: str) -> dict:
start_time = time.time()
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_API_KEY"
}
payload = {
"model": "gpt-4-turbo",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500
}
try:
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
response.raise_for_status()
result = response.json()
end_time = time.time()
print(f"Sync call duration: {end_time - start_time:.2f} seconds")
return result
except requests.exceptions.RequestException as e:
print(f"API call failed: {e}")
return {"error": str(e)}
# Example usage (blocking)
# generate_content_sync("Write a short blog post about async programming.")
The problem with this synchronous pattern, especially in a web server context (like a Flask or FastAPI app running with Gunicorn workers), is that each API call blocks the entire worker process. If an LLM call takes 10 seconds, that worker is effectively idle, waiting for I/O, and cannot serve any other requests during that time. This drastically limits concurrency. My monitoring showed CPU utilization often low, but request queues growing, and latency spiking – a classic sign of I/O bound bottlenecks.
Our average Time To Last Byte (TTLB) for these LLM-dependent requests was hovering around 12 seconds, with a 99th percentile pushing past 20 seconds. This was simply unacceptable.
First Aid: Embracing Asynchronous I/O with httpx
My first, and most obvious, step was to transition to asynchronous I/O. Python's asyncio paired with an asynchronous HTTP client like httpx was the natural choice. This allows the application to initiate an API call, then yield control back to the event loop, letting it process other tasks (like handling other incoming user requests) while waiting for the LLM API response. When the response arrives, the event loop resumes the original task.
import httpx
import json
import asyncio
import time
async def generate_content_async(prompt: str) -> dict:
start_time = time.time()
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_API_KEY"
}
payload = {
"model": "gpt-4-turbo",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500
}
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
response.raise_for_status()
result = response.json()
end_time = time.time()
print(f"Async call duration: {end_time - start_time:.2f} seconds")
return result
except httpx.RequestError as e:
print(f"API call failed: {e}")
return {"error": str(e)}
# Example usage (requires an async context)
# async def main():
# await generate_content_async("Write a short blog post about async programming.")
# asyncio.run(main())
This change alone significantly improved our system's concurrency and throughput. While the *individual* LLM API call might still take 10 seconds, the server could now handle many such calls concurrently, preventing a single slow response from blocking other users. Our request queue started to clear, and the 99th percentile latency dropped to around 15 seconds, with the average hovering around 8 seconds. A definite improvement, but I knew we could do better, especially for perceived performance.
Beyond TTLT: The Power of Streaming for Perceived Performance
Even with asynchronous calls, users were still waiting for the *entire* LLM response to be generated and received before seeing anything. This is measured by Time To Last Token (TTLT). For a 10-second response, that's 10 seconds of staring at a blank screen. This is where streaming comes into play. Most modern LLM APIs support streaming, where tokens are sent back as they are generated, rather than waiting for the complete response. This dramatically reduces Time To First Token (TTFT), making the application feel much faster and more responsive.
Implementing streaming with httpx required a slightly different approach, iterating over the response bytes as they arrive. I had to carefully manage chunking and decoding the SSE (Server-Sent Events) format that many LLM APIs use.
import httpx
import json
import asyncio
import time
async def generate_content_streaming(prompt: str):
start_time = time.time()
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_API_KEY"
}
payload = {
"model": "gpt-4-turbo",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500,
"stream": True # Crucial for streaming
}
async with httpx.AsyncClient(timeout=30.0) as client:
try:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload, timeout=30.0) as response:
response.raise_for_status()
full_content = ""
first_token_time = None
async for chunk in response.aiter_bytes():
# Each chunk might contain multiple SSEs or partial SSEs
decoded_chunk = chunk.decode("utf-8")
for line in decoded_chunk.split("\n"):
if line.startswith("data: "):
data_str = line[len("data: "):].strip()
if data_str == "[DONE]":
continue
try:
data = json.loads(data_str)
# Extract content from the stream
if data["choices"]["delta"].get("content"):
token = data["choices"]["delta"]["content"]
full_content += token
if first_token_time is None:
first_token_time = time.time()
print(f"Time To First Token (TTFT): {first_token_time - start_time:.2f} seconds")
# Here you'd yield the token or send it to the client
# print(token, end="", flush=True)
except json.JSONDecodeError:
# Handle partial JSON objects if chunks split them
pass
end_time = time.time()
print(f"\nTime To Last Token (TTLT): {end_time - start_time:.2f} seconds")
return {"content": full_content}
except httpx.RequestError as e:
print(f"Streaming API call failed: {e}")
return {"error": str(e)}
# Example usage
# async def main():
# await generate_content_streaming("Write a short blog post about async programming.")
# asyncio.run(main())
With streaming, our TTFT dropped dramatically, often to under 1-2 seconds. This made the application feel significantly snappier to users, even if the overall TTLT remained similar. The perceived performance boost was immense, leading to a much better user experience. This was a critical win, and I documented more about optimizing these kinds of cloud-based workloads in my post, How I Halved My Cloud Run Bill: Auto-Scaling, Concurrency, and Request Optimization for AutoBlogger, which delves deeper into the cost implications of slow requests and inefficient resource utilization.
The Hidden Bottleneck: Pydantic Parsing
I thought I was on a clear path to victory. TTFT was great, concurrency was up. But then, I started noticing something peculiar. While the raw tokens were arriving quickly, the actual processing and validation of the *final, structured output* from the LLM was still introducing noticeable delays. We heavily rely on Pydantic for data validation and parsing of structured JSON outputs from LLMs, a pattern I found incredibly robust for ensuring output quality, especially when building our RAG system, as detailed in My Journey Building a Production-Ready RAG System for AutoBlogger with Open-Source LLMs.
The issue was this: even after all the tokens had arrived, the synchronous operation of parsing a large JSON string into a complex Pydantic model would block the event loop. If the LLM output was a few kilobytes, or worse, tens of kilobytes with nested structures, this parsing step could take hundreds of milliseconds, or even a full second, depending on the model's complexity and the server's load. In an asynchronous FastAPI application, this single blocking operation could still impact the responsiveness of other concurrent requests.
Consider a complex Pydantic model designed to validate a generated blog post:
from pydantic import BaseModel, Field
from typing import List, Optional
class Author(BaseModel):
name: str = Field(..., description="Full name of the author.")
bio: Optional[str] = Field(None, description="Short biography of the author.")
email: Optional[str] = Field(None, description="Author's email address.")
class Section(BaseModel):
heading: str = Field(..., description="Heading of the section.")
content: str = Field(..., description="Detailed content of the section.")
keywords: List[str] = Field(default_factory=list, description="Relevant keywords for this section.")
class BlogPost(BaseModel):
title: str = Field(..., description="The main title of the blog post.")
slug: str = Field(..., description="URL-friendly slug for the blog post.")
summary: str = Field(..., description="A concise summary of the blog post.")
tags: List[str] = Field(default_factory=list, description="List of relevant tags for the post.")
author: Author
sections: List[Section] = Field(..., description="List of sections making up the blog post content.")
conclusion: str = Field(..., description="The concluding remarks of the blog post.")
references: List[str] = Field(default_factory=list, description="List of external references or sources.")
# Example of how parsing would block:
# json_data = "..." # Large JSON string from LLM
# start_parse = time.time()
# blog_post_instance = BlogPost.model_validate_json(json_data) # This is synchronous
# end_parse = time.time()
# print(f"Pydantic parsing took: {end_parse - start_parse:.4f} seconds")
When the LLM outputted a several-page blog post structured according to this schema, calling BlogPost.model_validate_json(json_data) (or parse_raw/parse_obj in Pydantic v1) was a synchronous, CPU-bound operation. It would block the event loop for its duration, even if the rest of the application was `asyncio`-native. My profiling tools confirmed this: occasional spikes in event loop blocking, correlating with complex LLM response processing.
The Fix: Offloading Blocking Operations to a Thread Pool
The solution to this specific problem lies in offloading these CPU-bound, synchronous operations from the `asyncio` event loop to a separate thread pool. Python's asyncio library provides a way to do this gracefully: loop.run_in_executor(). This function schedules a callable to be run in a separate thread (or process) and returns a Future that can be awaited. This allows the event loop to continue processing other I/O-bound tasks while the CPU-bound Pydantic parsing happens in the background.
import httpx
import json
import asyncio
import time
from pydantic import BaseModel, Field
from typing import List, Optional
import concurrent.futures # For the default ThreadPoolExecutor
# (BlogPost, Author, Section models defined as above)
async def generate_and_parse_streaming_with_offload(prompt: str) -> dict:
start_time = time.time()
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_API_KEY"
}
payload = {
"model": "gpt-4-turbo",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1500, # Increased max_tokens for a larger response
"response_format": {"type": "json_object"}, # Guide LLM to produce JSON
"stream": True
}
full_json_content = ""
first_token_time = None
async with httpx.AsyncClient(timeout=60.0) as client:
try:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload, timeout=60.0) as response:
response.raise_for_status()
async for chunk in response.aiter_bytes():
decoded_chunk = chunk.decode("utf-8")
for line in decoded_chunk.split("\n"):
if line.startswith("data: "):
data_str = line[len("data: "):].strip()
if data_str == "[DONE]":
continue
try:
data = json.loads(data_str)
if data["choices"]["delta"].get("content"):
token = data["choices"]["delta"]["content"]
full_json_content += token
if first_token_time is None:
first_token_time = time.time()
print(f"Time To First Token (TTFT): {first_token_time - start_time:.2f} seconds")
except json.JSONDecodeError:
pass # Handle partial JSON
parse_start_time = time.time()
# Offload the blocking Pydantic parsing to a thread pool
loop = asyncio.get_running_loop()
blog_post_instance = await loop.run_in_executor(
None, # Use the default ThreadPoolExecutor
BlogPost.model_validate_json,
full_json_content
)
parse_end_time = time.time()
print(f"Pydantic parsing (offloaded) took: {parse_end_time - parse_start_time:.4f} seconds")
end_time = time.time()
print(f"Total TTLT (including offloaded parse): {end_time - start_time:.2f} seconds")
return blog_post_instance.model_dump() # Or return the Pydantic model directly
except httpx.RequestError as e:
print(f"API call failed: {e}")
return {"error": str(e)}
except Exception as e:
print(f"An error occurred during parsing or processing: {e}")
return {"error": str(e)}
# Example usage
# async def main():
# # Prompt designed to generate a large, structured JSON output
# long_prompt = """
# Generate a detailed blog post about the benefits of quantum computing for supply chain optimization.
# The blog post should adhere to the following JSON schema:
# {
# "title": "...",
# "slug": "...",
# "summary": "...",
# "tags": ["...", "..."],
# "author": {"name": "...", "bio": "...", "email": "..."},
# "sections": [{"heading": "...", "content": "...", "keywords": ["...", "..."]}],
# "conclusion": "...",
# "references": ["...", "..."]
# }
# Ensure the content is rich and detailed, spanning several paragraphs per section.
# """
# await generate_and_parse_streaming_with_offload(long_prompt)
# asyncio.run(main())
By wrapping BlogPost.model_validate_json in loop.run_in_executor(None, ...), I effectively moved this CPU-intensive task out of the main event loop. This meant that while one thread was busy parsing a large LLM response, the main event loop could continue to accept new connections, process other I/O events, and keep the application responsive. The "None" argument tells run_in_executor to use the default ThreadPoolExecutor, which is usually sufficient for most CPU-bound tasks.
This change was subtle but impactful. While the *total* time to parse the JSON remained the same (or negligibly longer due to thread overhead), the critical difference was that it no longer *blocked* the entire application. The metrics for event loop blocking smoothed out, and the overall system responsiveness under load improved noticeably. This specific technique is a cornerstone for robust asynchronous Python applications that deal with both I/O-bound and CPU-bound tasks. For more in-depth understanding of Pydantic's performance characteristics, I highly recommend checking out their official documentation on performance considerations, especially regarding validation and serialization.
Cost Implications of Latency Reduction
It's easy to focus solely on user experience, but every millisecond saved in processing time translates directly into cost savings, particularly in cloud environments. Shorter request durations mean that compute resources (like Cloud Run instances or Lambda functions) are held for less time. This allows instances to scale down faster, or serve more requests per instance, reducing the overall number of instances needed. Our Cloud Run bill, for example, saw a measurable reduction after these optimizations, aligning with the principles I discussed in my post on halving my Cloud Run bill. The cascading effect of improved latency is a powerful lever for cost control in AI-driven services.
What I Learned / The Challenge
This debugging journey reinforced several critical lessons for me as a developer working with modern, distributed, AI-powered systems:
- Asynchronous I/O is Non-Negotiable for External APIs: For any application making external API calls (especially to LLMs which can be slow), `asyncio` with an async HTTP client like `httpx` is fundamental. It prevents I/O bound operations from creating bottlenecks and allows for high concurrency.
- Perceived Performance is Key: Streaming isn't just a nice-to-have; it's a critical component of user experience for generative AI. Reducing TTFT makes an application feel instantaneous, even if the total processing time (TTLT) remains constant.
- Asynchronous is Not a Panacea for CPU-Bound Tasks: This was my biggest "aha!" moment. Just because your code is in an
async deffunction doesn't mean all operations within it are non-blocking. CPU-intensive tasks, like parsing a large JSON string into a complex Pydantic model, will still block the event loop. - Profile, Profile, Profile: Without proper profiling and monitoring (looking at metrics like event loop blocking time, CPU usage, and request queue lengths), I wouldn't have pinpointed the Pydantic parsing as the culprit. Gut feelings are good, but data is better.
- `run_in_executor` is Your Friend: For mixing CPU-bound synchronous operations with an `asyncio` event loop, `loop.run_in_executor()` is the elegant solution. It allows you to safely offload these tasks to a separate thread pool, keeping your event loop free and your application responsive.
The challenge lies in understanding the nuances of how Python's GIL (Global Interpreter Lock) interacts with `asyncio` and threads. While `asyncio` handles I/O concurrency efficiently, the GIL means only one Python thread can execute Python bytecode at a time. However, `run_in_executor` works because when a thread is performing a CPU-bound task (like Pydantic parsing), it's often spending a significant amount of time in C code (e.g., parsing JSON strings), which releases the GIL, allowing other Python threads to run. For pure Python CPU-bound tasks, `multiprocessing.Process` might be a better choice with `run_in_executor` to bypass the GIL entirely, but for typical data parsing, threading often suffices.
Related Reading
-
My Journey Building a Production-Ready RAG System for AutoBlogger with Open-Source LLMs: This post dives into the architecture and development of our RAG system. Latency optimization is absolutely crucial for RAG, as it involves multiple LLM calls and data retrievals. The lessons learned here about efficient LLM interaction and robust data parsing directly apply to ensuring a snappy RAG experience.
-
How I Halved My Cloud Run Bill: Auto-Scaling, Concurrency, and Request Optimization for AutoBlogger: My journey to reduce LLM API latency had a direct, positive impact on our cloud infrastructure costs. This post explains how optimizing request duration and concurrency directly translates into significant savings on serverless platforms like Cloud Run. The technical decisions around async and streaming are deeply intertwined with efficient resource utilization and cost management.
Looking ahead, I'm exploring further optimizations. Perhaps custom, highly optimized JSON parsers for specific, critical paths, or even investigating alternative serialization libraries if Pydantic ever proves to be an insurmountable bottleneck (though its features often outweigh the minor overhead). Edge caching of common LLM responses is another area of interest. The world of AI and cloud development is constantly evolving, and staying on top of these performance nuances is a continuous, rewarding challenge. Every millisecond saved is a better experience for our users and a more efficient, cost-effective system for us.
--- 📝 Editor’s Note: This article documents real-world development experiments from the AutoBlogger project. Some drafting assistance was used for clarity, but all architectural decisions and debugging experiences are first-hand.
Comments
Post a Comment