forked from Sanyam07/tko_telco_churn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
1_data_ingest.py
executable file
·75 lines (64 loc) · 2.13 KB
/
1_data_ingest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Get Spark Session
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.master("yarn") \
.getOrCreate()
# Schema
schema = StructType(
[
StructField("customerID", StringType(), True),
StructField("gender", StringType(), True),
StructField("SeniorCitizen", StringType(), True),
StructField("Partner", StringType(), True),
StructField("Dependents", StringType(), True),
StructField("tenure", DoubleType(), True),
StructField("PhoneService", StringType(), True),
StructField("MultipleLines", StringType(), True),
StructField("InternetService", StringType(), True),
StructField("OnlineSecurity", StringType(), True),
StructField("OnlineBackup", StringType(), True),
StructField("DeviceProtection", StringType(), True),
StructField("TechSupport", StringType(), True),
StructField("StreamingTV", StringType(), True),
StructField("StreamingMovies", StringType(), True),
StructField("Contract", StringType(), True),
StructField("PaperlessBilling", StringType(), True),
StructField("PaymentMethod", StringType(), True),
StructField("MonthlyCharges", DoubleType(), True),
StructField("TotalCharges", DoubleType(), True),
StructField("Churn", StringType(), True)
]
)
# Upload to HDFS
!hdfs dfs -copyFromLocal raw/WA_Fn-UseC_-Telco-Customer-Churn.csv /tmp/WA_Fn-UseC_-Telco-Customer-Churn.csv
# Read it!
telco_data = spark.read.csv(
"/tmp/WA_Fn-UseC_-Telco-Customer-Churn.csv",
header=True,
schema=schema,
sep=',',
nullValue='NA'
)
telco_data.show()
telco_data.printSchema()
# Save it locally. This operation will fail if you don't have 8GB RAM
telco_data.coalesce(1).write.csv(
path="/tmp/telco-data/",
mode='overwrite',
header=True
)
# Create table in HDFS as Parquet
telco_data\
.write.format("parquet")\
.mode("overwrite")\
.saveAsTable('default.telco_churn', path="/tmp/spark-warehouse")
# Check tables
spark.sql("show databases").show()
spark.sql("show tables in default").show()
# Test table
spark.sql("select * from default.telco_churn").show()