etl-pycache
A robust, persistent, disk-backed and S3-enabled LRU cache designed specifically for Data Engineering (ETL) pipelines.
⚠️ The Problem
Data pipelines frequently make expensive API calls, run heavy transformations, and handle large payloads (like heavy Strings containing XML). If the final load step to a destination warehouse fails due to a network timeout or credential issue, pipelines typically have to start from scratch. This wastes compute resources, consumes API quotas, and drastically slows down developer velocity during debugging.
✅ The Solution
etl-pycache introduces a lightweight, persistent caching layer. It saves the state of your transformed data. If a pipeline fails downstream, it can read the fully transformed state directly from the cache on the next run, completely bypassing the extraction and transformation phases.
Core Benefits: * Idempotency: Guarantees that rerunning a failed pipeline won't duplicate extraction tasks. * Cost Efficiency: Prevents paying for the exact same compute or API queries twice during a retry. * Developer Velocity: Rapidly debug downstream load operations without waiting for upstream transformations to finish. * Polymorphic By Design: Natively supports strings, bytes, dictionaries, lists, and byte streams without requiring manual serialization.
👨🏼💻 Core Interface
The library enforces a strict contract for all cache implementations to ensure predictability across different environments:
from etl_pycache.interfaces import BaseCache
# The contract guarantees these methods are available
cache.set(key="payload_123", payload="<dataset>...</dataset>")
data = cache.get(key="payload_123")
cache.delete(key="payload_123")
🚀 Quick Start: The Developer Cheat Sheet
etl-pycache provides a completely unified developer experience. Whether you are caching to a local hard drive for an Airflow worker, or streaming compressed data to AWS S3, the method signatures remain exactly the same.
import boto3
from etl_pycache.local_cache import LocalDiskCache
from etl_pycache.s3_cache import S3Cache
# ==========================================
# 1. INITIALIZATION
# ==========================================
# Local Disk (Perfect for local workers or Celery)
local_cache = LocalDiskCache(cache_dir="/tmp/my_etl_cache")
# AWS S3 (Perfect for distributed cloud sharing)
s3_client = boto3.client("s3", region_name="eu-west-1")
cloud_cache = S3Cache(bucket_name="canda-anaplan-demo", client=s3_client)
# ==========================================
# 2. WRITING DATA (TTL + Compression)
# ==========================================
my_massive_xml_payload = "<dataset>... 500MB of data ...</dataset>"
# Save locally: Expires in 1 hour, shrunk by ~80% on disk, locked for OS concurrency
local_cache.set("financial_run_001", my_massive_xml_payload, ttl_seconds=3600, compress=True)
# Save to Cloud: Expires in 1 hour, shrunk by ~80% before upload to save AWS costs
cloud_cache.set("deve/financial_run_001", my_massive_xml_payload, ttl_seconds=3600, compress=True)
# ==========================================
# 3. READING DATA
# ==========================================
# You do NOT need to check if the file is expired or compressed!
# The engine automatically validates the TTL, deletes it if expired,
# and decompresses the bytes on the fly before returning your data.
local_data = local_cache.get("financial_run_001")
if local_data:
print("Successfully read and decompressed from local disk!")
cloud_data = cloud_cache.get("deve/financial_run_001")
if cloud_data:
print("Successfully read and decompressed from AWS S3!")
🤝 Contributing
We welcome contributions! To maintain enterprise-grade code quality, this project uses strict formatting, linting, and testing pipelines.
- Clone the repository and install all dependencies:
poetry install - Run the formatter:
poetry run python3 -m ruff format . - Run the linter:
poetry run python3 -m ruff check --fix . - Run the tests:
poetry run python3 -m pytest