Implementing Asyncio for Concurrent Batch Ingestion

High-volume supply chain feeds—purchase order acknowledgments, ASN manifests, and multi-warehouse inventory snapshots—routinely exceed synchronous ingestion thresholds. When procurement ops teams face 500+ concurrent supplier endpoints or logistics engineers must reconcile stock levels within a 15-minute SLA, blocking I/O becomes a hard bottleneck. This guide details how to implement asyncio for concurrent batch ingestion, focusing on exact code patterns, connection pool tuning, and deterministic pipeline recovery. For broader architectural context, refer to the foundational Ingestion & Parsing Workflows for Supply Chain Data documentation before deploying these patterns into production ETL pipelines.

1. Core Architecture: Event Loop & Connection Pooling

Before writing ingestion logic, establish a controlled execution environment. Supply chain APIs and legacy EDI gateways behave unpredictably under load. We use aiohttp for HTTP-based feeds, paired with a strict semaphore to prevent connection exhaustion and a bounded TCP connector to manage socket reuse. Proper connector configuration prevents ClientOSError spikes during peak procurement windows. See the official aiohttp client advanced configuration for detailed socket lifecycle management.

PYTHON
import asyncio
import aiohttp
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger("supply_chain_etl")

@dataclass
class FeedConfig:
    max_concurrency: int = 50
    timeout_seconds: float = 30.0
    batch_size: int = 100
    retry_attempts: int = 3
    backoff_base: float = 1.5

class AsyncBatchIngestor:
    def __init__(self, config: FeedConfig):
        self.config = config
        self.semaphore = asyncio.Semaphore(config.max_concurrency)
        self.session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self):
        timeout = aiohttp.ClientTimeout(total=self.config.timeout_seconds)
        connector = aiohttp.TCPConnector(
            limit=self.config.max_concurrency,
            limit_per_host=10,
            enable_cleanup_closed=True,
            force_close=False
        )
        self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

2. Batch Chunking & Concurrent Dispatch

Raw supplier payloads rarely arrive in optimal sizes. Chunking prevents memory spikes and aligns with downstream schema validation. Use asyncio.gather with return_exceptions=True to maintain pipeline continuity when individual endpoints fail or return malformed payloads. This approach is critical when integrating with Async Batch Processing for High-Volume Feeds to guarantee idempotent downstream writes.

PYTHON
class AsyncBatchIngestor:
    # Continued from above — constructor and __aenter__/__aexit__ omitted for brevity.

    async def fetch_feed_batch(self, urls: List[str]) -> List[Dict[str, Any]]:
        chunks = [urls[i:i + self.config.batch_size] for i in range(0, len(urls), self.config.batch_size)]
        all_successful: List[Dict[str, Any]] = []

        for chunk in chunks:
            tasks = [self._fetch_single(url) for url in chunk]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    logger.error(f"Batch ingestion failed for {chunk[i]}: {result}")
                    continue
                if result is not None:
                    all_successful.append(result)
        return all_successful

    async def _fetch_single(self, url: str) -> Optional[Dict[str, Any]]:
        async with self.semaphore:
            for attempt in range(self.config.retry_attempts):
                try:
                    async with self.session.get(url) as response:
                        response.raise_for_status()
                        return await response.json()
                except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                    wait_time = self.config.backoff_base ** attempt + (hash(url) % 1000) / 1000
                    logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying in {wait_time:.2f}s")
                    await asyncio.sleep(wait_time)
            logger.critical(f"Exhausted retries for {url}")
            return None

3. Deterministic Recovery & Debugging Protocols

Asyncio introduces non-deterministic execution ordering that complicates traditional debugging. Supply chain ETL pipelines require strict observability to isolate connection leaks, semaphore deadlocks, and payload drift. Implement the following debugging workflow when pipeline throughput degrades or error rates exceed 2%:

  1. Event Loop Blocking Detection: Wrap long-running synchronous operations (e.g., legacy XML parsing) in loop.run_in_executor(None, sync_func). Blocking the main thread starves the I/O multiplexer, causing cascading timeouts.
  2. Connection Pool Exhaustion: Monitor connector._acquired and connector._acquired_per_host. If limit_per_host is lower than the supplier’s actual DNS-resolved IP count, requests will queue indefinitely. Adjust limit_per_host to match known endpoint distribution.
  3. Unclosed Response Leaks: Always use async with self.session.get(...) as response:. Failing to consume or close the response body leaves sockets in CLOSE_WAIT, eventually triggering OSError: [Errno 24] Too many open files.
  4. Trace Correlation: Inject a request_id into headers and propagate it through asyncio.current_task().get_name(). This enables deterministic log aggregation across concurrent coroutines. Reference the Python asyncio task documentation for coroutine introspection techniques.
  5. Memory Profiling: Use tracemalloc or objgraph during staging. Large JSON payloads held in memory across asyncio.gather can trigger OOM kills. Stream responses to disk or process incrementally using response.content.iter_chunked(8192) when payloads exceed 50MB.