Optimizing LLM API Latency and Cost with Asynchronous Batching
Optimizing LLM API Latency and Cost with Asynchronous Batching
I still remember that Monday morning. Our internal dashboards were screaming. While our request per second (RPS) metrics for blog post generation were stable, the p99 latency for our LLM API calls had spiked, and the cost graphs were heading north faster than a rocket. My first thought, naturally, was to blame the LLM provider – perhaps a regional outage or an unexpected change in their service levels. But after a quick check of their status page and a look at our network logs, everything seemed normal on their end. The problem, as I would soon discover, was much closer to home: our own inefficient usage of their API.
The Hidden Cost of Synchronous LLM Calls
When I initially designed the LLM integration for our blog post generation service, simplicity was key. For each component of a blog post (e.g., generating a headline, drafting an intro paragraph, outlining key points, writing a conclusion), I made a separate, synchronous API call to the LLM. It looked something like this (simplified for clarity):
import requests
import json
import time
def call_llm_sync(prompt: str, model: str = "gpt-3.5-turbo") -> str:
"""
Makes a synchronous call to the LLM API.
"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer YOUR_API_KEY"
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500
}
start_time = time.time()
try:
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload, timeout=60)
response.raise_for_status() # Raise an exception for HTTP errors
duration = time.time() - start_time
print(f"LLM call completed in {duration:.2f} seconds.")
return response.json()["choices"]["message"]["content"]
except requests.exceptions.RequestException as e:
print(f"LLM API call failed: {e}")
return f"Error: {e}"
def generate_blog_post_sync(topic: str):
headline_prompt = f"Generate a compelling headline for a blog post about '{topic}'."
intro_prompt = f"Write an engaging introduction for a blog post titled '{call_llm_sync(headline_prompt)}'." # Nested call example
# This is problematic: each call blocks the next
headline = call_llm_sync(headline_prompt)
intro = call_llm_sync(intro_prompt) # This intro prompt needs the headline, so it's inherently sequential.
body_outline = call_llm_sync(f"Create a detailed outline for the body of a blog post about '{topic}', with the headline '{headline}' and intro '{intro}'.")
conclusion = call_llm_sync(f"Write a concise conclusion for a blog post about '{topic}'.")
return {
"headline": headline,
"intro": intro,
"body_outline": body_outline,
"conclusion": conclusion
}
# Example usage:
# post = generate_blog_post_sync("The Future of AI in Content Creation")
# print(post)
This approach was easy to reason about and implement, especially for independent tasks. However, as our usage grew and we started generating more complex posts requiring multiple distinct LLM interactions, the cumulative latency became a major bottleneck. Each API call, regardless of how small the prompt or expected response, incurs a certain amount of overhead: network roundtrip time, authentication, and the LLM provider's internal queuing and processing. When you multiply this by dozens of calls per blog post, and then by hundreds or thousands of posts, the latency exploded, leading to poor user experience and, crucially, a significant jump in our API costs. Our p99 latency for generating a full blog post jumped from a tolerable 10-15 seconds to over 45-60 seconds, and our token usage, surprisingly, also increased due to the overhead of repeated calls and less efficient context management.
Identifying the Root Cause: The "N+1" Problem for LLMs
My investigation started with our monitoring tools. Grafana showed alarming spikes in the duration of our LLM-dependent service calls. Cloud provider logs confirmed that the individual LLM API requests themselves were taking a consistent amount of time (e.g., 1-3 seconds each), but the overall end-to-end process was much slower. This pointed to an "N+1" problem: if generating a blog post required N distinct LLM calls, and each took an average of T seconds, the total time was closer to N * T, plus application logic. This synchronous, blocking pattern was killing our throughput.
Digging deeper, I realized that many of these calls, while logically distinct, were not strictly dependent on the immediate prior response. For instance, generating a list of keywords for a post could happen concurrently with drafting a social media blurb for the same post. Even when there were dependencies, like generating an introduction after a headline, other independent elements (like generating image prompts) could run in parallel. The key was that our system was treating all LLM interactions as blocking operations.
The cost aspect was also eye-opening. While prompt engineering techniques (as I discussed in How I Tuned Prompt Engineering for Maximum AI Cost Savings) are crucial for optimizing token usage per call, simply reducing the number of API calls and improving their efficiency could yield even greater savings. Each HTTP request has a fixed cost component, regardless of token count. Reducing roundtrips directly cuts this overhead.
The Solution: Embracing Asynchronous Processing and Request Batching
The path forward became clear: I needed to move from a synchronous, sequential model to an asynchronous, concurrent one. This involved two primary strategies:
- Asynchronous API Calls: Utilizing Python's
asynciolibrary to make multiple LLM API requests concurrently without blocking the main thread. This allows I/O-bound tasks (like waiting for an API response) to yield control, letting other tasks run. - Request Batching: Grouping multiple independent inference requests into a single logical unit. While some LLM providers offer explicit batch APIs (often for offline, lower-cost processing), I focused on "client-side" batching for real-time applications by sending multiple concurrent requests.
My goal was to maximize throughput and minimize latency for our interactive blog post generation. For Python, this meant leaning heavily on asyncio and an asynchronous HTTP client like httpx.
Implementing Asynchronous LLM Calls
First, I refactored our basic LLM call to be asynchronous:
import httpx
import asyncio
import json
import time
# Keep the AsyncClient outside the hot loop for connection pooling benefits
# In a real application, you'd manage this client globally or via dependency injection.
# For simplicity, we'll create it once here.
# Note: LLM calls can be long, so increase default timeout.
async_client = httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=5.0))
async def call_llm_async(prompt: str, model: str = "gpt-3.5-turbo") -> str:
"""
Makes an asynchronous call to the LLM API.
"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer YOUR_API_KEY" # Replace with actual API key or env var
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500
}
start_time = time.time()
try:
response = await async_client.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
response.raise_for_status()
duration = time.time() - start_time
# print(f"Async LLM call completed in {duration:.2f} seconds.")
return response.json()["choices"]["message"]["content"]
except httpx.RequestError as e:
print(f"Async LLM API call failed: {e}")
return f"Error: {e}"
With this asynchronous function, I could now concurrently execute multiple independent LLM calls using asyncio.gather. This function takes multiple awaitable objects (like our call_llm_async coroutines) and runs them in parallel, waiting for all to complete.
async def generate_blog_post_concurrent(topic: str):
headline_prompt = f"Generate a compelling headline for a blog post about '{topic}'."
# These can run in parallel if independent
headline_task = call_llm_async(headline_prompt)
keywords_task = call_llm_async(f"Generate 10 SEO keywords for a blog post about '{topic}'.")
image_prompt_task = call_llm_async(f"Generate a creative image prompt for a blog post about '{topic}'.")
# Await all independent tasks concurrently
headline, keywords, image_prompt = await asyncio.gather(
headline_task,
keywords_task,
image_prompt_task
)
# Now, tasks dependent on the headline can run. We can still parallelize other independent tasks.
intro_prompt = f"Write an engaging introduction for a blog post titled '{headline}'."
body_outline_prompt = f"Create a detailed outline with 5 sections for the body of a blog post about '{topic}', with the headline '{headline}' and intro '{intro_placeholder}'." # Placeholder for intro logic
conclusion_prompt = f"Write a concise conclusion for a blog post about '{topic}'."
# Execute dependent tasks, potentially still in parallel where possible
intro_task = call_llm_async(intro_prompt)
conclusion_task = call_llm_async(conclusion_prompt)
intro, conclusion = await asyncio.gather(intro_task, conclusion_task)
# The body outline might be the longest, and depends on both headline and intro.
# We can run it after intro is available.
body_outline = await call_llm_async(f"Create a detailed outline with 5 sections for the body of a blog post about '{topic}', with the headline '{headline}' and intro '{intro}'.")
return {
"headline": headline,
"intro": intro,
"body_outline": body_outline,
"keywords": keywords,
"image_prompt": image_prompt,
"conclusion": conclusion
}
async def main_concurrent():
start_total_time = time.time()
post = await generate_blog_post_concurrent("The Evolution of Generative AI in Creative Writing")
end_total_time = time.time()
print(f"\n--- Concurrent Generation Results ---")
print(f"Total concurrent generation time: {end_total_time - start_total_time:.2f} seconds")
# print(json.dumps(post, indent=2))
await async_client.aclose() # Close the client after all operations
# To run:
# asyncio.run(main_concurrent())
Even this simple refactoring dramatically reduced the overall latency. Instead of waiting for each of the 5-6 LLM calls to complete sequentially, many of them could now run in the background. If 3 tasks could run in parallel, and each took 2 seconds, the total time for those 3 tasks dropped from 6 seconds to roughly 2 seconds (plus minimal overhead).
Implementing a Custom Batching Mechanism
While asyncio.gather helps with concurrent execution, it doesn't inherently "batch" requests to the LLM provider in a way that might leverage a provider's internal batching optimizations (if they exist) or reduce the number of HTTP connection setups. For scenarios where I had many small, independent tasks that could be grouped, I considered a custom batching mechanism. The idea is to accumulate requests for a short period or until a certain batch size is reached, then send them all at once.
This is particularly useful for tasks like processing a list of customer reviews, generating short descriptions for product listings, or classifying multiple text snippets. While the OpenAI API (and others) don't always expose a direct "batch multiple prompts in one HTTP call" endpoint for real-time inference, the principle of sending many concurrent requests (which asyncio.gather facilitates) acts as a form of client-side batching. Some providers, like Together AI, do offer specific Batch APIs for asynchronous, cost-optimized processing.
For our real-time scenario, where true HTTP-level batching wasn't directly supported by our chosen LLM API for immediate responses, the "batching" primarily came from the efficiency gained by using asyncio.gather to keep the network busy with multiple concurrent requests instead of waiting idly between synchronous calls. This is a crucial distinction: we're not sending a single HTTP request with multiple prompts (unless the API explicitly supports it), but rather sending multiple HTTP requests concurrently.
To illustrate a more robust client-side batching approach that also handles rate limiting and dynamic request accumulation, I developed a simple LLMRequestProcessor:
from collections import deque
import time
import asyncio
import httpx
# Configuration for our batch processor
MAX_BATCH_SIZE = 10
MAX_WAIT_TIME = 0.5 # seconds
LLM_API_ENDPOINT = "https://api.openai.com/v1/chat/completions"
LLM_MODEL = "gpt-3.5-turbo"
LLM_API_KEY = "YOUR_API_KEY" # Replace with actual API key or env var
class LLMRequestProcessor:
def __init__(self, client: httpx.AsyncClient):
self.client = client
self.request_queue = deque()
self.results = {}
self.lock = asyncio.Lock()
self.processing_task = None
self.last_process_time = time.time()
self.request_counter = 0
async def _process_batch(self):
current_batch = []
async with self.lock:
while self.request_queue and len(current_batch) < MAX_BATCH_SIZE:
current_batch.append(self.request_queue.popleft())
if not current_batch:
return
print(f"Processing batch of {len(current_batch)} requests...")
self.last_process_time = time.time()
tasks = []
for req_id, prompt, model, future in current_batch:
tasks.append(self._make_llm_api_call(req_id, prompt, model, future))
await asyncio.gather(*tasks)
async def _make_llm_api_call(self, req_id: str, prompt: str, model: str, future: asyncio.Future):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {LLM_API_KEY}"
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500
}
start_time = time.time()
try:
response = await self.client.post(LLM_API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
content = response.json()["choices"]["message"]["content"]
duration = time.time() - start_time
# print(f" Request {req_id} completed in {duration:.2f} seconds.")
future.set_result(content)
except httpx.RequestError as e:
print(f" Request {req_id} failed: {e}")
future.set_result(f"Error: {e}")
except Exception as e:
print(f" Request {req_id} unexpected error: {e}")
future.set_result(f"Error: {e}")
async def _batch_scheduler(self):
while True:
await asyncio.sleep(0.01) # Small sleep to prevent busy-waiting
async with self.lock:
# Process if batch is full or max wait time exceeded and there are requests
if (len(self.request_queue) >= MAX_BATCH_SIZE or
(self.request_queue and (time.time() - self.last_process_time) >= MAX_WAIT_TIME)):
await self._process_batch()
async def add_request(self, prompt: str, model: str = LLM_MODEL) -> asyncio.Future:
future = asyncio.Future()
req_id = f"req-{self.request_counter}"
self.request_counter += 1
async with self.lock:
self.request_queue.append((req_id, prompt, model, future))
if self.processing_task is None:
self.processing_task = asyncio.create_task(self._batch_scheduler())
return future
# Example Usage with the custom batch processor
async def generate_multiple_headlines(topics: list[str]):
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=5.0)) as client:
processor = LLMRequestProcessor(client)
tasks = []
for topic in topics:
prompt = f"Generate a compelling headline for a blog post about '{topic}'."
tasks.append(processor.add_request(prompt))
print(f"Added {len(tasks)} requests to the processor queue.")
results = await asyncio.gather(*tasks)
# Ensure any remaining tasks in the queue are processed if scheduler is still running
await asyncio.sleep(MAX_WAIT_TIME * 2) # Give scheduler time to process last batch
if processor.processing_task:
processor.processing_task.cancel()
try:
await processor.processing_task
except asyncio.CancelledError:
pass
return results
async def main_batching():
topics = [
"The Rise of Quantum Computing",
"Sustainable Urban Planning in 2026",
"Blockchain Beyond Cryptocurrencies",
"Personalized Medicine: A New Era",
"The Impact of AI on Remote Work",
"Future of Space Exploration",
"Cybersecurity Threats in IoT",
"Ethical AI Development",
"Augmented Reality in Education",
"Next-Gen Battery Technologies",
"The Gig Economy's Evolution",
"Renewable Energy Innovations"
]
start_total_time = time.time()
headlines = await generate_multiple_headlines(topics)
end_total_time = time.time()
print(f"\n--- Batched Generation Results ({len(topics)} topics) ---")
print(f"Total batched generation time: {end_total_time - start_total_time:.2f} seconds")
# for i, headline in enumerate(headlines):
# print(f"Topic {i+1}: {headline}")
# To run:
# asyncio.run(main_batching())
This LLMRequestProcessor accumulates requests and processes them in batches concurrently, either when the batch size limit is hit or a maximum wait time elapses. This helps manage the load, prevents overwhelming the API with too many concurrent requests (rate limiting consideration), and optimizes network utilization.
Metrics and Impact
The results were significant. By moving to asynchronous processing and implicitly batching requests through concurrency, I observed:
- Latency Reduction: For a typical blog post generation workflow involving 10-12 distinct LLM calls, the
p99latency dropped from an average of 45 seconds (synchronous) to approximately 5-8 seconds (asynchronous). For tasks that could be fully parallelized, the latency became closer to the duration of the single longest LLM call, plus minor overhead. - Cost Savings: The reduction in the number of effective API roundtrips, combined with more efficient token usage (as the LLM could sometimes process slightly larger, more contextual prompts within a single call due to better overall system throughput), led to a measurable cost reduction of about 20-25% for the same workload volume. This is on top of any prompt engineering optimizations.
- Increased Throughput: Our service could now handle a much higher volume of concurrent blog post generation requests without degradation in latency, leading to better scalability and a more responsive user experience.
It's important to note that these gains are complementary to other cost-saving strategies. For instance, while I focused on API efficiency, further cost reductions can be achieved through techniques like model quantization if you're running models locally or selecting smaller, quantized models from providers, as discussed in Practical LLM Quantization for 40% Cost Reduction. Similarly, even with efficient API calls, poorly engineered prompts can still inflate costs, making the strategies outlined in How I Tuned Prompt Engineering for Maximum AI Cost Savings absolutely crucial.
For more details on asynchronous programming in Python, I often refer to the official asyncio documentation, which is an excellent resource for understanding its core concepts.
What I Learned / The Challenge
This optimization journey reinforced a few critical lessons for me:
- Don't Assume External APIs Are the Only Bottleneck: It's easy to point fingers at external services for performance issues. However, our own application's interaction patterns can often be the primary culprit. Profiling and detailed logging are indispensable for pinpointing the real bottlenecks.
- Asynchronous Programming Adds Complexity: While immensely powerful for I/O-bound tasks,
asynciodoes introduce a steeper learning curve and requires careful management of state, error handling, and cancellation. Debugging asynchronous code can be more challenging than synchronous code. - Batching Requires Nuance: "Batching" isn't a one-size-fits-all solution. For real-time applications, concurrent API calls (client-side batching) are key. For non-real-time tasks, true provider-side batch APIs can offer even greater cost savings. Understanding the specific capabilities of your LLM provider's API is crucial.
- Balancing Latency and Throughput: Aggressive batching can sometimes increase the latency for individual requests if they have to wait for a batch to fill up. It's a trade-off that needs careful tuning based on the application's requirements (e.g., interactive user-facing vs. background processing).
Related Reading
- Practical LLM Quantization for 40% Cost Reduction - This post dives into techniques for reducing the computational cost of running LLMs, which is highly relevant if you're considering self-hosting models or selecting smaller, more efficient models from providers. It complements API call optimizations by addressing the per-token cost at the model level.
- How I Tuned Prompt Engineering for Maximum AI Cost Savings - Even with a highly efficient API integration, poorly designed prompts can lead to excessive token usage. This article covers strategies to optimize your prompts for both quality and cost-effectiveness, ensuring that each API call is as efficient as possible.
This journey of optimizing our LLM API interactions has been a significant step towards building a more robust, cost-efficient, and scalable content generation platform. Looking ahead, I plan to explore more advanced dynamic batching strategies, potentially integrating with LLM providers that offer native real-time batching endpoints if they become more prevalent. I'm also keen on developing smarter caching mechanisms for common LLM responses to further reduce redundant API calls and latency. The world of AI engineering is constantly evolving, and staying on top of these optimization techniques is key to building sustainable and performant applications.
Comments
Post a Comment