Data Quality + Orchestration — Part II [3AE]

Aman Jaiswal
7 min readFeb 5, 2024

So, assuming you did end up reading the part before this, and are looking forward to what comes next, the search ends here.

So far, we’ve discussed on why data quality is important, why should one care about it and what’s there in the market to help you do the caring. But in today’s world, we don’t get that kinda free time, do we?

And here comes the godsend so that you can take the left (right) path:

ETL: Extract, Transform, Load

And primarily:

I’ve worked with Apache Airflow for close to 3 years now, and though the amount of advancements they keep bringing in isn’t something one gets to play with very actively, they do get the job done. As of today, there are a lot of other tools which do the same job, and probably better as well. But for the scope of this article, we’ll stick with Airflow to accomplish our goal to automate data quality and integrate it with our day to day work.

With all that monologue, lets get started.

The Setup

Things you need to know in order to further proceed include:

  1. Basics of great expectations
  2. Docker
  3. Airflow
  4. How to copy and paste

For point 3, I remember using images from a repository: puckel-airflow for a long time, and then came the:

https://www.astronomer.io/

Now, setting up astronomer is bit of a learning curve but the following repository helps with most of it:

Alright, let’s speed run the rest of this section

  1. Install docker-desktop from here and astro with the following command (for mac):
brew install astro

2. Setup project directory and initialize great expectations.

mkdir airflow_with_ge
cd airflow_with_ge
virtualenv venv
source venv/bin/activate
pip install great-expectations==0.17.23
great_expectations init

3. In the same folder, let’s use astro to initialize local dev space with the following command:

astro dev init

If everything above went right, the tree structure of the folder should look something like this:

Command for this: tree -L 2

I’d advice not to read further until you’ve covered this much ground with 0 issues.

Now as a sponsored post, I’ll do a product placement of my last article:

https://amanjaiswalofficial.medium.com/data-quality-orchestrated-part-i-3ae-97ba8185ea07

How can you save money by maintaining data quality

4. Using this, let’s reuse the code we wrote before (available here) and save it as include/main.py

5. To validate any incoming data, we’d need to build an expectation suite, which can be done via a JSON containing all the required expectations. Use the following and save it as expectations.json.

[
{
"name": "expect_column_values_to_not_be_null",
"column": "pickup_datetime",
"extra_args": {
"result_format": "BASIC"
}
},
{
"name": "expect_column_values_to_be_between",
"column": "trip_distance",
"extra_args": {
"min_value": 0,
"max_value": null,
"result_format": "BASIC"
}
},
{
"name": "expect_column_values_to_be_in_set",
"column": "payment_type",
"extra_args": {
"value_set": [1, 2, 3],
"result_format": "BASIC"
}
}

]

The Build

In this section, we’ll tighten all the remaining screws and do the following one by one:

a. Create an expectation suite
b. Load some dummy data to show how to validate new/incoming data
c. Create a DAG which uses this expectation suite and validates the data

Let’s generate the expectation suite using the below code:

from include.main import ExpectationConfig, GreatExpectationsManager
config_object = ExpectationConfig("./expectations.json")
expectations_list = config_object.get_expectations_list()
ge_manager = GreatExpectationsManager('my_suite_v1')
ge_manager.generate_suite(expectations_list)

This should create a file on the path below (remember the file-name):

airflow_with_ge/gx/expectations/my_suite_v1.json

If this looks good, that means we have a ready-to-go expectation layer with us. Let’s go ahead and get the name of the game with us, i.e. the data.

And if you’ve been wondering where would it be coming from, let me enlighten you in less than 3 seconds:

Lets download 2 such files, one which has some data quality issues and one that doesn’t. Saving them both within our include folder with the following commands:

cd include
wget https://raw.githubusercontent.com/amanjaiswalofficial/medium-articles/main/great_expectations_with_airflow_demo/include/data_with_error.csv
wget https://raw.githubusercontent.com/amanjaiswalofficial/medium-articles/main/great_expectations_with_airflow_demo/include/data_with_no_error.csv

Finally, lets put the last cog in the system.

A file in dags named ge_integration_dag.py with the following code:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.exceptions import AirflowException


