Skip to content
This repository has been archived by the owner on Feb 14, 2021. It is now read-only.

Aggregations #19

Open
wants to merge 6 commits into
base: api-design
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 117 additions & 8 deletions src/main/scala/scalaz/analytics/AnalyticsModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,33 @@ trait AnalyticsModule {
* The DataSet/DataStream operations of scalaz-analytics
*/
trait Ops[F[_]] {
// Unbounded
// Unbounded dataset ops
// (These ops require only bounded space, even with unbounded data)
def map[A, B](ds: F[A])(f: A =>: B): F[B]
def flatMap[A, B](ds: F[A])(f: A =>: DataSet[B]): F[B]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to force a Dataset here? I get that its a substitute for a "Finite set of data" but theoretically you can flatmap a stream as well - it just won't terminate unless its a finite stream.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually against this, IMO neither, DataSet nor DataStream should be Monads, because that would mean you can no longer statically analyze them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@camjo Yeah, I was trying to make the bounded-ness explicit, because I couldn't come up with a good reason to allow an unbounded stream to be inserted into a stream. Given that it is more constrained than an ordinary flatMap in the case of DataStream, maybe it would be better to name it something else or, as @LukaJCB suggested, leave it out entirely.

I added it in order to implement the word count example (i.e., words = lines.flatMap(_.split(" "))), but another approach (if we don't want monadic DataSet/DataStream types) would be to provide an operation like Spark's explode function (e.g., def explode[A](ds: F[List[A]]): F[A]). The split syntax would need to return a String =>: List[String], and the words example would become words = lines.map(_.split(" ")).explode. On the other hand, I'm not sure that the concern about static inspection is applicable in this context, since our "functions" themselves are inspectable values (not general Scala functions).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In https://github.com/LukaJCB/analytics I used concatMap which uses A => List[B] :)

def filter[A](ds: F[A])(f: A =>: Boolean): F[A]
// Streaming aggregations
// (These ops produce cumulative results, some may require unbounded space)
def scan[A, B](ds: F[A])(initial: Unit =>: B)(f: (B, A) =>: B): F[B]

// Bounded
def fold[A, B](ds: F[A])(window: Window)(initial: A =>: B)(f: (B, A) =>: B): F[B]
def scanAggregateBy[A, K, V](ds: F[A])(g: A =>: K)(initial: Unit =>: V)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure what this is. Is it a group by followed by a scan on each group? The F[(K, V)] seems to indicate that its a group by with a fold on each group? Is it meant to be emulating F[(K, F[V])] or something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it was meant to be a group by followed by a scan over each group. So for each element a of the original stream, the resulting stream would have an element (g(a), v), where v is a cumulative value for group g(a).

f: (V, A) =>: V
): F[(K, V)]

// Bounded dataset ops
// (These ops can only be performed on bounded subsets of data)
def fold[A, B](ds: F[A])(window: Window)(initial: Unit =>: B)(f: (B, A) =>: B): F[B]

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)(initial: Unit =>: V)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this exactly? Can it be broken into more fundamental pieces? GroupBy + Fold or something perhaps?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's meant to be a group by followed by a fold over each group.

Copy link
Author

@anovstrup anovstrup Dec 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I modeled grouping+aggregation this way on the basis of comments like this one in the dataset design discussion (#4) that questioned the utility of separating grouping and aggregation.

The other day, I toyed with the idea of implementing the aggregateBy operation in terms of separate groupBy and fold operations:

def groupBy[A, K](ds: F[A])(g: A =>: K): F[Grouped[K, A]]

def fold[A, B](ds: F[A])(window: Window)(initial: Unit =>: B)(f: (B, A) =>: B): F[B]

This doesn't quite work:

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
                        (initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
  fold(groupBy(ds)(g))(window)(initial)(f)

(The types don't quite line up, even if you can convert f from a (V, A) =>: V to a (V, Grouped[K, A]) =>: V, because you want the fold to result in an F[(K, V)] but the initial value for the fold is a V and not a (K, V).)

Instead, you can introduce a separate groupFold operation:

def groupFold[A, K, V](ds: F[Grouped[K, A]])(window: Window)
                      (initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)]

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
                        (initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
  groupFold(groupBy(ds)(g))(window)(initial)(f)

All that said, it's not obvious to me that there's real value in factoring out the groupBy.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach that might work, without requiring separate methods like groupFold, would be to allow the fold to produce an F[Grouped[K, V]] rather than an F[(K, V)]:

def groupBy[A, K](ds: F[A])(g: A =>: K): F[Grouped[K, A]]

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
                        (initial: Unit =>: V)(f: (V, A) =>: V): F[Grouped[K, V]] = {
  val initial2: Unit =>: Grouped[K, V] = ??? // derive from `initial`
  val f2: (Grouped[K, V], Grouped[K, A]) =>: Grouped[K, V] = ??? // derive from `f`
  fold(groupBy(ds)(g))(window)(initial2)(f2)
}

This approach might make the case for factoring out groupBy a bit more compelling, since it can reuse a few aggregation primitives (e.g., fold, scan) rather than requiring Grouped-specific aggregation operations like groupedFold, groupedScan, etc. Even then, however, it's not clear that the added complexity pays for itself.

The tupled representation can be recovered with an ungroup operation:

def ungroup[K, V](ds: F[Grouped[K, V]]): F[(K, V)]

def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
                        (initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
  ungroup(fold(groupBy(ds)(g))(window)(initial2)(f2)) 
  // where initial2 and f2 are transformations as above

f: (V, A) =>: V
): F[(K, V)]
def distinct[A](ds: F[A])(window: Window): F[A]
Copy link
Author

@anovstrup anovstrup Nov 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cumulative/streaming variant of the distinct operation could also be provided. Like scanAggregateBy, it would in general require unbounded space (i.e., space required would depend on the number of distinct elements).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exact streaming dedupe is guaranteed to run out of space eventually assuming input is unbound on the keyspace. I don't know if this is something we want to directly expose... Forcing a window means we offer a safe API that won't crash with OOM errors. What do others thing about this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the framework should assume that input is unbounded on the key-space. Note that if a streaming distinct is ruled out, then so are scanAggregate and scanAggregateBy (all of them require space proportional to the number of distinct values/keys).

}

/**
* The standard library of scalaz-analytics.
*/
trait StandardLibrary {
trait StandardLibrary extends TupleLibrary with StringLibrary {
def id[A: Type]: A =>: A
def compose[A, B, C](f: B =>: C, g: A =>: B): A =>: C
def andThen[A, B, C](f: A =>: B, g: B =>: C): A =>: C
Expand All @@ -121,6 +135,24 @@ trait AnalyticsModule {
def product[A, B](fab: A =>: B): (A, A) =>: (B, B)
}

trait TupleLibrary {
def fst[A: Type, B]: (A, B) =>: A
def snd[A, B: Type]: (A, B) =>: B
}

trait StringLibrary {
def strSplit(pattern: String): String =>: DataSet[String]
def strConcat: (String, String) =>: String
}

/**
* A DSL for string operations
*/
implicit class StringSyntax[A](val l: A =>: String) {
def split(pattern: String): A =>: DataSet[String] = l >>> stdLib.strSplit(pattern)
def concat(r: A =>: String): A =>: String = (l &&& r) >>> stdLib.strConcat
}

trait Numeric[A] {
def typeOf: Type[A] // underlying repr

Expand Down Expand Up @@ -156,11 +188,37 @@ trait AnalyticsModule {
def map[B: Type](f: (A =>: A) => (A =>: B)): DataSet[B] =
setOps.map(ds)(f(stdLib.id))

def flatMap[B: Type](f: (A =>: A) => (A =>: DataSet[B])): DataSet[B] =
setOps.flatMap(ds)(f(stdLib.id))

def filter(f: (A =>: A) => (A =>: Boolean)): DataSet[A] =
setOps.filter(ds)(f(stdLib.id))

def fold[B: Type](init: A =>: B)(f: (B, A) =>: B): DataSet[B] =
setOps.fold(ds)(Window.GlobalWindow())(init)(f)
def scan[B: Type](initial: Unit =>: B)(f: ((B, A) =>: (B, A)) => ((B, A) =>: B)): DataSet[B] =
setOps.scan(ds)(initial)(f(stdLib.id))

def scanAggregate[V: Type](
initial: Unit =>: V
)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataSet[(A, V)] =
scanAggregateBy(identity)(initial)(f)

def scanAggregateBy[K, V: Type](
g: (A =>: A) => (A =>: K)
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataSet[(K, V)] =
setOps.scanAggregateBy(ds)(g(stdLib.id))(initial)(f(stdLib.id))

def fold[B: Type](init: Unit =>: B)(f: ((B, A) =>: (B, A)) => ((B, A) =>: B)): DataSet[B] =
setOps.fold(ds)(Window.GlobalWindow())(init)(f(stdLib.id))

def aggregate[V: Type](
initial: Unit =>: V
)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataSet[(A, V)] =
aggregateBy(identity)(initial)(f)

def aggregateBy[K, V: Type](
g: (A =>: A) => (A =>: K)
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataSet[(K, V)] =
setOps.aggregateBy(ds)(Window.GlobalWindow())(g(stdLib.id))(initial)(f(stdLib.id))

def distinct: DataSet[A] =
setOps.distinct(ds)(Window.GlobalWindow())
Expand All @@ -174,16 +232,55 @@ trait AnalyticsModule {
def map[B: Type](f: (A =>: A) => (A =>: B)): DataStream[B] =
streamOps.map(ds)(f(stdLib.id))

def flatMap[B: Type](f: (A =>: A) => (A =>: DataSet[B])): DataStream[B] =
streamOps.flatMap(ds)(f(stdLib.id))

def filter(f: (A =>: A) => (A =>: Boolean)): DataStream[A] =
streamOps.filter(ds)(f(stdLib.id))

def fold[B: Type](window: Window)(init: A =>: B)(f: (B, A) =>: B): DataStream[B] =
streamOps.fold(ds)(window)(init)(f)
def scan[B: Type](
initial: Unit =>: B
)(f: ((B, A) =>: (B, A)) => ((B, A) =>: B)): DataStream[B] =
streamOps.scan(ds)(initial)(f(stdLib.id))

def scanAggregate[V: Type](
initial: Unit =>: V
)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataStream[(A, V)] =
scanAggregateBy(identity)(initial)(f)

def scanAggregateBy[K, V: Type](
g: (A =>: A) => (A =>: K)
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataStream[(K, V)] =
streamOps.scanAggregateBy(ds)(g(stdLib.id))(initial)(f(stdLib.id))

def fold[B: Type](
window: Window
)(init: Unit =>: B)(f: ((B, A) =>: (B, A)) => ((B, A) =>: B)): DataStream[B] =
streamOps.fold(ds)(window)(init)(f(stdLib.id))

def aggregate[V: Type](
window: Window
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataStream[(A, V)] =
aggregateBy(window)(identity)(initial)(f)

def aggregateBy[K, V: Type](window: Window)(
g: (A =>: A) => (A =>: K)
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataStream[(K, V)] =
streamOps.aggregateBy(ds)(window)(g(stdLib.id))(initial)(f(stdLib.id))

def distinct(window: Window): DataStream[A] =
streamOps.distinct(ds)(window)
}

implicit class TupleSyntax[A, B, C](t: A =>: (B, C)) {

def _1(implicit ev: Type[B]): A =>: B =
stdLib.compose[A, (B, C), B](stdLib.fst, t)

def _2(implicit ev: Type[C]): A =>: C =
stdLib.compose[A, (B, C), C](stdLib.snd, t)
}

/**
* Create an empty DataSet of type A
*/
Expand All @@ -207,6 +304,18 @@ trait AnalyticsModule {
implicit def short[A](v: scala.Short): A =>: Short
implicit def instant[A](v: java.time.Instant): A =>: java.time.Instant
implicit def localDate[A](v: java.time.LocalDate): A =>: java.time.LocalDate
implicit def tuple2[A, B, C](t: (A =>: B, A =>: C)): A =>: (B, C)

implicit def tuple2Lift[A, B, C](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice I did something similar in my branch. I don't think this works for arbitrarily nested tuples though does it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't test. The tuple stuff is basically ripped straight out of your branch, with minor changes (e.g., I used FanOut instead of a new RowFunction).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah... I just looked - I never pushed the more complex version I was working on because I never quite got it working haha. Think we might need a bit of Scala magic to make the implicits work for us

t: (B, C)
)(implicit ev1: B => A =>: B, ev2: C => A =>: C): A =>: (B, C) =
tuple2((ev1(t._1), ev2(t._2)))

implicit def tuple2Lift1[A, B, C](t: (A =>: B, C))(implicit ev: C => A =>: C): A =>: (B, C) =
tuple2((t._1, ev(t._2)))

implicit def tuple2Lift2[A, B, C](t: (B, A =>: C))(implicit ev1: B => A =>: B): A =>: (B, C) =
tuple2((ev1(t._1), t._2))

val setOps: Ops[DataSet]
val streamOps: Ops[DataStream]
Expand Down
57 changes: 51 additions & 6 deletions src/main/scala/scalaz/analytics/LocalAnalyticsModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,54 @@ trait LocalAnalyticsModule extends AnalyticsModule {
object LocalDataStream {
case class Empty(rType: Reified) extends LocalDataStream

case class Map(d: LocalDataStream, f: RowFunction) extends LocalDataStream
case class Filter(d: LocalDataStream, f: RowFunction) extends LocalDataStream
case class Map(d: LocalDataStream, f: RowFunction) extends LocalDataStream
case class FlatMap(d: LocalDataStream, f: RowFunction) extends LocalDataStream
case class Filter(d: LocalDataStream, f: RowFunction) extends LocalDataStream
case class Scan(d: LocalDataStream, initial: RowFunction, f: RowFunction)
extends LocalDataStream
case class ScanAggregateBy(
d: LocalDataStream,
groupBy: RowFunction,
initial: RowFunction,
f: RowFunction
) extends LocalDataStream

case class Fold(d: LocalDataStream, initial: RowFunction, f: RowFunction, window: Window)
extends LocalDataStream
case class AggregateBy(
d: LocalDataStream,
groupBy: RowFunction,
initial: RowFunction,
f: RowFunction,
window: Window
) extends LocalDataStream
case class Distinct(d: LocalDataStream, window: Window) extends LocalDataStream
}

private val ops: Ops[DataSet] = new Ops[DataSet] {
override def map[A, B](ds: LocalDataStream)(f: A =>: B): LocalDataStream =
LocalDataStream.Map(ds, f)
override def flatMap[A, B](ds: LocalDataStream)(f: A =>: DataSet[B]): LocalDataStream =
LocalDataStream.FlatMap(ds, f)
override def filter[A](ds: LocalDataStream)(f: A =>: Boolean): LocalDataStream =
LocalDataStream.Filter(ds, f)

override def fold[A, B](ds: LocalDataStream)(window: Window)(initial: A =>: B)(
f: (B, A) =>: B
): LocalDataStream = LocalDataStream.Fold(ds, initial, f, window)
override def scan[A, B](
ds: LocalDataStream
)(initial: Unit =>: B)(f: (B, A) =>: B): LocalDataStream =
LocalDataStream.Scan(ds, initial, f)
override def scanAggregateBy[A, K, V](
ds: LocalDataStream
)(g: A =>: K)(initial: Unit =>: V)(f: (V, A) =>: V): LocalDataStream =
LocalDataStream.ScanAggregateBy(ds, g, initial, f)

override def fold[A, B](
ds: LocalDataStream
)(window: Window)(initial: A =>: B)(f: (B, A) =>: B): LocalDataStream =
LocalDataStream.Fold(ds, initial, f, window)
override def aggregateBy[A, K, V](
ds: LocalDataStream
)(window: Window)(g: A =>: K)(initial: Unit =>: V)(f: (V, A) =>: V): LocalDataStream =
LocalDataStream.AggregateBy(ds, g, initial, f, window)
override def distinct[A](ds: LocalDataStream)(window: Window): LocalDataStream =
LocalDataStream.Distinct(ds, window)
}
Expand All @@ -151,6 +182,9 @@ trait LocalAnalyticsModule extends AnalyticsModule {
case class Split(f: RowFunction, g: RowFunction) extends RowFunction
case class Product(fab: RowFunction) extends RowFunction
case class Column(colName: String, rType: Reified) extends RowFunction
case class ExtractNth(reified: Reified, n: Int) extends RowFunction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting approach. So arbitrary length tuples are supported with this?

case class StrSplit(pattern: String) extends RowFunction
case object StrConcat extends RowFunction

// constants
case class IntLiteral(value: Int) extends RowFunction
Expand Down Expand Up @@ -181,6 +215,15 @@ trait LocalAnalyticsModule extends AnalyticsModule {
RowFunction.Split(f, g)

override def product[A, B](fab: A =>: B): (A, A) =>: (B, B) = RowFunction.Product(fab)

override def fst[A: Type, B]: (A, B) =>: A = RowFunction.ExtractNth(Type[A].reified, 0)

override def snd[A, B: Type]: (A, B) =>: B = RowFunction.ExtractNth(Type[B].reified, 1)

override def strSplit(pattern: String): String =>: DataSet[String] =
RowFunction.StrSplit(pattern)

override def strConcat: (String, String) =>: String = RowFunction.StrConcat
}

override def empty[A: Type]: LocalDataStream = LocalDataStream.Empty(LocalType.typeOf[A])
Expand All @@ -202,6 +245,8 @@ trait LocalAnalyticsModule extends AnalyticsModule {
implicit override def instant[A](v: Instant): A =>: Instant = RowFunction.InstantLiteral(v)
implicit override def localDate[A](v: LocalDate): A =>: LocalDate =
RowFunction.LocalDateLiteral(v)
implicit override def tuple2[A, B, C](t: (A =>: B, A =>: C)): A =>: (B, C) =
RowFunction.FanOut(t._1, t._2)

// todo this needs more thought
override def column[A: Type](str: String): Unknown =>: A =
Expand Down
31 changes: 31 additions & 0 deletions src/main/scala/scalaz/analytics/example/SimpleExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,46 @@ object SimpleExample {
def main(args: Array[String]): Unit = {
println(ds1)
println(ds2)
println(tupleExample)
println(tupleExample2)
}

val ds1: DataSet[Int] =
empty[Int]
.map(i => i * 7)
.distinct

val countExample =
ds1.fold(0)(va => va._1 + 1)

val sumExample =
ds1.fold(0)(va => va._1 + va._2)

val ds2: DataStream[Int] =
emptyStream[Int]
.filter(i => i + 1 > 0)
.distinct(Window.FixedTimeWindow())

val tupleExample: DataSet[(Int, Boolean)] =
empty[(Int, String)]
.map(_ => (4, false))
.filter(_._2)

val tupleExample2: DataSet[(Int, String)] =
empty[(Int, String)]
.map(_ => (4, false)) // tuple of literals works
.map(s => (3, s._1)) // tuple with right side projection
.map(s => (s._2, "")) // tuple with left side projection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling if you do something like

.map(s => (4, (s._1, (5, s._2)))) the implicits won't work here. (Haven't tested to confirm yet though).


def cumulativeCountDistinct[A: Type](ds: DataStream[A]): DataStream[(A, Int)] =
ds.scanAggregate(0)(_._1 + 1)

def countDistinct[A: Type](ds: DataSet[A]): DataSet[(A, Int)] =
ds.aggregate(0)(_._1 + 1)

def wordCount(lines: DataSet[String]): DataSet[(String, Int)] =
countDistinct(words(lines))

def words(lines: DataSet[String]): DataSet[String] =
lines.flatMap(_.split(" "))
}