Accessing Cloud Data
Also available as:
PDF
loading table of contents...

Enabling the Directory Committer in Spark

Spark has its own internal output committer which needs to be switched to the new committer mechanism, and, when using Apache Parquet-formatted output, Spark expects the committer Parquet to be a subclass of ParquetOutputCommitter.

As a result three lines need to be added to spark-defaults.conf to switch to the new committers:

spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

This is all that is needed. Note that the S3A Committer is only used for Spark SQL, Datasets and Dataframes; some simple examples such as the wordcount examples do not use these APIs, so do use the new committers.

Here is an example pyspark application using the committer. There is no difference between this and other applications

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext, SparkSession
from pyspark.sql.functions import *

sconf = SparkConf()
sconf.set("spark.hadoop.fs.s3a.committer.name", "directory")
sconf.set("spark.sql.sources.commitProtocolClass",
 "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
sconf.set("spark.sql.parquet.output.committer.class",
  "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")

sc = SparkContext(appName="s3acommitter", conf = sconf)

spark = SparkSession(sc)

sourceDF = spark.range(0, 10000)
datasets = "s3a://guarded-bucket/datasets/"

sourceDF.write.format("orc").save(datasets + "orc")
sourceDF.write.format("parquet").save(datasets + "parquet")
sourceDF.write.format("csv").save(datasets + "csv")

sc.stop()