LLM APIs are priced per token. Process one document? Cheap. Process a thousand? Suddenly you’re looking at real money.
We ran into this building our YouTube Markdown Agent. A single video transcript might run 10,000-50,000 tokens. Our vault had over 1,000 documents. Even with Claude Haiku 3.5 (Anthropic’s most cost-effective model), we were looking at several dollars per batch run.
Then we discovered Anthropic’s Batch API.
The 50% Solution
Anthropic’s Batch API offers a simple deal: if you can wait up to 24 hours for results, you get 50% off.
| Feature | Standard API | Batch API |
|---|---|---|
| Cost | Full price | 50% discount |
| Processing | Immediate | Within 24 hours (usually ~30 min) |
| Rate limits | Per-minute throttling | Up to 100,000 requests |
| Timeout | 10 minutes | No timeout |
| Use case | Interactive | Background processing |
For interactive use (user waiting for results), you need the standard API. For background processing (overnight batch jobs, bulk content processing), Batch API is a no-brainer.
We built our system to use both.
Dual-Mode Architecture
Here’s the key insight: automatically choose the right API based on the workload.
Figure 1 - Dual API architecture diagram showing decision flow: single item goes to synchronous API for immediate response, multiple items go to Batch API for 50% cost savings
The decision logic is simple:
def process_request(items: list[Item]) -> ProcessingResult: if len(items) == 1: # Single item: use sync API for immediate results return process_sync(items[0]) else: # Multiple items: use Batch API for cost savings return process_batch(items)User processes one video? They get instant results. User processes 5 videos? They save 50% and results arrive in minutes.
Implementing the Batch Workflow
The Batch API has four phases:
Phase 1: Prepare Requests
Each request must be formatted as JSONL (JSON Lines):
def prepare_batch_request(item: Item, index: int) -> dict: """Prepare a single batch request."""
# Create a unique, traceable ID safe_name = re.sub(r'[^a-zA-Z0-9_-]', '_', item.name)[:30] custom_id = f"item_{index:05d}_{safe_name}"
return { "custom_id": custom_id, "params": { "model": "claude-3-5-haiku-20241022", "max_tokens": 4096, "messages": [ { "role": "user", "content": build_prompt(item) } ] } }The custom_id is critical - it’s how you match results back to source items. We use a format that includes both an index and sanitized name:
item_00042_Building_RAG_SystemsThis makes debugging much easier when you’re processing hundreds of items.
Phase 2: Submit to Batch API
import anthropic
client = anthropic.Anthropic()
def submit_batch(requests: list[dict]) -> str: """Submit batch and return batch ID."""
batch = client.messages.batches.create(requests=requests)
return batch.idThat’s it. The API returns a batch ID immediately. Your requests are now queued for processing.
Phase 3: Poll for Completion
import time
def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict: """Poll until batch completes."""
while True: batch = client.messages.batches.retrieve(batch_id)
status = batch.processing_status counts = batch.request_counts
print(f"Status: {status}") print(f" Succeeded: {counts.succeeded}") print(f" Processing: {counts.processing}") print(f" Errored: {counts.errored}")
if status == "ended": return { "succeeded": counts.succeeded, "errored": counts.errored, }
time.sleep(poll_interval)In practice, batches of ~100 requests complete in 5-15 minutes. We poll every 30 seconds, but you could poll less frequently for larger batches.
Phase 4: Retrieve and Apply Results
def retrieve_results(batch_id: str) -> list[dict]: """Retrieve all results from a completed batch."""
results = []
for result in client.messages.batches.results(batch_id): results.append({ "custom_id": result.custom_id, "content": result.result.message.content[0].text, "status": result.result.type, })
return resultsMatch results back to source items using the custom_id.
Figure 2 - Batch processing flow diagram showing the four phases: Prepare JSONL requests, Submit to API, Poll for completion, Retrieve and apply results
The Multi-Batch Challenge
Anthropic limits individual batches to avoid overwhelming their systems. For large jobs, you need to split across multiple batches.
This is where we hit our first serious bug.
The Bug: Index Mismatch
When processing 122 files in two batches:
- Batch 1: Files 0-99 (custom_ids:
file_00000throughfile_00099) - Batch 2: Files 100-121 (custom_ids:
file_00100throughfile_00121)
But our metadata was indexed per-batch, not globally. Batch 2’s metadata said “file 0 is X” when it should have said “file 100 is X”.
The fix: Always use global indices:
def prepare_all_batches(items: list[Item], batch_size: int = 100): """Prepare batches with global indexing."""
all_requests = [] all_metadata = {}
for i, item in enumerate(items): # Use global index i, not batch-local index request = prepare_batch_request(item, i)
all_requests.append(request) all_metadata[request["custom_id"]] = { "global_index": i, "item": item, }
# Split into batches batches = [ all_requests[i:i + batch_size] for i in range(0, len(all_requests), batch_size) ]
return batches, all_metadataSubmitting Multiple Batches
For 782 files, we created 8 batches:
Submitted batch 1/8: msgbatch_01SdR31... (100 requests)Submitted batch 2/8: msgbatch_01UhFNq... (100 requests)...Submitted batch 8/8: msgbatch_01XGJWLe... (82 requests)All 8 batches completed in about 25 minutes with 100% success rate.
Frontend Integration
The frontend needs to handle both modes gracefully:
async function processItems(items: Item[]) { const response = await api.process(items);
if (response.mode === "sync") { // Immediate result - display now displayResult(response.result); } else { // Batch mode - poll for completion const batchId = response.batch_id; pollForCompletion(batchId); }}
async function pollForCompletion(batchId: string) { const pollInterval = 5000; // 5 seconds
while (true) { const status = await api.getBatchStatus(batchId);
updateProgressUI(status);
if (status.complete) { const results = await api.getBatchResults(batchId); displayResults(results); break; }
await sleep(pollInterval); }}Figure 3 - Progress UI mockup showing batch status with progress bar, succeeded/processing/errored counts, and estimated time remaining
Progressive Testing Strategy
Never run your first batch on production data.
We used a progressive testing approach:
| Run | Items | Purpose |
|---|---|---|
| 1 | 2 | Validate pipeline works |
| 2 | 6 | Test edge cases |
| 3 | 52 | First real folder |
| 4 | 72 | Parallel test |
| 5 | 122 | Multi-batch test |
| 6 | 782 | Full production |
This caught two critical bugs:
- Multi-batch index mismatch (discussed above)
- Unicode console output on Windows - filenames with emojis crashed the logging
Both would have caused silent data corruption at scale. Progressive testing isn’t optional - it’s essential.
Cost Analysis
Real numbers from our production run (1,028 files):
| Approach | Estimated Cost |
|---|---|
| Standard API | ~$3.00 |
| Batch API | ~$1.50 |
| Savings | $1.50 (50%) |
For a one-time cleanup, $1.50 saved isn’t life-changing. But for ongoing processing - say, 100 videos per week - the savings compound:
| Timeframe | Standard API | Batch API | Savings |
|---|---|---|---|
| Weekly | $0.50 | $0.25 | $0.25 |
| Monthly | $2.00 | $1.00 | $1.00 |
| Yearly | $24.00 | $12.00 | $12.00 |
The Batch API makes ongoing AI processing economically viable for personal projects.
Error Handling
Batch requests can fail individually. Always handle partial failures:
def apply_results_with_errors(results: list[dict], metadata: dict): """Apply results, handling individual failures."""
succeeded = 0 failed = 0
for result in results: custom_id = result["custom_id"] item = metadata[custom_id]
if result["status"] == "succeeded": item.update_content(result["content"]) succeeded += 1 else: error_msg = result.get("error", "Unknown error") log_error(f"Failed {custom_id}: {error_msg}") failed += 1
return { "succeeded": succeeded, "failed": failed, "success_rate": succeeded / (succeeded + failed) * 100, }In our runs, we achieved 100% success rate. But designing for partial failure means you can recover gracefully when issues occur.
When to Use Each Mode
Use Standard API when:
- Processing a single item
- User is waiting for results
- You need sub-second latency
- Testing/debugging (faster iteration)
Use Batch API when:
- Processing 2+ items
- Results aren’t time-sensitive
- Running overnight jobs
- Cost is a concern
- Processing large backlogs
The threshold of “2 items” is somewhat arbitrary - you could set it at 5 or 10 if the UX for polling is annoying. We found 2 works well because even waiting 5-10 minutes for 2 videos feels reasonable given the 50% savings.
Figure 4 - Decision matrix diagram showing when to use sync vs batch API based on item count, latency requirements, and cost sensitivity
Key Takeaways
-
The 50% discount is real. For background processing, Batch API is a no-brainer.
-
Build for both modes. Let your system choose automatically based on workload.
-
Global indices, not local. When splitting across batches, track items globally.
-
custom_id is your lifeline. Make it informative enough to debug.
-
Test progressively. 2 → 6 → 52 → full production. Catch bugs early.
-
Design for partial failure. Individual requests can fail even when the batch succeeds.
-
Poll responsibly. 30-second intervals are fine. Don’t hammer the API.
Related Articles
- From YouTube to Knowledge Graph - System overview
- Obsidian Vault Curation at Scale - Case study using Batch API
- Building a Semantic Note Network - What happens after processing
This article is part of our series on building AI-powered knowledge management tools. Written with assistance from Claude Code.