-
Notifications
You must be signed in to change notification settings - Fork 0
Design (DRAFT)
Cargo is a special-purpose data processing and modification ETL (Extract-Transform-Load) pipeline specifically for the Cardano ecosystem.
- Extract - consume data from one or more sources
- Transform - process data to normalize into common formats
- Load - output data to destination
Since this is well explored area of computing, Cargo does not need to reinvent the wheel. Adoption of existing libraries, protocols, and industry patterns should be preferred.
Our primary goal is to create an application which can be used for simple use cases on a local development machine, yet be horizontally scalable at several key points while remaining interoperable with the overall software ecosystem. This design should consider multiple actors, from novice developers to experienced blockchain and Golang programmers.
- Cardano ecosystem support
- Read from cardano-node via NtC or NtN
- Configuration driven pipelines which can be shared as code
- DAG nodes are independently scalable
- Read once, process many design
- Support multiple pipelines from a single instance
- Ability to blend batch and real-time data
- Plugin based system to extend pipeline functionality
- Shipped functionality delivered as plugins
- Pluggable task execution engine
- Strong contracts at hand-off points
- Full application lifecycle management for pipelines
- Backup/Restore
- CRUD operations
- Metrics collection
- Persistent configuration
- Versioning
- Simple to use out of box experience
- cardano-db-sync compatible schema output to postgres
- Screen (Terminal) output
- P2P discovery
Cargo is an ETL implementation, so it should naturally be broken into three phases: Extract, Transform, and Load. These three phases are represented by the functionality available to plugins representing each phase. Cargo pipelines are described as a DAG (Directed Acyclic Graph) which gets converted internally into a graph of Task (vertices) and Queue (edges) nodes for execution within the three phases.
These phases are managed and coordinated in an application control plane.
This control plane is responsible for:
- Receiving API requests
- Validating Cargo pipelines
- Planning Task execution
- Planning Queue topology
- Task execution and life-cycle
- Queue execution and life-cycle
- Metrics collection
Data extraction is the process by which we bring data into Cargo. This can be accomplished using any number of various push or pull methodologies. The Extraction phase is represented in Cargo by source plugins. The source plugin contract is to source data from sources external to Cargo and to serialize them into a standardized data format for ingestion into Cargo to be emitted as an event.
Examples of data extraction use cases include fetching Cardano blockchain data from a running cardano-node, consuming data from a Kafka topic, replicating database data using change-data-capture, or receiving webhooks with custom payloads.
Data transformation processes data and is responsible for data cleaning, quality, and consistency. This can be accomplished through the use of a series of data filters or external program execution. The Transform phase is represented in Cargo by transform plugins. The transform plugin contract is to process one or more events from the event queue and emit the resulting events.
Examples of data transform use cases include dropping cryptographic hashes from Cardano blockchain block data, decoding/encoding specific fields, joining data from multiple sources, validating webhook payloads, or deduplicating data.
Cargo transforms are executed in order and multiple transforms may be chained in series to get the desired result event.
Data loading writes data to the target. This can be as simple as an output to the terminal or a data warehouse. The Load phase is represented in Cargo by sink plugins. The sink plugin contract is to process a single event from the event queue and load it into the final data store.
Examples of data load use cases include writing to PostgreSQL in cardano-db-sync schema, publishing events to Kafka topics, sending a Tweet, or triggering a Lambda function with a given payload.
Cargo Queue nodes are the edges in a DAG. They are the transport medium between the Task nodes. The Queue nodes in Cargo are managed by the control plane, making them transparent to the end user, and they should not be directly accessed.
Cargo is designed to be usable as a single binary (and plugins) from the command line, yet be horizontally scalable.
Simple terminal output example
./cargo --pipelines=terminal.yaml
A Cargo workflow is rather simple.
- Cargo is launched and loads its API service
- One or more Cargo pipelines are submitted to the API
- Pipelines are validated
- DAG is resolved for each pipeline
- Queue nodes are launched
- Task nodes are launched
- Control plane monitors Queue and Task nodes
On shutdown, Cargo will ensure Task nodes are stopped and flushed to the persistent Queue nodes and stored cursors updated.
Cargo's main configuration file is cargo.yaml
which can be used to override any configuration in the application defaults. These configuration options can be overridden from the environment. This configuration controls the behavior of the Cargo application, including the configuration of available Queue providers and Task execution engines.
-
CARGO_HOME
-${HOME}/.cargo
-
CARGO_CONFIG_FILE
-${CARGO_HOME}/cargo.yaml
-
CARGO_PLUGIN_DIR
-${CARGO_HOME}/plugins
The Hashicorp go-plugin
library will be used to facilitate bidirectional communication between the control plane process and each plugin instance using gRPC. Plugins can either be compiled-in to the main binary or provided as separate binaries. A compiled-in plugin will execute within the control plane process as a separate Goroutine but still communicate via RPC.
Each plugin type will have its own well-defined interface. Because this interface uses gRPC, this will allow plugins to be implemented in languages other than Go.
Queue provider plugins provide an interface for a queue/messagebus system for communication between tasks.
The RPC interface will require functions for:
- creating/deleting a queue
- get/put of messages on a queue
- utility function to build queue name based on input (take list of components and join with separator character)
The initial list of queue providers will include:
- in-memory
- RabbitMQ/AMQP
- Kafka
Task execution plugins provide support for multiple methods for executing a task.
The RPC interface will require functions for:
- running an arbitrary command
- copying a file (such as for remote plugin execution)
The initial list of task execution providers will include:
- local (execute directly)
- k8s (run process in a Kubernetes pod)
- SSH (run process on a remote host via SSH)
Data source plugins provide input data for a pipeline.
The RPC interface will require functions for:
- submitting input data (plugin to control plane)
- starting/stopping (control plane to plugin)
The initial list of data source plugins will include:
- Cardano blockchain
- webhook
- execute external command
- fetch URL
Transform plugins provide composable filter/transform actions for data flowing through a pipeline.
The RPC interface will require functions for:
- performing transform (control plane to plugin)
- get/set on shared metadata cache (plugin to control plane)
The initial list of transform plugins will include:
- Cardano block/transaction data
Output sink plugins allow you to output data from the pipeline.
The RPC interface will require functions for:
- outputting data
The initial list of output sink plugins will include:
- Database (initially using the
cardano-db-sync
schema) - Kafka
- RabbitMQ/AMQP
DRAFT: @wolf31o2 @agaffney