-
Notifications
You must be signed in to change notification settings - Fork 1
/
spec.py
396 lines (326 loc) · 11.8 KB
/
spec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
from typing import List, Tuple, Callable, Dict, Optional, Type, Any, Union, Generic, TypeVar
from dataclasses import dataclass, field
class CheckerError(Exception):
def __init__(self, message, ast=None):
self.message = message
self.ast = ast
def __str__(self):
return self.message
class CheckerTypeError(CheckerError):
def __init__(self, ast=None, expected=None, actual=None):
self.ast = ast
self.message = f'TypeError: expected {expected!r} but got {actual!r}'
class CheckerNotImplementedError(CheckerError):
def __init__(self, ast=None, obj=None):
self.obj = obj
self.ast = ast
self.message = f'Not implemented for {obj !r}.'
class CheckerIndexError(CheckerError):
def __init__(self, index, df=None, ast=None):
self.index = index
self.ast = ast
self.message = f'Index {index !r} not found.'
class CheckerLackOfInfo(CheckerError):
def __init__(self, ast):
self.message = 'Lack of column types.'
self.ast = ast
class CheckerParamError(CheckerError):
def __init__(self, p, ps, ast=None):
self.mesage = f'Parameter {p !r} is not in {ps !r}'
self.ast = ast
def ensure_labels(df, col):
missing = []
for label in col:
if label not in df.columns:
missing.append(label)
if missing:
raise CheckerIndexError(missing, df)
def from_dtype(dt):
if dt.kind == 'i':
return IntLike(None)
elif dt.kind == 'f':
return FloatLike()
elif dt.kind == 'O':
# XXX
return StrLike(None)
else:
raise CheckerNotImplementedError(obj=dt)
def read_csv(fp):
import pandas as pd
df = pd.read_csv(fp.val)
return DataFrame(_index=from_dtype(df.index.dtype),
_columns={k: from_dtype(v)
for k, v in df.dtypes.to_dict().items()})
class Type:
def subtype_of(self, other):
return other.subtype(self)
def subtype(self, other):
return NotImplemented
def binop(self, other):
return NotImplemented
def rbinop(self, other):
return NotImplemented
def __add__(self, other): return self.binop(other)
def __sub__(self, other): return self.binop(other)
def __mul__(self, other): return self.binop(other)
def __div__(self, other): return self.binop(other)
def __radd__(self, other): return self.rbinop(other)
def __rsub__(self, other): return self.rbinop(other)
def __rmul__(self, other): return self.rbinop(other)
def __rdiv__(self, other): return self.rbinop(other)
class NoneType(Type):
__single = None
def __new__(clz):
if not NoneType.__single:
NoneType.__single = object.__new__(clz)
return NoneType.__single
@dataclass
class LiteralType(Type):
kinds: List[Type]
def __init__(self, vs):
if type(vs) is ListLike:
self.kinds = vs.val
else:
self.kinds = vs
def subtype(other):
return any(other.val == k.val for k in kinds)
@dataclass
class Bool(Type):
val: bool
def __bool__(self):
return bool(self.val)
@dataclass
class FloatLike(Type):
pass
@dataclass
class IntLike(Type):
val: int
def __bool__(self):
return bool(self.value)
def empty(self):
return IntLike(None)
def subtype(self, other):
if type(other) is IntLike:
return True
return False
def binop(self, other):
if type(other) is IntLike:
return self.empty()
else:
return NotImplemented
@dataclass
class StrLike(Type):
val: str
@dataclass
class ListLike(Type):
val: Any
typ: Type
@dataclass
class DictLike(Type):
val: Any
@dataclass
class Func(Type):
arg: Any
ret: Any
def __call__(self, arg):
if arg != self.arg:
raise CheckerError()
return self.ret
@dataclass
class Series(Type):
index: Type = None
value: Type = None
def __init__(self, data = None, index=None, *, _index=None, _value=None):
self.index = _index
self.value = _value
if type(data) is ListLike:
self.value = data.typ
def apply(self, func):
return Series(_index=self.index, _value=func(self.value))
def binop(self, other):
return Series(_index=self.index, _value=self.value.binop(other))
__add__ = binop
__sub__ = binop
__mul__ = binop
__truediv__ = binop
@dataclass
class LocIndexerFrame(Type):
df: 'DataFrame'
def __getitem__(self, idx):
if type(idx) is not list:
return #XXX
col = idx[1]
if type(col) is StrLike:
if col.val in self.df.columns:
return Series(_index=self.df.index, _value=self.df.columns.get(col.val))
else:
raise CheckerIndexError(index=col.val, df=self.df, ast=idx.ast)
elif type(col) is ListLike:
res = {}
missing = []
for label in col.val:
if label.val in self.df.columns:
res[label.val] = self.df.columns[label.val]
else:
missing.append(label)
if missing:
raise CheckerIndexError(index=missing)
return DataFrame(_index=self.df.index, _columns=res)
elif type(col) is slice:
return self
else:
raise CheckerNotImplementedError(col)
@dataclass
class DataFrame(Type):
index: Type = None
columns: Dict[str, Type] = field(default_factory=dict)
def __getattr__(self, attr):
if attr in self.columns:
return Series(_index=self.index, _value=self.columns.get(attr))
raise CheckerNotImplementedError(attr.ast, attr)
def __init__(self, data=None, index=None, columns=None, *, _index=None, _columns=None):
self.index = _index
self.columns = _columns
if type(data) is ListLike and type(data.typ) is ListLike:
if not index:
self.index = IntLike(None)
if not columns:
self.columns = {i:x for i, x in enumerate(data.typ.val)}
else:
self.columns = {i.val:x for i, x in zip(columns.val, data.typ.val)}
def __getitem__(self, idx):
if type(idx) is StrLike:
if idx.val in self.columns:
return Series(_index=self.index, _value=self.columns.get(idx.val))
else:
raise CheckerIndexError(index=idx.val, df=self, ast=idx.ast)
elif type(idx) is ListLike:
res = {}
missing = []
for label in idx.val:
if label.val in self.columns:
res[label.val] = self.columns[label.val]
else:
missing.append(label.val)
if missing:
raise CheckerIndexError(index=missing, ast=idx.ast)
return DataFrame(_index=self.index, _columns=res)
elif type(idx) is slice:
return self
else:
raise CheckerNotImplementedError(idx.ast, idx)
def __setitem__(self, idx, value):
new = self.assign(**{idx: value})
self.columns = new.columns
self.index = new.index
@property
def loc(self):
return LocIndexerFrame(self)
def assign(self, **kwargs: Dict[str, Type]):
new_cols = {}
for k, v in kwargs.items():
if isinstance(v, Series):
new_cols[k] = v.value
elif type(v) is Func:
ret_type = v.ret
if isinstance(ret_type, Series):
new_cols[k] = ret_type.value
else:
raise CheckerError('Callable not returning a Series')
elif type(v) is ListLike:
new_cols[k] = v.value
# else:
# raise Exception(f'Not type checked: {v!r}')
return DataFrame(_index=self.index, _columns={**self.columns, **new_cols})
def count(self, axis=None):
if axis in [None, 0, 'index']:
return Series(_index=str, _value=int)
elif axis in [1, 'columns']:
return Series(_index=self.index, _value=int)
def describe(self):
return DataFrame()
def merge(self, other, on=None, how=None):
possible_how = ['inner', 'left', 'right', 'outer']
if how and how.val not in possible_how:
raise CheckerParamError(how.val, possible_how)
if not isinstance(other, DataFrame):
raise CheckerError('other is not a DataFrame')
other: DataFrame = other
if type(on) is not ListLike:
on = ListLike([on], on)
on_labels = [lbl.val for lbl in on.val]
ensure_labels(self, on_labels)
ensure_labels(other, on_labels)
left_fields = [ self.columns[lbl] for lbl in on_labels]
right_fields = [other.columns[lbl] for lbl in on_labels]
if not all(t1.subtype(t2) for t1, t2 in zip(left_fields, right_fields)):
raise CheckerError('type mismatch')
overlapped = set(self.columns.keys()).intersection(set(other.columns.keys())) - set(on_labels)
new = {}
for lbl, typ in self.columns.items():
if lbl in overlapped:
lbl += '_x'
new[lbl] = typ
for lbl, typ in other.columns.items():
if lbl in overlapped:
lbl += '_y'
new[lbl] = typ
for lbl in on_labels:
new[lbl] = self.columns[lbl]
return DataFrame(_index=self.index, _columns=new)
def groupby(self, by=None):
key = None
if type(by) is StrLike and by.val in self.columns:
key = [by.val]
elif type(by) is ListLike and by.typ is StrLike:
if not all(lbl.val in self.columns for lbl in by.val):
raise CheckerError('key not found')
else:
key = [lbl.val for lbl in by.val]
if key:
return DataFrameGroupBy(key, self)
raise CheckerError('Not type checked')
def drop_duplicates(self, subset=None, keep=None):
missing = []
for label in subset.val:
if label.val not in self.columns:
missing.append(label.val)
if missing:
raise CheckerIndexError(index=missing)
return self
def sort_values(self, by, axis=None, ascending=None, inplace=None, kind=None, na_position=None, ignore_index=None):
if type(by) is not ListLike:
by = ListLike([by], by)
missing = []
for label in by.val:
if label.val not in self.columns:
missing.append(label.val)
if missing:
raise CheckerIndexError(index=missing)
if inplace:
return None
return self
def pivot(self, index, columns, values):
# TODO multiple column/values
idx = self.columns[index.val]
col = self.columns[columns.val]
value = self.columns[values.val]
if type(col) is LiteralType:
new_labels = col.kinds
new_col = {l.val:value for l in new_labels}
return DataFrame(_index=idx, _columns=new_col)
else:
# try ask a LiteralType
raise CheckerNotImplementedError()
def hint_cast(self, **kwargs):
return DataFrame(_index=self.index, _columns={**self.columns, **kwargs})
@dataclass
class DataFrameGroupBy:
key: List[str] = None
df: DataFrame = None
def agg(self, func, axis=0):
if type(func) is not Func and not hasattr(func, '__call__'):
raise CheckerError('not a function')
return DataFrame(_index=tuple(self.df.columns[k] for k in self.key),
_columns={k: func(v) for k, v in self.df.columns.items() if k not in self.key})
raise CheckerError('Not Type checked')