Skip to content
This repository has been archived by the owner on Feb 14, 2021. It is now read-only.

Aggregations #19

Open
wants to merge 6 commits into
base: api-design
Choose a base branch
from
Open

Aggregations #19

wants to merge 6 commits into from

Conversation

anovstrup
Copy link

This pull request adds various aggregation operations. There are variants that produce cumulative results (scan, scanAggregate, and scanAggregateBy) and ones that operate over bounded windows (fold, aggregate, and aggregateBy). Also included in the pull request are basic syntax/ops for tuples and for strings, which will be necessary to implement a complete WordCount example. The essence of the word count example is implemented in the pull request as a handful of defs in the SimpleExample module, but a complete implementation in WordCount.scala is not included.


def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)(initial: Unit =>: V)(
f: (V, A) =>: V
): F[(K, V)]
def distinct[A](ds: F[A])(window: Window): F[A]
Copy link
Author

@anovstrup anovstrup Nov 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cumulative/streaming variant of the distinct operation could also be provided. Like scanAggregateBy, it would in general require unbounded space (i.e., space required would depend on the number of distinct elements).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exact streaming dedupe is guaranteed to run out of space eventually assuming input is unbound on the keyspace. I don't know if this is something we want to directly expose... Forcing a window means we offer a safe API that won't crash with OOM errors. What do others thing about this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the framework should assume that input is unbounded on the key-space. Note that if a streaming distinct is ruled out, then so are scanAggregate and scanAggregateBy (all of them require space proportional to the number of distinct values/keys).

