Photo by the author Ideogram
The data is sloppy. So when you get information from the API interfaces, you analyze real data sets and the like, you will inevitably fall on duplicates, missing values and incorrect entries. Instead of writing the same cleaning code repeatedly, a well -designed pipeline saves time and ensures consistency in various data projects.
In this article, we will build a reusable data cleaning stream and validation, which supports joint problems with data quality, while providing detailed feedback on what has been repaired. Until the end you will have a tool that can spotless the data sets and confirm them in relation to business rules in just a few code lines.
Why data cleaning pipelines?
Think about data pipelines such as mounting lines in production. Each step performs a specific function, and leaving one step becomes an entrance for the next. This approach makes your code more maintained, testable and reusable in various projects.

Straightforward data cleaning pipeline
Photo by the author Diagrams.net (draw.io)
Our pipeline will serve three basic responsibilities:
- Cleaning: Remove duplicates and support the missing values (employ it as a starting point. You can add as many cleaning steps as you need).
- Validation: Make sure the data meet business rules and restrictions
- Reporting: Follow changes during processing
Configuring the programming environment
Make sure you employ the latest Python version. When using a local, create a virtual environment and install the required packages:
If you prefer, you can also employ Google Colab or similar notebook environments.
Defining the correctness check scheme
Before we can confirm the data, we must define what “correct” looks like. We will employ Pydantic, a Python library that uses types to confirm data types.
class DataValidator(BaseModel):
name: str
age: Optional[int] = None
email: Optional[str] = None
salary: Optional[float] = None
@field_validator('age')
@classmethod
def validate_age(cls, v):
if v is not None and (v < 0 or v > 100):
raise ValueError('Age must be between 0 and 100')
return v
@field_validator('email')
@classmethod
def validate_email(cls, v):
if v and '@' not in v:
raise ValueError('Invalid email format')
return v
This scheme models the expected data using Pydantic syntax. Utilize @field_validator Decorator, you’ll need @classmethod decorator. The logic of checking correctness ensures that age is within reasonable limits, and E -Mail messages contain the “@” symbol.
Building a class of pipelines
Our main class of pipelines includes all logic of cleaning and checking correctness:
class DataPipeline:
def __init__(self):
self.cleaning_stats = {'duplicates_removed': 0, 'nulls_handled': 0, 'validation_errors': 0}
The designer initiates the statistics dictionary to track changes introduced during processing. This helps to take a closer look at the quality of the data, as well as track the cleaning stages used in time.
Writing logic of data cleaning
Let’s add clean_data Method of supporting typical data quality problems, such as missing values and duplicate records:
def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
initial_rows = len(df)
# Remove duplicates
df = df.drop_duplicates()
self.cleaning_stats['duplicates_removed'] = initial_rows - len(df)
# Handle missing values
numeric_columns = df.select_dtypes(include=[np.number]).columns
df[numeric_columns] = df[numeric_columns].fillna(df[numeric_columns].median())
string_columns = df.select_dtypes(include=['object']).columns
df[string_columns] = df[string_columns].fillna('Unknown')
This approach is wise in the scope of supporting various types of data. The missing numerical values are filled with the median (more solid than the average in relation to the protruding values), and the text columns get a substitute value. The duplicate removal occurs first to avoid the distortion of our middle calculations.
Adding checking correctness by means of error tracking
The check step processes each poem individually, collecting both correct data and detailed information about errors:
def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
valid_rows = []
errors = []
for idx, row in df.iterrows():
try:
validated_row = DataValidator(**row.to_dict())
valid_rows.append(validated_row.model_dump())
except ValidationError as e:
errors.append({'row': idx, 'errors': str(e)})
self.cleaning_stats['validation_errors'] = len(errors)
return pd.DataFrame(valid_rows), errors
This ROW-Bro-Row approach ensures that one bad record does not break the entire pipeline. Correct rows continue to continue the process, while errors are registered for review. This is critical in production environments in which you need to process what you can, when determining problems.
Organizing the pipeline
. process The method connects everything together:
def process(self, df: pd.DataFrame) -> Dict[str, Any]:
cleaned_df = self.clean_data(df.copy())
validated_df, validation_errors = self.validate_data(cleaned_df)
return {
'cleaned_data': validated_df,
'validation_errors': validation_errors,
'stats': self.cleaning_stats
}
Returnal value is a comprehensive report that includes purified data, all validation errors and processing statistics.
Commissioning all this
Here’s how to employ a pipeline in practice:
# Create sample messy data
sample_data = pd.DataFrame({
'name': ['Tara Jamison', 'Jane Smith', 'Lucy Lee', None, 'Clara Clark','Jane Smith'],
'age': [25, -5, 25, 35, 150,-5],
'email': ['taraj@email.com', 'invalid-email', 'lucy@email.com', 'jane@email.com', 'clara@email.com','invalid-email'],
'salary': [50000, 60000, 50000, None, 75000,60000]
})
pipeline = DataPipeline()
result = pipeline.process(sample_data)
The pipeline automatically removes a duplicate record, supports the missing name, filling it with “unknown”, fulfills the missing remuneration with the median value and errors to check the flagship correctness for the negative age and incorrect e -mail message.
🔗 You can find Full GitHub script.
Expanding the pipeline
This pipeline serves as a foundation on which you can resist. Consider these improvements for your specific needs:
Custom cleaning rules: Add a method of cleaning specific to the domain, such as standardization of phone numbers or addresses.
Configurable checking of correctness: Make the pydantic scheme configured so that the same pipeline can support different data types.
Advanced error service: Implement the logic of re -transition errors or automatic correction of typical errors.
Performance optimization: In the case of enormous data sets, consider the employ of vectorized or parallel processing operations.
Wrapping
Data pipelines are not only cleaning individual data sets. It is about building reliable systems that can be maintained.
This pipeline approach ensures consistency in various projects and makes it easier to adjust the business rules as the requirements change. Start with this basic pipeline and then adapt it to your specific needs.
The key is to have reliable, reusable, which supports mundane tasks so that you can focus on extracting insights from pure data. Cheerful data cleaning!
Bala Priya C He is a programmer and technical writer from India. He likes to work at the intersection of mathematics, programming, data science and content creation. Its interest areas and specialist knowledge include Devops, Data Science and Natural Language Processing. He likes to read, write, cod and coffee! He is currently working on learning and sharing his knowledge of programmers, creating tutorials, guides, opinions and many others. Bal also creates a coding resource and tutorial review.
