3.2.2. MRv2 Terminology

Resource Model

YARN supports a very general resource model for applications. An application (via the Application Master) can request resources with highly specific requirements such as:

  • Resource name (more complex network topologies that would include host name and rack name are currently under development)

  • Memory (in MB)

  • CPU (cores, for now)

Container Specification During Launch:

While a Container is merely a right to use a specified amount of resources on a specific machine (Node Manager) in the cluster, the Application Master has to provide considerably more information to the Node Manager to actually launch the Container. This information can include .jar files, memory requested, input data, and number of CPUs. 

Resource Request

YARN is designed to allow individual applications (via the Application Master) to utilize cluster resources in a shared, secure, and multi-tenant manner. YARN also remains aware of cluster topology in order to efficiently schedule and optimize data access (i.e. reduce data motion) for applications to the greatest possible extent.

In order to meet those goals, the central Scheduler (in the Resource Manager) has extensive information about an application’s resource needs, which allows it to make better scheduling decisions across all applications in the cluster. This leads to the Resource Request and the resulting Container.

Resource-requirement

These are the resources required to start a Container, such as memory, CPU, etc.

Number-of-containers

This is essentially the unit of MRv2 for each job. Keep in mind that the Application Master runs as a Container. Up to 10% of the capacity can be allocated to the Application Master Container.

Map Phase

The first phase of a MRv2 job which runs locally with respect to the data. It takes the input data, processes it into key-value pairs, and outputs it to the reduce phase. The data is passed into the Mapper as a <key, value> pair generated by an  InputFormat instance. InputFormat determines where the input data needs to be split between the Mappers, and then generates an InputSplit instance for each split. The Partitioner creates the partition for the record. This determines which reducer will process the record.

After processing this data, the map method of the Mapper class outputs a <key, value> pair that is stored in an unsorted buffer in memory. When the buffer fills up, or when the map task is complete, the <key, value>  pairs in the buffer are sorted, then spilled to the disk. If more than one spill file was created, these files are merged into a single file of sorted <key, value> pairs. The sorted records in the spill file wait to be retrieved by a Reducer.

Mapper

The individual task of the map phase as run by the Containers. MRv2 spawns a map task for each InputSplit generated by the InputFormat.

Reduce Phase

The final phase of a MRv2 job, the Reduce phase gathers and combines the output of all of the mappers. The Reducer fetches the records from the Mapper. The Reduce phase can be broken down into three phases:

  • Shuffle: This occurs when MapReduce retrieves the output of the Mappers (via HTTP) and sends the data to the Reducers. All  records with the same key are combined and sent to the same Reducer.

  • Sort: This phase happens simultaneously with the shuffle phase. As the records are fetched and merged, they are sorted by key.

  • Reduce: The reduce method is invoked for each key.

Reducer

The individual task of the Reduce phase run by the Containers after the map phase has completed for at least one of the mappers.

Task

The partition of a job that will be run on a Container, which keeps track of the status of the MRv2 tasks that have been assigned to it.

Container Executor

Controls initialization, finalization, clean up, launching, and killing of the task’s JVM. This is the equivalent of the Task Tracker controller in MRv1.

The Container Executor is set up in two files:

container-executor.cfg -- a file which has the following configuration by default:

  • Conf name yarn.nodemanager.linux-container-executor.group has a value of  “hadoop”

  • Conf name banned.users has a value of “hdfs,yarn,mapred,bin”

  • Conf name min.user.id has a value of “1000”

yarn-site.xml -- a file with the following configuration:

  • Conf name yarn.nodemanager.container-executor.class has a value of “org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor”

  • Conf name yarn.nodemanager.linux-container-executor.group has a value of “hadoop”

Capacity Scheduler

The Capacity Scheduler is designed to run Hadoop MRV2 as a shared, multi-tenant cluster in an operator-friendly manner, while also maximizing the throughput and utilization of the cluster when running MRv2 applications.  

Users submit jobs to Queues. Queues, as a collection of jobs, allow the system to provide specific functionality. HDP comes configured with a single mandatory queue named "default".

For more information about the Scheduler, see the Capacity Scheduler guide.

ACL

The Access Control List (ACL) can be enabled on the cluster for job-level and queue-level authorization. When enabled, access control checks are performed by:

  • The Resource Manager before allowing users to submit jobs to queues and administering these jobs.

  • The Resource Manager and the Node Manager before allowing users to view job details or to modify a job using MRv2 APIs, CLI, or Web user interfaces.

There is no need for the mapred-queue-acl.xml since it is all configured in the capacity-scheduler.xml file.

Data Compression

MapReduceV2 provides facilities for the application writer to specify compression for both intermediate map-outputs and the job-outputs (i.e., the output of the reducers). It can be set up with CompressionCodec implementation for the zlib compression algorithm. The gzip file format is also supported.

Concatenation

MapReduceV2 can concatenate multiple small files into one block size that is more efficient in storage and data movement.

Distributed Cache

DistributedCache efficiently distributes large application-specific, read-only files. The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node.  It is also very common to use the DistributedCache by using the GenericOptionsParser. For example:

$ bin/hadoop jar -libjars testlib.jar -files file.txt args

The above command will copy the file.txt file to the cluster.

Hadoop Pipes

Hadoop Pipes is the name of the C++ interface to Hadoop MRv2. Hadoop Pipes use sockets as the channel over which the Node Manager communicates with the process running the C++ map or reduce function. JNI is not used.

Hadoop Streaming

This is a Hadoop API to MRv2 that allows the user to write map and reduce functions in languages other than Java (Perl, Python, etc.). Hadoop Streaming uses Windows streams as the interface between Hadoop and the program, so the user can use any language that can read standard input and write to standard output to write the MapReduce program. Streaming is naturally suited for text processing.

Security

MRv2 supports Kerberos security and can run on Kerberos-secured clusters. Extra configuration may be required to achieve functioning of MRv2 on a secured cluster.

Job .jar Files

MRv2 sets a value for the replication level for submitted job .jar files. As a best practice, set this value to approximately the square root of the number of nodes. The default value is 10.

Note: The path for hadoop .jar files is different:

/usr/lib/hadoop-mapreduce/