Hands-On Deep Learning with Apache Spark
上QQ阅读APP看书,第一时间看更新

Data ingestion from a relational database

Suppose the data is stored in a table called sparkexample in a MySQL (https://dev.mysql.com/) schema with the name sparkdb. This is the structure of that table:

mysql> DESCRIBE sparkexample;
+-----------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+-------------+------+-----+---------+-------+
| DateTimeString | varchar(23) | YES | | NULL | |
| CustomerID | varchar(10) | YES | | NULL | |
| MerchantID | varchar(10) | YES | | NULL | |
| NumItemsInTransaction | int(11) | YES | | NULL | |
| MerchantCountryCode | varchar(3) | YES | | NULL | |
| TransactionAmountUSD | float | YES | | NULL | |
| FraudLabel | varchar(5) | YES | | NULL | |
+-----------------------+-------------+------+-----+---------+-------+
7 rows in set (0.00 sec)

It contains the same data as, for the example, in Training data ingestion through Spark, as follows:

mysql> select * from sparkexample;
+-------------------------+------------+------------+-----------------------+---------------------+----------------------+------------+
| DateTimeString | CustomerID | MerchantID | NumItemsInTransaction | MerchantCountryCode | TransactionAmountUSD | FraudLabel |
+-------------------------+------------+------------+-----------------------+---------------------+----------------------+------------+
| 2016-01-01 17:00:00.000 | 830a7u3 | u323fy8902 | 1 | USA | 100 | Legit |
| 2016-01-01 18:03:01.256 | 830a7u3 | 9732498oeu | 3 | FR | 73.2 | Legit |
|... | | | | | | |

The dependencies to add to the Scala Spark project are the following:

  • Apache Spark 2.2.1
  • Apache Spark SQL 2.2.1
  • The specific JDBC driver for the MySQL database release used

Let's now implement the Spark application in Scala. In order to connect to the database, we need to provide all of the needed parameters. Spark SQL also includes a data source that can read data from other databases using JDBC, so the required properties are the same as for a connection to a database through traditional JDBC; for example:

var jdbcUsername = "root"
var jdbcPassword = "secretpw"

val jdbcHostname = "mysqlhost"
val jdbcPort = 3306
val jdbcDatabase ="sparkdb"
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"

We need to check that the JDBC driver for the MySQL database is available, as follows:

Class.forName("com.mysql.jdbc.Driver")

We can now create a SparkSession, as follows:

val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark MySQL basic example")
.getOrCreate()

Import the implicit conversions, as follows:

import spark.implicits._

You can finally connect to the database and load the data from the sparkexample table to a DataFrame, as follows:

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"${jdbcDatabase}.sparkexample")
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.load()

Spark automatically reads the schema from a database table and maps its types back to Spark SQL types. Execute the following method on the DataFrame:

jdbcDF.printSchema()

It returns the exact same schema as for the table sparkexample; for example:

root
|-- DateTimeString: string (nullable = true)
|-- CustomerID: string (nullable = true)
|-- MerchantID: string (nullable = true)
|-- NumItemsInTransaction: integer (nullable = true)
|-- MerchantCountryCode: string (nullable = true)
|-- TransactionAmountUSD: double (nullable = true)
|-- FraudLabel: string (nullable = true)

Once the data is loaded into the DataFrame, it is possible to run SQL queries against it using the specific DSL as shown in the following example:

jdbcDF.select("MerchantCountryCode", "TransactionAmountUSD").groupBy("MerchantCountryCode").avg("TransactionAmountUSD")

It is possible to increase the parallelism of the reads through the JDBC interface. We need to provide split boundaries based on the DataFrame column values. There are four options available (columnname, lowerBound, upperBound, and numPartitions) to specify the parallelism on read. They are optional, but they must all be specified if any of them is provided; for example:

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"${jdbcDatabase}.employees")
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.option("columnName", "employeeID")
.option("lowerBound", 1L)
.option("upperBound", 100000L)
.option("numPartitions", 100)
.load()

While the examples in this section refer to a MySQL database, they apply the same way to any commercial or open source RDBMS for which a JDBC driver is available.