Skip to content

Connected Dataflow

Kannan edited this page Jan 8, 2019 · 15 revisions

Connected DataFlow

The Connected DataFlow strategy is to compose multiple independent/dependent dataflow task graphs into a single entity. A dataflow task graph consists of multiple subtasks which are arranged based on the parent-child relationship between the tasks. In general, a dataflow task graph consists of multiple vertices and edges to connect those vertices. The vertices represent the characteristics of computations and edges represent the communication between those computations. The simple representation of connected dataflow graph is shown in Fig. 1. It is not necessary that the structure of TG1 and TG2 are identical in nature.Fig.1: Connected Dataflow Graph

Hybrid Task Scheduler

The main objective of the hybrid task scheduler approach is to overcome the issues and limitations in the existing centralized and distributed task scheduler for processing the big data analytics applications. The centralized task scheduler creates huge traffic because all the requests have to go forward and backward from the workers to the central task scheduler that leads to a performance bottleneck. It provides poor scheduling decisions for long running jobs. However, the distributed task scheduler has the opportunity to make the scheduling decisions in a distributed way. However, the combination of centralized and the distributed task scheduler (hybrid task scheduler) achieves high performance by improving the throughput of the submitted tasks as well as it reduces the traffic. The centralized task scheduler acts as a controller which identify the task graphs available in the hierarchical task graph and schedule the task graphs to the workers. The distributed task scheduler which is running in every worker makes the scheduling decisions to run their individual tasks. The initial design of the hybrid task scheduler is shown in Fig. 2.

Fig.2: Hybrid Task Scheduler

Connected Dataflow Driver Program

There are two options to run the driver program

  • Architecture with a separate Driver
  • Architecture with a Driver in the JobMaster

Architecture With a Separate Driver

In this option, the driver program run along with the client program which is shown in Fig.3.

Fig.3: Architecture with Separate Driver

Architecture With a Driver In JobMaster

In this option, the driver program is running along with the Job Master which is shown in Fig.4.

Fig.4: Architecture with Driver in Job Master

Terminology

Job

A Job is a logical entity submitted by the client. We have the following types of Jobs

  • Allocate a set of resources, a data flow graph can be created inside these resources and run by the user, User programs the IWorker, this is the current mode of operation
  • User programs DataFlow graphs at the client and submit them to the workers. This is the DataFlow connected mode of operation. The client created DataFlow graphs are coarse grain graphs. These graphs are connected using DataSets. Fig. 5: Job

Driver

The driver program creates and submits dataflow graphs to the workers

  • Send messages to workers about the graphs to run
  • Receive messages from workers about the graphs completed
  • Change resources by asking the resource scheduler for different graphs

Scheduler

Does high-level scheduling on the DataFlow graphs, for example, if multiple graphs are submitted at once, which graph runs on which resources

DataFlow Executor

Responsible for running the coarse grain tasks from a submitted DataFlow graph.

Where should we run the Driver?

We have two options for running the Driver.

At the client

Advantages

It is easy to write the program in the submitting client main program

Disadvantages

There is a separate communication from Job Master to Client. So messages go from Workers -> JobMaster -> Driver

At the JobMaster

  • User programs the DataFlow in the IDriver interface.
  • Now it submits a job with a specific resource requirement.
  • We initialize the IDriver at the JobMaster and submits the DataFlows to the workers

Advantages

No need for additional communication

Disadvantages

The initial resources need to be specified by the user. These are then allocated and the driver is initialized

Considering these the second option seems to be the clear winner.

Communication Requirements

  • Driver should be able to send a message to the all workers or subset of workers
  • Driver should be able to receive messages from all workers or subset of workers

If Driver runs in the client

There is a communication link from Client -> JobMaster -> Workers

If Driver runs in JobMaster

The communication links are from JobMaster -> Workers

Connection Between DataFlows

The connection between DataFlows needs to happen through a DataSet abstraction. We haven’t finalized the details of this.

Modifications required in the present model

  • Connected Dataflow Scheduler - The scheduler has to parse the connected dataflow, identify the order of dataflow graph execution, and the resource requirements of each dataflow graph. (The present model uses the metagraph to identify the order of dataflow graph execution)
  • Connected Dataflow Driver - Grouping of workers based on the input from the scheduler.
  • Connected Dataflow Executor - The connected dataflow executor is a wrapper over the task executor which could have the ability to generate the execution plan for the connected individual dataflow graphs and execute it.
  • Connected Dataflow Proto file - We have to refactor this and remove the not required one.

