Skip to content
This repository has been archived by the owner on Feb 14, 2021. It is now read-only.

Dataset design #4

Open
camjo opened this issue Aug 5, 2018 · 15 comments
Open

Dataset design #4

camjo opened this issue Aug 5, 2018 · 15 comments
Labels
help wanted Extra attention is needed

Comments

@camjo
Copy link
Contributor

camjo commented Aug 5, 2018

Creating this issue to discuss the detailed design of Dataset[A] (and potentially DataStream[A]).

As discussed in the meeting with John, I think its worth thinking through what this API would look like as both batch and stream. We can go severals ways with this:

The first would be a unified API like Spark's Dataset (and structured streaming).
https://spark.apache.org/docs/2.3.1/structured-streaming-programming-guide.html#programming-model

Another would be Flink's Dataset/DataStream API (which are build on top of their stateful streams abstraction).
https://ci.apache.org/projects/flink/flink-docs-release-1.5/concepts/programming-model.html

I'd like to flesh out what this API should look like and how it should function in more detail here.

This was referenced Aug 5, 2018
@camjo camjo added the help wanted Extra attention is needed label Aug 5, 2018
@SemanticBeeng
Copy link

SemanticBeeng commented Aug 8, 2018

Do we agree this cannot be separated from the data schema discussion?

In the above I guess that A would be the data schema at a logical level but given that apache arrow is in scope (great!) one can conclude we need to consider implementation reuse and try to "lift" them into design.

Concretely, apache spark, apache arrow, avro and parquet all have elements of data schema.

Then apache arrow also implies interoperability with Python, C++ and even R are in scope (this should).
Are they?

If so then we can consider data schema support from numpy, xarray , xtensor, etc.

Lastly we have jep and its ability to "zero copy" share memory between JVM and CPython runtime.

So I think we need to include the above in scope - thoughts please?

@camjo
Copy link
Contributor Author

camjo commented Aug 9, 2018

@SemanticBeeng If I'm understanding you correctly, I think the data schema you're referring to can be separated from the API design discussion. What we are aiming to do here is define an API that describes the operations we want to support. The actual data formats in memory or on disk are a separate concern and will occur when we interpret the Dataset DAG to some back end (at which point this will become an important discussion).

@diminou
Copy link

diminou commented Aug 9, 2018

Having a unified API for both streaming and batch use cases means forbidding several useful operations on batch datasets (computing count and other basic aggregates come to mind; Spark forbids some), as they are meaningless for streams. However, those same operations are very practical for quick explorations of batches from the REPL.
It could be useful to have a streaming API by default with extensions for the case where the data are not a stream, so that it the users doing batch-mode analysis may benefit from the ergonomic standards other tools provide.
So, a Dataset[A] may be manipulated in any way a DataStream[A] may, but being tailored for batch data, it would have additional simplified operations, like count returning a number instead of a stream.

@camjo
Copy link
Contributor Author

camjo commented Aug 9, 2018

Yeah so ultimately, the low level description language probably makes sense to be streaming based since streams are a superset of batch operations: https://data-artisans.com/blog/batch-is-a-special-case-of-streaming

