This doc describes the design of a data catalog system for improving our data documentation, quality, and orchestration.
From the Architecture Upgrades 2023 project README:
The Data Department has developed data infrastructure to facilitate its core functions: modeling, reporting, publishing open data, and research. This infrastructure uses the system-of-record (iasWorld) as a "base" dataset, then adds third-party, collected, and generated data.
The data mostly flows in a single direction: it is extracted from the system-of-record to S3, cleaned and transformed via Athena, then pushed to modeling, open data, etc. The only outputs that currently go back into the system-of-record are predicted values from the residential and condominium models.
This approach is simple and has worked well for us, but is difficult to expand, maintain, and trust. Our data stack is essentially a big one-way funnel that feeds the models; it wasn't designed with more complex applications in mind. As the Data Department grows, so too will the needs from our infrastructure.
This document describes the design of a data catalog system that can resolve the problems with our current data infrastructure described in the README, problems like:
- Referential integrity issues
- Schema inconsistency
- Lack of documentation around existing data
- Orchestration issues (timing, automating data creation)
- Complex joins and views
- Lack of data validation and alerting for iasWorld and third-party data
For a full list of issues we intend to solve with the catalog, see the requirements.
Each of the following requirements has two components: A motivation ("why do do we want to do this?") plus a list of requirements ("how will we do this?").
From the project README:
[Some tables within iasWorld] often contain duplicate or non-active records. We have to do a fairly large amount of deduping and filtering (in Athena) to get the "correct" records out of iasWorld.
We should be able to:
- Define keys on top of unconstrained data
- Allow for deduplication and filtering of source data
From the project README:
The rollout of iasWorld has been piece-wise. Not all data from the different agencies involved with the property tax process has been migrated yet. Even within the CCAO, certain data is essentially backlogged, i.e. it needs to be cleaned, QC'd, and added to the system-of-record at some point in the future.
Because of this piece-wise rollout, some tables and fields change over time, meaning we need to create queries that likewise change over time.
We should be able to:
- Track changes to schemas over time
- Track changes to queries in response to changing schemas
From the project README:
Due to the complexity and general unreliability of our "base" data set, the Data Department has created views and scripts for cleaning raw iasWorld data. These views work well but are quite complicated. They codify a lot of hard-earned domain knowledge about how the backend system actually works and perform a lot of data cleaning [...].
We should be able to:
- Clearly document all of our data sources, transformations, and products, including:
- ETL operations
- Databases, tables, views, schemas
- Manual processes
From the project README:
Most of our third-party data is created and uploaded via simple R scripts. This works well enough since most data needs to be gathered only once per year. However, a better system for gathering and uploading third-party data would be ideal.
We should be able to:
- Build and update our data warehouse automatically
- Write scripts for loading third-party data and run them as part of the build
- Define dependencies between data sources in the form of a DAG
- Schedule time-sensitive operations, e.g. wait for current parcel shapefiles to be updated before joining to third-party data
- This should also be clearly documented
From the project README:
In addition to the first-party backend data, the Data Department also attaches a plethora of third-party data that we use for modeling and reporting. This includes things like spatial data (neighborhood, census tract) and economic data (median income).
Joining this third-party data to the primary records can be complex, and currently involves running many CREATE TABLE AS statements. There's lots of room for error in this process.
We should be able to:
- Separate intermediate views from final views
- Factor out common logic in views for more clarity and better modularization
- Provide clear provenance for views in published documentation
- Document specific choices in our queries
See "Data validation issues" in the project README.
We should be able to:
- Add layers of validation on top of untyped varchar fields
- Automatically flag mistakes introduced by data entry
See "Lack of automated flagging" in the project README.
We should be able to:
- Notify teams automatically of potential data integrity issues
- Define a process to push output from our data catalog to the system of record
From the project README:
[People] ask things like "Is PROCNAME updated for this township?" or "Did sqoop pull the latest data last night?" We use CloudWatch for extremely basic logging and alerts (on failures), but we don't have a system to provide visibility into what data is recently updated.
We should be able to:
- Know when data has been updated
- See results of ETL jobs and debug them in cases of failure
- Notify ourselves of job failures or warnings
Our data stack is currently built on top of AWS Athena, in particular all of our views on top of iasWorld. Whatever tool we use for our catalog should work with Athena, or else we would need to rewrite all of our existing view code.
We should be able to:
- Define and document views in AWS Athena
After a decent amount of research and experimentation, we propose to build our data catalog and data integrity checks in dbt.
dbt is a "Data Build Tool" that allows users to define and document ETL workflows using common software engineering patterns like version control and modularization. For a longer introduction, see the docs.
For a fun take on what dbt is through the lens of what it is not, see this blog post.
There are two distinct tools that are both somewhat confusingly referred to as "dbt":
- dbt Cloud: The SaaS version of dbt, including a web view with CI/CD, orchestration, monitoring, RBAC, and an IDE
- Pros: Provides a full-featured, hosted data catalog service
- Cons: Super expensive (minimum $100/seat/mo); owned and operated by a VC-funded startup
- dbt Core: Open source CLI that dbt Cloud is built on top of
- Pros: Free and open source; users have full control and ownership
- Cons: Basically only handles DAG builds (equivalent to e.g. CMake); orchestration/monitoring has to be handled by another tool
dbt Cloud might save us time in the short run, but the process to get it approved and procured may take a number of months. It also seems risky to build our catalog on top of a VC-funded service, and the orchestration/monitoring tools it provides on top of the Core features are not high priority for us. As such, we think it would be more prudent for us to build with dbt Core and design our own orchestration/monitoring/authentication integrations on top. Hence, when this doc refers to "dbt", we are actually referring to dbt Core.
One downside of this choice is that we would have to choose a separate tool for orchestrating and monitoring our DAGs if we move forward with dbt. This is an important fact to note in our decision, because orchestrators are notoriously controversial:
Every other application, besides dbt, is an orchestrator. Every application pays the price for it. Every person hates every application as soon as it starts orchestrating. To orchestrate is to grapple with reality, to embrace mortality, to welcome death.
As such, we evaluate this choice with an eye towards options for third-party orchestration and monitoring.
Another downside is that dbt does not have robust support for the types of non-SQL scripted transformations we sometimes want to produce, like our sales value flagging script. There is currently an effort underway to provide better support for Python models in dbt, but only three data platforms have been supported since the launch of Python models in late 2022, and there is not yet a clear roadmap for the future development of Python models. As such, we will need to use a separate system to keep track of our scripted transformations. We provide a brief sketch of the design of such a system in the Tracking raw data and ML transformations section below.
See the
jeancochrane/dbt-spike
branch of this repository for an example of a simple dbt setup using our views.
Requirement | dbt | Notes |
---|---|---|
Referential integrity | ✅ | Via tests. |
Schema consistency | ✅ | Via tests. |
Documentation | ✅ | Via docs generator. |
Orchestration/automation | ❌ | Requires dbt Cloud, or integration with another service. |
Simpler joins/views | 🟡 | Can't simplify views, but can help factor them out and document them better. |
Data validation | ✅ | Via tests. |
Automated flagging | ❌ | Requires dbt Cloud, or integration with another service. |
Monitoring | ❌ | Requires dbt Cloud, or integration with another service. |
Athena integration | ✅ | Via dbt-athena-community plugin. |
After some discussion, we consider the lack of orchestration, automation, automated flagging, and monitoring to be acceptable. The only mission critical pieces are are flagging and monitoring, and we provide some thoughts on how to accomplish those goals using external services in the Design section below.
Here is a sketch of how we plan to implement a workflow for cataloging and validating our data using dbt:
- Our dbt configuration will be stored in a
dbt/
folder in thedata-architecture
repository. - View definitions will be stored in a
models/
subdirectory, with nested subdirectories for each schema (e.g.default/
,location/
). - Tests will be defined in a
schema.yml
file, and docs will be defined as doc blocks in adocs.md
file.- It may end up being wise to split these files up by schema.
- We will have two profiles, one for
dev
and one forprod
, which will point to two separate AWS data catalogs.- We may want to eventually namespace these by users, but we should push off that optimization until we run into conflicts.
- Data science team members will edit views by editing the model files and
making pull requests against the
data-architecture/
repository. - Team members will build views manually by running
dbt run
from the root of the folder. - In rare cases where we want to preserve the behavior of old views, we will enable dbt versioning for those views.
- Like views, documentation will be edited via pull request.
- Documentation will be rebuilt by running
dbt docs generate
from the root of the folder. - Documentation will be hosted as a static site on S3 with a CloudFront CDN.
- We will define a helper script in the repository to update documentation.
- As an optional improvement, we may decide to incorporate docs generation and upload into a CI/CD step (GitHub Action).
- Data integrity checks will run via the
dbt test
command, and will be defined as a GitHub Action workflow. - Data integrity checks will be run once per night after the sqoop operation completes.
- The sqoop process will trigger data integrity checks by issuing an API request to trigger the GitHub Action workflow.
- As an initial MVP, data integrity results will print to the console in the output of the GitHub Action workflow.
- As an initial MVP, we will subscribe to workflow notifications to get notified about results of data integrity checks.
- As an iterative improvement, we will design a more complicated result notification process using e.g. stored dbt test failures and AWS SNS for notification management.
- We will keep our raw data extraction scripts separate from the dbt DAG, per dbt's recommendation.
- Raw data will be referenced in dbt transformations using the
source()
function. - Source freshness checks will be used to ensure that raw data is updated appropriately prior to transformation.
- Where possible, the intermediate transformations defined in the
aws-s3/scripts-ccao-data-warehouse-us-east-1
subdirectory will be rewritten in SQL and moved into the dbt DAG. During the transition period, while some transformations are still written in R, we will treat their output as if it were raw data and reference it usingsource()
. Any transformations that can't be easily rewritten in SQL will continue to be defined this way in the long term. - Intermediate or final transformations that require CPU- or memory-intensive
operations like running machine learning models will be defined in Python,
run as AWS Glue jobs, and defined as ephemeral
models in the
dbt DAG. This will be true even in cases where the Glue jobs depend on
models produced by the dbt DAG, e.g. the tables produced by
model-sales-val
. A bullet below will explain how we will manage circular dependencies between these services. - Glue jobs will be kept under version control and deployed to AWS using
Terraform run in GitHub Actions on
commits to their repo's main branch. We will write a reusable composite
action
that performs the following operations:
- Runs
terraform apply
to recreate the Glue job definition in AWS 1. In doing so, inserts the current Git SHA as a command argument in the Glue job definition so that the job script can read the SHA and use it for versioning. 2. Supports creating staging jobs that we can use for testing during CI. - Uploads the newest version of the script to the proper bucket in S3
- Runs
- There will be three expected ways in which we handle dependencies between
dbt and Glue, depending on the direction of the dependency graph:
- In cases where dbt depends on the output of a Glue job (Glue -> dbt), we
will treat the Glue job output as an ephemeral model or
a source in the DAG and
schedule the job as necessary to maintain freshness.
- If we would like to rebuild the dbt models every time the Glue source data updates, we can schedule the job via GitHub Actions instead of the Glue job scheduler and configure GitHub Actions to rerun dbt in case of a successful Glue job run.
- In cases where a Glue job depends on the output of dbt (dbt -> Glue),
we will document the Glue job as an
exposure in the DAG.
Exposures should make use of the
depends_on
config attribute in order to properly document the lineage of the data created by Glue. If we would like to ensure that we run the Glue job every time the dbt source data updates, we can schedule the Glue job using a GitHub Actions workflow and configure the workflow to check the dbt state to see if it needs to be rerun. - In case of a circular dependency between dbt and Glue (dbt -> Glue ->
dbt), we will document the Glue job as an ephemeral
model in
dbt so that we can specify its dependencies using the
depends_on
attribute. If we would like to be able to build the entire DAG from scratch, including running the Glue jobs and transforming their output using dbt, we can separate the dbt config into two targets, use the second bullet approach above (dbt -> Glue) to trigger the Glue job once the first target has completed, and update the dbt wrapper script to initiate the second dbt target build once the Glue job has completed.- Any such wrapper script should also provide the caller with the option to skip running the Glue job if the AWS CLI can determine that the output of the Glue job already exists.
- The opposite circular dependency (Glue -> dbt -> Glue) should not require a special solution since it is just a combination of the first and second bullets above (i.e. one Glue job acting as a source and another acting as an exposure).
- In cases where dbt depends on the output of a Glue job (Glue -> dbt), we
will treat the Glue job output as an ephemeral model or
a source in the DAG and
schedule the job as necessary to maintain freshness.
- Designed around software engineering best practices (version control, reproducibility, testing, etc.)
- Free and open source
- Built on top of established, familiar tools (e.g. command-line, SQL, Python)
- Documentation and data validation out of the box
- No native support for Python or R scripting as a means of building models, so we can't incorporate our raw data extraction scripts
- We would need to use a community plugin for Athena support; this plugin is not supported on dbt Cloud, if we ever decided to move to that
- Requires a separate orchestrator for automation, monitoring, and alerting
- Tests currently do not support the same rich documentation descriptions that other entities do (see this GitHub issue)
- Given the way tests are defined in dbt, we expect each validation check to be implemented as a query that runs against Athena. According to experiments by other users, Athena has a soft concurrency limit of 4-5 queries at once (source). Given that table scans take ~10s in my experience, is this flow going to scale to the size of the validation we expect to need?
Click here for raw notes on dbt
- Requirements
- Referential integrity
- Built-in via tests (relationship)
- Schema consistency
- Documentation
- Docs
- Stored separately from the code, in YAML file
- Although SQL comments are supported, and inline descriptions of databases/tables/views are allowed
dbt docs generate
compiles docs,dbt docs serve
runs a server- There are also contracts
- But these seem too strict for our use case (basically build failures)
- We would likely want to set
enforced: false
- We would likely want to set
- But these seem too strict for our use case (basically build failures)
- Orchestration
- Views and joins
- Docs
- Can be defined and documented like other tables
- Data validation
- Tests can be any SQL query
- Two failiure levels:
fail
andwarn
- Automated flagging
- Nothing built-in; we would have to produce output views and then use the orchestration layer to define an automated process
- Monitoring
- Nothing built-in, this will likely need to be part of the orchestration layer
- Referential integrity
- We would have to use a community adapter to connect to Athena
- Migration may be challenging
- Not a verified adapter
- Web UI seems expensive
- Team is $100/seat/mo, and we likely need Enterprise features
- E.g. Team plan only allows 5 readonly users
- Core example uses Meltano for ETL
- Team is $100/seat/mo, and we likely need Enterprise features
- A lot of useful stuff is only available in Cloud, including:
- CI/CD
- Job scheduling
- Deploy environments
- Built-in IDE
- Job monitoring
- There are clearly a lot of orchestrators!
AWS Glue is an AWS service for ETL. It also provides utilities for data quality checks and data cataloging.
- Built-in to AWS, which we are already locked into
- High confidence that this service will last for a long time
- Pricing for data integrity checks is similar to running ETL tasks
- It may be a strategic risk to lock ourselves into AWS even further
- "Push logs to CloudWatch" and "query failures using Athena" do not give me confidence in ease of use
- No easy way of keeping code and configuration under version control
- No data documentation beyond what is visible in the AWS console
Click here for raw notes on AWS Glue
- (Getting started guide for data quality)[https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-from-the-aws-glue-data-catalog/]
- Version control integration seems like it requires some AWS spaghetti code
- As seen in the quickstart guide, authoring jobs seems very UI-heavy
- Schemas are defined as either JSON or protobuf
Great Expectations is an open source Python library for data validation. It is often used for data integrity in conjunction with dbt, most notably in the Meltano ELT stack.
- Simple design (data quality checks are just Python functions)
- More extensive documentation of tests than dbt
- Native Athena support
- Validation runs can output HTML or other custom rendering formats
- Documentation of non-test entities is less featureful than dbt (e.g. no lineage graphs)
- Can't build or version views; can only run integrity checks
- Only satisfies data validation requirement
- Custom checks are complicated and must be defined in Python
Apache Atlas is an open source data governance and metadata management service.
- Open source
- We could run it on prem
- Uncertain how well maintained it is, e.g. on 7/18 the last commit was on 6/13
- Integrity checks require a third-party package
Theoretically, we could use a large Excel spreadsheet to document our data, and write macros for running integrity checks.
- Easy for non-Data team members to operate
- Excel is likely to be supported for a long time
- Excel macros are an esoteric art and will probably be hard to debug and maintain in the long run
- No clear path forward for integrating with Athena