Your First Task as a Data Engineer in a New Company? Make the ETL Pipeline Testable

Your First Task as a Data Engineer in a New Company? Make the ETL Pipeline Testable


joining a new company as a data engineer. You inherit quite a few ETL pipelines and you are responsible for maintaining them. What do you think are the challenges of your work?

Typically, you may faces the following problems:

  • Upstream schema changes: Developer teams may add or drop fields, change data types or rename columns. When a source schema changes unexpectedly, ETL jobs can fail abruptly. To make matters worse, the pipeline silently load corrupted or null values into downstream tables.
  • Data Quality issues: Sometimes ETL jobs don’t fail immediately, to the contrary, they run and finish with a success status. However, the data loaded are completely incorrect, containing duplicated or missing records.
  • Lack of documentation: Legacy pipelines may have little documents, or the existing documents may be outdated. So you are not sure if they are in line with the current business logic.
  • Volume growth and performance spikes: The data volume increases as the business grows. An ETL pipeline optimized for a smaller historical dataset can easily become slow, stall, or fail when processing massive volumes.

An automated testing workflow can help you address the problems above. Why? Because the structured workflow can help you understand all key aspects of an ETL pipeline quickly: the business logic, the algorithms for data transformation, the data types, all the data issues that the ETL pipelines are required to solve. The testing patterns are reusable—you don’t have to design a new workflow every time you inherit a different ETL pipeline.

In today’s article, I’ll focus on automated testing in data engineering, including the environment configuration and a practical workflow. At the end, I’ll also discuss how AI-assisted code can accelerate the workflow and improve productivity.

Make the Environment Work

If you build the automated testing workflow for the first time, the setup of the environment may take some time. There are different tools and flows for data engineers to set up the testing environment. But if you follow my below steps, the process will be easy and smooth.

Firstly, you only need to install 3 things: Docker Desktop, VS Code and Dev Containers Extension.

In your testing workflow, Docker will create lightweight, isolated, and repeatable test environments. It allows you to spin up mock data infrastructure (for example, databases, data pipelines, and orchestration engines) directly on a local machine or within a Continuous Integration (CI) pipeline. With Docker, you can run your integration tests and data validation identically across all platforms without polluting local operating systems.

Visual Studio Code (VS Code) is a centralised development environment for scripting, debugging, implementing, and automating data pipeline tests. As a data engineer, you may have used it for your other projects. You might be more familiar with PyCharm or IntelliJ IDEA. From my user experience perspective, I choose VS Code due to its lightweight build, extension ecosystem, and hybrid notebook/script workflow. AI-native editors such as Cursor and Windsurf are rapidly gaining popularity among developers, which I’ll discuss more in the later part of this article.

I assume you already have python, poetry, and Java installed. You can open your VS Code terminal, type the following scripts to check their versions and make sure they are updated. You can also install them under your terminal if you haven’t yet done it.

python --version
java -version
poetry --version

The Dev Container extension enables you to use a Docker container as a fully functional, reproducible development environment. It standardises environments across team and allows to test data ingestion logic locally without the consumption of cloud resources. To install Dev Container is quite straightforward. You just need to open Extensions in VS Code – you can press Ctrl+Shift+X (Windows/Linux) or Cmd+Shift+X (Mac), then search ‘Dev Containers’ in the search bar, and click on “Install”.

But the Dev Containers extension does not know how to build your specific environment. It needs a ‘guide’. The guide is the .devcontainer folder and the devcontainer.json file under the folder tells the Dev Container extension:

  • Which Docker image to download.
  • Which ports to forward.
  • Which VS Code extensions to install inside the container.

There are two methods for you to get .devcontainer folder. If you are new to these tools, you can use use VS Code’s automated tool. When you select a Python or Data Engineering template, VS Code can generate the folder automatically. When you’re more experienced in such kind of projects, you can also write it by hand from scratch to meet your team’s testing requirements. The .devcontainer folder can be committed and pushed to Git, together with your source code and source data, which you prepare to test.

To make your life easier, you can clone the Git repository and open that folder with VS Code.

git clone https://github.com/company/data-ingestion-transformation.git

The last step of configuration is to reopen in container. Why is it important? Because when you click “Reopen in Container”, VS Code restarts its backend engine. It launches the Docker container and attaches your local project folder directly inside that container. Your source code and source data in this ETL pipeline are accessible to the Docker environment. You can run your tests securely in an isolated sandbox. Sounds cool? Yes, now you have already had your environment set up and are ready to start testing your ETL pipelines.

Let the Tests Tell You What the System Does

When I inherit an unfamiliar ETL pipeline, my first question is not: “How does the code work?” Instead, I ask: “What behavior is the system expected to produce?” Tests usually answer that question faster than source code.

Imagine the company that you join uses LLMs such as GPT-5.5, Claude 4.6 and Gemini 3 Pro and the Finance team want to track AI spending across teams.

Mock sample data created by author

The above table shows part of the data in the csv format to be stored. The column names must be standardised by replacing spaces with underscores so downstream systems can reference fields consistently. For example. ‘Model Name’ should become ‘Model_Name’. You found ingest.py to define the functions for column standardisation and data ingestion and ai_cost_ingest.py to call these functions in the folder.

import logging
from typing import List

from pyspark.sql import SparkSession


def sanitize_columns(columns: List[str]) -> List[str]:
    return [column.replace(" ", "_") for column in columns]


