Back to blog
batchapiautomationscale

Batch Document Processing: Process Hundreds of Files via API

February 8, 2026Smole Team

Batch Document Processing: Process Hundreds of Files via API

Single-document extraction is useful. But the real value unlocks when you can process hundreds or thousands of documents through the same pipeline — a quarter's worth of invoices, a folder of contracts, an archive of forms.

This guide covers how to set up batch document processing with Smole's API, including parallel execution, error handling, and result collection.

Architecture of a Batch Pipeline

Document folder
    │
    ├── file_001.pdf
    ├── file_002.pdf
    ├── ...
    └── file_500.pdf
         │
         ▼
    Upload (parallel)
         │
         ▼
    Pipeline processing
         │
         ▼
    Result collection
         │
         ▼
    Output (JSON, CSV, database)

Each document is processed independently. A failure in one doesn't affect the others.

Python Batch Processing

Basic Sequential Processing

Start simple — process one file at a time:

import requests
import time
import json
from pathlib import Path

API_BASE = "https://api.smole.tech/api"
headers = {"Authorization": f"Bearer {API_KEY}"}

def extract(file_path, schema_id, timeout=120):
    with open(file_path, "rb") as f:
        resp = requests.post(
            f"{API_BASE}/pipeline/file",
            headers=headers,
            files={"file": f},
            data={"schemaId": schema_id}
        )
    resp.raise_for_status()
    pipeline_id = resp.json()["id"]

    start = time.time()
    while time.time() - start < timeout:
        resp = requests.get(f"{API_BASE}/pipeline/{pipeline_id}", headers=headers)
        data = resp.json()
        if data["status"] == "completed":
            return data["extraction"]["data"]
        if data["status"] == "failed":
            raise Exception(f"Failed: {data.get('error')}")
        time.sleep(2)

    raise TimeoutError("Extraction timed out")

# Process all PDFs in a folder
results = []
for pdf in sorted(Path("./documents").glob("*.pdf")):
    try:
        data = extract(str(pdf), schema_id)
        results.append({"file": pdf.name, "status": "ok", "data": data})
        print(f"  OK: {pdf.name}")
    except Exception as e:
        results.append({"file": pdf.name, "status": "error", "error": str(e)})
        print(f"  FAIL: {pdf.name}: {e}")

with open("results.json", "w") as f:
    json.dump(results, f, indent=2)

Parallel Processing with ThreadPoolExecutor

Process multiple documents simultaneously:

from concurrent.futures import ThreadPoolExecutor, as_completed

def process_batch(folder, schema_id, max_workers=5):
    files = sorted(Path(folder).glob("*.pdf"))
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {
            pool.submit(extract, str(f), schema_id): f.name
            for f in files
        }

        for future in as_completed(futures):
            filename = futures[future]
            try:
                data = future.result()
                results.append({"file": filename, "status": "ok", "data": data})
                print(f"  OK: {filename}")
            except Exception as e:
                results.append({"file": filename, "status": "error", "error": str(e)})
                print(f"  FAIL: {filename}: {e}")

    return results

Recommended concurrency: Start with 5 workers and increase based on your rate limits.

With Retries

Add resilience for transient failures:

def extract_with_retry(file_path, schema_id, max_retries=3):
    for attempt in range(1, max_retries + 1):
        try:
            return extract(file_path, schema_id)
        except Exception as e:
            if attempt == max_retries:
                raise
            wait = 2 ** attempt
            print(f"    Retry {attempt}/{max_retries} in {wait}s...")
            time.sleep(wait)

JavaScript Batch Processing

import fs from "node:fs";
import path from "node:path";

async function processBatch(directory, schemaId, concurrency = 5) {
  const files = fs.readdirSync(directory).filter(f => f.endsWith(".pdf"));
  const results = [];

  for (let i = 0; i < files.length; i += concurrency) {
    const chunk = files.slice(i, i + concurrency);
    const settled = await Promise.allSettled(
      chunk.map(async (file) => {
        const data = await extract(path.join(directory, file), schemaId);
        return { file, data };
      })
    );

    for (const result of settled) {
      if (result.status === "fulfilled") {
        results.push({ ...result.value, status: "ok" });
      } else {
        results.push({ file: "unknown", status: "error", error: result.reason.message });
      }
    }
  }

  return results;
}

Output Formats

Save as JSON

with open("results.json", "w") as f:
    json.dump(results, f, indent=2, ensure_ascii=False)

Save as CSV

Flatten the data for spreadsheet use:

import csv

def results_to_csv(results, output_path, row_key="line_items"):
    rows = []
    for r in results:
        if r["status"] != "ok":
            continue
        data = r["data"]
        for item in data.get(row_key, []):
            row = {"source_file": r["file"]}
            row.update(item)
            rows.append(row)

    if rows:
        with open(output_path, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=rows[0].keys())
            writer.writeheader()
            writer.writerows(rows)

Write to Database

def save_batch_to_db(results, conn):
    cur = conn.cursor()
    for r in results:
        if r["status"] != "ok":
            continue
        data = r["data"]
        cur.execute(
            "INSERT INTO documents (filename, vendor, date, total) VALUES (%s, %s, %s, %s)",
            (r["file"], data.get("vendor_name"), data.get("date"), data.get("total"))
        )
    conn.commit()

Monitoring and Reporting

Track progress and generate a summary:

def print_summary(results):
    ok = sum(1 for r in results if r["status"] == "ok")
    failed = sum(1 for r in results if r["status"] == "error")
    total = len(results)

    print(f"\n{'='*40}")
    print(f"Batch complete: {total} documents")
    print(f"  Succeeded: {ok}")
    print(f"  Failed:    {failed}")
    print(f"  Success rate: {ok/total*100:.1f}%")

    if failed > 0:
        print(f"\nFailed files:")
        for r in results:
            if r["status"] == "error":
                print(f"  - {r['file']}: {r['error']}")

Handling Mixed Document Types

If your batch contains different document types, use separate schemas:

schemas = {
    "invoice": "schema_id_invoices",
    "receipt": "schema_id_receipts",
    "contract": "schema_id_contracts",
}

def classify_and_extract(file_path, schemas):
    """Route documents to the right schema based on filename or folder."""
    name = Path(file_path).stem.lower()

    if "invoice" in name or "inv" in name:
        schema_id = schemas["invoice"]
    elif "receipt" in name:
        schema_id = schemas["receipt"]
    elif "contract" in name or "agreement" in name:
        schema_id = schemas["contract"]
    else:
        schema_id = schemas["invoice"]  # default

    return extract(file_path, schema_id)

Performance Tips

  1. Start with 5 concurrent workers and adjust based on response times and rate limits
  2. Process large batches in off-peak hours if possible
  3. Use retries with exponential backoff — transient failures are normal at scale
  4. Save intermediate results — Write results to disk periodically so a crash doesn't lose everything
  5. Log failures for reprocessing — Keep a list of failed files to retry separately

Try It Now

Start with a single document in the Playground to validate your schema, then scale up with the batch processing code above.

For API details and rate limits, see the documentation.