In this post we will walk through the process of writing a Spark DataFrame to an Elasticsearch index.

Spark Dataframes to ES Index

Elastic provides Apache Spark Support via elasticsearch-hadoop, which has native integration between Elasticsearch and Apache Spark.

Note: All examples are written in Scala 2.11 with Spark SQL 2.3.x. Prior experience with Apache Spark is a pre-requisite.

Breakdown:

  • Maven Dependencies.
  • Spark-ES Configurations.
  • writeToIndex() Code.

Maven Dependencies

The dependencies mentioned below should be present in your classpath. elasticsearch-spark-20 provides the native Elasticsearch support to Spark and commons-httpclient is needed to make RESTful calls to the Elasticsearch APIs. For some strange reason this version of elasticsearch-spark-20 omitted the http client dependency so it had to be added manually.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-------------------
Snippet of the pom.xml
-------------------
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.4.2</version>
</dependency>

Spark-ES Configurations

In order for Spark to communicate with the Elasticsearch, we’ll need to know where the ES node(s) are located as well as the port to communicate with. These are provided to the SparkSession by setting the spark.es.nodes and spark.es.port configurations.

Note: The example here used Elasticsearch hosted to AWS and hence needed an additional configuration spark.es.nodes.wan.only to be set to true.

Let’s see some code to create the SparkSession.

1
2
3
4
5
6
7
8
val spark = SparkSession
     .builder()
     .appName("WriteToES")
     .master("local[*]")
     .config("spark.es.nodes","<IP-OF-ES-NODE>")
     .config("spark.es.port","<ES-PORT>")
     .config("spark.es.nodes.wan.only","true") // Needed for ES hosted on AWS
     .getOrCreate()

writeToIndex() Code

First we’ll define a case class to represent our index structure.

1
case class AlbumIndex(artist:String, yearOfRelease:Int, albumName: String)

Next we’ll create a Seq of AlbumIndex objects and convert them to a DataFrame using the handy .toDF() function, which can be invoked by importing spark.implicits._.

1
2
3
4
5
6
7
import spark.implicits._

   val indexDocuments = Seq(
        AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
        AlbumIndex("Boston",1976,"Boston"),
        AlbumIndex("Fleetwood Mac", 1979,"Tusk")
   ).toDF

Note: spark here represents the SparkSession object.

Once we have our DataFrame ready, all we need to do is import org.elasticsearch.spark.sql._ and invoke the .saveToEs() method on it.

1
2
3
import org.elasticsearch.spark.sql._

indexDocuments.saveToEs("demoindex/albumindex")

Here is the entire source code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql._

object WriteToElasticSearch {

 def main(args: Array[String]): Unit = {
   WriteToElasticSearch.writeToIndex()
 }

 def writeToIndex(): Unit = {

   val spark = SparkSession
     .builder()
     .appName("WriteToES")
     .master("local[*]")
     .config("spark.es.nodes","<IP-OF-ES-NODE>")
     .config("spark.es.port","<ES-PORT>")
     .config("spark.es.nodes.wan.only","true") // Needed for ES hosted on AWS
     .getOrCreate()

   import spark.implicits._

   val indexDocuments = Seq(
   AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
   AlbumIndex("Boston",1976,"Boston"),
   AlbumIndex("Fleetwood Mac", 1979,"Tusk")
   ).toDF

   indexDocuments.saveToEs("demoindex/albumindex")
 }
}

case class AlbumIndex(artist:String, yearOfRelease:Int, albumName: String)

References

  1. Elasticsearch Spark Support.