def run(spark: SparkSession, ingest_path: str, transformation_path: str) -> None:
    logging.info("Reading text file from: %s", ingest_path)

    input_df = (
        spark.read.format("org.apache.spark.csv")
        .option("header", True)
        .csv(ingest_path)
    )

    renamed_columns = sanitize_columns(input_df.columns)

    ref_df = input_df.toDF(*renamed_columns)

    ref_df.write.parquet(transformation_path)
import logging
import sys

from pyspark.sql import SparkSession

from data_ingestions.ai_cost import ingest

LOG_FILENAME = "project.log"
APP_NAME = "AI_Cost Pipeline: Ingest"

if __name__ == "__main__":
    logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
    logging.info(sys.argv)

    if len(sys.argv) != 3:
        logging.warning("Input source and output path are required")
        sys.exit(1)

    spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
    sc = spark.sparkContext
    app_name = sc.appName
    logging.info("Application Initialized: " + app_name)
    input_path = sys.argv[1]
    output_path = sys.argv[2]
    ingest.run(spark, input_path, output_path)
    logging.info("Application Done: " + spark.sparkContext.appName)
    spark.stop()

You should understand the defined functions first. You may ask: “What exactly should sanitize_columns() do? Does it handle leading spaces, trailing spaces and internal spaces?” With these questions in your mind, you write such code:

from data_ingestions.ai_cost import ingest

def test_should_sanitize_nothing() -> None:
    no_whitespace_columns = ["Model"]

    actual = ingest.sanitize_columns(no_whitespace_columns)
    expected = no_whitespace_columns
    assert expected == actual

def test_should_sanitize_whitespace_outside() -> None:
    no_whitespace_columns = [" Prompt Tokens "]

    actual = ingest.sanitize_columns(no_whitespace_columns)
    expected = ["_Prompt_Tokens_"]
    assert expected == actual

def test_should_sanitize_whitespace_in_between() -> None:
    no_whitespace_columns = ["Prompt Tokens"]

    actual = ingest.sanitize_columns(no_whitespace_columns)
    expected = ["Prompt_Tokens"]
    assert expected == actual

The code allows you to test the function of sanitize_columns() directly without launching Spark and processing files. It’s an example of a unit test.

Unit Tests

Unit tests are designed to validate a small piece of logic in isolation. They are usually fast, deterministic and independent of external systems.

Integration Tests

Unit tests tell whether a small piece of logic behaves correctly. But they are unable to address the question: “Does the entire pipeline work when all components are connected together?”

For a data engineer, this often means:

  • Reading files
  • Starting Spark
  • Running transformations
  • Writing outputs
  • Validating results

To test the entire pipeline, we need integration tests, which reveal system behavior. Integration tests are very useful during onboarding because they describe what the system must do, regardless of how the implementation evolves over time.

For the AI_cost data ingestion project, you can use a integration test to help validate whether:

  • Input arrives as CSV files.
  • Spark is used to process the data.
  • Column names are sanitized.
  • Data values remain unchanged.
  • Output is written in Parquet format.
  • The complete ingestion workflow must succeed.
import csv
import os
import tempfile
from pathlib import Path
from typing import List, Tuple

from pyspark.sql import SparkSession

from data_ingestions.ai_cost import ingest

def test_should_sanitize_column_names(
    spark_session: SparkSession,
) -> None:

    given_ingest_folder, given_transform_folder = (
        __create_ingest_and_transform_folders()
    )

    input_csv_path = given_ingest_folder + "input.csv"

    csv_content = [
        [
            "Model Name",
            "Prompt Tokens",
            " Completion Tokens "
        ],
        [
            "GPT-5.5",
            "1200",
            "300"
        ],
        [
            "Gemini 3 Pro",
            "900",
            "250"
        ],
    ]

    __write_csv_file(input_csv_path, csv_content)

    ingest.run(
        spark_session,
        input_csv_path,
        given_transform_folder
    )

    actual = spark_session.read.parquet(
        given_transform_folder
    )

    expected = spark_session.createDataFrame(
        [
            ["GPT-5.5", "1200", "300"],
            ["Gemini 3 Pro", "900", "250"]
        ],
        [
            "Model_Name",
            "Prompt_Tokens",
            "_Completion_Tokens_"
        ]
    )

    assert expected.collect() == actual.collect()

Let AI Read the ETL Pipeline Before You Do

Imagine that you are reviewing an unfamiliar ETL pipeline which contains hundreds or even thousands of lines of PySpark code. To understand the code and write tests may take hours or even days. Today, tools such as Cursor, Windsurf, and GitHub Copilot can help accelerate this process.

Take Cursor as an example. As an AI Assistant, it can analyze an entire repository and generate explanations of individual modules, functions, and data flows. It can also generate initial versions of unit tests and integration tests. To maximize its productivity, you need to ask right questions as a data engineer. Here some sample questions that you may ask:

  • What is the purpose of this ETL job?
  • What input and output formats does this pipeline expect?
  • Which functions are responsible for data validation?
  • Which edge cases are currently untested?

AI can suggest test cases, but it cannot determine whether those tests meet the business requirements and company’s strategy. Understanding the pipeline, validating assumptions, and reviewing code are still your responsibility. AI is a productivity accelerator rather than a replacement for engineering judgment. It saves your time understanding and testing ETL pipeline so you can focus on higher-value data engineering work such as designing data architectures, building scalable data platforms, and empowering data-driven decision making.



Source link