Time for action – running WordCount on EMR

We will now show you how to run this same JAR file on EMR. Remember, as always, that this costs money!

  1. Go to the AWS console at http://aws.amazon.com/console, sign in, and select S3.
  2. You'll need two buckets: one to hold the JAR file and another for the job output. You can use existing buckets or create new ones.
  3. Open the bucket where you will store the job file, click on Upload, and add the wc1.jar file created earlier.
  4. Return to the main console home page, and then go to the EMR portion of the console by selecting Elastic MapReduce.
  5. Click on the Create a New Job Flow button and you'll see a familiar screen as shown in the following screenshot:
    Time for action – running WordCount on EMR
  6. Previously, we used a sample application; to run our code, we need to perform different steps. Firstly, select the Run your own application radio button.
  7. In the Select a Job Type combobox, select Custom JAR.
  8. Click on the Continue button and you'll see a new form, as shown in the following screenshot:
    Time for action – running WordCount on EMR

We now specify the arguments to the job. Within our uploaded JAR file, our code—particularly the driver class—specifies aspects such as the Mapper and Reducer classes.

What we need to provide is the path to the JAR file and the input and output paths for the job. In the JAR Location field, put the location where you uploaded the JAR file. If the JAR file is called wc1.jar and you uploaded it into a bucket called mybucket, the path would be mybucket/wc1.jar.

In the JAR Arguments field, you need to enter the name of the main class and the input and output locations for the job. For files on S3, we can use URLs of the form s3://bucketname/objectname. Click on Continue and the familiar screen to specify the virtual machines for the job flow appears, as shown in the following screenshot:

Time for action – running WordCount on EMR

Now continue through the job flow setup and execution as we did in Chapter 2, Getting Hadoop Up and Running.

What just happened?

The important lesson here is that we can reuse the code written on and for a local Hadoop cluster in EMR. Also, besides these first few steps, the majority of the EMR console is the same regardless of the source of the job code to be executed.

Through the remainder of this chapter, we will not explicitly show code being executed on EMR and will instead focus more on the local cluster, because running a JAR file on EMR is very easy.

The pre-0.20 Java MapReduce API

Our preference in this book is for the 0.20 and above versions of MapReduce Java API, but we'll need to take a quick look at the older APIs for two reasons:

  1. Many online examples and other reference materials are written for the older APIs.
  2. Several areas within the MapReduce framework are not yet ported to the new API, and we will need to use the older APIs to explore them.

The older API's classes are found primarily in the org.apache.hadoop.mapred package.

The new API classes use concrete Mapper and Reducer classes, while the older API had this responsibility split across abstract classes and interfaces.

An implementation of a Mapper class will subclass the abstract MapReduceBase class and implement the Mapper interface, while a custom Reducer class will subclass the same MapReduceBase abstract class but implement the Reducer interface.

We'll not explore MapReduceBase in much detail as its functionality deals with job setup and configuration, which aren't really core to understanding the MapReduce model. But the interfaces of pre-0.20 Mapper and Reducer are worth showing:

public interface Mapper<K1, V1, K2, V2>
{
void map( K1 key, V1 value, OutputCollector< K2, V2> output, Reporter reporter) throws IOException ;
}

public interface Reducer<K2, V2, K3, V3>
{
void reduce( K2 key, Iterator<V2> values, 
OutputCollector<K3, V3> output, Reporter reporter) 
throws IOException ;
}

There are a few points to understand here:

  • The generic parameters to the OutputCollector class show more explicitly how the result of the methods is presented as output.
  • The old API used the OutputCollector class for this purpose, and the Reporter class to write status and metrics information to the Hadoop framework. The 0.20 API combines these responsibilities in the Context class.
  • The Reducer interface uses an Iterator object instead of an Iterable object; this was changed as the latter works with the Java for each syntax and makes for cleaner code.
  • Neither the map nor the reduce method could throw InterruptedException in the old API.

As you can see, the changes between the APIs alter how MapReduce programs are written but don't change the purpose or responsibilities of mappers or reducers. Don't feel obliged to become an expert in both APIs unless you need to; familiarity with either should allow you to follow the rest of this book.

Hadoop-provided mapper and reducer implementations

We don't always have to write our own Mapper and Reducer classes from scratch. Hadoop provides several common Mapper and Reducer implementations that can be used in our jobs. If we don't override any of the methods in the Mapper and Reducer classes in the new API, the default implementations are the identity Mapper and Reducer classes, which simply output the input unchanged.

Note that more such prewritten Mapper and Reducer implementations may be added over time, and currently the new API does not have as many as the older one.

The mappers are found at org.apache.hadoop.mapreduce.lib.mapper, and include the following:

  • InverseMapper: This outputs (value, key)
  • TokenCounterMapper: This counts the number of discrete tokens in each line of input

The reducers are found at org.apache.hadoop.mapreduce.lib.reduce, and currently include the following:

  • IntSumReducer: This outputs the sum of the list of integer values per key
  • LongSumReducer: This outputs the sum of the list of long values per key