@@ -207,6 +304,18 @@ trait AnalyticsModule {
implicit def short[A](v: scala.Short): A =>: Short
implicit def instant[A](v: java.time.Instant): A =>: java.time.Instant
implicit def localDate[A](v: java.time.LocalDate): A =>: java.time.LocalDate
implicit def tuple2[A, B, C](t: (A =>: B, A =>: C)): A =>: (B, C)

implicit def tuple2Lift[A, B, C](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice I did something similar in my branch. I don't think this works for arbitrarily nested tuples though does it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't test. The tuple stuff is basically ripped straight out of your branch, with minor changes (e.g., I used FanOut instead of a new RowFunction).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah... I just looked - I never pushed the more complex version I was working on because I never quite got it working haha. Think we might need a bit of Scala magic to make the implicits work for us


def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)(initial: Unit =>: V)(
f: (V, A) =>: V
): F[(K, V)]
def distinct[A](ds: F[A])(window: Window): F[A]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exact streaming dedupe is guaranteed to run out of space eventually assuming input is unbound on the keyspace. I don't know if this is something we want to directly expose... Forcing a window means we offer a safe API that won't crash with OOM errors. What do others thing about this?

def map[A, B](ds: F[A])(f: A =>: B): F[B]
def flatMap[A, B](ds: F[A])(f: A =>: DataSet[B]): F[B]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to force a Dataset here? I get that its a substitute for a "Finite set of data" but theoretically you can flatmap a stream as well - it just won't terminate unless its a finite stream.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually against this, IMO neither, DataSet nor DataStream should be Monads, because that would mean you can no longer statically analyze them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@camjo Yeah, I was trying to make the bounded-ness explicit, because I couldn't come up with a good reason to allow an unbounded stream to be inserted into a stream. Given that it is more constrained than an ordinary flatMap in the case of DataStream, maybe it would be better to name it something else or, as @LukaJCB suggested, leave it out entirely.

I added it in order to implement the word count example (i.e., words = lines.flatMap(_.split(" "))), but another approach (if we don't want monadic DataSet/DataStream types) would be to provide an operation like Spark's explode function (e.g., def explode[A](ds: F[List[A]]): F[A]). The split syntax would need to return a String =>: List[String], and the words example would become words = lines.map(_.split(" ")).explode. On the other hand, I'm not sure that the concern about static inspection is applicable in this context, since our "functions" themselves are inspectable values (not general Scala functions).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In https://github.com/LukaJCB/analytics I used concatMap which uses A => List[B] :)


// Bounded
def fold[A, B](ds: F[A])(window: Window)(initial: A =>: B)(f: (B, A) =>: B): F[B]
def scanAggregateBy[A, K, V](ds: F[A])(g: A =>: K)(initial: Unit =>: V)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure what this is. Is it a group by followed by a scan on each group? The F[(K, V)] seems to indicate that its a group by with a fold on each group? Is it meant to be emulating F[(K, F[V])] or something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it was meant to be a group by followed by a scan over each group. So for each element a of the original stream, the resulting stream would have an element (g(a), v), where v is a cumulative value for group g(a).

// (These ops can only be performed on bounded subsets of data)
def fold[A, B](ds: F[A])(window: Window)(initial: Unit =>: B)(f: (B, A) =>: B): F[B]

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)(initial: Unit =>: V)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this exactly? Can it be broken into more fundamental pieces? GroupBy + Fold or something perhaps?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's meant to be a group by followed by a fold over each group.

Copy link
Author

@anovstrup anovstrup Dec 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I modeled grouping+aggregation this way on the basis of comments like this one in the dataset design discussion (#4) that questioned the utility of separating grouping and aggregation.

The other day, I toyed with the idea of implementing the aggregateBy operation in terms of separate groupBy and fold operations:

def groupBy[A, K](ds: F[A])(g: A =>: K): F[Grouped[K, A]]

def fold[A, B](ds: F[A])(window: Window)(initial: Unit =>: B)(f: (B, A) =>: B): F[B]

This doesn't quite work:

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
                        (initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
  fold(groupBy(ds)(g))(window)(initial)(f)

(The types don't quite line up, even if you can convert f from a (V, A) =>: V to a (V, Grouped[K, A]) =>: V, because you want the fold to result in an F[(K, V)] but the initial value for the fold is a V and not a (K, V).)

Instead, you can introduce a separate groupFold operation:

def groupFold[A, K, V](ds: F[Grouped[K, A]])(window: Window)
                      (initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)]

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
                        (initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
  groupFold(groupBy(ds)(g))(window)(initial)(f)

All that said, it's not obvious to me that there's real value in factoring out the groupBy.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach that might work, without requiring separate methods like groupFold, would be to allow the fold to produce an F[Grouped[K, V]] rather than an F[(K, V)]:

def groupBy[A, K](ds: F[A])(g: A =>: K): F[Grouped[K, A]]

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
                        (initial: Unit =>: V)(f: (V, A) =>: V): F[Grouped[K, V]] = {
  val initial2: Unit =>: Grouped[K, V] = ??? // derive from `initial`
  val f2: (Grouped[K, V], Grouped[K, A]) =>: Grouped[K, V] = ??? // derive from `f`
  fold(groupBy(ds)(g))(window)(initial2)(f2)
}

This approach might make the case for factoring out groupBy a bit more compelling, since it can reuse a few aggregation primitives (e.g., fold, scan) rather than requiring Grouped-specific aggregation operations like groupedFold, groupedScan, etc. Even then, however, it's not clear that the added complexity pays for itself.

The tupled representation can be recovered with an ungroup operation:

def ungroup[K, V](ds: F[Grouped[K, V]]): F[(K, V)]

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
                        (initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
  ungroup(fold(groupBy(ds)(g))(window)(initial2)(f2)) 
  // where initial2 and f2 are transformations as above

@@ -151,6 +182,9 @@ trait LocalAnalyticsModule extends AnalyticsModule {
case class Split(f: RowFunction, g: RowFunction) extends RowFunction
case class Product(fab: RowFunction) extends RowFunction
case class Column(colName: String, rType: Reified) extends RowFunction
case class ExtractNth(reified: Reified, n: Int) extends RowFunction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting approach. So arbitrary length tuples are supported with this?

empty[(Int, String)]
.map(_ => (4, false)) // tuple of literals works
.map(s => (3, s._1)) // tuple with right side projection
.map(s => (s._2, "")) // tuple with left side projection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling if you do something like

.map(s => (4, (s._1, (5, s._2)))) the implicits won't work here. (Haven't tested to confirm yet though).

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants