Running Apache Spark Applications
Also available as:
PDF

Automating Spark Jobs with Oozie Spark Action

Use the following steps to automate Apache Spark jobs using Oozie Spark action.

About Oozie Spark Action

If you use Apache Spark as part of a complex workflow with multiple processing steps, triggers, and interdependencies, consider using Apache Oozie to automate jobs. Oozie is a workflow engine that executes sequences of actions structured as directed acyclic graphs (DAGs). Each action is an individual unit of work, such as a Spark job or Hive query.

The Oozie "Spark action" runs a Spark job as part of an Oozie workflow. The workflow waits until the Spark job completes before continuing to the next action.

For additional information about Spark action, see the Apache "Oozie Spark Action Extension" documentation. For general information about Oozie, see "Using HDP for Workflow and Scheduling with Oozie" in the HDP Data Movement and Integration guide. For general information about using Workflow Manager, see the HDP Workflow Management guide.

Note
Note

Spark 2 support for Oozie Spark action is available as a technical preview; it is not ready for production deployment. Configuration is through manual steps (not Ambari).

Support for yarn-client execution mode for Oozie Spark action will be removed in a future release. Oozie will continue to support yarn-cluster execution mode for Oozie Spark action.

Configure Oozie Spark Action for Spark

To use Oozie Spark action with Spark jobs, create a spark2 ShareLib directory, copy associated files into it, and then point Oozie to spark2. The Oozie ShareLib is a set of libraries that allow jobs to run on any node in a cluster.

  1. Create a spark2 ShareLib directory under the Oozie ShareLib directory associated with the oozie service user:
    hdfs dfs -mkdir /user/oozie/share/lib/lib_<ts>/spark2
  2. Copy spark2 jar files from the spark2 jar directory to the Oozie spark2 ShareLib:
    hdfs dfs -put \
        /usr/hdp/<version>/spark2/jars/* \
        /user/oozie/share/lib/lib_<ts>/spark2/
  3. Copy the oozie-sharelib-spark jar file from the spark ShareLib directory to the spark2 ShareLib directory:
    hdfs dfs -cp \
        /user/oozie/share/lib/lib_<ts>/spark/oozie-sharelib-spark-<version>.jar \
        /user/oozie/share/lib/lib_<ts>/spark2/
  4. Copy the hive-site.xml file from the current spark ShareLib to the spark2 ShareLib:
    hdfs dfs -cp \
        /user/oozie/share/lib/lib_<ts>/spark/hive-site.xml \
        /user/oozie/share/lib/lib_<ts>/spark2/
  5. Copy Python libraries to the spark2 ShareLib:
    hdfs dfs -put \
        /usr/hdp/<version>/spark2/python/lib/py* \
        /user/oozie/share/lib/lib_<ts>/spark2/
  6. Run the Oozie sharelibupdate command:
    oozie admin –sharelibupdate

To verify the configuration, run the Oozie shareliblist command. You should see spark2 in the results.

oozie admin –shareliblist spark2

To run a Spark job with the spark2 ShareLib, add the action.sharelib.for.spark property to the job.properties file, and set its value to spark2:

oozie.action.sharelib.for.spark=spark2

The following examples show a workflow definition XML file, an Oozie job configuration file, and a Python script for running a Spark2-Pi job.

Sample Workflow.xml file for spark2-Pi:

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkPythonPi'>
          <start to='spark-node' />
          
          <action name='spark-node'>
            <spark xmlns="uri:oozie:spark-action:0.1">
              <job-tracker>${jobTracker}</job-tracker>
              <name-node>${nameNode}</name-node>
              <master>${master}</master>
              <name>Python-Spark-Pi</name>
              <jar>pi.py</jar>
            </spark>
            <ok to="end" />
            <error to="fail" />
          </action>
          
          <kill name="fail">
            <message>Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
          </kill>
          <end name='end' />
        </workflow-app>

Sample Job.properties file for spark2-Pi:

nameNode=hdfs://host:8020
jobTracker=host:8050
queueName=default
examplesRoot=examples
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/pyspark
master=yarn-cluster
oozie.action.sharelib.for.spark=spark2

Sample Python script, lib/pi.py:

import sys
from random import random
from operator import add
from pyspark import SparkContext
        
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="Python-Spark-Pi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
        
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
        
count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
        
sc.stop()

Troubleshooting .jar file conflicts with Oozie Spark action

When using Oozie Spark action, Oozie jobs may fail with the following error if there are .jar file conflicts between the "oozie" sharelib and the "spark2" sharelib.

2018-06-04 13:27:32,652 WARN SparkActionExecutor:523 - SERVER[XXXX] USER[XXXX] GROUP[-] TOKEN[] APP[XXXX] JOB[0000000-<XXXXX>-oozie-oozi-W] ACTION[0000000-<XXXXXX>-oozie-oozi-W@spark2] Launcher exception: Attempt to add (hdfs://XXXX/user/oozie/share/lib/lib_XXXXX/oozie/aws-java-sdk-kms-1.10.6.jar) multiple times to the distributed cache. 
java.lang.IllegalArgumentException: Attempt to add (hdfs://XXXXX/user/oozie/share/lib/lib_20170727191559/oozie/aws-java-sdk-kms-1.10.6.jar) multiple times to the distributed cache. 
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13$anonfun$apply$8.apply(Client.scala:632) 
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13$anonfun$apply$8.apply(Client.scala:623) 
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74) 
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13.apply(Client.scala:623) 
at org.apache.spark.deploy.yarn.Client$anonfun$prepareLocalResources$13.apply(Client.scala:622) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:622) 
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:895) 
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:171) 
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1231) 
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1290) 
at org.apache.spark.deploy.yarn.Client.main(Client.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:750) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:311) 
at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:232) 
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:58) 
at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:62) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:237) 
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) 
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) 
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:422) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) 
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164) 

Run the following commands to resolve this issue.

Note
Note

You may need to perform a backup before running the rm commands.

hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark2/aws* 
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark2/azure* 
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark2/hadoop-aws* 
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark2/hadoop-azure* 
hadoop fs -rm /user/oozie/share/lib/lib_<ts>/spark2/ok*
hadoop fs -mv /user/oozie/share/lib/lib_<ts>/oozie/jackson* /user/oozie/share/lib/lib_<ts>/oozie.old 

Next, run the following command to update the Oozie sharelib:

oozie admin -oozie http://<oozie-server-hostname>:11000/oozie -sharelibupdate