Implementing Asyncio for Concurrent Batch Ingestion
High-volume supply chain feeds—purchase order acknowledgments, ASN manifests, and multi-warehouse inventory snapshots—routinely exceed synchronous ingestion thresholds. When procurement ops teams face 500+ concurrent supplier endpoints or logistics engineers must reconcile stock levels within a 15-minute SLA, blocking I/O becomes a hard bottleneck. This guide details how to implement asyncio for concurrent batch ingestion, focusing on exact code patterns, connection pool tuning, and deterministic pipeline recovery. For broader architectural context, refer to the foundational Ingestion & Parsing Workflows for Supply Chain Data documentation before deploying these patterns into production ETL pipelines.
1. Core Architecture: Event Loop & Connection Pooling
Before writing ingestion logic, establish a controlled execution environment. Supply chain APIs and legacy EDI gateways behave unpredictably under load. We use aiohttp for HTTP-based feeds, paired with a strict semaphore to prevent connection exhaustion and a bounded TCP connector to manage socket reuse. Proper connector configuration prevents ClientOSError spikes during peak procurement windows. See the official aiohttp client advanced configuration for detailed socket lifecycle management.
import asyncio
import aiohttp
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger("supply_chain_etl")
@dataclass
class FeedConfig:
max_concurrency: int = 50
timeout_seconds: float = 30.0
batch_size: int = 100
retry_attempts: int = 3
backoff_base: float = 1.5
class AsyncBatchIngestor:
def __init__(self, config: FeedConfig):
self.config = config
self.semaphore = asyncio.Semaphore(config.max_concurrency)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
timeout = aiohttp.ClientTimeout(total=self.config.timeout_seconds)
connector = aiohttp.TCPConnector(
limit=self.config.max_concurrency,
limit_per_host=10,
enable_cleanup_closed=True,
force_close=False
)
self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
2. Batch Chunking & Concurrent Dispatch
Raw supplier payloads rarely arrive in optimal sizes. Chunking prevents memory spikes and aligns with downstream schema validation. Use asyncio.gather with return_exceptions=True to maintain pipeline continuity when individual endpoints fail or return malformed payloads. This approach is critical when integrating with Async Batch Processing for High-Volume Feeds to guarantee idempotent downstream writes.
class AsyncBatchIngestor:
# Continued from above — constructor and __aenter__/__aexit__ omitted for brevity.
async def fetch_feed_batch(self, urls: List[str]) -> List[Dict[str, Any]]:
chunks = [urls[i:i + self.config.batch_size] for i in range(0, len(urls), self.config.batch_size)]
all_successful: List[Dict[str, Any]] = []
for chunk in chunks:
tasks = [self._fetch_single(url) for url in chunk]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Batch ingestion failed for {chunk[i]}: {result}")
continue
if result is not None:
all_successful.append(result)
return all_successful
async def _fetch_single(self, url: str) -> Optional[Dict[str, Any]]:
async with self.semaphore:
for attempt in range(self.config.retry_attempts):
try:
async with self.session.get(url) as response:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
wait_time = self.config.backoff_base ** attempt + (hash(url) % 1000) / 1000
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying in {wait_time:.2f}s")
await asyncio.sleep(wait_time)
logger.critical(f"Exhausted retries for {url}")
return None
3. Deterministic Recovery & Debugging Protocols
Asyncio introduces non-deterministic execution ordering that complicates traditional debugging. Supply chain ETL pipelines require strict observability to isolate connection leaks, semaphore deadlocks, and payload drift. Implement the following debugging workflow when pipeline throughput degrades or error rates exceed 2%:
- Event Loop Blocking Detection: Wrap long-running synchronous operations (e.g., legacy XML parsing) in
loop.run_in_executor(None, sync_func). Blocking the main thread starves the I/O multiplexer, causing cascading timeouts. - Connection Pool Exhaustion: Monitor
connector._acquiredandconnector._acquired_per_host. Iflimit_per_hostis lower than the supplier’s actual DNS-resolved IP count, requests will queue indefinitely. Adjustlimit_per_hostto match known endpoint distribution. - Unclosed Response Leaks: Always use
async with self.session.get(...) as response:. Failing to consume or close the response body leaves sockets inCLOSE_WAIT, eventually triggeringOSError: [Errno 24] Too many open files. - Trace Correlation: Inject a
request_idinto headers and propagate it throughasyncio.current_task().get_name(). This enables deterministic log aggregation across concurrent coroutines. Reference the Python asyncio task documentation for coroutine introspection techniques. - Memory Profiling: Use
tracemallocorobjgraphduring staging. Large JSON payloads held in memory acrossasyncio.gathercan trigger OOM kills. Stream responses to disk or process incrementally usingresponse.content.iter_chunked(8192)when payloads exceed 50MB.