Implementing Distributed Rate Limiting for LLM APIs in Serverless Functions

Implementing Distributed Rate Limiting for LLM APIs in Serverless Functions

I still remember the knot in my stomach. It was a Tuesday morning, and our internal monitoring dashboard, usually a picture of serene green, was screaming red. Specifically, our LLM API integration was throwing a cascade of 429 Too Many Requests errors. What followed was a frustrating few hours of debugging, ultimately tracing the issue back to an uncontrolled explosion of concurrent LLM API calls from our serverless functions. Our carefully optimized LLM orchestration, which I'd spent weeks refining for cost and latency, was suddenly failing spectacularly, leading to increased processing times and, predictably, a noticeable spike in our cloud bills.

This wasn't just a blip; it was a fundamental architectural flaw exposed by scale. Each of our serverless functions, designed to be stateless and scale independently, was doing its job perfectly in isolation. However, collectively, they were behaving like a thundering herd, overwhelming the rate limits imposed by our LLM provider. This post dives deep into how I diagnosed this critical concurrency problem and implemented a robust, distributed rate-limiting mechanism to bring order to the chaos and safeguard our application's stability and cost-efficiency.

The Thundering Herd Problem in Serverless LLM Orchestration

Our application leverages serverless functions (think AWS Lambda or Google Cloud Functions/Run) to orchestrate complex LLM workflows. Each function might perform a specific step: pre-processing user input, generating multiple prompt variations, making parallel LLM calls for diverse perspectives, or post-processing responses. This pattern is incredibly powerful for scalability and cost-efficiency, as you only pay for compute when it's actively running. I've previously written about how we optimized these orchestration costs in Optimizing LLM Orchestration Costs with Serverless Functions, and focused on latency in Optimizing LLM API Latency: Async, Streaming, and Pydantic in Production.

The problem, however, arises when these independently scaling functions all attempt to access a shared external resource – in our case, the LLM API – that has its own, often strict, rate limits. A single serverless function invocation might make one or two LLM calls, well within any reasonable per-client rate limit. But when thousands of these functions are invoked concurrently due to high user traffic, the cumulative effect can be disastrous. The LLM provider sees a massive surge of requests from our application, quickly exceeding the allowed requests per minute (RPM) or requests per second (RPS) limits.

My initial reaction to the 429 errors was to ensure our individual LLM API calls had proper retry logic with exponential backoff. This is standard practice and certainly necessary. Here's a simplified Python example of what that might look like:


import httpx
import asyncio
import time

async def call_llm_with_retry(prompt: str, max_retries: int = 5):
    base_delay = 1.0 # seconds
    for attempt in range(max_retries):
        try:
            print(f"Attempt {attempt + 1} to call LLM API...")
            # Simulate an LLM API call
            # In a real scenario, this would be an actual API client call
            response = await httpx.post(
                "https://api.llmprovider.com/generate",
                json={"prompt": prompt},
                timeout=10.0
            )
            response.raise_for_status() # Raises HTTPStatusError for 4xx/5xx responses
            print(f"LLM API call successful on attempt {attempt + 1}.")
            return response.json()
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429 and attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt) + (random.random() * 0.5 * base_delay) # Add jitter
                print(f"Rate limited (429). Retrying in {delay:.2f} seconds...")
                await asyncio.sleep(delay)
            else:
                print(f"Failed after {attempt + 1} attempts or non-retryable error: {e}")
                raise
        except httpx.RequestError as e:
            print(f"Network error during LLM API call: {e}")
            if attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt) + (random.random() * 0.5 * base_delay)
                print(f"Network error. Retrying in {delay:.2f} seconds...")
                await asyncio.sleep(delay)
            else:
                raise

# Example usage within a serverless function
async def my_serverless_handler(event, context):
    prompt_to_send = "Generate a short blog post about distributed systems."
    try:
        llm_response = await call_llm_with_retry(prompt_to_send)
        return {"statusCode": 200, "body": json.dumps(llm_response)}
    except Exception as e:
        print(f"Handler failed: {e}")
        return {"statusCode": 500, "body": json.dumps({"error": str(e)})}

While this retry logic is crucial for handling transient errors and occasional rate limits, it doesn't solve the fundamental problem. If a thousand functions simultaneously hit the rate limit, they will all back off and then retry *around the same time*, creating a new "thundering herd" after the backoff period. This leads to increased overall latency for the user, wasted compute cycles for retries, and a frustratingly unpredictable experience. Our costs also crept up because we were paying for multiple invocations of our serverless functions just to retry failed LLM calls, even if the LLM provider itself didn't charge for 429 responses.

