Hands-On Deep Learning with Apache Spark
上QQ阅读APP看书,第一时间看更新

Data ingestion through DataVec and transformation through Spark

Data can come from many sources and in many types, for example:

  • Log files
  • Text documents
  • Tabular data
  • Images
  • Videos

When working with neural nets, the end goal is to convert each data type into a collection of numerical values in a multidimensional array. Data could also need to be pre-processed before it can be used to train or test a net. Therefore, an ETL process is needed in most cases, which is a sometimes underestimated challenge that data scientists have to face when doing ML or DL. That's when the DL4J DataVec library comes to the rescue. After data is transformed through this library API, it comes into a format (vectors) understandable by neural networks, so DataVec quickly produces open standard compliant vectorized data.

DataVec supports out-of-the-box all the major types of input data (text, CSV, audio, video, image) with their specific input formats. It can be extended for specialized input formats not covered by the current release of its API. You can think about the DataVec input/output format system as the same way Hadoop MapReduce uses InputFormat implementations to determine the logical InputSplits and the RecordReaders implementation to use. It also provides RecordReaders to serialize data. This library also includes facilities for feature engineering, data cleanup, and normalization. They work with both static data and time series. All of the available functionalities can be executed on Apache Spark through the DataVec-Spark module.

If you want to know more about the Hadoop MapReduce classes mentioned previously, you can have a look at the following official online Javadocs:

Let's see a practical code example in Scala. We want to extract data from a CSV file that contains some e-shop transactions and have the following columns:

  • DateTimeString
  • CustomerID
  • MerchantID
  • NumItemsInTransaction
  • MerchantCountryCode
  • TransactionAmountUSD
  • FraudLabel

Then, we perform some transformation over them.

We need to import the required dependencies (Scala, Spark, DataVec, and DataVec-Spark) first. Here is a complete list for a Maven POM file (but, of course, you can use SBT or Gradle):

<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.1</spark.version>
<dl4j.version>0.9.1</dl4j.version>
<datavec.spark.version>0.9.1_spark_2</datavec.spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.datavec</groupId>
<artifactId>datavec-api</artifactId>
<version>${dl4j.version}</version>
</dependency>

<dependency>
<groupId>org.datavec</groupId>
<artifactId>datavec-spark_2.11</artifactId>
<version>${datavec.spark.version}</version>
</dependency>
</dependencies>

The first thing to do in the Scala application is to define the input data schema, as follows:

val inputDataSchema = new Schema.Builder()
.addColumnString("DateTimeString")
.addColumnsString("CustomerID", "MerchantID")
.addColumnInteger("NumItemsInTransaction")
.addColumnCategorical("MerchantCountryCode", List("USA", "CAN", "FR", "MX").asJava)
.addColumnDouble("TransactionAmountUSD", 0.0, null, false, false) //$0.0 or more, no maximum limit, no NaN and no Infinite values
.addColumnCategorical("FraudLabel", List("Fraud", "Legit").asJava)
.build

If input data is numeric and appropriately formatted then a CSVRecordReader (https://deeplearning4j.org/datavecdoc/org/datavec/api/records/reader/impl/csv/CSVRecordReader.html) may be satisfactory. If, however, the input data has non-numeric fields, then a schema transformation will be required. DataVec uses Apache Spark to perform transform operations. Once we have the input schema, we can define the transformation we want to apply to the input data. Just a couple of transformations are described in this example. We can remove some columns that are unnecessary for our net, for example:

val tp = new TransformProcess.Builder(inputDataSchema)
.removeColumns("CustomerID", "MerchantID")
.build

Filter the MerchantCountryCode column in order to get the records related to USA and Canada only, as follows:

.filter(new ConditionFilter(
new CategoricalColumnCondition("MerchantCountryCode", ConditionOp.NotInSet, new HashSet(Arrays.asList("USA","CAN")))))

At this stage, the transformations are only defined, but not applied yet (of course we need to get the data from the input file first). So far, we have used DataVec classes only. In order to read the data and apply the defined transformations, the Spark and DataVec-Spark API need to be used.

Let's create the SparkContext first, as follows:

val conf = new SparkConf
conf.setMaster(args[0])
conf.setAppName("DataVec Example")

val sc = new JavaSparkContext(conf)

Now, we can read the CSV input file and parse the data using a CSVRecordReader, as follows:

val directory = new ClassPathResource("datavec-example-data.csv").getFile.getAbsolutePath
val stringData = sc.textFile(directory)

val rr = new CSVRecordReader
val parsedInputData = stringData.map(new StringToWritablesFunction(rr))

Then execute the transformation defined earlier, as follows:

val processedData = SparkTransformExecutor.execute(parsedInputData, tp)

Finally, let's collect the data locally, as follows:

val processedAsString = processedData.map(new WritablesToStringFunction(","))
val processedCollected = processedAsString.collect
val inputDataCollected = stringData.collect

The input data is as follows:

The processed data is as follows:

The full code of this example is part of the source code included with the book.