Back to blog
pythontutorialapiextraction

Extract Structured Data from Documents with Python

February 22, 2026Smole Team

Extract Structured Data from Documents with Python

Python is the go-to language for data processing, automation, and backend services. If you're building a document processing pipeline in Python — extracting invoices, parsing contracts, digitizing scanned records — here's how to do it with Smole's REST API.

This guide covers everything from a basic single-file extraction to a production-ready batch processing script.

Setup

Install the requests library if you don't have it:

pip install requests

Set your API key:

import requests

API_BASE = "https://api.smole.tech/api"
API_KEY = "your_api_key_here"  # from https://smole.tech/account/api-keys

headers = {
    "Authorization": f"Bearer {API_KEY}"
}

Step 1: Create a Schema

Define the data structure you want to extract. This example uses an invoice schema:

schema_payload = {
    "name": "invoice-schema",
    "schema": {
        "type": "object",
        "properties": {
            "vendor": {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "vat_id": {"type": "string"}
                }
            },
            "invoice_number": {"type": "string"},
            "date": {"type": "string", "format": "date"},
            "due_date": {"type": "string", "format": "date"},
            "line_items": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "description": {"type": "string"},
                        "quantity": {"type": "number"},
                        "unit_price": {"type": "number"},
                        "total": {"type": "number"}
                    }
                }
            },
            "subtotal": {"type": "number"},
            "tax_amount": {"type": "number"},
            "total": {"type": "number"},
            "currency": {"type": "string"}
        }
    }
}

resp = requests.post(
    f"{API_BASE}/schemas",
    headers=headers,
    json=schema_payload
)
schema = resp.json()
schema_id = schema["id"]
print(f"Schema created: {schema_id}")

You only need to create a schema once — reuse the same schema_id for every document of that type.

Step 2: Extract a Document

Upload a file and run it through the pipeline:

def extract_document(file_path: str, schema_id: str) -> dict:
    """Upload a document and extract structured data."""
    with open(file_path, "rb") as f:
        resp = requests.post(
            f"{API_BASE}/pipeline/file",
            headers=headers,
            files={"file": f},
            data={"schemaId": schema_id}
        )
    resp.raise_for_status()
    return resp.json()

result = extract_document("invoice.pdf", schema_id)
pipeline_id = result["id"]
print(f"Pipeline started: {pipeline_id}")

Step 3: Get the Results

The pipeline runs asynchronously. Poll for the result:

import time

def wait_for_result(pipeline_id: str, timeout: int = 120) -> dict:
    """Poll until the pipeline completes or times out."""
    start = time.time()

    while time.time() - start < timeout:
        resp = requests.get(
            f"{API_BASE}/pipeline/{pipeline_id}",
            headers=headers
        )
        resp.raise_for_status()
        data = resp.json()

        status = data.get("status")
        if status == "completed":
            return data
        elif status == "failed":
            raise Exception(f"Pipeline failed: {data.get('error')}")

        time.sleep(2)

    raise TimeoutError(f"Pipeline {pipeline_id} did not complete within {timeout}s")

result = wait_for_result(pipeline_id)
extracted = result["extraction"]["data"]
print(extracted)

Example output:

{
    "vendor": {
        "name": "Bürotechnik Schmidt GmbH",
        "vat_id": "DE198374562"
    },
    "invoice_number": "BS-2025-4210",
    "date": "2025-12-01",
    "due_date": "2025-12-31",
    "line_items": [
        {"description": "Office Chair Ergonomic", "quantity": 5, "unit_price": 349.00, "total": 1745.00},
        {"description": "Standing Desk 160cm", "quantity": 3, "unit_price": 599.00, "total": 1797.00},
        {"description": "Monitor Arm Dual", "quantity": 5, "unit_price": 89.00, "total": 445.00}
    ],
    "subtotal": 3987.00,
    "tax_amount": 757.53,
    "total": 4744.53,
    "currency": "EUR"
}

Complete Single-File Script

Putting it all together:

import requests
import time

API_BASE = "https://api.smole.tech/api"
API_KEY = "your_api_key_here"
headers = {"Authorization": f"Bearer {API_KEY}"}


def extract(file_path: str, schema_id: str, timeout: int = 120) -> dict:
    """Extract structured data from a document."""

    # Upload and start pipeline
    with open(file_path, "rb") as f:
        resp = requests.post(
            f"{API_BASE}/pipeline/file",
            headers=headers,
            files={"file": f},
            data={"schemaId": schema_id}
        )
    resp.raise_for_status()
    pipeline_id = resp.json()["id"]

    # Poll for result
    start = time.time()
    while time.time() - start < timeout:
        resp = requests.get(f"{API_BASE}/pipeline/{pipeline_id}", headers=headers)
        resp.raise_for_status()
        data = resp.json()

        if data["status"] == "completed":
            return data["extraction"]["data"]
        elif data["status"] == "failed":
            raise Exception(f"Extraction failed: {data.get('error')}")

        time.sleep(2)

    raise TimeoutError(f"Timed out after {timeout}s")


