-
Notifications
You must be signed in to change notification settings - Fork 17
Aggregations #19
base: api-design
Are you sure you want to change the base?
Aggregations #19
Changes from 5 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,19 +100,33 @@ trait AnalyticsModule { | |
* The DataSet/DataStream operations of scalaz-analytics | ||
*/ | ||
trait Ops[F[_]] { | ||
// Unbounded | ||
// Unbounded dataset ops | ||
// (These ops require only bounded space, even with unbounded data) | ||
def map[A, B](ds: F[A])(f: A =>: B): F[B] | ||
def flatMap[A, B](ds: F[A])(f: A =>: DataSet[B]): F[B] | ||
def filter[A](ds: F[A])(f: A =>: Boolean): F[A] | ||
// Streaming aggregations | ||
// (These ops produce cumulative results, some may require unbounded space) | ||
def scan[A, B](ds: F[A])(initial: Unit =>: B)(f: (B, A) =>: B): F[B] | ||
|
||
// Bounded | ||
def fold[A, B](ds: F[A])(window: Window)(initial: A =>: B)(f: (B, A) =>: B): F[B] | ||
def scanAggregateBy[A, K, V](ds: F[A])(g: A =>: K)(initial: Unit =>: V)( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not quite sure what this is. Is it a group by followed by a scan on each group? The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it was meant to be a group by followed by a scan over each group. So for each element |
||
f: (V, A) =>: V | ||
): F[(K, V)] | ||
|
||
// Bounded dataset ops | ||
// (These ops can only be performed on bounded subsets of data) | ||
def fold[A, B](ds: F[A])(window: Window)(initial: Unit =>: B)(f: (B, A) =>: B): F[B] | ||
|
||
def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)(initial: Unit =>: V)( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this exactly? Can it be broken into more fundamental pieces? GroupBy + Fold or something perhaps? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's meant to be a group by followed by a fold over each group. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: I modeled grouping+aggregation this way on the basis of comments like this one in the dataset design discussion (#4) that questioned the utility of separating grouping and aggregation. The other day, I toyed with the idea of implementing the def groupBy[A, K](ds: F[A])(g: A =>: K): F[Grouped[K, A]]
def fold[A, B](ds: F[A])(window: Window)(initial: Unit =>: B)(f: (B, A) =>: B): F[B] This doesn't quite work: def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
(initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
fold(groupBy(ds)(g))(window)(initial)(f) (The types don't quite line up, even if you can convert Instead, you can introduce a separate def groupFold[A, K, V](ds: F[Grouped[K, A]])(window: Window)
(initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)]
def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
(initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
groupFold(groupBy(ds)(g))(window)(initial)(f) All that said, it's not obvious to me that there's real value in factoring out the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another approach that might work, without requiring separate methods like def groupBy[A, K](ds: F[A])(g: A =>: K): F[Grouped[K, A]]
def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
(initial: Unit =>: V)(f: (V, A) =>: V): F[Grouped[K, V]] = {
val initial2: Unit =>: Grouped[K, V] = ??? // derive from `initial`
val f2: (Grouped[K, V], Grouped[K, A]) =>: Grouped[K, V] = ??? // derive from `f`
fold(groupBy(ds)(g))(window)(initial2)(f2)
} This approach might make the case for factoring out The tupled representation can be recovered with an def ungroup[K, V](ds: F[Grouped[K, V]]): F[(K, V)]
def aggregateBy[A, K, V](ds: F[A])(window: Window)(g: A =>: K)
(initial: Unit =>: V)(f: (V, A) =>: V): F[(K, V)] =
ungroup(fold(groupBy(ds)(g))(window)(initial2)(f2))
// where initial2 and f2 are transformations as above |
||
f: (V, A) =>: V | ||
): F[(K, V)] | ||
def distinct[A](ds: F[A])(window: Window): F[A] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A cumulative/streaming variant of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An exact streaming dedupe is guaranteed to run out of space eventually assuming input is unbound on the keyspace. I don't know if this is something we want to directly expose... Forcing a window means we offer a safe API that won't crash with OOM errors. What do others thing about this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure the framework should assume that input is unbounded on the key-space. Note that if a streaming |
||
} | ||
|
||
/** | ||
* The standard library of scalaz-analytics. | ||
*/ | ||
trait StandardLibrary { | ||
trait StandardLibrary extends TupleLibrary with StringLibrary { | ||
def id[A: Type]: A =>: A | ||
def compose[A, B, C](f: B =>: C, g: A =>: B): A =>: C | ||
def andThen[A, B, C](f: A =>: B, g: B =>: C): A =>: C | ||
|
@@ -121,6 +135,24 @@ trait AnalyticsModule { | |
def product[A, B](fab: A =>: B): (A, A) =>: (B, B) | ||
} | ||
|
||
trait TupleLibrary { | ||
def fst[A: Type, B]: (A, B) =>: A | ||
def snd[A, B: Type]: (A, B) =>: B | ||
} | ||
|
||
trait StringLibrary { | ||
def strSplit(pattern: String): String =>: DataSet[String] | ||
def strConcat: (String, String) =>: String | ||
} | ||
|
||
/** | ||
* A DSL for string operations | ||
*/ | ||
implicit class StringSyntax[A](val l: A =>: String) { | ||
def split(pattern: String): A =>: DataSet[String] = l >>> stdLib.strSplit(pattern) | ||
def concat(r: A =>: String): A =>: String = (l &&& r) >>> stdLib.strConcat | ||
} | ||
|
||
trait Numeric[A] { | ||
def typeOf: Type[A] // underlying repr | ||
|
||
|
@@ -156,11 +188,37 @@ trait AnalyticsModule { | |
def map[B: Type](f: (A =>: A) => (A =>: B)): DataSet[B] = | ||
setOps.map(ds)(f(stdLib.id)) | ||
|
||
def flatMap[B: Type](f: (A =>: A) => (A =>: DataSet[B])): DataSet[B] = | ||
setOps.flatMap(ds)(f(stdLib.id)) | ||
|
||
def filter(f: (A =>: A) => (A =>: Boolean)): DataSet[A] = | ||
setOps.filter(ds)(f(stdLib.id)) | ||
|
||
def fold[B: Type](init: A =>: B)(f: (B, A) =>: B): DataSet[B] = | ||
setOps.fold(ds)(Window.GlobalWindow())(init)(f) | ||
def scan[B: Type](initial: Unit =>: B)(f: ((B, A) =>: (B, A)) => ((B, A) =>: B)): DataSet[B] = | ||
setOps.scan(ds)(initial)(f(stdLib.id)) | ||
|
||
def scanAggregate[V: Type]( | ||
initial: Unit =>: V | ||
)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataSet[(A, V)] = | ||
scanAggregateBy(identity)(initial)(f) | ||
|
||
def scanAggregateBy[K, V: Type]( | ||
g: (A =>: A) => (A =>: K) | ||
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataSet[(K, V)] = | ||
setOps.scanAggregateBy(ds)(g(stdLib.id))(initial)(f(stdLib.id)) | ||
|
||
def fold[B: Type](init: Unit =>: B)(f: ((B, A) =>: (B, A)) => ((B, A) =>: B)): DataSet[B] = | ||
setOps.fold(ds)(Window.GlobalWindow())(init)(f(stdLib.id)) | ||
|
||
def aggregate[V: Type]( | ||
initial: Unit =>: V | ||
)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataSet[(A, V)] = | ||
aggregateBy(identity)(initial)(f) | ||
|
||
def aggregateBy[K, V: Type]( | ||
g: (A =>: A) => (A =>: K) | ||
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataSet[(K, V)] = | ||
setOps.aggregateBy(ds)(Window.GlobalWindow())(g(stdLib.id))(initial)(f(stdLib.id)) | ||
|
||
def distinct: DataSet[A] = | ||
setOps.distinct(ds)(Window.GlobalWindow()) | ||
|
@@ -174,16 +232,55 @@ trait AnalyticsModule { | |
def map[B: Type](f: (A =>: A) => (A =>: B)): DataStream[B] = | ||
streamOps.map(ds)(f(stdLib.id)) | ||
|
||
def flatMap[B: Type](f: (A =>: A) => (A =>: DataSet[B])): DataStream[B] = | ||
streamOps.flatMap(ds)(f(stdLib.id)) | ||
|
||
def filter(f: (A =>: A) => (A =>: Boolean)): DataStream[A] = | ||
streamOps.filter(ds)(f(stdLib.id)) | ||
|
||
def fold[B: Type](window: Window)(init: A =>: B)(f: (B, A) =>: B): DataStream[B] = | ||
streamOps.fold(ds)(window)(init)(f) | ||
def scan[B: Type]( | ||
initial: Unit =>: B | ||
)(f: ((B, A) =>: (B, A)) => ((B, A) =>: B)): DataStream[B] = | ||
streamOps.scan(ds)(initial)(f(stdLib.id)) | ||
|
||
def scanAggregate[V: Type]( | ||
initial: Unit =>: V | ||
)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataStream[(A, V)] = | ||
scanAggregateBy(identity)(initial)(f) | ||
|
||
def scanAggregateBy[K, V: Type]( | ||
g: (A =>: A) => (A =>: K) | ||
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataStream[(K, V)] = | ||
streamOps.scanAggregateBy(ds)(g(stdLib.id))(initial)(f(stdLib.id)) | ||
|
||
def fold[B: Type]( | ||
window: Window | ||
)(init: Unit =>: B)(f: ((B, A) =>: (B, A)) => ((B, A) =>: B)): DataStream[B] = | ||
streamOps.fold(ds)(window)(init)(f(stdLib.id)) | ||
|
||
def aggregate[V: Type]( | ||
window: Window | ||
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataStream[(A, V)] = | ||
aggregateBy(window)(identity)(initial)(f) | ||
|
||
def aggregateBy[K, V: Type](window: Window)( | ||
g: (A =>: A) => (A =>: K) | ||
)(initial: Unit =>: V)(f: ((V, A) =>: (V, A)) => ((V, A) =>: V)): DataStream[(K, V)] = | ||
streamOps.aggregateBy(ds)(window)(g(stdLib.id))(initial)(f(stdLib.id)) | ||
|
||
def distinct(window: Window): DataStream[A] = | ||
streamOps.distinct(ds)(window) | ||
} | ||
|
||
implicit class TupleSyntax[A, B, C](t: A =>: (B, C)) { | ||
|
||
def _1(implicit ev: Type[B]): A =>: B = | ||
stdLib.compose[A, (B, C), B](stdLib.fst, t) | ||
|
||
def _2(implicit ev: Type[C]): A =>: C = | ||
stdLib.compose[A, (B, C), C](stdLib.snd, t) | ||
} | ||
|
||
/** | ||
* Create an empty DataSet of type A | ||
*/ | ||
|
@@ -207,6 +304,18 @@ trait AnalyticsModule { | |
implicit def short[A](v: scala.Short): A =>: Short | ||
implicit def instant[A](v: java.time.Instant): A =>: java.time.Instant | ||
implicit def localDate[A](v: java.time.LocalDate): A =>: java.time.LocalDate | ||
implicit def tuple2[A, B, C](t: (A =>: B, A =>: C)): A =>: (B, C) | ||
|
||
implicit def tuple2Lift[A, B, C]( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice I did something similar in my branch. I don't think this works for arbitrarily nested tuples though does it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't test. The tuple stuff is basically ripped straight out of your branch, with minor changes (e.g., I used FanOut instead of a new RowFunction). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yeah... I just looked - I never pushed the more complex version I was working on because I never quite got it working haha. Think we might need a bit of Scala magic to make the implicits work for us |
||
t: (B, C) | ||
)(implicit ev1: B => A =>: B, ev2: C => A =>: C): A =>: (B, C) = | ||
tuple2((ev1(t._1), ev2(t._2))) | ||
|
||
implicit def tuple2Lift1[A, B, C](t: (A =>: B, C))(implicit ev: C => A =>: C): A =>: (B, C) = | ||
tuple2((t._1, ev(t._2))) | ||
|
||
implicit def tuple2Lift2[A, B, C](t: (B, A =>: C))(implicit ev1: B => A =>: B): A =>: (B, C) = | ||
tuple2((ev1(t._1), t._2)) | ||
|
||
val setOps: Ops[DataSet] | ||
val streamOps: Ops[DataStream] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,23 +109,54 @@ trait LocalAnalyticsModule extends AnalyticsModule { | |
object LocalDataStream { | ||
case class Empty(rType: Reified) extends LocalDataStream | ||
|
||
case class Map(d: LocalDataStream, f: RowFunction) extends LocalDataStream | ||
case class Filter(d: LocalDataStream, f: RowFunction) extends LocalDataStream | ||
case class Map(d: LocalDataStream, f: RowFunction) extends LocalDataStream | ||
case class FlatMap(d: LocalDataStream, f: RowFunction) extends LocalDataStream | ||
case class Filter(d: LocalDataStream, f: RowFunction) extends LocalDataStream | ||
case class Scan(d: LocalDataStream, initial: RowFunction, f: RowFunction) | ||
extends LocalDataStream | ||
case class ScanAggregateBy( | ||
d: LocalDataStream, | ||
groupBy: RowFunction, | ||
initial: RowFunction, | ||
f: RowFunction | ||
) extends LocalDataStream | ||
|
||
case class Fold(d: LocalDataStream, initial: RowFunction, f: RowFunction, window: Window) | ||
extends LocalDataStream | ||
case class AggregateBy( | ||
d: LocalDataStream, | ||
groupBy: RowFunction, | ||
initial: RowFunction, | ||
f: RowFunction, | ||
window: Window | ||
) extends LocalDataStream | ||
case class Distinct(d: LocalDataStream, window: Window) extends LocalDataStream | ||
} | ||
|
||
private val ops: Ops[DataSet] = new Ops[DataSet] { | ||
override def map[A, B](ds: LocalDataStream)(f: A =>: B): LocalDataStream = | ||
LocalDataStream.Map(ds, f) | ||
override def flatMap[A, B](ds: LocalDataStream)(f: A =>: DataSet[B]): LocalDataStream = | ||
LocalDataStream.FlatMap(ds, f) | ||
override def filter[A](ds: LocalDataStream)(f: A =>: Boolean): LocalDataStream = | ||
LocalDataStream.Filter(ds, f) | ||
|
||
override def fold[A, B](ds: LocalDataStream)(window: Window)(initial: A =>: B)( | ||
f: (B, A) =>: B | ||
): LocalDataStream = LocalDataStream.Fold(ds, initial, f, window) | ||
override def scan[A, B]( | ||
ds: LocalDataStream | ||
)(initial: Unit =>: B)(f: (B, A) =>: B): LocalDataStream = | ||
LocalDataStream.Scan(ds, initial, f) | ||
override def scanAggregateBy[A, K, V]( | ||
ds: LocalDataStream | ||
)(g: A =>: K)(initial: Unit =>: V)(f: (V, A) =>: V): LocalDataStream = | ||
LocalDataStream.ScanAggregateBy(ds, g, initial, f) | ||
|
||
override def fold[A, B]( | ||
ds: LocalDataStream | ||
)(window: Window)(initial: A =>: B)(f: (B, A) =>: B): LocalDataStream = | ||
LocalDataStream.Fold(ds, initial, f, window) | ||
override def aggregateBy[A, K, V]( | ||
ds: LocalDataStream | ||
)(window: Window)(g: A =>: K)(initial: Unit =>: V)(f: (V, A) =>: V): LocalDataStream = | ||
LocalDataStream.AggregateBy(ds, g, initial, f, window) | ||
override def distinct[A](ds: LocalDataStream)(window: Window): LocalDataStream = | ||
LocalDataStream.Distinct(ds, window) | ||
} | ||
|
@@ -151,6 +182,9 @@ trait LocalAnalyticsModule extends AnalyticsModule { | |
case class Split(f: RowFunction, g: RowFunction) extends RowFunction | ||
case class Product(fab: RowFunction) extends RowFunction | ||
case class Column(colName: String, rType: Reified) extends RowFunction | ||
case class ExtractNth(reified: Reified, n: Int) extends RowFunction | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting approach. So arbitrary length tuples are supported with this? |
||
case class StrSplit(pattern: String) extends RowFunction | ||
case object StrConcat extends RowFunction | ||
|
||
// constants | ||
case class IntLiteral(value: Int) extends RowFunction | ||
|
@@ -181,6 +215,15 @@ trait LocalAnalyticsModule extends AnalyticsModule { | |
RowFunction.Split(f, g) | ||
|
||
override def product[A, B](fab: A =>: B): (A, A) =>: (B, B) = RowFunction.Product(fab) | ||
|
||
override def fst[A: Type, B]: (A, B) =>: A = RowFunction.ExtractNth(Type[A].reified, 0) | ||
|
||
override def snd[A, B: Type]: (A, B) =>: B = RowFunction.ExtractNth(Type[B].reified, 1) | ||
|
||
override def strSplit(pattern: String): String =>: DataSet[String] = | ||
RowFunction.StrSplit(pattern) | ||
|
||
override def strConcat: (String, String) =>: String = RowFunction.StrConcat | ||
} | ||
|
||
override def empty[A: Type]: LocalDataStream = LocalDataStream.Empty(LocalType.typeOf[A]) | ||
|
@@ -202,6 +245,8 @@ trait LocalAnalyticsModule extends AnalyticsModule { | |
implicit override def instant[A](v: Instant): A =>: Instant = RowFunction.InstantLiteral(v) | ||
implicit override def localDate[A](v: LocalDate): A =>: LocalDate = | ||
RowFunction.LocalDateLiteral(v) | ||
implicit override def tuple2[A, B, C](t: (A =>: B, A =>: C)): A =>: (B, C) = | ||
RowFunction.FanOut(t._1, t._2) | ||
|
||
// todo this needs more thought | ||
override def column[A: Type](str: String): Unknown =>: A = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,15 +10,46 @@ object SimpleExample { | |
def main(args: Array[String]): Unit = { | ||
println(ds1) | ||
println(ds2) | ||
println(tupleExample) | ||
println(tupleExample2) | ||
} | ||
|
||
val ds1: DataSet[Int] = | ||
empty[Int] | ||
.map(i => i * 7) | ||
.distinct | ||
|
||
val countExample = | ||
ds1.fold(0)(va => va._1 + 1) | ||
|
||
val sumExample = | ||
ds1.fold(0)(va => va._1 + va._2) | ||
|
||
val ds2: DataStream[Int] = | ||
emptyStream[Int] | ||
.filter(i => i + 1 > 0) | ||
.distinct(Window.FixedTimeWindow()) | ||
|
||
val tupleExample: DataSet[(Int, Boolean)] = | ||
empty[(Int, String)] | ||
.map(_ => (4, false)) | ||
.filter(_._2) | ||
|
||
val tupleExample2: DataSet[(Int, String)] = | ||
empty[(Int, String)] | ||
.map(_ => (4, false)) // tuple of literals works | ||
.map(s => (3, s._1)) // tuple with right side projection | ||
.map(s => (s._2, "")) // tuple with left side projection | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a feeling if you do something like
|
||
|
||
def cumulativeCountDistinct[A: Type](ds: DataStream[A]): DataStream[(A, Int)] = | ||
ds.scanAggregate(0)(_._1 + 1) | ||
|
||
def countDistinct[A: Type](ds: DataSet[A]): DataSet[(A, Int)] = | ||
ds.aggregate(0)(_._1 + 1) | ||
|
||
def wordCount(lines: DataSet[String]): DataSet[(String, Int)] = | ||
countDistinct(words(lines)) | ||
|
||
def words(lines: DataSet[String]): DataSet[String] = | ||
lines flatMap (_.split(" ")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to force a
Dataset
here? I get that its a substitute for a "Finite set of data" but theoretically you can flatmap a stream as well - it just won't terminate unless its a finite stream.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually against this, IMO neither,
DataSet
norDataStream
should be Monads, because that would mean you can no longer statically analyze them.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@camjo Yeah, I was trying to make the bounded-ness explicit, because I couldn't come up with a good reason to allow an unbounded stream to be inserted into a stream. Given that it is more constrained than an ordinary flatMap in the case of DataStream, maybe it would be better to name it something else or, as @LukaJCB suggested, leave it out entirely.
I added it in order to implement the word count example (i.e.,
words = lines.flatMap(_.split(" "))
), but another approach (if we don't want monadic DataSet/DataStream types) would be to provide an operation like Spark'sexplode
function (e.g.,def explode[A](ds: F[List[A]]): F[A]
). Thesplit
syntax would need to return aString =>: List[String]
, and thewords
example would becomewords = lines.map(_.split(" ")).explode
. On the other hand, I'm not sure that the concern about static inspection is applicable in this context, since our "functions" themselves are inspectable values (not general Scala functions).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In https://github.com/LukaJCB/analytics I used
concatMap
which usesA => List[B]
:)