-
Notifications
You must be signed in to change notification settings - Fork 3
/
csv-to-parquet.py
89 lines (71 loc) · 3.1 KB
/
csv-to-parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import logging
import sys
import ast
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from variables import PATH_SOURCE, PATH_TARGET
class CSVtoPARQUET:
def __init__(self, spark, path_source:str, format_source: str, path_target:str, format_target: str) -> None:
self.spark = spark
if format_source != 'csv':
raise Exception(f"The format_source {format_source} is not supported. Use CSV.")
elif format_target != 'parquet':
raise Exception(f"The format_target {format_target} is not supported. Use PARQUET.")
else:
self.format_source = format_source
self.format_target = format_target
self.path_source = path_source
self.path_target = path_target
def run(self) -> str:
self.create_logger()
self.csv_to_parquet()
return "Application completed. Going out..."
def create_logger(self):
logging.basicConfig(format='%(name)s - %(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p', stream=sys.stdout)
logger = logging.getLogger('ETL_AWS_VINICIUS_CAMPOS')
logger.setLevel(logging.DEBUG)
def csv_to_parquet(self):
df = (
self.spark.read.format(self.format_source)
.option("sep", ",")
.option("header", True)
.option("encoding", "utf-8")
.load(self.path_source)
)
return df.coalesce(1).write.mode("overwrite").format(self.format_target).save(self.path_target)
if __name__ == "__main__":
spark = (
SparkSession.builder.appName('ETL_AWS_VINICIUS_CAMPOS')
.enableHiveSupport()
.config('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version', '2')
.config('spark.speculation', 'false')
.config('spark.sql.adaptive.enabled', 'true')
.config('spark.shuffle.service.enabled', 'true')
.config('spark.dynamicAllocation.enabled', 'true')
.config('spark.sql.adaptive.coalescePartitions.enabled', 'true')
.config('spark.sql.adaptive.coalescePartitions.minPartitionNum', '1')
.config('spark.sql.adaptive.coalescePartitions.initialPartitionNum', '10')
.config('spark.sql.adaptive.advisoryPartitionSizeInBytes', '134217728')
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
.config('spark.dynamicAllocation.minExecutors', "5")
.config('spark.dynamicAllocation.maxExecutors', "30")
.config('spark.dynamicAllocation.initialExecutors', "10")
.config('spark.sql.debug.maxToStringFields', '300')
.config('spark.sql.join.preferSortMergeJoin', 'true')
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
script_input = ast.literal_eval(sys.argv[1])
file = script_input['file']
format_source = script_input['format_source']
format_target = script_input['format_target']
m = CSVtoPARQUET(
spark,
PATH_SOURCE.format(file=file),
format_source,
PATH_TARGET.format(file=file),
format_target
)
m.run()
spark.stop()