Ultimately the user facing APIs will just be a way to construct this lower level description of a program. I think that exposing two APIs (Dataset[A] and DataStream[A]) might be the right way to go. My thinking on why is:

  • Both of these can easily translate down to the actual grammar we want to interpret and manipulate.
  • Whilst batch is technically a special case of streaming, its still much easier to program batch logic without worrying about streaming concepts. This also means the APIs will be more streamlined for purpose rather than being cluttered with windows etc when all you're trying to do is read and count * on a file.
  • We can translate between the two API if/when it makes sense (not unlike the KStreams API for example which exposes a KTable and a KStream and lets you translate between them - though we need to consider the semantics of this translation because they treat a KTable as a point in time aggregation of a KStream (i.e. their "batch" api is still technically a streaming api. You can't make assumptions that the KTable is "fully loaded" etc))
  • We still have the ability (like flink) to layer higher level abstractions on top such as the "Table API" which unifies the batch and streaming models if thats what people want by constraining the set of things that can be done (similar as you mentioned to spark).

Thoughts?

@diminou
Copy link

diminou commented Aug 10, 2018

@camjo absolutely, that would buy us the ability to implement operations over Dataset separately from DataStream and without thinking about streaming implications. And when both are sufficiently complete, a rewrite of some of the Dataset's API in terms of DataStream's could help us reduce duplication.
In the meantime, if needed, an umbrella (such as flink's Table API you mentioned) would aggregate minimal common API operations.

@camjo
Copy link
Contributor Author

camjo commented Aug 24, 2018

I have a very rough API design proposal here: https://gist.github.com/camjo/10cb0f25b9da10f08f9b30cbd9419985

I've done a lot of thinking about the nature of batch and stream and what it means to do computation on each as well as how we should expose it as an API.

I think it fundamentally boils down to data vs. computation. Data can roughly be conceptualised as either Bounded or Unbounded. Computation can be thought of as (online/streaming/unbounded) vs. (offline/batch/bounded). The Gist explains what I mean by this in more detail. Hopefully you can follow the types to understand how the various concepts fit together.

There are still a fair few gaps listed as // todo throughout the gist.

@jdegoes Would be keen to get your thoughts on this so far. Would it be easier as a PR that we can work on improving before merging or is a gist fine for now?

Perhaps if this seems like a suitable direction, it can form the basis of another design session. I'm aware that its not implemented in terms of =>: etc yet but I can clean that up shortly if the general structure looks correct.

@jdegoes
Copy link
Member

jdegoes commented Aug 24, 2018

@camjo Another session works, too.

Some quick thoughts:

  1. We don't need GroupedDataset, we should use Dataset[Map[K, V]] for this, or perhaps, Dataset[Grouped[K, V]], where Grouped is our own type. Then we can define a family of aggregations Grouped[K, V] =>: V2 (we have to constrain aggregations some way rather than take in Scala closures, but we can still be very flexible if necessary).
  2. Similarly, I don't think we need WindowedDataStream, I think we can use DataStream[Grouped[K, V]].
  3. In general, move away from the Scala closures. We may have to do some design work here.
  4. Overall the methods between Dataset and DataStream look so similar, I'm tempted to unify them. Then we just have something like Dataset[Window[A, B]] for a windowed view, and some operations only work on that.

@camjo
Copy link
Contributor Author

camjo commented Aug 24, 2018

@jdegoes

  1. Hmm thats an interesting idea. I initially thought "it should be Grouped[K, List[V]] but perhaps you're thinking of a more general Aggregatable structure for V that can be used by implementing modules?

  2. I'm not sure that this fully captures all the possible window semantics. e.g. how do we represent Sliding vs. rolling vs. hopping windows if we only have the equivalent of a Map[K, V]? (See below for why I think we need at least 3 distinct cases - though I think we can potentially collapse GroupedDataset as you suggested). Either way it seems like we would need to treat DataStream[Grouped[K, V]] as if it were a separate case anyway. Is this just renaming types rather than removing the concept of WindowedDataStream?

  3. Agree - this is just a quick and dirty impl to work out the heirachy of ops. Will update this shortly.

  4. Yeah so I spent quite a while thinking through this lately. My first prototype actually unified them but it's really challenging to express the nature and computations of both batches and streams using only streams. I agree its a tempting goal that we should explore but I actually think you lose the ability to talk fully about one vs. the other in detail if you do that. Happy to be proven wrong here.

The issue seems to be that you have 3 cases caused by the data/computation split.

Data   |   Computation
-------------------------------------------------------------
batch  |   bounded (map/filter/fold/distinct etc)
stream |   bounded (windowed, e.g. fold/distinct/average etc)
stream |   unbounded (e.g. map/filter etc)

And consequently

  • A bounded computation on a batch source is always a batch output.
  • A bounded computation on a stream (i.e. windowed) actually returns another stream (think sliding sum as an example)
  • An unbounded computation on a stream also returns another stream.

Note specifically that a bounded computation on a batch doesn't seem to have the same signature as a bounded computation on a stream.

The alternative here would be to say "ok lets treat a batch as a finite stream and collapse them". Ok sure, but you're not removing any of those cases, you're simply renaming "batch" to "finite stream". The 3 cases of semantics still need to exist. Perhaps there is a more precise encoding that I'm missing though?

In terms of prior work:

  • Spark added streaming semantics to their Dataset API which we could take a look at in terms of its implementation/limitations.
  • Kafka Streams treats everything as a stream but doesn't give a language for bounded streams (at least that I'm aware of) which makes it hard to enforce the kinds of semantics we want in an analytics engine.
  • Flink has two APIs that sit on top of a Structured Streaming core - which is an arbitrary processing node that can choose how many messages it processes at a time (effectively allowing it to emulate batch processing - though I'm not yet clear how they mark that a batch is all there. This could be where to look for ideas on how to express a batch in a streaming system).
  • Beam treats everything as either a stream or a table. I extracted the essence of the heirachy in this gist: https://gist.github.com/camjo/75ca375d6569b790024016b75d3f6e90.

Thoughts?

@idc101
Copy link

idc101 commented Aug 26, 2018

r.e. GroupedDataset, from a usability point of view I found Spark's equivalent hard to work with and usually dropped back to Spark's un-typed DataFrames for group by/aggregations. It would be interesting to explore Dataset[Grouped[K, List[V]]] further to see how usable it would be.

@camjo
Copy link
Contributor Author

camjo commented Aug 26, 2018

@idc101
Just thinking about it, what is the purpose of grouping a dataset without aggregating it? Do we gain anything special by exposing it as a new state? Perhaps we can just remove that transition entirely by making every groupby have an aggregate at construction time? AFAIK SQL doesn't allow you to group by without an aggregation of some sort either.

It seems to me that the usage of a group by in frameworks that expose it with an intermediate structure like GroupedDataset falls into one of two buckets (please tell me if I’ve missed something here):

  1. A fancy DSL for adding aggregations (e.g. gds.min('col1).max('col4)
  2. Getting a reference to the grouped dataset to use it as a new “source” for a set of branching aggregations.
    e.g.
val dataset = ???
val grouped = ds.groupBy(a => a.b)
val output1 = grouped.agg(min, max)
val output2 = grouped.agg(sum, count)
// use output1 and output2 separately

The thing is, other libraries/frameworks can't express it like this:

val dataset = ???
val output1 = ds.groupBy(a => a.b, min, max)
val output2 = ds.groupBy(a => a.b, sum, count)

Reason being is that they allow arbitrary functions in the group by clause so they can't optimise common aggregates. Since we capture functions and reify everything we can actually provide that API and still optimise it to run more like the first code snippit above by comparing equality of the functions.

This would remove that state entirely.

Can anyone see why this might not work?

I wonder if we could even apply the same logic to WindowedDataStream as well? I haven't thought that through properly yet.

@Umko
Copy link

Umko commented Aug 28, 2018

I like the idea of data vs computation split, since the nature of computation on a stream is often very different from the one on a batch resulting in different optimisation strategies which might be applied and different expectations to a runtime.

AFAIK SQL doesn't allow you to group by without an aggregation of some sort either.

you can - if you group by all columns, result will be an equivalent of a distinct (surprisingly producing more efficient query plan in some cases)

@jdegoes
Copy link
Member

jdegoes commented Aug 28, 2018

I think possibly windowed and grouped are aspects of the same thing. The only reason I put emphasis on this is that many sql streaming libraries provide the same set of operators for both batch and streaming computations — albeit they have different interpretations.

I do think it's a good point that "group by" only really exists for purposes of aggregation, so may not even need an intermediate representation.

@robfitzgerald
Copy link

Hi, potential support dev here. Not sure where this discussion is at this point, but here is yet another stab at framing the API discussion, which is that we could divide DataSet into InfiniteDataSet and FiniteDataSet types. I don't know all of the problems we really need to solve here, but here are some ideas on how this would flesh out.

InfiniteDataSet would be a high-level trait describing both the batch and streaming case. StreamDataSet would really just be an instance of BatchDataSet from a high-level perspective, treated as the case where input batch size is exactly 1. These would both expose a batchSize value but no count() method, and appropriate analytic methods (i.e. rolling average) with underlying implementations optimized to streams and batches.

FiniteDataSet would be the case where a complete DataSet has been loaded into Scalaz-Analytics, such as from disk/Hadoop. This would expose a count() method and appropriate analytic methods (i.e. arithmetic mean). Its underlying implementation would still be lazy, of course. This would match most use cases from the Python Pandas/R scene, but provide the same API for local and distributed workloads, making it effortless to scale analytical models.

PS., I agree with the suggestion that Grouped may not need to be exposed to the user - I can't think of the last time I needed a GroupedRDD in Spark or didn't immediately follow with .mapValues(). Does this imply we will expose convenience methods like groupByKey() and even reduce() (because it is more recognizable than fold for the MapReduce people), or will we simply provide general-use aggregation combinators like fold() and aggregate()? Or both? If we are "simple and principled" (from the intro page), perhaps drop the convenience methods, and speak to how Scalaz-Analytics provides the minimal set of operations to perform all performant operations over a distributed data set (with examples in documentation).

@camjo
Copy link
Contributor Author

camjo commented Sep 2, 2018

Thank you to everyone who has left feedback so far. I've created a PR with my new proposal over here: #14. I'm pretty sure it addresses all the comments left so far as well as my own gripes with the versions I had proposed earlier.

Please take a look and make sure to comment if you have ideas/issues! :)

@jdegoes

@SemanticBeeng
Copy link

SemanticBeeng commented Nov 22, 2018

4. Overall the methods between Dataset and DataStream look so similar, I'm tempted to unify them. Then we just have something like Dataset[Window[A, B]] for a windowed view, and some operations only work on that

+1 for unifying and introducing data window in the design.
Insights from creators of hoodie about " stream processing, incremental processing, and batch processing" seem to support this https://www.oreilly.com/ideas/ubers-case-for-incremental-processing-on-hadoop.

Having a unified API for both streaming and batch use cases means forbidding several useful operations on batch datasets

@diminou this article informs about streaming vs batch with "incremental" in between: you may find it interesting.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

7 participants