-
Notifications
You must be signed in to change notification settings - Fork 0
/
Joins info
275 lines (197 loc) · 9.64 KB
/
Joins info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
Meta AI
1. Inner Join
Returns only the rows that have a match in both DataFrames.
df1.join(df2, on='column_name', how='inner')
2. Outer Join (Full Outer Join)
Returns all rows from both DataFrames. If there's no match, the result will contain null values.
df1.join(df2, on='column_name', how='outer')
df1.join(df2, on='column_name', how='full')
df1.join(df2, on='column_name', how='fullouter')
df1.join(df2, on='column_name', how='full_outer')
3. Left Outer Join (Left Join)
Returns all rows from the left DataFrame and matching rows from the right DataFrame. If there's no match, the result will contain null values.
df1.join(df2, on='column_name', how='left')
df1.join(df2, on='column_name', how='leftouter')
df1.join(df2, on='column_name', how='left_outer')
4. Right Outer Join (Right Join)
Returns all rows from the right DataFrame and matching rows from the left DataFrame. If there's no match, the result will contain null values.
df1.join(df2, on='column_name', how='right')
df1.join(df2, on='column_name', how='rightouter')
df1.join(df2, on='column_name', how='right_outer')
5. Semi Join (Left Semi Join)
Returns only the rows from the left DataFrame that have a match in the right DataFrame.
df1.join(df2, on='column_name', how='semi')
df1.join(df2, on='column_name', how='leftsemi')
df1.join(df2, on='column_name', how='left_semi')
6. Anti Join (Left Anti Join)
Returns only the rows from the left DataFrame that do not have a match in the right DataFrame.
df1.join(df2, on='column_name', how='anti')
df1.join(df2, on='column_name', how='leftanti')
df1.join(df2, on='column_name', how='left_anti')
7. Cross Join
Returns the Cartesian product of both DataFrames.
df1.crossJoin(df2)
Parameters:
- on: The column(s) to join on. Can be a string, list of strings, or a Column object.
- how: The type of join (inner, outer, left, right, semi, anti, cross).
Additional Options:
- hint: Specify a hint for the join operation.
- mergeSchema: Merge the schemas of the two DataFrames.
Example Use Cases:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("Join Examples").getOrCreate()
# Create sample DataFrames
df1 = spark.createDataFrame([(1, "John"), (2, "Mary")], ["id", "name"])
df2 = spark.createDataFrame([(1, "USA"), (2, "Canada")], ["id", "country"])
# Inner Join
df_inner = df1.join(df2, on="id", how="inner")
df_inner.show()
# Left Outer Join
df_left = df1.join(df2, on="id", how="leftouter")
df_left.show()
# Semi Join
df_semi = df1.join(df2, on="id", how="semi")
df_semi.show()
This code demonstrates various join operations using PySpark.
Chat GPT
In PySpark, joins are operations used to combine two DataFrames based on a related column between them. PySpark supports several types of joins, similar to SQL, which allow for flexible ways to combine and process data. Below is a detailed breakdown of the different types of joins, their syntax, and examples:
1. Inner Join
An inner join returns only the rows that have matching values in both DataFrames.
Syntax:
python
Copy code
df1.join(df2, df1["column_name"] == df2["column_name"], "inner")
Example:
python
Copy code
result = df1.join(df2, df1["id"] == df2["id"], "inner")
Explanation: This will only return rows where the id column matches in both DataFrames. Rows that don't match are excluded.
2. Left Join (Left Outer Join)
A left join returns all rows from the left DataFrame and the matching rows from the right DataFrame. If no match is found, NULLs are returned for the columns from the right DataFrame.
Syntax:
python
Copy code
df1.join(df2, df1["column_name"] == df2["column_name"], "left")
Example:
python
Copy code
result = df1.join(df2, df1["id"] == df2["id"], "left")
Explanation: All rows from df1 will appear in the result, and df2 columns will have NULL values where there is no match.
3. Right Join (Right Outer Join)
A right join returns all rows from the right DataFrame and the matching rows from the left DataFrame. If no match is found, NULLs are returned for the columns from the left DataFrame.
Syntax:
python
Copy code
df1.join(df2, df1["column_name"] == df2["column_name"], "right")
Example:
python
Copy code
result = df1.join(df2, df1["id"] == df2["id"], "right")
Explanation: All rows from df2 will appear in the result, and df1 columns will have NULL values where there is no match.
4. Full Outer Join
A full outer join returns all rows from both DataFrames. Where no match is found, NULLs are returned for the non-matching side.
Syntax:
python
Copy code
df1.join(df2, df1["column_name"] == df2["column_name"], "outer")
Example:
python
Copy code
result = df1.join(df2, df1["id"] == df2["id"], "outer")
Explanation: The result will include all rows from both df1 and df2. If a row from either side has no match, it will still be included but with NULL values for the non-matching columns.
5. Semi Join
A semi join returns rows from the left DataFrame for which there is a match in the right DataFrame. Unlike inner joins, semi joins only return columns from the left DataFrame, not from the right.
Syntax:
python
Copy code
df1.join(df2, df1["column_name"] == df2["column_name"], "left_semi")
Example:
python
Copy code
result = df1.join(df2, df1["id"] == df2["id"], "left_semi")
Explanation: It only returns the rows from df1 that have a match in df2, but only with columns from df1.
6. Anti Join
An anti join returns rows from the left DataFrame for which there is no match in the right DataFrame. Like semi joins, it only returns columns from the left DataFrame.
Syntax:
python
Copy code
df1.join(df2, df1["column_name"] == df2["column_name"], "left_anti")
Example:
python
Copy code
result = df1.join(df2, df1["id"] == df2["id"], "left_anti")
Explanation: It only returns rows from df1 that have no matching rows in df2.
7. Cross Join (Cartesian Join)
A cross join returns the Cartesian product of both DataFrames, meaning every row from df1 is combined with every row from df2. This can result in a huge number of rows.
Syntax:
python
Copy code
df1.crossJoin(df2)
Example:
python
Copy code
result = df1.crossJoin(df2)
Explanation: Every row in df1 will be combined with every row in df2.
Join Conditions and Aliases
Join Condition: You can specify multiple conditions in the join, and even use comparison operators (<, >, etc.).
Example:
python
Copy code
result = df1.join(df2, (df1["age"] > df2["age"]) & (df1["city"] == df2["city"]), "inner")
Aliases: When joining the same DataFrame or when columns have the same name in both DataFrames, you can use alias() to assign temporary names to the DataFrames.
Example:
python
Copy code
df1.alias("t1").join(df2.alias("t2"), col("t1.id") == col("t2.id"))
Performance Optimization in Joins
Broadcast Join: For small DataFrames, you can use a broadcast join to optimize performance. The smaller DataFrame is broadcast to all nodes, which can make the join faster.
python
Copy code
from pyspark.sql.functions import broadcast
result = df1.join(broadcast(df2), "id")
Bucketing: For large DataFrames, bucketing can optimize join operations by hashing the join key and assigning rows to different buckets, improving shuffling and performance.
python
Copy code
df1.write.bucketBy(4, "id").saveAsTable("bucketed_table1")
df2.write.bucketBy(4, "id").saveAsTable("bucketed_table2")
Partitioning: Repartitioning DataFrames by the join key can help in minimizing shuffling.
python
Copy code
df1 = df1.repartition("id")
df2 = df2.repartition("id")
result = df1.join(df2, "id")
Common Errors in Joins
Ambiguous Columns: When both DataFrames have columns with the same name, PySpark may throw an error. You can resolve this by using aliases or renaming the columns.
Example:
python
Copy code
df1 = df1.withColumnRenamed("name", "name_df1")
df2 = df2.withColumnRenamed("name", "name_df2")
Data Skew: If one DataFrame has a highly skewed distribution of values in the join key, the join may perform poorly. Bucketing or partitioning by a non-skewed key can help.
Summary Table of Joins:
Join Type Description Returned Rows
Inner Join Returns matching rows from both DataFrames. Rows with matches in both df1 and df2.
Left Join Returns all rows from df1, and matching rows from df2. All rows from df1, with df2 columns NULL if no match.
Right Join Returns all rows from df2, and matching rows from df1. All rows from df2, with df1 columns NULL if no match.
Full Outer Join Returns all rows from both DataFrames, with NULLs for missing matches. All rows from df1 and df2, NULLs where no match.
Semi Join Returns rows from df1 that have matching rows in df2. Only rows from df1 that have a match in df2.
Anti Join Returns rows from df1 that have no matching rows in df2. Rows from df1 without a match in df2.
Cross Join Returns the Cartesian product of both DataFrames. Every combination of rows from df1 and df2.
These joins offer powerful ways to combine and manipulate data in PySpark for complex analytical tasks.
______________________________________
When we join 2 tables, all the columns will appear and there might be duplicates. We can remove the duplicates by writing them in join condition or we can use select condition
data = [(1, "Tim", 24, "Kerala", "India"),
(2, "Asman", 26, "Kerala", "India")]
df1 = spark.createDataFrame(data, ["emp_id", "name", "age", "state", "country"])
df1.show()
data2 = [(1, "Tim", 24, "Comcity"),
(2, "Asman", 26, "bimcity")]
df2 = spark.createDataFrame(data2, ["emp_id", "name", "age", "address"])
df2.show()
findf = df1.join(df2, ["emp_id", "name", "age"], "outer")
findf.show()
this syntax doesn't work
findf = df1.join(df2, "emp_id", "outer").select("emp_id", "name", "age", "state", "country", "address")
findf.show()
pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`name`, `name`].