Accessing Cloud Data
Also available as:
PDF
loading table of contents...

Chapter 8. Accessing Cloud Data in Spark

Datasets stored in cloud object stores can used in Spark as if it were stored in HDFS.

All these object stores are viewed by Spark as filesystems, allowing them to be used as the source and destination of data of data: be it batch, SQL, DataFrame, or Spark Streaming. To load and save data in the cloud, Spark uses the same APIs that is used to load and save data in HDFS or other filesystems.

Provided the relevant libraries are on the classpath, a file stored in a Cloud Object Store can be referenced simply via a URL:

sparkContext.textFile("s3a://landsat-pds/scene_list.gz").count()

Similarly, an RDD can be saved to an object store via saveAsTextFile():

val numbers = sparkContext.parallelize(1 to 1000)
// save to Amazon S3 (or compatible implementation)
numbers.saveAsTextFile("s3a://bucket1/counts")

Example 1: DataFrames

DataFrames can read from and write to object stores using their read() and write() methods:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
val spark = SparkSession
    .builder
    .appName("DataFrames")
    .config(sparkConf)
    .getOrCreate()
import spark.implicits._
val numRows =1000
// generate test dataval
sourceData = spark.range(0, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))

// define the destination
val dest ="adl://hortonworks-eu.azuredatalakestore.net/dataframes"
// write the data
val orcFile = dest + "/data.orc"
sourceData.write.format("orc").save(orcFile)

// now read it back
val orcData = spark.read.format("orc").load(orcFile)

// finally, write the data as Parquet
orcData.write.format("parquet").save(dest + "/data.parquet")

spark.stop()

Example 2: Spark Streaming and Cloud Storage

Spark Streaming can monitor files added to object stores by creating a FileInputDStream DStream monitoring a path under a bucket:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._

val sparkConf =newSparkConf()
val ssc = newStreamingContext(sparkConf, Milliseconds(5000))
try {
  val lines = ssc.textFileStream("s3a://bucket1/incoming")
  val matches = lines.filter(_.endsWith("3"))
  matches.print()
  ssc.start()
  ssc.awaitTermination()
} finally {
  ssc.stop(true)
}
[Note]Note

The time to scan for new files is proportional to the number of files under the path — not the number of new files — so this can become a slow operation.

Checkpointing streaming data to an S3 bucket is very slow, as the stream data is (potentially) recalculated, uploaded to S3, and then renamed into the checkpoint file (the rename being a slow copy operation). If S3 is used for checkpointing, the interval between checkpoints must be long enough to allow for this slow checkpoint. WASB, ADL and GCS all have faster rename operations, so not not suffer from this problem.

Related Links

Using S3 as a Safe and Fast Destination of Work

Improving Spark Performance with Cloud Storage