# Usage
result = extract("invoice.pdf", "your_schema_id")
print(result["vendor"]["name"])       # "Bürotechnik Schmidt GmbH"
print(result["total"])                 # 4744.53
print(len(result["line_items"]))       # 3

Batch Processing

Process a folder of documents:

from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import json


def process_folder(folder: str, schema_id: str, max_workers: int = 5) -> list[dict]:
    """Process all PDFs in a folder concurrently."""
    files = list(Path(folder).glob("*.pdf"))
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {
            pool.submit(extract, str(f), schema_id): f.name
            for f in files
        }

        for future in as_completed(futures):
            filename = futures[future]
            try:
                data = future.result()
                results.append({"file": filename, "data": data})
                print(f"  OK: {filename}")
            except Exception as e:
                results.append({"file": filename, "error": str(e)})
                print(f"  FAIL: {filename} — {e}")

    return results


# Process all invoices in a folder
results = process_folder("./invoices/", "your_schema_id", max_workers=5)

# Save results
with open("extracted.json", "w") as f:
    json.dump(results, f, indent=2)

print(f"Processed {len(results)} documents")

This processes 5 documents in parallel. Adjust max_workers based on your needs and rate limits.

Saving Results to CSV

For spreadsheet workflows:

import csv


def results_to_csv(results: list[dict], output_path: str):
    """Flatten extracted invoice data to CSV."""
    rows = []

    for r in results:
        if "error" in r:
            continue
        data = r["data"]
        for item in data.get("line_items", []):
            rows.append({
                "file": r["file"],
                "vendor": data.get("vendor", {}).get("name", ""),
                "invoice_number": data.get("invoice_number", ""),
                "date": data.get("date", ""),
                "item_description": item.get("description", ""),
                "quantity": item.get("quantity", 0),
                "unit_price": item.get("unit_price", 0),
                "item_total": item.get("total", 0),
                "invoice_total": data.get("total", 0),
            })

    if not rows:
        return

    with open(output_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=rows[0].keys())
        writer.writeheader()
        writer.writerows(rows)


results_to_csv(results, "invoices.csv")

Saving Results to a Database

For production pipelines, write directly to PostgreSQL:

import psycopg2


def save_to_db(data: dict, filename: str):
    """Insert extracted invoice into PostgreSQL."""
    conn = psycopg2.connect("postgresql://user:pass@localhost/invoices")
    cur = conn.cursor()

    cur.execute("""
        INSERT INTO invoices (
            filename, vendor_name, invoice_number,
            date, due_date, subtotal, tax_amount, total, currency
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        RETURNING id
    """, (
        filename,
        data.get("vendor", {}).get("name"),
        data.get("invoice_number"),
        data.get("date"),
        data.get("due_date"),
        data.get("subtotal"),
        data.get("tax_amount"),
        data.get("total"),
        data.get("currency"),
    ))
    invoice_id = cur.fetchone()[0]

    for item in data.get("line_items", []):
        cur.execute("""
            INSERT INTO line_items (
                invoice_id, description, quantity, unit_price, total
            ) VALUES (%s, %s, %s, %s, %s)
        """, (
            invoice_id,
            item.get("description"),
            item.get("quantity"),
            item.get("unit_price"),
            item.get("total"),
        ))

    conn.commit()
    cur.close()
    conn.close()

    return invoice_id

Handling Scanned Documents

Scanned PDFs and images work the same way — just upload them. Smole detects scanned documents automatically and runs OCR before extraction:

# Works with scanned PDFs, photos, and images
result = extract("scanned_invoice.pdf", schema_id)
result = extract("photo_of_receipt.jpg", schema_id)
result = extract("faxed_contract.tiff", schema_id)

No extra configuration needed. The same schema, the same code.

Error Handling

A production-ready wrapper with retries:

def extract_with_retries(
    file_path: str,
    schema_id: str,
    max_retries: int = 3
) -> dict:
    """Extract with automatic retries on transient failures."""
    last_error = None

    for attempt in range(1, max_retries + 1):
        try:
            return extract(file_path, schema_id)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code in (429, 502, 503):
                wait = 2 ** attempt
                print(f"  Retry {attempt}/{max_retries} in {wait}s...")
                time.sleep(wait)
                last_error = e
            else:
                raise
        except TimeoutError as e:
            last_error = e
            print(f"  Timeout, retry {attempt}/{max_retries}...")

    raise last_error

Different Document Types

The code stays the same — only the schema changes. Create separate schemas for each document type:

schemas = {
    "invoice": "schema_id_for_invoices",
    "contract": "schema_id_for_contracts",
    "receipt": "schema_id_for_receipts",
    "report": "schema_id_for_reports",
}

# Extract based on document type
result = extract("q4-report.pdf", schemas["report"])

Try It Now

Test your extraction in the Playground before writing code — upload a document, define a schema, and see the JSON output. Then use the code above to integrate into your Python project.

For the full API reference, see the documentation.