-
Notifications
You must be signed in to change notification settings - Fork 0
/
currency converter in pyspark
174 lines (124 loc) · 7.64 KB
/
currency converter in pyspark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
When working with currency conversion in PySpark, especially in cases where you need to convert amounts from multiple currencies to a single target currency like USD, there are a few approaches depending on the data source and requirements. One popular option is to use a Python library such as currencyconverter or an API that provides live or historical exchange rates. However, since PySpark operates in a distributed environment, it's essential to ensure that currency conversion logic is efficiently handled across your cluster.
Key Concepts for Currency Conversion in PySpark
Python UDFs for Currency Conversion: In PySpark, we can define a User Defined Function (UDF) to apply a Python function to each row of a DataFrame. The function would handle the currency conversion for the amount field based on the currency column.
The currencyconverter Package: The currencyconverter Python package provides a convenient way to convert currencies, and it supports live as well as historical exchange rates. It works offline by using exchange rate data from the European Central Bank.
Distributed Processing Consideration: In a distributed environment like PySpark, the currencyconverter package or any currency API will be invoked on worker nodes. This can be inefficient if not carefully handled, especially if there are frequent HTTP requests to an external API. Batch processing of rates and using broadcast variables can help optimize performance.
Steps to Perform Currency Conversion in PySpark
Here’s a detailed guide to performing currency conversion in PySpark:
Step 1: Install the currencyconverter Package
First, install the currencyconverter package to handle conversions:
pip install currencyconverter
If you're using a cluster or distributed environment, ensure that this package is available on all worker nodes, or you can package it with your PySpark job.
Step 2: Initialize PySpark and Sample Data
Let’s assume we have a PySpark DataFrame with amounts in different currencies, and we want to convert all amounts to USD.
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("CurrencyConversionExample").getOrCreate()
# Sample DataFrame with amount and currency
data = [
(1, "01-Jan-17", 9.15, 50, "GBP"),
(2, "02-Jan-17", 9.25, 70, "USD"),
(3, "03-Jan-17", 9.29, 80, "INR"),
(4, "04-Jan-18", 9.30, 90, "GBP"),
(5, "01-Jan-17", 9.35, 50, "INR")
]
columns = ["Userid", "Date", "label", "amount", "currency"]
df = spark.createDataFrame(data, columns)
# Show initial DataFrame
df.show()
Step 3: Define UDF for Currency Conversion
The currencyconverter package provides an offline mode, which means you don’t need to worry about making API calls for every row, making it suitable for batch processing in PySpark.
Define a UDF to perform the conversion:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from currency_converter import CurrencyConverter
# Initialize the CurrencyConverter (offline mode)
c = CurrencyConverter()
# Define a UDF to convert the amount to USD
def convert_to_usd(amount, currency):
try:
# Convert amount from the given currency to USD
return c.convert(amount, currency, 'USD')
except:
return None # Return None if conversion fails
# Register the UDF
convert_to_usd_udf = udf(convert_to_usd, DoubleType())
In this UDF, CurrencyConverter.convert() is used to convert the amount based on the currency and target currency ('USD' in this case). The function handles exceptions to ensure that errors during conversion (e.g., unsupported currencies) do not break the code.
Step 4: Apply UDF to the DataFrame
Now apply the UDF to convert all amounts to USD:
# Add a new column "amount_in_usd" by applying the UDF
df_with_usd = df.withColumn("amount_in_usd", convert_to_usd_udf(df["amount"], df["currency"]))
# Show the results
df_with_usd.select("Userid", "Date", "amount", "currency", "amount_in_usd").show()
This will add a new column amount_in_usd containing the converted amounts.
Step 5: Optimizing Currency Conversion with Broadcast Variables
If you are dealing with a very large dataset, it’s important to reduce overhead and increase efficiency by broadcasting the CurrencyConverter instance across worker nodes. Broadcasting allows you to share a read-only variable (in this case, the CurrencyConverter object) with all workers, avoiding the need to initialize it on every node.
Here’s how you can broadcast the CurrencyConverter instance:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from currency_converter import CurrencyConverter
from pyspark import SparkContext
# Initialize Spark session
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("CurrencyConversion").getOrCreate()
# Initialize and broadcast CurrencyConverter
c = CurrencyConverter()
broadcast_c = sc.broadcast(c)
# Define UDF using the broadcasted CurrencyConverter
def convert_to_usd_broadcast(amount, currency):
try:
return broadcast_c.value.convert(amount, currency, 'USD')
except:
return None
# Register UDF
convert_to_usd_udf = udf(convert_to_usd_broadcast, DoubleType())
# Apply UDF to DataFrame
df_with_usd = df.withColumn("amount_in_usd", convert_to_usd_udf(df["amount"], df["currency"]))
# Show the results
df_with_usd.select("Userid", "Date", "amount", "currency", "amount_in_usd").show()
Step 6: Historical Rates
The currencyconverter package also supports historical rates. To convert amounts using rates from a specific date, you can pass the date as an additional argument to the convert function.
Example:
# Convert using historical rates (for example, January 1st, 2017)
c.convert(50, 'GBP', 'USD', date='2017-01-01')
You can modify the UDF to accept a date column from the DataFrame and use it to perform historical conversions.
Full Example Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from currency_converter import CurrencyConverter
from pyspark import SparkContext
# Initialize Spark session
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("CurrencyConversion").getOrCreate()
# Sample data
data = [
(1, "01-Jan-17", 9.15, 50, "GBP"),
(2, "02-Jan-17", 9.25, 70, "USD"),
(3, "03-Jan-17", 9.29, 80, "INR"),
(4, "04-Jan-18", 9.30, 90, "GBP"),
(5, "01-Jan-17", 9.35, 50, "INR")
]
columns = ["Userid", "Date", "label", "amount", "currency"]
df = spark.createDataFrame(data, columns)
# Initialize and broadcast CurrencyConverter
c = CurrencyConverter()
broadcast_c = sc.broadcast(c)
# Define UDF using the broadcasted CurrencyConverter
def convert_to_usd_broadcast(amount, currency):
try:
return broadcast_c.value.convert(amount, currency, 'USD')
except:
return None
# Register UDF
convert_to_usd_udf = udf(convert_to_usd_broadcast, DoubleType())
# Apply UDF to DataFrame
df_with_usd = df.withColumn("amount_in_usd", convert_to_usd_udf(df["amount"], df["currency"]))
# Show the results
df_with_usd.select("Userid", "Date", "amount", "currency", "amount_in_usd").show()
Conclusion
UDF and Broadcast: By using a UDF and broadcasting the CurrencyConverter, we can efficiently convert currency amounts in a distributed PySpark environment.
Offline vs. Online: The currencyconverter package works offline by default, which avoids the need for API calls for each row. If you need real-time data, you might need to use an external API and manage API rate limits.
Optimization: For large datasets, it’s critical to broadcast the conversion logic or use an external service that batches conversion rates.
Let me know if you need further details or other options!