Skip to content

Data Pipeline Adapters

Validates a CSV file's header against a Pydantic contract to detect schema drift.

This adapter opens a flat file, reads only the first row (the header), and cross-references the column names against the keys defined in the Pydantic schema. It does not validate data types or row values, making it extremely fast for detecting dropped or renamed columns in large data pipelines.

Parameters:

Name Type Description Default
contract type[BaseModel]

The Pydantic model representing the expected schema.

required
file_path str

The absolute or relative path to the CSV file.

required
encoding str | None

The encoding used to open the file (e.g., "utf-8").

None
**kwargs Any

Additional keyword arguments (e.g., delimiter=";", quotechar="|") to pass directly to the underlying csv.reader.

{}

Returns:

Type Description
list[dict[str, str]]

list[dict[str, str]]: A list of validation errors. Returns an empty list if all schema keys are present in the CSV header.

Source code in rdce/adapters.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def enforce_csv_structure(
    contract: type[BaseModel], file_path: str, encoding: str | None = None, **kwargs: Any
) -> list[dict[str, str]]:
    """
    Validates a CSV file's header against a Pydantic contract to detect schema drift.

    This adapter opens a flat file, reads only the first row (the header), and
    cross-references the column names against the keys defined in the Pydantic schema.
    It does not validate data types or row values, making it extremely fast for
    detecting dropped or renamed columns in large data pipelines.

    Args:
        contract (type[BaseModel]): The Pydantic model representing the expected schema.
        file_path (str): The absolute or relative path to the CSV file.
        encoding (str | None): The encoding used to open the file (e.g., "utf-8").
        **kwargs (Any): Additional keyword arguments (e.g., delimiter=";", quotechar="|")
            to pass directly to the underlying `csv.reader`.

    Returns:
        list[dict[str, str]]: A list of validation errors. Returns an empty list if
            all schema keys are present in the CSV header.
    """
    schema = extract_schema(contract)
    errors = []

    # Always open CSVs with newline="" per Python documentation
    # Pass encoding to open(), let kwargs (like delimiter) go to the reader
    with open(file_path, mode="r", newline="", encoding=encoding) as csv_file:
        reader = csv.reader(csv_file, **kwargs)

        try:
            # Grab the very first row (the header) using next()
            header = next(reader)
        except StopIteration:
            # Handle the edge case of a completely empty file
            header = []

        # Cross-reference the schema keys against the CSV header
        for key in schema.keys():
            if key not in header:
                errors.append(build_error(key, "COLUMN_PRESENT", "MISSING"))

    return errors

Streams a CSV file row-by-row, validating data types against a Pydantic schema. Yields a dictionary for every row that fails validation, allowing the developer to route bad data without loading the entire file into memory.

Parameters:

Name Type Description Default
contract type[BaseModel]

The expected schema.

required
file_path str

Path to the CSV file.

required
null_markers list[str]

Strings that represent NULL in this CSV (e.g., "", "NaN").

None
ignore_nulls bool

If True, forgives all null markers regardless of schema.

False
encoding str | None

The encoding used to open the file (e.g., "utf-8-sig").

None
**kwargs Any

Passed directly to csv.DictReader (e.g., delimiter=";").

{}

Yields:

Name Type Description
dict dict[str, Any]

A payload containing line_num, the raw_row dictionary, and errors.

Source code in rdce/adapters.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def stream_csv_contract(
    contract: type[BaseModel],
    file_path: str,
    null_markers: list[str] | None = None,
    ignore_nulls: bool = False,
    encoding: str | None = None,
    **kwargs: Any,
) -> Generator[dict[str, Any], None, None]:
    """
    Streams a CSV file row-by-row, validating data types against a Pydantic schema.
    Yields a dictionary for every row that fails validation, allowing the developer
    to route bad data without loading the entire file into memory.

    Args:
        contract (type[BaseModel]): The expected schema.
        file_path (str): Path to the CSV file.
        null_markers (list[str]): Strings that represent NULL in this CSV (e.g., "", "NaN").
        ignore_nulls (bool): If True, forgives all null markers regardless of schema.
        encoding (str | None): The encoding used to open the file (e.g., "utf-8-sig").
        **kwargs: Passed directly to `csv.DictReader` (e.g., delimiter=";").

    Yields:
        dict: A payload containing `line_num`, the `raw_row` dictionary, and `errors`.
    """

    logger.info(f"Starting CSV stream validation for file: {file_path}")
    logger.debug(
        f"Using null_markers: {null_markers} | ignore_nulls: {ignore_nulls} | encoding: {encoding}"
    )

    if null_markers is None:
        null_markers = ["", "NaN", "\\N"]

    logger.debug(f"Extracting schema for contract: {contract.__name__}")
    schema = extract_schema(contract)

    logger.debug("Running pre-stream structural check...")
    structural_errors = enforce_csv_structure(contract, file_path, encoding=encoding, **kwargs)
    if structural_errors:
        logger.warning(f"Structural check failed for {file_path}. Aborting stream.")
        yield {"line_num": 1, "raw_row": {}, "errors": structural_errors}
        # Abort the stream completely!
        return

    # Row-by-Row Coercion Stream (Pass encoding down!)
    with open(file_path, mode="r", newline="", encoding=encoding) as csv_file:
        reader = csv.DictReader(csv_file, **kwargs)

        # start=2 because line 1 is the header
        for line_index, row in enumerate(reader, start=2):
            row_errors = []

            for key, expected_type in schema.items():
                value = row.get(key)

                # Missing column entirely (Handled by structural check, but safe to skip)
                if value is None:
                    continue

                # Null Handling
                if value in null_markers:
                    if ignore_nulls:
                        continue
                    if isinstance(expected_type, tuple) and "NoneType" in expected_type:
                        continue

                    row_errors.append(build_error(key, str(expected_type), "NULL"))
                    continue

                # Type Coercion
                # Unwrap tuples (e.g., ("int", "NoneType")) to find the core type
                target_types = (
                    expected_type if isinstance(expected_type, tuple) else [expected_type]
                )
                success = False

                for t in target_types:
                    if t == "NoneType":
                        continue
                    if t == "str":
                        success = True
                        break
                    elif t == "int":
                        try:
                            int(value)
                            success = True
                            break
                        except ValueError:
                            pass
                    elif t == "float":
                        try:
                            float(value)
                            success = True
                            break
                        except ValueError:
                            pass
                    elif t == "bool":
                        # Handle CSV boolean text flags natively
                        if value.lower() in {"true", "false", "1", "0", "yes", "no", "y", "n"}:
                            success = True
                            break

                # If we tried all allowed types and none of them worked...
                if not success:
                    row_errors.append(build_error(key, str(expected_type), "str"))

            # If this row had any violations, yield it to the developer!
            if row_errors:
                yield {"line_num": line_index, "raw_row": dict(row), "errors": row_errors}