The Need for Distributed Concurrency Control

It became clear that individual function-level retry mechanisms were insufficient. We needed a way to coordinate API calls *across* all concurrently running serverless functions. This meant implementing a *distributed* rate limiter – a central gatekeeper that would ensure our collective requests to the LLM API never exceeded its defined limits. The challenge was doing this efficiently and reliably in a stateless, ephemeral serverless environment.

I considered a few approaches:

  1. API Gateway Rate Limiting: Some cloud providers offer rate limiting at the API Gateway level. This is great for inbound requests to *our* API, but less ideal for outbound requests from our internal serverless functions to a *third-party* API. It would involve routing all our outbound LLM traffic through our own API Gateway, adding latency and complexity.
  2. Dedicated Rate Limiting Service: Using a specialized service or a managed Redis instance to store rate limiting state. This is a solid approach but introduces another dependency and operational overhead.
  3. Custom Distributed Rate Limiter with a Shared Database: This seemed like the sweet spot for our current architecture. We already use various managed databases (DynamoDB, Firestore) for other shared state. I could implement a token bucket algorithm using one of these as a distributed, persistent store for our rate limiting state. This would give us fine-grained control and integrate well with our existing cloud infrastructure.

I opted for the custom distributed rate limiter using a shared database. This allowed me to leverage existing infrastructure knowledge and avoid adding a new, specialized service just for rate limiting. I decided to implement a token bucket algorithm, a classic technique for rate limiting.

Implementing a Token Bucket with a Shared State

The token bucket algorithm works like this: Imagine a bucket that holds a certain number of "tokens." Requests consume tokens. If the bucket is empty, requests are denied or queued. Tokens are added to the bucket at a fixed rate. This effectively smooths out bursts of requests.

To make this distributed, the "bucket" state (current tokens, last refill time) needs to be stored in a shared, accessible location. I chose a single record in a database like AWS DynamoDB or Google Cloud Firestore. These databases offer strong consistency guarantees for single-item updates, which is crucial for preventing race conditions when multiple serverless functions try to acquire tokens simultaneously.

Here's a conceptual Python implementation, assuming an asynchronous database client:


import time
import asyncio
import uuid
from datetime import datetime, timezone

# --- Configuration for our Rate Limiter ---
# These values would be loaded from environment variables or a config service
RATE_LIMIT_BUCKET_CAPACITY = 100  # Max tokens in the bucket (e.g., 100 LLM calls)
RATE_LIMIT_REFILL_RATE_PER_SECOND = 10  # Tokens added per second (e.g., 10 LLM calls/sec)
RATE_LIMITER_KEY = "llm_api_global_rate_limiter" # Unique key for the database record

# Conceptual database client - replace with actual DynamoDB/Firestore client
class MockKeyValueStore:
    def __init__(self):
        self._store = {}
        self._lock = asyncio.Lock() # Simulate database transaction lock

    async def get_item(self, key: str):
        async with self._lock:
            return self._store.get(key)

    async def put_item(self, key: str, item: dict, expected_attributes: dict = None):
        async with self._lock:
            if expected_attributes:
                current_item = self._store.get(key)
                # Simulate conditional write: check if current_item matches expected_attributes
                # For a real DB like DynamoDB, this would be UpdateItem with ConditionExpression
                # For Firestore, this might involve a transaction with a read-then-write
                match = True
                if current_item is None and expected_attributes: # If we expect an item but it's not there
                    match = False
                elif current_item:
                    for attr, val in expected_attributes.items():
                        if current_item.get(attr) != val:
                            match = False
                            break
                if not match:
                    raise ValueError("Conditional write failed: item changed concurrently.")

            self._store[key] = item
            return True

# Initialize a global mock store for demonstration
db_store = MockKeyValueStore()

async def initialize_rate_limiter_state():
    """Initializes the rate limiter state in the database if it doesn't exist."""
    current_state = await db_store.get_item(RATE_LIMITER_KEY)
    if not current_state:
        initial_state = {
            "tokens": RATE_LIMIT_BUCKET_CAPACITY,
            "last_refill_timestamp": datetime.now(timezone.utc).isoformat(),
            "version": str(uuid.uuid4()) # Optimistic locking / versioning
        }
        await db_store.put_item(RATE_LIMITER_KEY, initial_state)
        print("Rate limiter state initialized.")
    else:
        print("Rate limiter state already exists.")

