There are few times where I’ve chosen to use Spark as an ETL tool for it’s ease of use when it comes to reading/writing parquet, csv or xml files. Reading any of these file formats is as simple as one line of spark code (after you ensure that you have the required dependencies of course)

Transforming Spark Datasets

Intent

Most of the reference material available online for transforming Datasets points to calling createOrReplaceTempView() and registering the Dataset/Dataframe as a table, then performing SQL operations on it.

While this may be fine for most use cases, there are times when it feels more natural to use Spark’s Scala transformation functions instead, especially if you already have some pre existing transformation logic written in Scala or ReactiveX or even Java 8.

If this sounds like something you’d like to do, then please read on.

Getting Started

Let’s assume we want to make the following transformation using Datasets.

Input Dataset
+--------+----------+----------+
|flightNo| departure|   arrival|
+--------+----------+----------+
| GOI-134|1569315038|1569319183|
|  AI-123|1569290498|1569298178|
|  TK-006|1567318178|1567351838|
+--------+----------+----------+

Output Dataset
+-------+--------+
| flight|duration|
+-------+--------+
|GOI-134|   1 hrs|
| AI-123|   2 hrs|
| TK-006|   9 hrs|
+-------+--------+

Domain objects and the transformation function

We’ll assume we have the following domain objects and transformation function that converts a FlightSchedule object into a FlightInfo object.

1
2
3
4
5
6
7
8
9
case class FlightSchedule(flightNo: String, departure: Long, arrival: Long)

case class FlightInfo(flight: String, duration: String)


def existingTransformationFunction(flightSchedule: FlightSchedule): FlightInfo = {
  val duration = (flightSchedule.arrival - flightSchedule.departure) / 60 / 60
  FlightInfo(flightSchedule.flightNo, s"$duration hrs")
}

Creating the spark session and reading the input Dataset

Creating the input Dataset is kept simple for brevity.

1
2
3
4
5
6
7
8
9
val spark = SparkSession.builder().master("local[*]").appName("transform").getOrCreate()

import spark.implicits._

val schedules: Dataset[FlightSchedule] = Seq(
    FlightSchedule("GOI-134", 1569315038, 1569319183),
    FlightSchedule("AI-123", 1569290498, 1569298178),
    FlightSchedule("TK-006", 1567318178, 1567351838)
).toDS()

Defining the Encoder and the Spark transformation

This is where things start to get interesting. In order to transform a Dataset[FlightSchedule] to a Dataset[FlightInfo], Spark needs to know how to “Encode” your case class. Omitting this step will give you the following dreadful compile time error.

Error:(34, 18) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    schedules.map(s => existingTransformationFunction(s))
    

Encoders[T] are used to convert any JVM object or primitive of type T to and from Spark SQL’s InternalRow representation. Since the Dataset.map() method requires an encoder to be passed as an implicit parameter, we’ll define an implicit variable.

1
2
3
4
def makeFlightInfo(schedules: Dataset[FlightSchedule]): Dataset[FlightInfo] = {
  implicit val encoder: ExpressionEncoder[FlightInfo] = ExpressionEncoder[FlightInfo]
  schedules.map(s => existingTransformationFunction(s))
}

Transforming the Dataset

The only thing left to do now is call the transform method on the input Dataset. I will include the entire code here along with calls to show() so that we can see our results.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
object DatasetTransform {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("transform").getOrCreate()

    import spark.implicits._

    val schedules: Dataset[FlightSchedule] = Seq(
      FlightSchedule("GOI-134", 1569315038, 1569319183),
      FlightSchedule("AI-123", 1569290498, 1569298178),
      FlightSchedule("TK-006", 1567318178, 1567351838)
    ).toDS()

    // Print the input Dataset
    schedules.show()

    // Transform Dataset
    val flightInfo: Dataset[FlightInfo] = schedules.transform(makeFlightInfo)

    // Print the output Dataset
    flightInfo.show()
  }


  def makeFlightInfo(schedules: Dataset[FlightSchedule]): Dataset[FlightInfo] = {
    implicit val enc: ExpressionEncoder[FlightInfo] = ExpressionEncoder[FlightInfo]
    schedules.map(s => existingTransformationFunction(s))
  }

  def existingTransformationFunction(flightSchedule: FlightSchedule): FlightInfo = {
    val duration = (flightSchedule.arrival - flightSchedule.departure) / 60 / 60
    FlightInfo(flightSchedule.flightNo, s"$duration hrs")
  }

}

References

  1. Spark SQL Encoders