-
Notifications
You must be signed in to change notification settings - Fork 0
/
complex data processing - example
80 lines (65 loc) · 2.65 KB
/
complex data processing - example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
jsnStr = """{
"name":"MSFT","location":"Redmond", "satellites": ["Bay Area", "Shanghai"],
"goods": {
"trade":true, "customers":["government", "distributer", "retail"],
"orders":[
{"orderId":1,"orderTotal":123.34,"shipped":{"orderItems":[{"itemName":"Laptop","itemQty":20},{"itemName":"Charger","itemQty":2}]}},
{"orderId":2,"orderTotal":323.34,"shipped":{"orderItems":[{"itemName":"Mice","itemQty":2},{"itemName":"Keyboard","itemQty":1}]}}
]}}
{"name":"Company1","location":"Seattle", "satellites": ["New York"],
"goods":{"trade":false, "customers":["store1", "store2"],
"orders":[
{"orderId":4,"orderTotal":123.34,"shipped":{"orderItems":[{"itemName":"Laptop","itemQty":20},{"itemName":"Charger","itemQty":3}]}},
{"orderId":5,"orderTotal":343.24,"shipped":{"orderItems":[{"itemName":"Chair","itemQty":4},{"itemName":"Lamp","itemQty":2}]}}
]}}
{"name": "Company2", "location": "Bellevue",
"goods": {"trade": true, "customers":["Bank"], "orders": [{"orderId": 4, "orderTotal": 123.34}]}}
{"name": "Company3", "location": "Kirkland"}"""
df = spark.read.json(sc.parallelize([jsnStr]))
df.printSchema()
df.createOrReplaceTempView("tbl_json")
spark.sql("""
select location, name, satellites, goods_customer, goods_orders_orderId,
goods_orders_orderTotal, orderItems.itemName goods_orders_shipped_orderItems_itemName,
orderItems.itemQty goods_orders_shipped_orderItems_itemQty from (
select location, name, satellites, goods_customer,
orders.orderId goods_orders_orderId,
orders.orderTotal goods_orders_orderTotal,
explode(orders.shipped.orderItems) orderItems
from (
select location, name, satellites, goods_customer, explode(orders) orders from (
select location, name, satellites, explode(goods.customers) goods_customer, goods.orders from (
select location, name, explode(satellites) satellites, goods from tbl_json) iq
) iq2
) iq3
)iq4
""").show()
flattendf1 = df.selectExpr(
"location",
"name",
"explode(satellites) as satellites",
"explode(goods.customers) as customers",
"explode(goods.orders) as orders"
)
flattendf1.printSchema()
flattendf2 = flattendf1.selectExpr(
"location",
"name",
"satellites",
"customers",
"orders.orderId as orderId",
"orders.orderTotal as orderTotal",
"explode(orders.shipped.orderItems) as orderItems",
)
flattendf2.printSchema()
flattendf3 = flattendf2.selectExpr(
"location",
"name",
"satellites",
"customers",
"orderId",
"orderTotal",
"orderItems.itemName as itemName",
"orderItems.itemQty as itemQty"
)
flattendf3.printSchema()