Apache > Hadoop > Pig
 

Zebra and MapReduce

Overview

MapReduce allows you to take full advantage of Zebra's capabilities.

Hadoop MapReduce APIs

This release of Zebra supports the "new" jobContext-style MapReduce APIs.

  • org.apache.hadoop.mapreduce.* - supported ("new" jobContext-style mapreduce API)
  • org.apache.hadoop.mapred.* - supported, but deprecated ("old" jobConf-style mapreduce API)

Zebra MapReduce APIs

Zebra includes several classes for use in MapReduce programs. The main entry point into Zebra are the two classes for reading and writing tables, namely TableInputFormat and BasicTableOutputFormat.

Zebra MapReduce Examples

Table Output Format

This MapReduce example demonstrates the Zebra table output format. The Zebra table in this example has two unsorted columns groups, each of which has one column. The output format is specified as follows:

BasicTableOutputFormat.setStorageInfo(jobContext, 
    ZebraSchema.createZebraSchema("word:string, count:int"), 
    ZebraStorageHint.createZebraStorageHint("[word];[count]"), 
    null);


The input file for this example should contain rows of word and count, separated by a space. For example:

this 2
is 1
a 5
test 2
hello 1
world 3

The example works like this. The first job is in Zebra format. The second job reads output from the first job, where Count is specified as a projection column. The table input format projects an input row which has both Word and Count into a row containing only the Count column and hands it to map. The reducer sums the counts and produces a sum of counts which should match total number of words in original text.

 package org.apache.hadoop.zebra.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
import org.apache.pig.data.Tuple;

import java.io.IOException;
import java.util.Iterator;



public class TableMapReduceExample extends Configured implements Tool {

	static class Map extends Mapper<LongWritable, Text, BytesWritable, Tuple> {
		private BytesWritable bytesKey;
		private Tuple tupleRow;

		/**
		 * Map method for reading input.
		 */
		@Override
		public void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException {

			// value should contain "word count"
			String[] wordCount = value.toString().split(" ");
			if (wordCount.length != 2) {
				// LOG the error
				throw new IOException("Value does not contain two fields:" + value);
			}

			byte[] word = wordCount[0].getBytes();
			bytesKey.set(word, 0, word.length);
			tupleRow.set(0, new String(word));
			tupleRow.set(1, Integer.parseInt(wordCount[1]));

			context.write( bytesKey, tupleRow );
		}

		/**
		 * Configuration of the job. Here we create an empty Tuple Row.
		 */
		@Override
		public void setup(Context context) {
			bytesKey = new BytesWritable();
			try {
				Schema outSchema = BasicTableOutputFormat.getSchema( context );
				tupleRow = TypesUtils.createTuple(outSchema);
			} catch (IOException e) {
				throw new RuntimeException(e);
			} catch (ParseException e) {
				throw new RuntimeException(e);
			}
		}

	}

	static class ProjectionMap extends
	Mapper<BytesWritable, Tuple, Text, IntWritable> {
		private final static Text all = new Text("All");

		/**
		 * Map method which gets count column after projection.
		 * 
		 * @throws IOException
		 */
		@Override
		public void map(BytesWritable key, Tuple value, Context context)
		throws IOException, InterruptedException {
			context.write( all, new IntWritable((Integer) value.get(0)) );
		}
	}

	public static class ProjectionReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
		/**
		 * Reduce method which implements summation. Acts as both reducer and
		 * combiner.
		 * 
		 * @throws IOException
		 */
		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context)
		throws IOException, InterruptedException {
			int sum = 0;
			Iterator<IntWritable> iterator = values.iterator();
			while (iterator.hasNext()) {
				sum += iterator.next().get();
			}
			context.write(key, new IntWritable(sum));
		}
	}

	/**
	 * Where jobs and their settings and sequence is set.
	 * 
	 * @param args
	 *          arguments with exception of Tools understandable ones.
	 */
	public int run(String[] args) throws Exception {
		if (args == null || args.length != 3) {
			System.out
			.println("usage: TableMapReduceExample input_path_for_text_file output_path_for_table output_path_for_text_file");
			System.exit(-1);
		}

		/*
		 * First MR Job creating a Table with two columns
		 */
		Job job = new Job();
		job.setJobName("TableMapReduceExample");
		Configuration conf = job.getConfiguration();
		conf.set("table.output.tfile.compression", "none");

		// Input settings
		job.setInputFormatClass(TextInputFormat.class);
		job.setMapperClass(Map.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// Output settings
		job.setOutputFormatClass(BasicTableOutputFormat.class);
		BasicTableOutputFormat.setOutputPath( job, new Path(args[1]) );

		// set the logical schema with 2 columns
		BasicTableOutputFormat.setSchema( job, "word:string, count:int" );

		// for demo purposes, create 2 physical column groups
		BasicTableOutputFormat.setStorageHint( job, "[word];[count]" );

		// set map-only job.
		job.setNumReduceTasks(0);

		// Run Job
		job.submit();

		/*
		 * Second MR Job for Table Projection of count column
		 */
		Job projectionJob = new Job();
		projectionJob.setJobName("TableProjectionMapReduceExample");
		conf = projectionJob.getConfiguration();

		// Input settings
		projectionJob.setMapperClass(ProjectionMap.class);
		projectionJob.setInputFormatClass(TableInputFormat.class);
		TableInputFormat.setProjection(job, "count");
		TableInputFormat.setInputPaths(job, new Path(args[1]));
		projectionJob.setMapOutputKeyClass(Text.class);
		projectionJob.setMapOutputValueClass(IntWritable.class);

		// Output settings
		projectionJob.setOutputFormatClass(TextOutputFormat.class);
		FileOutputFormat.setOutputPath(projectionJob, new Path(args[2]));
		projectionJob.setReducerClass(ProjectionReduce.class);
		projectionJob.setCombinerClass(ProjectionReduce.class);

		// Run Job
		projectionJob.submit();

		return 0;
	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new TableMapReduceExample(),
				args);
		System.exit(res);
	}
}

   

