Cloud Data Access
Also available as:
PDF
loading table of contents...

Chapter 7. Accessing Cloud Data in Spark

Datasets stored in S3, ADLS or WASB can be made available in Spark.

S3, ADLS or WASB are viewed by Spark as filesystems, allowing them to be used as the source and destination of data of data: be it batch, SQL, DataFrame, or Spark Streaming. To load and save data in the cloud, Spark uses the same APIs that is used to load and save data in HDFS or other filesystems.

Provided the relevant libraries are on the classpath, a file stored in S3, ADLS or WASB can be referenced simply via a URL:

sparkContext.textFile("s3a://landsat-pds/scene_list.gz").count()

Similarly, an RDD can be saved to an object store via saveAsTextFile():

val numbers = sparkContext.parallelize(1 to 1000)
// save to Amazon S3 (or compatible implementation)
numbers.saveAsTextFile("s3a://bucket1/counts")

Example 1: DataFrames

DataFrames can read from and write to object stores using their read() and write() methods:

import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StringTypeval spark =SparkSession
    .builder
    .appName("DataFrames")
    .config(sparkConf)
    .getOrCreate()
import spark.implicits._
val numRows =1000// generate test dataval sourceData = spark.range(0, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))

// define the destinationval dest ="s3a://bucket1/dataframes"// write the dataval orcFile = dest + "/data.orc"
sourceData.write.format("orc").save(orcFile)

// now read it backval orcData = spark.read.format("orc").load(orcFile)

// finally, write the data as Parquet
orcData.write.format("parquet").save(dest + "/data.parquet")

spark.stop()
[Note]Note

Checkpointing streaming data to an S3 bucket is very slow, as the stream data is (potentially) recalculated, uploaded to S3, and then renamed into the checkpoint file (the rename being a slow copy operation). If S3 is used for checkpointing, the interval between checkpoints must be long enough to allow for this slow checkpoint.

Example 2: Spark Streaming and Cloud Storage

Spark Streaming can monitor files added to object stores by creating a FileInputDStream DStream monitoring path under a bucket:

import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming._

val sparkConf =newSparkConf()
val ssc =newStreamingContext(sparkConf, Milliseconds(5000))
try {
  val lines = ssc.textFileStream("s3a://bucket1/incoming")
  val matches = lines.filter(_.endsWith("3"))
  matches.print()
  ssc.start()
  ssc.awaitTermination()
} finally {
  ssc.stop(true)
}
[Note]Note

The time to scan for new files is proportional to the number of files under the path — not the number of new files — so this can become a slow operation.

Related Links

Committing Output to S3

Improving Spark Performance with S3/ADLS/WASB