2. Run the WordCount Example

WordCount is a simple program that counts how often a word occurs in a text file.

  1. Select an input file for the Spark WordCount example. You can use any text file as input.

  2. Upload the input file to HDFS. The following example uses log4j.properties as the input file:

    su hdfs

    cd /usr/hdp/current/spark-client/

    hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties /tmp/data

  3. Run the Spark shell:

    ./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m

    You should see output similar to the following:

    Spark assembly has been built with Hive, including Datanucleus jars on classpath
    15/03/30 17:42:41 INFO SecurityManager: Changing view acls to: root
    15/03/30 17:42:41 INFO SecurityManager: Changing modify acls to: root
    15/03/30 17:42:41 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    15/03/30 17:42:41 INFO HttpServer: Starting HTTP Server
    15/03/30 17:42:41 INFO Utils: Successfully started service 'HTTP class server' on port 55958.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
          /_/
    
    Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
    Type in expressions to have them evaluated.
    Type :help for more information.
    15/03/30 17:42:47 INFO SecurityManager: Changing view acls to: root
    15/03/30 17:42:47 INFO SecurityManager: Changing modify acls to: root
    15/03/30 17:42:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    15/03/30 17:42:48 INFO Slf4jLogger: Slf4jLogger started
    15/03/30 17:42:48 INFO Remoting: Starting remoting
    15/03/30 17:42:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@green4:33452]
    15/03/30 17:42:48 INFO Utils: Successfully started service 'sparkDriver' on port 33452.
    15/03/30 17:42:48 INFO SparkEnv: Registering MapOutputTracker
    15/03/30 17:42:48 INFO SparkEnv: Registering BlockManagerMaster
    15/03/30 17:42:48 INFO DiskBlockManager: Created local directory at /tmp/spark-a0fdb1ce-d395-497d-bf6f-1cf00ae253b7/spark-52dfe754-7f19-4b5b-bd73-0745a1f6d158
    15/03/30 17:42:48 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
    15/03/30 17:42:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    15/03/30 17:42:49 INFO HttpFileServer: HTTP File server directory is /tmp/spark-817944df-07d2-4205-972c-e1b877ca4869/spark-280ea9dd-e40d-4ec0-8ecf-8c4b159dafaf
    15/03/30 17:42:49 INFO HttpServer: Starting HTTP Server
    15/03/30 17:42:49 INFO Utils: Successfully started service 'HTTP file server' on port 56174.
    15/03/30 17:42:49 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    15/03/30 17:42:49 INFO SparkUI: Started SparkUI at http://green4:4040
    15/03/30 17:42:49 INFO Executor: Starting executor ID <driver> on host localhost
    15/03/30 17:42:49 INFO Executor: Using REPL class URI: http://172.23.160.52:55958
    15/03/30 17:42:49 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@green4:33452/user/HeartbeatReceiver
    15/03/30 17:42:49 INFO NettyBlockTransferService: Server created on 47704
    15/03/30 17:42:49 INFO BlockManagerMaster: Trying to register BlockManager
    15/03/30 17:42:49 INFO BlockManagerMasterActor: Registering block manager localhost:47704 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 47704)
    15/03/30 17:42:49 INFO BlockManagerMaster: Registered BlockManager
    15/03/30 17:42:49 INFO SparkILoop: Created spark context..
    Spark context available as sc.
    
    scala> 

  4. Submit the job. At the scala prompt, type the following commands, replacing node names, file name and file location with your own values:

    val file = sc.textFile("/tmp/data")

    val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ +_)

    counts.saveAsTextFile("/tmp/wordcount")

  5. To view the output from within the scala shell:

    counts.toArray().foreach(println)

    To view the output using HDFS:

    1. Exit the scala shell:

      scala > exit

    2. View WordCount job results:

      hadoop fs -l /tmp/wordcount

      You should see output similar to the following:

      /tmp/wordcount/_SUCCESS
      /tmp/wordcount/part-00000
      /tmp/wordcount/part-00001
    3. Use the HDFS cat command to list WordCount output. For example:

      hadoop fs -cat /tmp/wordcount/part*