Extract Structured Data from Documents with Python
Extract Structured Data from Documents with Python
Python is the go-to language for data processing, automation, and backend services. If you're building a document processing pipeline in Python — extracting invoices, parsing contracts, digitizing scanned records — here's how to do it with Smole's REST API.
This guide covers everything from a basic single-file extraction to a production-ready batch processing script.
Setup
Install the requests library if you don't have it:
pip install requests
Set your API key:
import requests
API_BASE = "https://api.smole.tech/api"
API_KEY = "your_api_key_here" # from https://smole.tech/account/api-keys
headers = {
"Authorization": f"Bearer {API_KEY}"
}
Step 1: Create a Schema
Define the data structure you want to extract. This example uses an invoice schema:
schema_payload = {
"name": "invoice-schema",
"schema": {
"type": "object",
"properties": {
"vendor": {
"type": "object",
"properties": {
"name": {"type": "string"},
"vat_id": {"type": "string"}
}
},
"invoice_number": {"type": "string"},
"date": {"type": "string", "format": "date"},
"due_date": {"type": "string", "format": "date"},
"line_items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"description": {"type": "string"},
"quantity": {"type": "number"},
"unit_price": {"type": "number"},
"total": {"type": "number"}
}
}
},
"subtotal": {"type": "number"},
"tax_amount": {"type": "number"},
"total": {"type": "number"},
"currency": {"type": "string"}
}
}
}
resp = requests.post(
f"{API_BASE}/schemas",
headers=headers,
json=schema_payload
)
schema = resp.json()
schema_id = schema["id"]
print(f"Schema created: {schema_id}")
You only need to create a schema once — reuse the same schema_id for every document of that type.
Step 2: Extract a Document
Upload a file and run it through the pipeline:
def extract_document(file_path: str, schema_id: str) -> dict:
"""Upload a document and extract structured data."""
with open(file_path, "rb") as f:
resp = requests.post(
f"{API_BASE}/pipeline/file",
headers=headers,
files={"file": f},
data={"schemaId": schema_id}
)
resp.raise_for_status()
return resp.json()
result = extract_document("invoice.pdf", schema_id)
pipeline_id = result["id"]
print(f"Pipeline started: {pipeline_id}")
Step 3: Get the Results
The pipeline runs asynchronously. Poll for the result:
import time
def wait_for_result(pipeline_id: str, timeout: int = 120) -> dict:
"""Poll until the pipeline completes or times out."""
start = time.time()
while time.time() - start < timeout:
resp = requests.get(
f"{API_BASE}/pipeline/{pipeline_id}",
headers=headers
)
resp.raise_for_status()
data = resp.json()
status = data.get("status")
if status == "completed":
return data
elif status == "failed":
raise Exception(f"Pipeline failed: {data.get('error')}")
time.sleep(2)
raise TimeoutError(f"Pipeline {pipeline_id} did not complete within {timeout}s")
result = wait_for_result(pipeline_id)
extracted = result["extraction"]["data"]
print(extracted)
Example output:
{
"vendor": {
"name": "Bürotechnik Schmidt GmbH",
"vat_id": "DE198374562"
},
"invoice_number": "BS-2025-4210",
"date": "2025-12-01",
"due_date": "2025-12-31",
"line_items": [
{"description": "Office Chair Ergonomic", "quantity": 5, "unit_price": 349.00, "total": 1745.00},
{"description": "Standing Desk 160cm", "quantity": 3, "unit_price": 599.00, "total": 1797.00},
{"description": "Monitor Arm Dual", "quantity": 5, "unit_price": 89.00, "total": 445.00}
],
"subtotal": 3987.00,
"tax_amount": 757.53,
"total": 4744.53,
"currency": "EUR"
}
Complete Single-File Script
Putting it all together:
import requests
import time
API_BASE = "https://api.smole.tech/api"
API_KEY = "your_api_key_here"
headers = {"Authorization": f"Bearer {API_KEY}"}
def extract(file_path: str, schema_id: str, timeout: int = 120) -> dict:
"""Extract structured data from a document."""
# Upload and start pipeline
with open(file_path, "rb") as f:
resp = requests.post(
f"{API_BASE}/pipeline/file",
headers=headers,
files={"file": f},
data={"schemaId": schema_id}
)
resp.raise_for_status()
pipeline_id = resp.json()["id"]
# Poll for result
start = time.time()
while time.time() - start < timeout:
resp = requests.get(f"{API_BASE}/pipeline/{pipeline_id}", headers=headers)
resp.raise_for_status()
data = resp.json()
if data["status"] == "completed":
return data["extraction"]["data"]
elif data["status"] == "failed":
raise Exception(f"Extraction failed: {data.get('error')}")
time.sleep(2)
raise TimeoutError(f"Timed out after {timeout}s")
# Usage
result = extract("invoice.pdf", "your_schema_id")
print(result["vendor"]["name"]) # "Bürotechnik Schmidt GmbH"
print(result["total"]) # 4744.53
print(len(result["line_items"])) # 3
Batch Processing
Process a folder of documents:
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
def process_folder(folder: str, schema_id: str, max_workers: int = 5) -> list[dict]:
"""Process all PDFs in a folder concurrently."""
files = list(Path(folder).glob("*.pdf"))
results = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(extract, str(f), schema_id): f.name
for f in files
}
for future in as_completed(futures):
filename = futures[future]
try:
data = future.result()
results.append({"file": filename, "data": data})
print(f" OK: {filename}")
except Exception as e:
results.append({"file": filename, "error": str(e)})
print(f" FAIL: {filename} — {e}")
return results
# Process all invoices in a folder
results = process_folder("./invoices/", "your_schema_id", max_workers=5)
# Save results
with open("extracted.json", "w") as f:
json.dump(results, f, indent=2)
print(f"Processed {len(results)} documents")
This processes 5 documents in parallel. Adjust max_workers based on your needs and rate limits.
Saving Results to CSV
For spreadsheet workflows:
import csv
def results_to_csv(results: list[dict], output_path: str):
"""Flatten extracted invoice data to CSV."""
rows = []
for r in results:
if "error" in r:
continue
data = r["data"]
for item in data.get("line_items", []):
rows.append({
"file": r["file"],
"vendor": data.get("vendor", {}).get("name", ""),
"invoice_number": data.get("invoice_number", ""),
"date": data.get("date", ""),
"item_description": item.get("description", ""),
"quantity": item.get("quantity", 0),
"unit_price": item.get("unit_price", 0),
"item_total": item.get("total", 0),
"invoice_total": data.get("total", 0),
})
if not rows:
return
with open(output_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
results_to_csv(results, "invoices.csv")
Saving Results to a Database
For production pipelines, write directly to PostgreSQL:
import psycopg2
def save_to_db(data: dict, filename: str):
"""Insert extracted invoice into PostgreSQL."""
conn = psycopg2.connect("postgresql://user:pass@localhost/invoices")
cur = conn.cursor()
cur.execute("""
INSERT INTO invoices (
filename, vendor_name, invoice_number,
date, due_date, subtotal, tax_amount, total, currency
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
filename,
data.get("vendor", {}).get("name"),
data.get("invoice_number"),
data.get("date"),
data.get("due_date"),
data.get("subtotal"),
data.get("tax_amount"),
data.get("total"),
data.get("currency"),
))
invoice_id = cur.fetchone()[0]
for item in data.get("line_items", []):
cur.execute("""
INSERT INTO line_items (
invoice_id, description, quantity, unit_price, total
) VALUES (%s, %s, %s, %s, %s)
""", (
invoice_id,
item.get("description"),
item.get("quantity"),
item.get("unit_price"),
item.get("total"),
))
conn.commit()
cur.close()
conn.close()
return invoice_id
Handling Scanned Documents
Scanned PDFs and images work the same way — just upload them. Smole detects scanned documents automatically and runs OCR before extraction:
# Works with scanned PDFs, photos, and images
result = extract("scanned_invoice.pdf", schema_id)
result = extract("photo_of_receipt.jpg", schema_id)
result = extract("faxed_contract.tiff", schema_id)
No extra configuration needed. The same schema, the same code.
Error Handling
A production-ready wrapper with retries:
def extract_with_retries(
file_path: str,
schema_id: str,
max_retries: int = 3
) -> dict:
"""Extract with automatic retries on transient failures."""
last_error = None
for attempt in range(1, max_retries + 1):
try:
return extract(file_path, schema_id)
except requests.exceptions.HTTPError as e:
if e.response.status_code in (429, 502, 503):
wait = 2 ** attempt
print(f" Retry {attempt}/{max_retries} in {wait}s...")
time.sleep(wait)
last_error = e
else:
raise
except TimeoutError as e:
last_error = e
print(f" Timeout, retry {attempt}/{max_retries}...")
raise last_error
Different Document Types
The code stays the same — only the schema changes. Create separate schemas for each document type:
schemas = {
"invoice": "schema_id_for_invoices",
"contract": "schema_id_for_contracts",
"receipt": "schema_id_for_receipts",
"report": "schema_id_for_reports",
}
# Extract based on document type
result = extract("q4-report.pdf", schemas["report"])
Try It Now
Test your extraction in the Playground before writing code — upload a document, define a schema, and see the JSON output. Then use the code above to integrate into your Python project.
For the full API reference, see the documentation.
Related articles
How to Convert PDFs to JSON with an API
A practical guide to converting PDF documents into structured JSON data using a REST API. Covers digital PDFs, scanned documents, and batch processing.
pdfHow to Extract Tables from PDFs into Structured Data
Extract tables from PDF documents into structured JSON or CSV. Handle multi-column layouts, merged cells, and inconsistent formatting with schema-based extraction.
ocrHow to Extract Data from Scanned Documents
Learn how to extract structured data from scanned PDFs, photographed documents, and image-based files using OCR and schema-based extraction.