Table Input/Output Formats

This MapReduce examples demonstrates how to perform a simple union. To run this program, we need two basic tables that contain the data as in the example above (word, count). In this example they are: /user/mapredu/t1 and /user/mapredu/t2. The resulting table is /user/mapredu2/t.

package org.apache.hadoop.zebra.mapreduce;

import java.io.IOException;
import java.util.List;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
import org.apache.pig.data.Tuple;

public class TableMRSample2 {
	static class MapClass extends
	Mapper<BytesWritable, Tuple, BytesWritable, Tuple> {
		private BytesWritable bytesKey;
		private Tuple tupleRow;

		@Override
		public void map(BytesWritable key, Tuple value, Context context)
		throws IOException, InterruptedException {
			System.out.println(key.toString() + value.toString());
			context.write(key, value);
		}

		@Override
		public void setup(Context context) {
			bytesKey = new BytesWritable();
			try {
				Schema outSchema = BasicTableOutputFormat.getSchema(context);
				tupleRow = TypesUtils.createTuple(outSchema);
			} catch (IOException e) {
				throw new RuntimeException(e);
			} catch (ParseException e) {
				throw new RuntimeException(e);
			}
		}

		public static void main(String[] args) throws ParseException, IOException, 
		InterruptedException, ClassNotFoundException {
			Job job = new Job();
			job.setJobName("tableMRSample");
			Configuration conf = job.getConfiguration();
			conf.set("table.output.tfile.compression", "gz");

			// input settings
			job.setInputFormatClass(TableInputFormat.class);
			job.setOutputFormatClass(BasicTableOutputFormat.class);
			job.setMapperClass(TableMRSample2.MapClass.class);

			List<Path> paths = new ArrayList<Path>(2);
			Path p = new Path("/homes/chaow/mapredu/t1");
			System.out.println("path = " + p);
			paths.add(p);
			p = new Path("/homes/chaow/mapredu/t2");
			paths.add(p);

			TableInputFormat.setInputPaths(job, paths.toArray(new Path[2]));
			TableInputFormat.setProjection(job, "word");
			BasicTableOutputFormat.setOutputPath(job, new Path(
			"/homes/chaow/mapredu2/t1"));

			BasicTableOutputFormat.setSchema(job, "word:string");
			BasicTableOutputFormat.setStorageHint(job, "[word]");

			// set map-only job.
			job.setNumReduceTasks(0);
			// TODO: need to find a replacement
			//job.setNumMapTasks(2);
			job.submit();
		}
	}
}

Sort Columns

This MapReduce code snippet demonstrates how to sort Zebra columns.

/* user provides a Comparator Class for creating ZebraSortInfo */
    
public static final class MemcmpRawComparator implements
    RawComparator<Object>, Serializable {
      @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
          return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
      }

      @Override
      public int compare(Object o1, Object o2) {
        throw new RuntimeException("Object comparison not supported");
      }
}

…
… 

ZebraSchema zSchema = ZebraSchema.createZebraSchema(schemaString);
    
ZebraStorageHint zStorageHint = ZebraStorageHint.createZebraStorageHint(storageHintString);
    
/* Here we can use above Comparator Class to create a ZebraSortInfo object */
    
ZebraSortInfo zSortInfo = ZebraSortInfo.createZebraSortInfo(sortColumnsString, MemcmpRawComparator.class); 
    
BasicTableOutputFormat.setStorageInfo(jobContext, zSchema, zStorageHint, zSortInfo);

Drop Column Groups

This example illustrates how to drop column groups (CG) in Zebra tables. This is not a MapReduce program since the API for deleting columg groups is a simple Java API.

How to compile:

# this command requires pig.jar and latest zebra jar. 
$ javac -cp pig.jar:zebra-0.1.0.jar DropColumnGroupExample.java
 
# create a jar file
$ jar cvf dropcg.jar DropColumnGroupExample.class

How to run:

# run the example.
$ java -cp pig.jar:zebra-0.1.0.jar:dropcg.jar DropColumnGroupExample
 
# This creates a table under the directory "dropCGExample".
# If run the same command again, it fails since the destination
# directory still exists. Please remove the directory before running.

#This program takes one argument : directory for the example table
$ java -cp pig.jar:zebra-0.1.0.jar:dropcg.jar DropColumnGroupExample tableDir

Source code:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.io.TableInserter;
import org.apache.hadoop.zebra.io.TableScanner;
import org.apache.hadoop.zebra.types.TypesUtils;
import org.apache.pig.data.Tuple;

public class DropColumnGroupExample {

  private static String schema = "url, outcount:int, content:string";
  private static String storageHint = "[url, outcount]; [content] as data";  
  
  public static void main(String[] args) throws Exception {
    
    // Create configuration and process generaic Hadoop options like -D.
    Configuration conf = new Configuration();
    args = new GenericOptionsParser(conf, args).getRemainingArgs();
    
    Path path = new Path((args.length > 0 ? args[0] : "dropCGExample"));
    
    if (path.getFileSystem(conf).exists(path)) {
      throw new IOException("Destination path '" + path + "' already exists. " +
                            "Please remove the directory and run again.");
    }
    // create a simple table. 
    createTable(conf, path);
    
    //verify we can read the table.
    String content = getContentForUrl(path, conf, "http://www.yahoo.com/");
    if (content == null || !content.contains("yahoo")) {
      throw new IOException("Table read failed.");
    }
    
    /* Now drop the colum group named "data".
     * 
     * An exception is thrown if the CG could not be removed for some reason
     * (e.g. the user might not have enough permissions).
     */
    BasicTable.dropColumnGroup(path, conf, "data");
    
    // deleting an already deleted CG is a no-op.
    BasicTable.dropColumnGroup(path, conf, "data");
    
    // now try to read content column.
    // Note that NULL is returned for the third column.
    if (getContentForUrl(path, conf, "http://www.yahoo.com/") != null) {
      throw new IOException("Expected NULL for 3rd column");
    }
    // While reading this table, a warning is logged since the user
    // is trying to reading from a deleted CG.
    
    //clean up the test directory. 
    //for now we are not deleting the directory to let users check it.
    //BasicTable.drop(path, conf);
  }
  
  // Utility functions:
  
  /**
   * This is a simple table reader that iterates over the table to 
   * find a given url and returns its content.
   */
  private static String getContentForUrl(Path path, Configuration conf, 
                                         String url) throws Exception {
    
    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
    TableScanner scanner = reader.getScanner(null, true);
    Tuple row = TypesUtils.createTuple(3);
    
    try {
      while (!scanner.atEnd()) {
        scanner.getValue(row);
        if (url.equals(row.get(0))) {
          return (String)row.get(2);
        }
        scanner.advance();
      }
    } finally {
      scanner.close();
    }
    throw new IOException(url + " is not found");
  }
  
  private static void createTable(Configuration conf, Path path) 
                                  throws Exception {
    /* NOTE: This creates a table using BasicTable API. This is not 
     * a committed public API yet. Typically tables are created
     * in M/R or through PIG.
     */
    BasicTable.Writer writer = new BasicTable.Writer(path, schema, 
                                                     storageHint, conf);
    TableInserter inserter = writer.getInserter("part-0", true);
    Tuple rowTuple = TypesUtils.createTuple(3);
    BytesWritable emptyKey = new BytesWritable();
    
    // add two rows:
    rowTuple.set(0, "http://www.cnn.com/");
    rowTuple.set(1, 10);
    rowTuple.set(2, "content for cnn.com");
    
    inserter.insert(emptyKey, rowTuple);
    
    rowTuple.set(0, "http://www.yahoo.com/");
    rowTuple.set(1, 20);
    rowTuple.set(2, "content for yahoo.com");
    
    inserter.insert(emptyKey, rowTuple);
    
    inserter.close();
  }
}

Multiple Table Outputs

This code snippet illustrates how to work with multiple table outputs.

In main()
 
    String multiLocs = "/user/multi/us" + "," + "/user/multi/india" + "," + "/user/multi/japan";
    
    job.setOutputFormatClass(BasicTableOutputFormat.class);
    BasicTableOutputFormat.setMultipleOutputPaths(job, multiLocs);
    BasicTableOutputFormat.setZebraOutputPartitionClass(job, MultipleOutputsTest.OutputPartitionerClass.class);
 

Implement a partition class:
 
  static class OutputPartitionerClass implements ZebraOutputPartition {
       @Override
       public int getOutputPartition(BytesWritable key, Tuple value, String commaSeparatedLocs) {                        
 
          String reg = null;  
          try {  
            reg = (String)(value.get(0));
          } catch ( ExecException e) {
             // do something about e              
          }
 
          if(reg.equals("us")) return 0;
          if(reg.equals("india")) return 1;
          if(reg.equals("japan")) return 2;
                        
          return 0;
      } 
  }