Driver Interfaces

Following interfaces will be provided to the Driver for

  • scaling up/down workers in a job and
  • sending/receiving messages and events

IScaler Interface

The Driver will be provided with the following scaler object.

/**

  • An interface that scales up/down the number of workers in a Twister2 job */

public interface IScaler {

/**

  • whether this job is scalable
  • @return true if scalable */ boolean isScalable();

/**

  • add new instances of workers to the job
  • @param instancesToAdd
  • @return true if successful */

boolean scaleUpWorkers(int instancesToAdd);

/**

  • remove some instances of the workers from the job
  • @param instancesToRemove
  • @return true if successful */

boolean scaleDownWorkers(int instancesToRemove);

}

DriverMessenger Interface

The Driver will be provided with the following messenger object. Driver will send messages to workers with this object.

/**

  • A messenger interface to send messages from the driver to the workers in a job */

public interface IDriverMessenger {

/**

  • send a protocol buffer message to all workers in the job
  • @return */

boolean broadcastToAllWorkers(Message message);

/**

  • send a protocol buffer message to a list of workers in the job
  • @return */

boolean sendToWorkerList(Message message, List workerList);

/**

  • send a protocol buffer message to a worker in the job
  • @return */

boolean sendToWorker(Message message, Integer workerID); }

DriverJobListener Interface

The Driver will receive messages from the workers and events with the following interface.

public interface DriverJobListener {

/**

  • received a protocol buffer message from a worker
  • @param anyMessage received a message from the worker */

void workerMessageReceived(Any anyMessage, int senderWorkerID);

/**

  • this method is invoked when all workers joined the job initially
  • and also, after each scale up operation,
  • when all new workers joined the job, it is invoked
  • @param workerList */

void allWorkersJoined(List<JobMasterAPI.WorkerInfo> workerList);

}

Worker Interfaces

Workers will be provided with the following interfaces.

IWorkerMessenger Interface

Workers will use this object to send messages to the Driver.

/**

  • A messenger interface to send messages from a worker to the driver in a job */

public interface IWorkerMessenger {

/**

  • send a protocol buffer message to the driver in the job
  • @return */

boolean sendToDriver(Message message);

}

JobListener Interface

Workers will use this interface to get events and receive messages.

/**

  • IWorker should listen on this interface to get messages from the driver */

public interface JobListener {

/**

  • called when new instances of workers are added the job
  • @param instancesAdded */

void workersScaledUp(int instancesAdded);

/**

  • called when new instances of workers are removed from the job
  • @param instancesRemoved */

void workersScaledDown(int instancesRemoved);

/**

  • received a broadcast message from the driver
  • @param anyMessage received message from the driver */

void driverMessageReceived(Any anyMessage);

/**

  • this method is invoked when all workers joined the job initially
  • and also, after each scale up operation,
  • when all new workers joined the job, it is invoked
  • @param workerList */

void allWorkersJoined(List<JobMasterAPI.WorkerInfo> workerList);

}

Connected Dataflow Scheduler Interface

/* This scheduler interface is responsible for making scheduling decisions to identify the dataflow graph to run.*/

public interface IConnectedDataflowScheduler {

/* we should get the set of workers to run this dataflow*/

Set schedule(DataflowGraph graph);

}

Connected Dataflow Executor Interface

This would be an extension of TaskExecutor and an implementation of JobListener.

Connected Dataflow Proto Messages

message ExecuteMessage {

string name = 1;

tws.proto.job.Config config = 2;

bytes graph_serialized = 3;

WorkerPlan = 4

}

message ExecuteCompletedMessage { string subgraph_name = 1; }

message JobCompletedMessage { string htg_jobname = 1; }

Related Works

Hybrid/Centralized/Decentralized Task Schedulers

  • Hawk Scheduler
  • Sparrow Scheduler
  • Mercury: Hybrid Centralized and Distributed Scheduling in Large Shared Clusters
  • Apache Spark/Apache Flink Schedulers