Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort

[ Go Back ]

Introduction

The pluggable shuffle and pluggable sort capabilities allow replacing the built in shuffle and sort logic with alternate implementations. Example use cases for this are: using a different application protocol other than HTTP such as RDMA for shuffling data from the Map nodes to the Reducer nodes; or replacing the sort logic with custom algorithms that enable Hash aggregation and Limit-N query.

IMPORTANT: The pluggable shuffle and pluggable sort capabilities are experimental and unstable. This means the provided APIs may change and break compatibility in future versions of Hadoop.

Implementing a Custom Shuffle and a Custom Sort

A custom shuffle implementation requires a org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.AuxiliaryService implementation class running in the NodeManagers and a org.apache.hadoop.mapred.ShuffleConsumerPlugin implementation class running in the Reducer tasks.

The default implementations provided by Hadoop can be used as references:

  • org.apache.hadoop.mapred.ShuffleHandler
  • org.apache.hadoop.mapreduce.task.reduce.Shuffle

A custom sort implementation requires a org.apache.hadoop.mapred.MapOutputCollector implementation class running in the Mapper tasks and (optionally, depending on the sort implementation) a org.apache.hadoop.mapred.ShuffleConsumerPlugin implementation class running in the Reducer tasks.

The default implementations provided by Hadoop can be used as references:

  • org.apache.hadoop.mapred.MapTask$MapOutputBuffer
  • org.apache.hadoop.mapreduce.task.reduce.Shuffle

Configuration

Except for the auxiliary service running in the NodeManagers serving the shuffle (by default the ShuffleHandler), all the pluggable components run in the job tasks. This means, they can be configured on per job basis. The auxiliary service servicing the Shuffle must be configured in the NodeManagers configuration.

Job Configuration Properties (on per job basis):

PropertyDefault ValueExplanation
mapreduce.job.reduce.shuffle.consumer.plugin.classorg.apache.hadoop.mapreduce.task.reduce.ShuffleThe ShuffleConsumerPlugin implementation to use
mapreduce.job.map.output.collector.classorg.apache.hadoop.mapred.MapTask$MapOutputBufferThe MapOutputCollector implementation to use

These properties can also be set in the mapred-site.xml to change the default values for all jobs.

NodeManager Configuration properties, yarn-site.xml in all nodes:

PropertyDefault ValueExplanation
yarn.nodemanager.aux-services...,mapreduce.shuffleThe auxiliary service name
yarn.nodemanager.aux-services.mapreduce.shuffle.classorg.apache.hadoop.mapred.ShuffleHandlerThe auxiliary service class to use

IMPORTANT: If setting an auxiliary service in addition the default mapreduce.shuffle service, then a new service key should be added to the yarn.nodemanager.aux-services property, for example mapred.shufflex. Then the property defining the corresponding class must be yarn.nodemanager.aux-services.mapreduce.shufflex.class.