-
Notifications
You must be signed in to change notification settings - Fork 0
/
sdc_jython_origin_file_system_operations
102 lines (92 loc) · 2.49 KB
/
sdc_jython_origin_file_system_operations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
[
{
"key": "glob_path",
"value": "/media/psf/Downloads/*/data/*/fake_*.*.gz"
},
{
"key": "trigger",
"value": "-1"
},
{
"key": "suppress",
"value": "off"
}
]
#------------------------------------------------#
#------------------------------------------------#
#------------------------------------------------#
try:
sdc.importLock()
import os
import sys
import datetime
import time
import glob
finally:
sdc.importUnlock()
targetPattern = "${glob_path}"
# select the file ctime as sort key.
def sortOn(e):
return e[1]
data = sorted(
[
(f[0], f[1], f[2], f[4])
for f in [
(
f,
os.stat(f).st_ctime,
os.path.basename(f),
os.stat(f).st_mtime,
os.stat(f).st_size,
)
for f in glob.glob(targetPattern)
]
],
key=sortOn,
)
entityName = ""
if sdc.lastOffsets.containsKey(entityName):
offset = int(sdc.lastOffsets.get(entityName))
else:
offset = 0
if sdc.userParams.containsKey("recordPrefix"):
prefix = sdc.userParams.get("recordPrefix")
else:
prefix = ""
cur_batch = sdc.createBatch()
for d in data:
record = sdc.createRecord("record created " + str(datetime.datetime.now()))
try:
offset = offset + 1
fts = datetime.datetime.today() - datetime.datetime.fromtimestamp(d[1])
delta = int(fts.total_seconds() / 60.0 / 60.0)
removed = False
if int("${trigger}") < delta:
if "${suppress}".upper() == "ON":
os.remove(d[0])
removed = True
value = {
"file_path": d[0],
"file_base_name": d[2],
"file_ts": datetime.datetime.fromtimestamp(d[1]),
"file_delta_ts": delta,
"removed": removed,
"file_size": d[3],
"file_count": len(data),
}
record.value = value
cur_batch.add(record)
if cur_batch.size() >= sdc.batchSize:
cur_batch.process(entityName, str(offset))
cur_batch = sdc.createBatch()
if sdc.isStopped():
break
except Exception as e:
cur_batch.addError(record, str(e))
cur_batch.process(entityName, str(offset))
break
evt = sdc.createEvent("no-more-data", 0)
evt.value = {"file_total_count": len(data)}
cur_batch.addEvent(evt)
if cur_batch.size() + cur_batch.errorCount() + cur_batch.eventCount() > 0:
cur_batch.process(entityName, str(offset))