Batch Document Processing: Process Hundreds of Files via API
Batch Document Processing: Process Hundreds of Files via API
Single-document extraction is useful. But the real value unlocks when you can process hundreds or thousands of documents through the same pipeline — a quarter's worth of invoices, a folder of contracts, an archive of forms.
This guide covers how to set up batch document processing with Smole's API, including parallel execution, error handling, and result collection.
Architecture of a Batch Pipeline
Document folder
│
├── file_001.pdf
├── file_002.pdf
├── ...
└── file_500.pdf
│
▼
Upload (parallel)
│
▼
Pipeline processing
│
▼
Result collection
│
▼
Output (JSON, CSV, database)
Each document is processed independently. A failure in one doesn't affect the others.
Python Batch Processing
Basic Sequential Processing
Start simple — process one file at a time:
import requests
import time
import json
from pathlib import Path
API_BASE = "https://api.smole.tech/api"
headers = {"Authorization": f"Bearer {API_KEY}"}
def extract(file_path, schema_id, timeout=120):
with open(file_path, "rb") as f:
resp = requests.post(
f"{API_BASE}/pipeline/file",
headers=headers,
files={"file": f},
data={"schemaId": schema_id}
)
resp.raise_for_status()
pipeline_id = resp.json()["id"]
start = time.time()
while time.time() - start < timeout:
resp = requests.get(f"{API_BASE}/pipeline/{pipeline_id}", headers=headers)
data = resp.json()
if data["status"] == "completed":
return data["extraction"]["data"]
if data["status"] == "failed":
raise Exception(f"Failed: {data.get('error')}")
time.sleep(2)
raise TimeoutError("Extraction timed out")
# Process all PDFs in a folder
results = []
for pdf in sorted(Path("./documents").glob("*.pdf")):
try:
data = extract(str(pdf), schema_id)
results.append({"file": pdf.name, "status": "ok", "data": data})
print(f" OK: {pdf.name}")
except Exception as e:
results.append({"file": pdf.name, "status": "error", "error": str(e)})
print(f" FAIL: {pdf.name}: {e}")
with open("results.json", "w") as f:
json.dump(results, f, indent=2)
Parallel Processing with ThreadPoolExecutor
Process multiple documents simultaneously:
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_batch(folder, schema_id, max_workers=5):
files = sorted(Path(folder).glob("*.pdf"))
results = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(extract, str(f), schema_id): f.name
for f in files
}
for future in as_completed(futures):
filename = futures[future]
try:
data = future.result()
results.append({"file": filename, "status": "ok", "data": data})
print(f" OK: {filename}")
except Exception as e:
results.append({"file": filename, "status": "error", "error": str(e)})
print(f" FAIL: {filename}: {e}")
return results
Recommended concurrency: Start with 5 workers and increase based on your rate limits.
With Retries
Add resilience for transient failures:
def extract_with_retry(file_path, schema_id, max_retries=3):
for attempt in range(1, max_retries + 1):
try:
return extract(file_path, schema_id)
except Exception as e:
if attempt == max_retries:
raise
wait = 2 ** attempt
print(f" Retry {attempt}/{max_retries} in {wait}s...")
time.sleep(wait)
JavaScript Batch Processing
import fs from "node:fs";
import path from "node:path";
async function processBatch(directory, schemaId, concurrency = 5) {
const files = fs.readdirSync(directory).filter(f => f.endsWith(".pdf"));
const results = [];
for (let i = 0; i < files.length; i += concurrency) {
const chunk = files.slice(i, i + concurrency);
const settled = await Promise.allSettled(
chunk.map(async (file) => {
const data = await extract(path.join(directory, file), schemaId);
return { file, data };
})
);
for (const result of settled) {
if (result.status === "fulfilled") {
results.push({ ...result.value, status: "ok" });
} else {
results.push({ file: "unknown", status: "error", error: result.reason.message });
}
}
}
return results;
}
Output Formats
Save as JSON
with open("results.json", "w") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
Save as CSV
Flatten the data for spreadsheet use:
import csv
def results_to_csv(results, output_path, row_key="line_items"):
rows = []
for r in results:
if r["status"] != "ok":
continue
data = r["data"]
for item in data.get(row_key, []):
row = {"source_file": r["file"]}
row.update(item)
rows.append(row)
if rows:
with open(output_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
Write to Database
def save_batch_to_db(results, conn):
cur = conn.cursor()
for r in results:
if r["status"] != "ok":
continue
data = r["data"]
cur.execute(
"INSERT INTO documents (filename, vendor, date, total) VALUES (%s, %s, %s, %s)",
(r["file"], data.get("vendor_name"), data.get("date"), data.get("total"))
)
conn.commit()
Monitoring and Reporting
Track progress and generate a summary:
def print_summary(results):
ok = sum(1 for r in results if r["status"] == "ok")
failed = sum(1 for r in results if r["status"] == "error")
total = len(results)
print(f"\n{'='*40}")
print(f"Batch complete: {total} documents")
print(f" Succeeded: {ok}")
print(f" Failed: {failed}")
print(f" Success rate: {ok/total*100:.1f}%")
if failed > 0:
print(f"\nFailed files:")
for r in results:
if r["status"] == "error":
print(f" - {r['file']}: {r['error']}")
Handling Mixed Document Types
If your batch contains different document types, use separate schemas:
schemas = {
"invoice": "schema_id_invoices",
"receipt": "schema_id_receipts",
"contract": "schema_id_contracts",
}
def classify_and_extract(file_path, schemas):
"""Route documents to the right schema based on filename or folder."""
name = Path(file_path).stem.lower()
if "invoice" in name or "inv" in name:
schema_id = schemas["invoice"]
elif "receipt" in name:
schema_id = schemas["receipt"]
elif "contract" in name or "agreement" in name:
schema_id = schemas["contract"]
else:
schema_id = schemas["invoice"] # default
return extract(file_path, schema_id)
Performance Tips
- Start with 5 concurrent workers and adjust based on response times and rate limits
- Process large batches in off-peak hours if possible
- Use retries with exponential backoff — transient failures are normal at scale
- Save intermediate results — Write results to disk periodically so a crash doesn't lose everything
- Log failures for reprocessing — Keep a list of failed files to retry separately
Try It Now
Start with a single document in the Playground to validate your schema, then scale up with the batch processing code above.
For API details and rate limits, see the documentation.
Related articles
How to Automate Invoice Processing with an API
Step-by-step guide to automating invoice data extraction. Extract vendor details, line items, totals, and VAT from invoices into structured JSON using a REST API.
automationAutomating Document Workflows with Smole API
Learn how to build automated document processing pipelines that scale with your business.
pdfHow to Convert PDFs to JSON with an API
A practical guide to converting PDF documents into structured JSON data using a REST API. Covers digital PDFs, scanned documents, and batch processing.