async def acquire_token(amount: int = 1, timeout: float = 10.0, retry_delay: float = 0.1):
    """
    Attempts to acquire tokens from the distributed bucket.
    Retries on contention (conditional write failures).
    """
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            current_state = await db_store.get_item(RATE_LIMITER_KEY)
            if not current_state:
                # Should not happen after initialization, but handle defensively
                await initialize_rate_limiter_state()
                continue

            current_tokens = current_state.get("tokens", 0)
            last_refill_timestamp_str = current_state.get("last_refill_timestamp")
            last_refill_dt = datetime.fromisoformat(last_refill_timestamp_str)
            version = current_state.get("version")

            # Refill tokens based on elapsed time
            now_dt = datetime.now(timezone.utc)
            time_elapsed_seconds = (now_dt - last_refill_dt).total_seconds()
            tokens_to_add = int(time_elapsed_seconds * RATE_LIMIT_REFILL_RATE_PER_SECOND)

            new_tokens = min(RATE_LIMIT_BUCKET_CAPACITY, current_tokens + tokens_to_add)
            
            # If we don't have enough tokens, wait or fail
            if new_tokens < amount:
                # Calculate how long to wait for enough tokens to refill
                tokens_needed = amount - new_tokens
                wait_time = tokens_needed / RATE_LIMIT_REFILL_RATE_PER_SECOND
                print(f"Not enough tokens ({new_tokens}/{amount}). Waiting {wait_time:.2f}s...")
                await asyncio.sleep(min(wait_time, timeout - (time.time() - start_time) + retry_delay)) # Don't wait past overall timeout
                continue # Re-attempt after waiting

            # Consume tokens
            new_tokens -= amount
            
            # Update the state with new tokens and current timestamp
            new_state = {
                "tokens": new_tokens,
                "last_refill_timestamp": now_dt.isoformat(),
                "version": str(uuid.uuid4()) # Generate new version for optimistic locking
            }

            # Attempt a conditional write: only update if 'version' hasn't changed
            # This is crucial to prevent race conditions from concurrent updates
            await db_store.put_item(
                RATE_LIMITER_KEY,
                new_state,
                expected_attributes={"version": version}
            )
            print(f"Tokens acquired successfully. Remaining tokens: {new_tokens}")
            return True # Tokens acquired

        except ValueError as e:
            if "Conditional write failed" in str(e):
                print(f"Concurrency conflict detected. Retrying token acquisition in {retry_delay:.2f}s...")
                await asyncio.sleep(retry_delay) # Wait a bit before retrying on conflict
            else:
                raise
        except Exception as e:
            print(f"Error during token acquisition: {e}")
            raise

    print(f"Failed to acquire token within {timeout} seconds.")
    return False # Failed to acquire token within timeout

# Example usage within a serverless function that needs LLM access
async def guarded_llm_handler(event, context):
    prompt_to_send = "Summarize this article."
    
    # Attempt to acquire a token before calling the LLM API
    if await acquire_token(amount=1, timeout=30.0): # Wait up to 30 seconds for a token
        try:
            print("Token acquired, proceeding with LLM call...")
            llm_response = await call_llm_with_retry(prompt_to_send)
            return {"statusCode": 200, "body": json.dumps(llm_response)}
        except Exception as e:
            print(f"LLM call failed after token acquisition: {e}")
            # Consider releasing token or handling error specifically
            return {"statusCode": 500, "body": json.dumps({"error": str(e)})}
    else:
        print("Could not acquire token within timeout. Denying request.")
        return {"statusCode": 429, "body": json.dumps({"error": "Rate limit exceeded globally. Try again later."})}

# --- Main execution for testing ---
async def main_test():
    await initialize_rate_limiter_state()
    print("\n--- Simulating concurrent token acquisitions ---")
    tasks = []
    for i in range(15): # Simulate 15 concurrent function invocations
        tasks.append(guarded_llm_handler(None, None))
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    import random
    import json
    asyncio.run(main_test())

A crucial part of this implementation is the "conditional write" (expected_attributes={"version": version} in my mock). In DynamoDB, this translates to using a ConditionExpression to ensure the update only happens if the version attribute in the database matches the version we read. For Firestore, you'd typically use a transaction to read the current state, compute the new state, and then write it back, ensuring atomicity. This optimistic locking mechanism prevents race conditions where two functions read the same state, compute new tokens, and then try to write, with the last write winning and potentially corrupting the token count.

