This library provides support for reading an Amazon DynamoDB table with Apache Spark.
Tables can be read directly as a DataFrame, or as an RDD of stringified JSON. Users can run ad-hoc SQL queries directly against DynamoDB tables, and easily build ETL pipelines that load DynamoDB tables into another system. This library was created by the Data Platform team at Medium.
Table of Contents
Depend on this library in your application with the following Maven coordinates:
<dependency>
<groupId>com.github.traviscrawford</groupId>
<artifactId>spark-dynamodb</artifactId>
<version>0.0.6</version>
</dependency>
Start a spark shell with this library as a dependency:
$ spark-shell --packages com.github.traviscrawford:spark-dynamodb:0.0.6
You can register a DynamoDB table and run SQL queries against it, or query with the Spark SQL DSL. The schema will be inferred by sampling items in the table, or you can provide your own schema.
import com.github.traviscrawford.spark.dynamodb._
// Read a table in the default region.
val users = sqlContext.read.dynamodb("users")
// Or read a table from another region.
val users2 = sqlContext.read.dynamodb("us-west-2", "users")
// Query with SQL.
users.registerTempTable("users")
val data = sqlContext.sql("select username from users where username = 'tc'")
// Or query with the DSL.
val data2 = users.select("username").filter($"username" = "tc")
DynamoDB tables do not have a schema beyond the primary key(s). If no schema is provided, the schema will be inferred from a sample of items in the table. If items with multiple schemas are stored in the table you may choose to provide the schema.
At a high-level, Spark SQL schemas are a StructType
that contains a sequence of typed
StructField
s. At Medium we generate StructType
s from protobuf schemas that define the data
structure stored in a particular DynamoDB table.
// Example schema
val schema = StructType(Seq(
StructField("userId", LongType),
StructField("username", StringType)))
For details about Spark SQL schemas, see StructType.
Option | Description |
---|---|
rate_limit_per_segment |
Max number of read capacity units per second each scan segment will consume from the DynamoDB table. Default: no rate limit |
page_size |
Scan page size. Default: 1000 |
segments |
Number of segments to scan the DynamoDB table with. |
aws_credentials_provider |
Class name of the AWS credentials provider to use when connecting to DynamoDB. |
endpoint |
DynamoDB client endpoint in http://localhost:8000 format. This is generally not needed and intended for unit tests. |
filter_expression |
DynamoDB scan filter expression to be performed server-side. |
The filter_expression
reader option allows you to pass filters directly to DynamoDB to be performed "server-side". That is,
to be performed by DynamoDB servers before anything is loaded into Spark. In essence, this shifts load from your Spark
instances onto your table-dedicated DynamoDB instances. This may be beneficial if you are using a shared Spark cluster and
need to load a partial dataset from a large DynamoDB table.
import com.github.traviscrawford.spark.dynamodb._
// Run server-side filter with string value (operations supported: =, >, <, >=, <=, <>)
val tcUsers = sqlContext.read
// NOTE: No quotes (') around string value
.option("filter_expression", "username = tc")
.dynamodb("users")
// Strings can also use begins_with
val beginsWithTCUsers = sqlConfext.read
// NOTE: No quotes (') around string value
.option("filter_Expression", "begins_with(username, tc)")
.dynamodb("users")
// Run server-side filter with number value (operations supported: =, >, <, >=, <=, <>)
val highLoginUsers = sqlContext.read
.option("filter_expression", "num_logins > 100")
.dynamodb("users")
For more information on DynamoDB Scan filters, see the AWS documentation here.
Like most NoSQL databases, DynamoDB does not strictly enforce schemas. For this reason it may not be appropriate to
load your DynamoDB tables as Spark DataFrames. This library provides support for loading DynamoDB tables as
RDD[String]
containing stringified JSON.
Scan a table from the command-line with the following command. Spark requires the name of your job jar, though you may not have one when scanning a table - simply put a placeholder name to satisfy the launcher script.
spark-submit \
--class com.github.traviscrawford.spark.dynamodb.DynamoBackupJob \
--packages com.github.traviscrawford:spark-dynamodb:0.0.6 \
fakeJar
The following flags are supported:
usage: com.github.traviscrawford.spark.dynamodb.DynamoBackupJob$ [<flag>...]
flags:
-credentials='java.lang.String': Optional AWS credentials provider class name.
-help='false': Show this help
-output='java.lang.String': Path to write the DynamoDB table backup.
-pageSize='1000': Page size of each DynamoDB request.
-rateLimit='Int': Max number of read capacity units per second each scan segment will consume.
-region='java.lang.String': Region of the DynamoDB table to scan.
-table='java.lang.String': DynamoDB table to scan.
-totalSegments='1': Number of DynamoDB parallel scan segments.
To integrate with your own code, see DynamoScanner
.
This fork by jphaugla is to update the version for spark 2.4 and AWS SDK 1.11.519
To allow this to work with a local machine the pom.xml file was changed to create a with-dependencies jar file as well as to do an install.
use this mvn command
mvn install -DskipTests -Dgpg.skip
Added DynamoToCassandra.scala program to fully copy a DyanamoDB table to a Cassandra table when passed the DynamaDB table name. A runscript is also added.
Can test spark shell using jar-with-dependencies file using spark 2.4.0
./spark-shell --jars=/Users/jasonhaugland/gits/spark-dynamodb/target/spark-dynamodb-0.0.14-SNAPSHOT-jar-with-dependencies.jar --packages datastax:spark-cassandra-connector:2.4.0-s_2.11
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import com.github.traviscrawford.spark.dynamodb._
val jph_test = spark.read.dynamodb("JPH_test")
jph_test.show();
jph_test.write.cassandraFormat("jph_test","testks").save()