from include.main import GreatExpectationsManager
suite_name = "my_suite_v1"
input_data_path = "include/data_with_error.csv"


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 2, 3),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'great_expectations_with_airflow_dag',
default_args=default_args,
description='A simple DAG with Great Expectations',

)

def check_data_with_great_expectations():
ge_manager = GreatExpectationsManager(suite_name)
validation_result = ge_manager.validate(input_data_path)
print(validation_result)
validation_status = validation_result.success
if validation_status:
return True
else:
raise AirflowException("The validation has failed, consider checking DAG logs.")

def do_something_else():
print("Validation of data was complete, you can proceed with everything else")
return True

task_read_csv = PythonOperator(
task_id='validate_data_against_expectations',
python_callable=check_data_with_great_expectations,
dag=dag,
)

task_validated = PythonOperator(
task_id='do_everything_else',
python_callable=do_something_else,
dag=dag,
)


task_read_csv >> task_validated

if __name__ == "__main__":
dag.cli()

Now let me recap, what have we done in as short summary as possible

“We’ve made a DAG which will read data from a csv like source, and based on whether it matches our expectation or not, will trigger the downstream tasks”

The Payoff

So with all this hard work done so far, you can guess what comes next, and if you can’t, in caps I’ll write it for you:

TOO MANY SCREENSHOTS

But before that, lets fire up the astronomer local environment and take a look at that beautiful beautiful DAG.

For this, on the running the command below:

astro dev start

You’d get something like this:

Remember the following 2 things before we take the next leap of faith, i.e., starting the DAG and hoping it will run successfully for the first time. In our file: ge_integration_dag.py :

suite_name = "my_suite_v1"
input_data_path = "include/data_with_no_error.csv"

If you read everything carefully so far, I asked you to remember the file name while creating the suite. It matches the suite_name here.

Why is this important?

Tomorrow as the expectations change, we’d need a new suite. The process to do so will be very straight forward:

  1. Run the code with the newer expectation list
  2. Generate the suite with a new name, ex — “my_suite_v2"
  3. Use the same suite in the DAGs that should worry about this data.

Second, the data in question here is data_with_error.csv , so we expect the DAG to fail with some information. You can extend this functionality and do any of the following:

I. Send email to interested parties
II. Fail the subsequent steps or dependent DAGs
III. Send alerts on systems like Slack or Pagerduty

Enabling the DAG for the first time and running it should give us back something that looks like this:

The logs should tell the same story:

For me I’ve printed the validation result as well on line 33 in the DAG, which should come handy in order to figure out which expectation disappointed us the most.

Huh!

Lets finish this already. Switching the file name at line 10 , and replacing it with:

input_data_path = "include/data_with_no_error.csv"

Assuming the code change doesn’t reflect by itself, you can always do:

astro dev restart

Now running the DAG again with:

Ka aam e ha aam e haaaaaa!

And you should get all the green signals that you’ve been waiting for:

The logs are just icing on the cake:

Phewwww!

Lets do a time jump 6 months forward, and you can certainly expect one or more of the following:

The source changed
The schema changed
The end functionality changed
The alert mechanism changed
The developer… (let’s not go there)

In any of the above scenarios, the fundamentals never change. With minimal changes to one thing or the other, always be ready for data that comes and the operation that follows.

And that was the end of one more long article 🎉, which probably no one will read either. But I don’t think that’s the point anyway.

In pop-culture:

I finished a 10 year long show in 2 months 🧑🏼‍🚀 (Armin > Eren).

December has always been a great month to watch old movies and go “full-nostalgia mode”. Some re-watches I did in the month of December’ 23:

Ferris Bueller’s Day Off
The Goonies
Groundhog Day
Home Alone
Home Alone 2

And if stocks are something you’re into (not giving/can’t give financial advice), this Dumb Money is for you.

Also, Skrillex won a grammmmmmmmyyy!

How it started:

How it’s going…

godfather of my kids, right here.

Until next time.

fin!

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Aman Jaiswal
Aman Jaiswal

Written by Aman Jaiswal

A part time nerd and a full time engineer. Loves to talk about tech, cinema and everything in between.

No responses yet

Write a response