For more details on conditional writes in DynamoDB, you can refer to the official AWS DynamoDB documentation on Condition Expressions. These atomic operations are fundamental to building robust distributed systems.

Challenges and Considerations

Implementing this wasn't without its challenges:

  1. Database Contention: A single database record becomes a hot spot. While DynamoDB and Firestore are highly scalable, frequent updates to a single item can still lead to throttling or increased latency if not managed correctly. We configured our database with sufficient write capacity and monitored its metrics closely.
  2. Latency of Token Acquisition: Each LLM API call now has a preceding database read and write operation. This adds a small but measurable amount of latency. I experimented with caching strategies (e.g., in-memory cache with a short TTL for the token state within a single function instance, but always falling back to the database for the authoritative state check), but ultimately, the overhead was acceptable given the stability gains.
  3. Cost of Database Operations: Every token acquisition involves a read and a conditional write. For high-throughput scenarios, this can add up. It's a trade-off: pay for database operations to save on potentially more expensive LLM API calls (especially if failed calls still incur some cost) and ensure application stability.
  4. Refinement of Bucket Parameters: Determining the optimal RATE_LIMIT_BUCKET_CAPACITY and RATE_LIMIT_REFILL_RATE_PER_SECOND required careful calibration against the LLM provider's actual rate limits and our application's expected traffic patterns. This involved monitoring LLM API error rates, our internal queue depths, and the token acquisition success rate of our limiter.

Metrics and Monitoring: Proving the Solution

Once the distributed rate limiter was deployed, the difference was stark. Our LLM API 429 error rates plummeted from a peak of nearly 60% during the incident to virtually zero. More importantly, the overall latency of our LLM-dependent workflows stabilized. Instead of functions retrying endlessly and users waiting, requests either succeeded promptly or were gracefully deferred (by waiting for a token) with a clear indication if a timeout occurred.

I set up new monitoring dashboards to track:

  • LLM API 429 Error Count: The primary metric. This went from critical to negligible.
  • Token Acquisition Success Rate: A high success rate indicated our bucket parameters were well-tuned for current traffic. A low success rate would indicate either insufficient capacity or a surge in demand.
  • Average Token Acquisition Latency: To ensure the database operations weren't introducing unacceptable delays.
  • Serverless Function Concurrency: While our rate limiter was active, we could see the number of active functions that were *attempting* to make LLM calls, but the actual rate of *successful* calls to the LLM API was now capped and smoothed.

The cost implications were also positive. While we incurred some database costs for the rate limiter, these were significantly offset by:

  • Reduced LLM API charges: Fewer failed calls and more efficient use of our allocated quota.
  • Reduced serverless compute costs: Fewer retries meant shorter average function execution times and fewer total invocations for tasks that previously failed and required re-processing.

What I Learned / The Challenge

This incident and its resolution reinforced a critical lesson about building resilient systems on cloud infrastructure: scaling individual components (like serverless functions) is easy, but managing the interactions between these scaled components and external, rate-limited services requires explicit, distributed coordination. The "stateless" nature of serverless functions is a double-edged sword; while it offers incredible scalability, it also means you can't rely on in-memory state for global coordination. Every shared resource needs a robust, distributed control mechanism.

The challenge was not just implementing a token bucket, but doing it in a way that was:

  • Asynchronous: To fit into our existing async Python workflows.
  • Resilient to contention: Using optimistic locking via conditional writes.
  • Observable: With clear metrics to understand its behavior.
  • Cost-effective: Balancing the cost of the limiter itself against the savings it provided.

This experience highlighted that even seemingly simple external API integrations can become complex distributed systems problems at scale. It pushed me to think more deeply about the aggregate behavior of our serverless fleet rather than just the behavior of individual functions.

Related Reading

Looking ahead, I'm exploring ways to make our rate limiter even more dynamic, potentially adjusting the refill rate based on real-time feedback from the LLM provider or our own system load. I'm also considering integrating circuit breaker patterns to gracefully degrade service when the LLM API is experiencing prolonged issues, rather than just queuing requests. The journey to build truly resilient, cost-optimized, and performant AI applications is ongoing, and every challenge brings a new opportunity to learn and innovate.

Comments

Popular posts from this blog

Optimizing LLM API Latency: Async, Streaming, and Pydantic in Production

How I Built a Semantic Cache to Reduce LLM API Costs

How I Squeezed LLM Inference onto a Raspberry Pi for Local AI