Databricks Spark Reference Applications

Reusing Code from Batching: transform()

As you may have noticed, while the functions you called on a DStream are named the same as those you called on an RDD in the batch example, they are not the same methods, and it may not be clear how to reuse the code from the batch examples. In this section, we refactor the code from the batch examples and show how to reuse it here.

DStreams have transform functions which allows you to call any arbitrary RDD to RDD functions to RDD's in the DStream. The transform functions are perfect for reusing any RDD to RDD functions that you may have written in batch code and want to port over to streaming. Let's look at some code to illustrate this point.

Let's say we have separated out a function, responseCodeCount from our batch example that can compute the response code count given the apache access logs RDD:

public static JavaPairRDD<Integer, Long> responseCodeCount(
   JavaRDD<ApacheAccessLog> accessLogRDD) {
  return accessLogRDD
     .mapToPair(s -> new Tuple2<>(s.getResponseCode(), 1L))
     .reduceByKey(SUM_REDUCER);
}

The responseCodeCountDStream can be created by calling transformToPair with the responseCodeCount function to the accessLogDStream. Then, you can finish up by calling updateStateByKey to keep a running count of the response codes for all of time, and use forEachRDD to print the values out:

// Compute Response Code to Count.
// Notice the user transformToPair to produce the a DStream of
// response code counts, and then updateStateByKey to accumulate
// the response code counts for all of time.
JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogDStream
   .transformToPair(LogAnalyzerStreamingTotalRefactored::responseCodeCount);
JavaPairDStream<Integer, Long> cumulativeResponseCodeCountDStream =
   responseCodeCountDStream.updateStateByKey(COMPUTE_RUNNING_SUM);
cumulativeResponseCodeCountDStream.foreachRDD(rdd -> {
  System.out.println("Response code counts: " + rdd.take(100));
  return null;
});

It is possible to combine transform functions before and after an updateStateByKey as well:

// A DStream of ipAddresses accessed > 10 times.
JavaDStream<String> ipAddressesDStream = accessLogDStream
   .transformToPair(LogAnalyzerStreamingTotalRefactored::ipAddressCount)
   .updateStateByKey(COMPUTE_RUNNING_SUM)
   .transform(LogAnalyzerStreamingTotalRefactored::filterIPAddress);
ipAddressesDStream.foreachRDD(rdd -> {
  List<String> ipAddresses = rdd.take(100);
  System.out.println("All IPAddresses > 10 times: " + ipAddresses);
  return null;
});

Take a closer look at LogAnalyzerStreamingTotalRefactored.java now to see how that code has been refactored to reuse code from the batch example.