In this post we will walk through the process of writing a Spark DataFrame
to an Elasticsearch index.
Elastic provides Apache Spark Support via elasticsearch-hadoop, which has native integration between Elasticsearch and Apache Spark.
Note: All examples are written in Scala 2.11 with Spark SQL 2.3.x. Prior experience with Apache Spark is a pre-requisite.
Breakdown:
- Maven Dependencies.
- Spark-ES Configurations.
- writeToIndex() Code.
Maven Dependencies
The dependencies mentioned below should be present in your classpath
. elasticsearch-spark-20
provides the native Elasticsearch support to Spark and commons-httpclient
is needed to make RESTful calls to the Elasticsearch APIs. For some strange reason this version of elasticsearch-spark-20
omitted the http client dependency so it had to be added manually.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-------------------
Snippet of the pom.xml
-------------------
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.4.2</version>
</dependency>
Spark-ES Configurations
In order for Spark to communicate with the Elasticsearch, we’ll need to know where the ES node(s) are located as well as the port to communicate with. These are provided to the SparkSession
by setting the spark.es.nodes
and spark.es.port
configurations.
Note: The example here used Elasticsearch hosted to AWS and hence needed an additional configuration spark.es.nodes.wan.only
to be set to true
.
Let’s see some code to create the SparkSession
.
1
2
3
4
5
6
7
8
val spark = SparkSession
.builder()
.appName("WriteToES")
.master("local[*]")
.config("spark.es.nodes","<IP-OF-ES-NODE>")
.config("spark.es.port","<ES-PORT>")
.config("spark.es.nodes.wan.only","true") // Needed for ES hosted on AWS
.getOrCreate()
writeToIndex() Code
First we’ll define a case class
to represent our index structure.
1
case class AlbumIndex(artist:String, yearOfRelease:Int, albumName: String)
Next we’ll create a Seq
of AlbumIndex
objects and convert them to a DataFrame
using the handy .toDF()
function, which can be invoked by importing spark.implicits._
.
1
2
3
4
5
6
7
import spark.implicits._
val indexDocuments = Seq(
AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
AlbumIndex("Boston",1976,"Boston"),
AlbumIndex("Fleetwood Mac", 1979,"Tusk")
).toDF
Note: spark
here represents the SparkSession
object.
Once we have our DataFrame
ready, all we need to do is import org.elasticsearch.spark.sql._
and invoke the .saveToEs()
method on it.
1
2
3
import org.elasticsearch.spark.sql._
indexDocuments.saveToEs("demoindex/albumindex")
Here is the entire source code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql._
object WriteToElasticSearch {
def main(args: Array[String]): Unit = {
WriteToElasticSearch.writeToIndex()
}
def writeToIndex(): Unit = {
val spark = SparkSession
.builder()
.appName("WriteToES")
.master("local[*]")
.config("spark.es.nodes","<IP-OF-ES-NODE>")
.config("spark.es.port","<ES-PORT>")
.config("spark.es.nodes.wan.only","true") // Needed for ES hosted on AWS
.getOrCreate()
import spark.implicits._
val indexDocuments = Seq(
AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
AlbumIndex("Boston",1976,"Boston"),
AlbumIndex("Fleetwood Mac", 1979,"Tusk")
).toDF
indexDocuments.saveToEs("demoindex/albumindex")
}
}
case class AlbumIndex(artist:String, yearOfRelease:Int, albumName: String)