ABOUT THIS BLOG

In this blog, we’ll discuss all things data quality and data observability. We’ll present interviews with leading data scientists, and we’ll discuss our own working building data observability solutions for data engineers and data scientists working in a broad range of industries.

Validating Data for Pipelines with Data Culpa

by | Nov 16, 2021 | Data Culpa

Consistent pipeline behavior is critical for any data process. You can use Data Culpa Validator to ensure consistent operation of pipelines as well as data at rest in databases, data lakes, and data warehouses.

This introduction shows you how to validate your results when working with data in pipelines and Python. We’re going to work with some Yahoo! Finance data. At the end of our tutorial, we’ll be able to see the results from running our pipeline when the market is open and when the market is closed:

OK, let’s get started:

import yfinance as yf
tickerData = yf.Ticker('AAPL').info

# tickerData is now a dictionary:
# sorted(list(t.info.keys()))
# ['52WeekChange', 'SandP52WeekChange', 'address1', 'algorithm', 'annualHoldingsTurnover', 'annualReportExpenseRatio', 'ask', 'askSize', etc ]
#

To start integrating with Data Culpa, we install the dataculpa-client package via pip:

pip install dataculpa-client

We create an account with Data Culpa and set up an API key and secret in the user interface:

creating an api key

Next, we tell our code to use the Validator client with the host and secret. The “watchpoint” is the name and metadata that we want to tie to our pipeline. We can tag a version on our pipeline, or delineate between a dev environment and production, among other things. This lets us rapidly compare results from different environments, such as on our workstation vs a QA or production environment.

We can just dump this information right into Data Culpa Validator as a JSON record:

dc = DataCulpaValidator(watchpoint_name="ticker-pipeline", 
                        watchpoint_environment="default",
                        watchpoint_stage="default",
                        watchpoint_version="default",
                        protocol="http", 
                        dc_host="my-host.dataculpa.cloud", 
                        dc_port=7777, 
                        api_access_id=myPipelineUser, 
                        api_secret=mySecret,
                        queue_window=1000,
                        timeshift=None)
dc.open_queue()
dc.queue_record(tickerData)
dc.queue_commit()

Note that we have a list of tickerData entries that we want to queue up; we can queue as many records as we want in one session. Perhaps we’re working with multiple ticker symbols:

import yfinance as yf
dcData = []
tickers = ['AAPL', 'TSLA', 'MSFT']
dc = DataCulpaValidator()
dc.open_queue()

for t in tickers:
    td = yf.Ticker(t).info
    dc.queue_record(td)

r = dc.queue_commit()
print(r)

Now, sometimes the market is open and sometimes it’s not. Here our pipeline will generate different data downstream for these conditions:

import holidays
from datetime import datetime

def is_market_open():
    dt = datetime.now()

    us_holidays = holidays.US(years=dt.year)
    if dt.date() in us_holidays:
        return False, "US Holiday"
    
    day_name = dt.strftime("%a")
    is_weekend = (day_name in ["Sat", "Sun"])
    if is_weekend:
        return False, "Weekend"

    return True, "Probably open"

And our “main pipeline code”:

def run():
    dc = DataCulpaValidator( ... )
    tickers = ['AAPL', 'TSLA', 'MSFT']
    for t in theList:
        td = yf.Ticker(t).info
        (is_open, is_open_reason) = is_market_open()
        d = {'time_now': str(datetime.now()), 
            'is_open': is_open, 
            'is_open_reason': is_open_reason,
            'ticker': t,
            'data': td.info}
        dc.queue_record(d)
        # do other stuff
    
    dc.queue_commit() 

You may wonder where errors show up. Validator includes APIs for error reporting and status updates. Most pipelines today run in “lights-out” environments where error logs can be hard to come by, so by default, processing in Data Culpa is asynchronous and alerts to changed schema or value distributions or no activity are pushed through our alerts path. You can access the alerts path via API, Slack, or other endpoints.

We can review the activity of our pipeline in the Validator UI. At this point, we’ve run the pipeline twice: once with the market open and once with the market closed. We can see the cumulative number of records, the individual fields, and the two very different schemas that result from the pipeline in these different conditions—and which schema values are shared between the two schemas.

You can call Data Culpa Validator from any pipeline environment where you have the ability to call a REST API. We provide Python bindings out of the box with our open source client library to make invoking Validator easy for developers working with Python.

As your pipeline runs, you get historical access to the results your pipeline processed. Validator uses this historical data set to build models for alerting to changes in the background, so that you can get timely alerts about changes. You can also pre-load historical data with our timeshift feature.

In an upcoming post, we’ll show off some of the alerting capabilities of Data Culpa Validator.

Interested in trying out Data Culpa Validator? Sign up for a free trial.

Have Questions?

Feel free to reach out to us!

NEWSLETTER SIGN UP

Subscribe to the Data Culpa Newsletter