Validating Supplier Data Payloads with Pydantic Models
Supplier payloads rarely arrive clean. EDI translations, CSV exports, and REST API responses from tier-1 and tier-2 vendors consistently introduce type drift, missing mandatory fields, malformed dates, and precision mismatches in quantities and unit costs. When these anomalies bypass the ingestion layer, they cascade into inventory reconciliation failures, PO matching errors, and downstream WMS allocation blocks. Implementing strict, runtime schema validation at the payload boundary is the only reliable way to quarantine bad data before it touches your core transactional tables.
This guide provides exact implementation patterns for Pydantic v2 models tailored to supply chain data, error extraction workflows, batch integration tuning, and pipeline recovery procedures.
Step 1: Define Strict Base Models for Supplier Feeds
Start by establishing a base configuration that enforces explicit type checking and disables lenient parsing. Supply chain payloads frequently ship strings where decimals or integers are expected (e.g., "150.00" instead of 150.00 for case quantities). Strict mode prevents silent type conversion that corrupts reconciliation math.
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from datetime import date
from decimal import Decimal, ROUND_HALF_UP
from typing import Optional
class SupplierLineItem(BaseModel):
model_config = ConfigDict(strict=True, extra="forbid", validate_default=True)
line_number: int = Field(ge=1, description="Sequential PO/ASN line number")
sku: str = Field(pattern=r"^[A-Z0-9\-]{6,20}$", description="Internal SKU format")
supplier_sku: Optional[str] = Field(default=None, max_length=25)
quantity_ordered: Decimal = Field(ge=0, description="Requested quantity")
quantity_shipped: Optional[Decimal] = Field(default=None, ge=0)
unit_cost: Decimal = Field(ge=0, description="Per-unit cost in base currency")
currency_code: str = Field(pattern=r"^[A-Z]{3}$", description="ISO 4217 currency")
expected_delivery: date
actual_delivery: Optional[date] = None
@field_validator("quantity_ordered", "unit_cost", mode="before")
@classmethod
def quantize_financials(cls, v: Decimal) -> Decimal:
"""Enforce 4 decimal places for quantities, 2 for costs."""
if isinstance(v, (int, float, str)):
v = Decimal(str(v))
if v == v.quantize(Decimal("0.0001")):
return v.quantize(Decimal("0.0001"))
return v.quantize(Decimal("0.01"))
Key configuration choices:
strict=True: Blocks implicit string-to-number or string-to-date conversions that mask upstream formatting errors.extra="forbid": Rejects payloads containing undocumented fields, which frequently indicate version drift in supplier API contracts.validate_default=True: EnsuresOptionalfields still pass validation if explicitly provided asNoneinstead of missing entirely.
For broader architectural context on how these models integrate into broader Ingestion & Parsing Workflows for Supply Chain Data, refer to the framework documentation.
Step 2: Implement Supply Chain–Specific Validators
Standard type checks are insufficient for procurement and logistics data. You must enforce domain rules: valid GTIN/EAN formats, ISO 8601 date parsing with timezone normalization, and cross-field reconciliations. Pydantic v2’s @field_validator and @model_validator decorators allow you to inject business logic directly into the parsing phase.
import re
class SupplierLineItem(SupplierLineItem):
@field_validator("sku")
@classmethod
def validate_gtin_checksum(cls, v: str) -> str:
"""Validate GTIN-12/13/14 checksums using Luhn algorithm."""
if not re.match(r"^\d{12,14}$", v):
raise ValueError("Invalid GTIN format")
digits = [int(d) for d in v[:-1]]
total = sum(d * (3 if i % 2 else 1) for i, d in enumerate(digits))
check_digit = (10 - (total % 10)) % 10
if check_digit != int(v[-1]):
raise ValueError("GTIN checksum mismatch")
return v
@model_validator(mode="after")
def reconcile_quantities(self) -> "SupplierLineItem":
"""Ensure shipped quantities never exceed ordered quantities."""
if self.quantity_shipped is not None and self.quantity_shipped > self.quantity_ordered:
raise ValueError("quantity_shipped cannot exceed quantity_ordered")
return self
GTIN validation prevents master data corruption at the catalog level. Checksum verification must align with GS1 identification standards to ensure downstream ERP and WMS systems accept the identifiers without manual intervention.
Step 3: Error Extraction & Quarantine Routing
When validation fails, Pydantic raises a ValidationError. Catching this exception and parsing its structured output is critical for automated quarantine workflows. Rather than failing the entire batch, extract field-level diagnostics and route malformed records to a dead-letter queue or staging table for manual review.
from pydantic import ValidationError
import json
def parse_validation_error(err: ValidationError, raw_payload: dict) -> dict:
"""Convert Pydantic errors into a structured quarantine record."""
return {
"raw_payload": raw_payload,
"error_count": len(err.errors()),
"failed_fields": [
{
"field": ".".join(str(loc) for loc in e["loc"]),
"error_type": e["type"],
"message": e["msg"],
"input_value": e["input"]
}
for e in err.errors()
],
"timestamp": "2024-01-15T08:30:00Z" # Replace with actual pipeline timestamp
}
# Usage in ingestion loop
raw_payload = { # One row pulled from your supplier feed.
"sku": "ACME-12345",
"quantity": 10,
"unit_price_usd": 4.99,
"gtin": "0614141999996",
}
try:
validated_item = SupplierLineItem.model_validate(raw_payload)
except ValidationError as e:
quarantine_record = parse_validation_error(e, raw_payload)
# Push to Kafka dead-letter topic or S3 quarantine bucket
Detailed patterns for handling these exceptions and mapping them to operational dashboards are documented in the Schema Validation Using Pydantic cluster.
Step 4: Batch Processing & Throughput Optimization
Validating thousands of line items sequentially introduces unacceptable latency in high-volume EDI or API polling scenarios. Pydantic v2 provides TypeAdapter and model_validate_json for bulk operations that bypass per-object instantiation overhead.
from pydantic import TypeAdapter
from typing import List
# Pre-compile adapter for bulk validation
LineItemBatch = TypeAdapter(List[SupplierLineItem])
def validate_batch(json_payload: bytes) -> tuple[List[SupplierLineItem], List[dict]]:
"""Validate a JSON array of line items in a single pass."""
valid_items = []
quarantined = []
# Fast JSON parsing + validation
try:
items = LineItemBatch.validate_json(json_payload)
valid_items.extend(items)
except ValidationError as e:
# Fallback: parse individually to isolate failures
raw_list = json.loads(json_payload)
for idx, item in enumerate(raw_list):
try:
valid_items.append(SupplierLineItem.model_validate(item))
except ValidationError as ve:
quarantined.append(parse_validation_error(ve, item))
return valid_items, quarantined
Financial precision alignment follows Python decimal arithmetic guidelines to avoid floating-point drift in cost calculations. When processing multi-gigabyte ASN feeds, chunk payloads into 10,000-record batches and apply model_validate_json to leverage Pydantic’s Rust-backed parser.
Step 5: Pipeline Recovery & Idempotent Reconciliation
Partial validation failures require deterministic recovery mechanisms. Implement idempotent retry logic using supplier reference IDs or cryptographic payload hashes. If a transient network timeout interrupts a batch, the pipeline must safely reprocess without duplicating inventory allocations.
- Hash-Based Deduplication: Generate a SHA-256 hash of the raw payload before validation. Store the hash in a Redis cache or PostgreSQL unique index. Reject re-submissions with identical hashes unless explicitly flagged as corrections.
- Stateful Quarantine Tables: Persist quarantined records with a
statuscolumn (PENDING_REVIEW,CORRECTED,REJECTED). Allow procurement ops to patch fields via an admin UI, then re-run validation against the corrected payload. - Audit Trail Generation: Log every validation pass/fail with the exact schema version used. When suppliers update their API contracts, version drift becomes immediately traceable to specific model configurations.
By enforcing strict boundary validation, extracting structured error diagnostics, and implementing idempotent recovery loops, your ingestion pipeline will isolate data anomalies before they propagate to financial or inventory systems. This approach reduces manual reconciliation overhead and ensures consistent, auditable supplier data flows across tier-1 and tier-2 networks.