Friday, March 13, 2026

Build ETL pipelines to the flows of scientific work in about 30 Python lines

Share


Photo by the author Ideogram

Do you know this feeling when you have data scattered in various formats and sources, and you have to understand it all? This is what we solve today. Let’s build an ETL pipeline that accepts messy data and turns it into something actually useful.

In this article I will lead you by creating a pipeline that processes e-commerce transactions. Nothing fancy, just a practical code that performs the task.

We get the data from the CSV file (just like you download from the e-commerce platform), spotless it and store them in the appropriate database for analysis.

🔗 Link to the code on GitHub

What is the extract, transformation, load (ETL)?

Each ETL pipeline has the same pattern. You dress data from where somewhere (extract), clear it and correct (transform), and then place them somewhere useful (load).

ETL-pipeline
ETL Pipelin Photo by the author Diagrams.net (draw.io)

The process begins with extract The phase in which data is downloaded from various source systems, such as databases, API interfaces, files or stream platforms. In this phase, the pipeline identifies and downloads appropriate data while maintaining connections with different systems that can work on different schedules and formats.

Then transform The phase represents the basic stage of processing, in which separate data undergo cleaning, validation and restructuring. This step applies to data quality problems, applies business rules, performs calculations and transforms data into the required format and structure. Common transformations include data type conversions, field mapping, aggregations and removal of duplicates or incorrect records.

At last load The phase transfers already transformed data to the target system. This step can occur through full loads, in which entire sets of data are replaced, or incremental loads, in which only fresh or changed data are added. The charging strategy depends on such factors as data volume, system performance requirements and business needs.

Step 1: Extract

The step of the “lift” is that we are acquiring data. In the real world you can download this CSV from the e-commerce platform reporting desktop by pulling it out of the FTP server or receive it via the API interface. Here we read from the available CSV file.

def extract_data_from_csv(csv_file_path):
    try:
        print(f"Extracting data from {csv_file_path}...")
        df = pd.read_csv(csv_file_path)
        print(f"Successfully extracted {len(df)} records")
        return df
    except FileNotFoundError:
        print(f"Error: {csv_file_path} not found. Creating sample data...")
        csv_file = create_sample_csv_data()
        return pd.read_csv(csv_file)

Now that we have raw data from its source (raw_transacions.csv), we must transform it into something useful.

Step 2: Transform

It is here that we make the data really useful.

def transform_data(df):
    print("Transforming data...")
    
    df_clean = df.copy()
    
    # Remove records with missing emails
    initial_count = len(df_clean)
    df_clean = df_clean.dropna(subset=['customer_email'])
    removed_count = initial_count - len(df_clean)
    print(f"Removed {removed_count} records with missing emails")
    
    # Calculate derived fields
    df_clean['total_amount'] = df_clean['price'] * df_clean['quantity']
    
    # Extract date components
    df_clean['transaction_date'] = pd.to_datetime(df_clean['transaction_date'])
    df_clean['year'] = df_clean['transaction_date'].dt.year
    df_clean['month'] = df_clean['transaction_date'].dt.month
    df_clean['day_of_week'] = df_clean['transaction_date'].dt.day_name()
    
    # Create customer segments
    df_clean['customer_segment'] = pd.cut(df_clean['total_amount'], 
                                        bins=[0, 50, 200, float('inf')], 
                                        labels=['Low', 'Medium', 'High'])
    
    return df_clean

First of all, we drop rows with missing E -Mail messages, because incomplete customer data is not helpful for most analyzes.

Then calculate total_amount By multiplying the price and quantity. This seems obvious, but you would be surprised how often such fields are out of raw data.

Date extraction is really useful. Instead of just having a time marker, now we have separate columns of the year, month and day of the week. This makes it easier for designs such as “Do we sell more on weekends?”

Customer segmentation with pd.cut() It can be particularly useful. Automatically transforms customers into the expenditure category. Now, instead of only transaction amounts, we have significant business segments.

Step 3: Charging

In a real design, you can load into the database, send to the API interface or press to storage in the cloud.

Here we charge our spotless data to the proper SQLITE database.

def load_data_to_sqlite(df, db_name="ecommerce_data.db", table_name="transactions"):
    print(f"Loading data to SQLite database '{db_name}'...")
    
    conn = sqlite3.connect(db_name)
    
    try:
        df.to_sql(table_name, conn, if_exists="replace", index=False)
        
        cursor = conn.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        record_count = cursor.fetchone()[0]
        
        print(f"Successfully loaded {record_count} records to '{table_name}' table")
        
        return f"Data successfully loaded to {db_name}"
        
    finally:
        conn.close()

Now analysts can launch SQL queries, combine BI tools and actually employ this data to make decisions.

Sqlite works well because it is lithe, does not require configuration and creates a single file that you can easily share or create a backup. . if_exists="replace" The parameter means that you can run this pipeline many times without worrying about duplicate data.

We have added verification steps so that you know that the burden has been successful. There is nothing worse than thinking that your data is safely stored just to find an empty table later.

Starting the ETL pipeline

This organizes the entire extract, transformation, and work flow.

def run_etl_pipeline():
    print("Starting ETL Pipeline...")
    
    # Extract
    raw_data = extract_data_from_csv('raw_transactions.csv')
    
    # Transform  
    transformed_data = transform_data(raw_data)
    
    # Load
    load_result = load_data_to_sqlite(transformed_data)
    
    print("ETL Pipeline completed successfully!")
    
    return transformed_data

Pay attention to how it connects everything together. Extract, transform, load, ready. You can run it and see processed data immediately.

You can find a full code on github.

Wrapping

This pipeline adopts raw transactions and transforms it into something that the analyst or scientist can actually work. You have spotless records, calculated fields and significant segments.

Each function does one thing well and you can easily modify or expand any part without breaking the rest.

Now try to run it yourself. Also try to modify it for another employ. Cheerful coding!

Bala Priya C He is a programmer and technical writer from India. He likes to work at the intersection of mathematics, programming, data science and content creation. Its interest areas and specialist knowledge include Devops, Data Science and Natural Language Processing. He likes to read, write, cod and coffee! He is currently working on learning and sharing his knowledge of programmers, creating tutorials, guides, opinions and many others. Bal also creates a coding resource and tutorial review.

Latest Posts

More News