chat-limiter
A Pythonic rate limiter for OpenAI, Anthropic, and OpenRouter APIs that provides a high-level chat completion interface with automatic rate limit management.
Features
- 🚀 High-Level Chat Interface: OpenAI/Anthropic-style chat completion methods
- 📡 Automatic Rate Limit Discovery: Fetches current limits from API response headers
- ⚡ Sync & Async Support: Use with
async/await
or synchronous code
- 📦 Batch Processing: Process multiple requests efficiently with concurrency control
- 🔄 Intelligent Retry Logic: Exponential backoff with provider-specific optimizations
- 🌐 Multi-Provider Support: Works seamlessly with OpenAI, Anthropic, and OpenRouter
- 🎯 Pythonic Design: Context manager interface with proper error handling
- 🛡️ Fully Tested: Comprehensive test suite with 93% coverage
- 🔧 Token Estimation: Basic token counting for better rate limit management
- 🔑 Environment Variable Support: Automatic API key detection from env vars
- 🔀 Provider Override: Manually specify provider for custom models
Installation
pip install chat-limiter
Or with uv:
uv add chat-limiter
Quick Start
High-Level Chat Completion Interface (Recommended)
import asyncio
from chat_limiter import ChatLimiter, Message, MessageRole
async def main():
async with ChatLimiter.for_model("gpt-4o") as limiter:
response = await limiter.chat_completion(
model="gpt-4o",
messages=[Message(role=MessageRole.USER, content="Hello!")]
)
print(response.choices[0].message.content)
async with ChatLimiter.for_model("claude-3-5-sonnet-20241022", api_key="sk-ant-...") as limiter:
response = await limiter.simple_chat(
model="claude-3-5-sonnet-20241022",
prompt="What is Python?",
max_tokens=100
)
print(response)
asyncio.run(main())
Environment Variables
Set your API keys as environment variables:
export OPENAI_API_KEY="sk-your-openai-key"
export ANTHROPIC_API_KEY="sk-ant-your-anthropic-key"
export OPENROUTER_API_KEY="sk-or-your-openrouter-key"
The library will automatically detect the provider from the model name and use the appropriate environment variable.
Provider Override
For custom models or when auto-detection fails:
async with ChatLimiter.for_model(
"custom-model-name",
provider="openai",
api_key="sk-key"
) as limiter:
response = await limiter.chat_completion(
model="custom-model-name",
messages=[Message(role=MessageRole.USER, content="Hello!")]
)
Synchronous Usage
from chat_limiter import ChatLimiter, Message, MessageRole
with ChatLimiter.for_model("gpt-4o") as limiter:
response = limiter.chat_completion_sync(
model="gpt-4o",
messages=[Message(role=MessageRole.USER, content="Hello!")]
)
print(response.choices[0].message.content)
text_response = limiter.simple_chat_sync(
model="gpt-4o",
prompt="What is the capital of France?",
max_tokens=50
)
print(text_response)
Batch Processing with High-Level Interface
import asyncio
from chat_limiter import (
ChatLimiter,
Message,
MessageRole,
ChatCompletionRequest,
process_chat_completion_batch,
create_chat_completion_requests,
BatchConfig
)
async def batch_example():
requests = create_chat_completion_requests(
model="gpt-4o",
prompts=["Hello!", "How are you?", "What is Python?"],
max_tokens=50,
temperature=0.7
)
async with ChatLimiter.for_model("gpt-4o") as limiter:
config = BatchConfig(
max_concurrent_requests=5,
max_retries_per_item=3,
group_by_model=True
)
results = await process_chat_completion_batch(limiter, requests, config)
for result in results:
if result.success:
response = result.result
print(response.choices[0].message.content)
asyncio.run(batch_example())
Provider Support
Auto-Detection from Model Names
The library automatically detects providers based on model names:
- OpenAI:
gpt-4o
, gpt-4o-mini
, gpt-3.5-turbo
, etc.
- Anthropic:
claude-3-5-sonnet-20241022
, claude-3-haiku-20240307
, etc.
- OpenRouter:
openai/gpt-4o
, anthropic/claude-3-sonnet
, etc.
Provider-Specific Features
OpenAI
- ✅ Automatic header parsing (
x-ratelimit-*
)
- ✅ Request and token rate limiting
- ✅ Exponential backoff with jitter
- ✅ Model-specific optimizations
Anthropic
- ✅ Claude-specific headers (
anthropic-ratelimit-*
)
- ✅ Separate input/output token tracking
- ✅ System message handling
- ✅ Retry-after header support
OpenRouter
- ✅ Multi-model proxy support
- ✅ Dynamic limit discovery
- ✅ Model-specific rate adjustments
- ✅ Credit-based limiting
Advanced Usage
Low-Level Interface
For advanced users who need direct HTTP access:
from chat_limiter import ChatLimiter, Provider
async with ChatLimiter(
provider=Provider.OPENAI,
api_key="sk-your-key"
) as limiter:
response = await limiter.request(
"POST", "/chat/completions",
json={
"model": "gpt-4o",
"messages": [{"role": "user", "content": "Hello!"}]
}
)
result = response.json()
print(result["choices"][0]["message"]["content"])
Custom HTTP Clients
import httpx
from chat_limiter import ChatLimiter
custom_client = httpx.AsyncClient(
timeout=httpx.Timeout(60.0),
headers={"Custom-Header": "value"}
)
async with ChatLimiter.for_model(
"gpt-4o",
http_client=custom_client
) as limiter:
response = await limiter.chat_completion(
model="gpt-4o",
messages=[Message(role=MessageRole.USER, content="Hello!")]
)
Provider Configuration
from chat_limiter import ChatLimiter, ProviderConfig, Provider
config = ProviderConfig(
provider=Provider.OPENAI,
base_url="https://api.openai.com/v1",
default_request_limit=100,
default_token_limit=50000,
max_retries=5,
base_backoff=2.0,
request_buffer_ratio=0.8
)
async with ChatLimiter(config=config, api_key="sk-key") as limiter:
response = await limiter.chat_completion(
model="gpt-4o",
messages=[Message(role=MessageRole.USER, content="Hello!")]
)
Error Handling
from chat_limiter import ChatLimiter, Message, MessageRole
from tenacity import RetryError
import httpx
async with ChatLimiter.for_model("gpt-4o") as limiter:
try:
response = await limiter.chat_completion(
model="gpt-4o",
messages=[Message(role=MessageRole.USER, content="Hello!")]
)
except RetryError as e:
print(f"Request failed after retries: {e}")
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e.response.status_code}")
except httpx.RequestError as e:
print(f"Request error: {e}")
Monitoring and Metrics
async with ChatLimiter.for_model("gpt-4o") as limiter:
await limiter.chat_completion(
model="gpt-4o",
messages=[Message(role=MessageRole.USER, content="Hello!")]
)
limits = limiter.get_current_limits()
print(f"Requests used: {limits['requests_used']}/{limits['request_limit']}")
print(f"Tokens used: {limits['tokens_used']}/{limits['token_limit']}")
limiter.reset_usage_tracking()
Message Types and Parameters
Message Structure
from chat_limiter import Message, MessageRole
messages = [
Message(role=MessageRole.SYSTEM, content="You are a helpful assistant."),
Message(role=MessageRole.USER, content="Hello!"),
Message(role=MessageRole.ASSISTANT, content="Hi there!"),
Message(role=MessageRole.USER, content="How are you?")
]
Chat Completion Parameters
response = await limiter.chat_completion(
model="gpt-4o",
messages=messages,
max_tokens=100,
temperature=0.7,
top_p=0.9,
stop=["END"],
stream=False,
frequency_penalty=0.0,
presence_penalty=0.0,
top_k=40,
)
Batch Processing
Simple Batch Processing
from chat_limiter import create_chat_completion_requests, process_chat_completion_batch
requests = create_chat_completion_requests(
model="gpt-4o",
prompts=["Question 1", "Question 2", "Question 3"],
max_tokens=50
)
async with ChatLimiter.for_model("gpt-4o") as limiter:
results = await process_chat_completion_batch(limiter, requests)
for result in results:
if result.success:
print(result.result.choices[0].message.content)
else:
print(f"Error: {result.error}")
Batch Configuration
from chat_limiter import BatchConfig
config = BatchConfig(
max_concurrent_requests=10,
max_workers=4,
max_retries_per_item=3,
retry_delay=1.0,
stop_on_first_error=False,
group_by_model=True,
adaptive_batch_size=True
)
Rate Limiting Details
How It Works
- Header Parsing: Automatically extracts rate limit information from API response headers
- Token Bucket Algorithm: Uses PyrateLimiter for smooth rate limiting with burst support
- Adaptive Limits: Updates limits based on server responses in real-time
- Intelligent Queuing: Coordinates requests to stay under limits while maximizing throughput
Provider-Specific Behavior
OpenAI | ✅ RPM | ✅ TPM | ✅ Headers | Model detection, batch optimization |
Anthropic | ✅ RPM | ✅ Input/Output TPM | ✅ Headers | Tier handling, system messages |
OpenRouter | ✅ RPM | ✅ TPM | ✅ Auth endpoint | Multi-model, credit tracking |
Testing
The library includes a comprehensive test suite:
uv run pytest
uv run pytest --cov=chat_limiter
uv run pytest tests/test_high_level_interface.py -v
Development
git clone https://github.com/your-repo/chat-limiter.git
cd chat-limiter
uv sync --group dev
uv run ruff check src/ tests/
uv run mypy src/
uv run ruff format src/ tests/
Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Run the test suite and linting
- Submit a pull request
License
MIT License - see LICENSE file for details.
Changelog
0.2.0 (Latest)
- 🚀 High-level chat completion interface - OpenAI/Anthropic-style methods
- 🔑 Environment variable support - Automatic API key detection
- 🔀 Provider override - Manual provider specification for custom models
- 📦 Enhanced batch processing - High-level batch operations with ChatCompletionRequest
- 🎯 Unified message types - Cross-provider message and response compatibility
- 🧪 Improved testing - 93% test coverage with comprehensive high-level interface tests
0.1.0 (Initial Release)
- Multi-provider support (OpenAI, Anthropic, OpenRouter)
- Async and sync interfaces
- Batch processing with concurrency control
- Automatic rate limit discovery
- Comprehensive test suite
- Type hints and documentation