Hadoop : Optimizing MapReduce Jobs

Hadoop MapReduce

I have had the opportunity to work on several projects that used Hadoop MapReduce to process data on a large scale. (Of course, large is a relative term. While I think that several 1000 MapReduce jobs, some of which including multi-trillion row joins, all which result in over 60 TB of output is large, Google, Facebook, and Amazon might disagree...)

While MapReduce can be tricky to grasp and harness, it is a very powerful paradigm for processing large amounts of data. It can also be one of the few times when you, as a Java developer, actually need to care about the low level details of the JVM such as memory management, garbage collection, and object instantiation!

My initial goal in writing this blog was to write about how to index into Lucene using MapReduce. As I started to work on that blog though, I realized there was enough introductory material needed to do that, that it warrented its own blog.

Before going into the details on how to optimize your MapReduce jobs, I want to say a quick word about the wrappers that are available and why you choose to write MapReduce jobs in the first place.

Apache Hive and Apache Pig are widely used tools that allows you to organize your HDFS data in such a way that you can write and execute simple scripts to manipulate your data without the need to write custom code. Apache Hive uses SQL, Apache Pig uses their own DSL. The scripts you write is automatically translated into custom MapReduce job bytecode and executed for you.

These tools definitely have their merits and are indispensible on an large scale project. In general the trade off between these tools and writing your own MapReduce jobs is between ease of use and performance / business logic. Apache Hive uses SQL, very familiar to most developers. It is easy to write, easy to maintain. And in some cases (where you have many many joins) it is the only real option you have. (One of the projects I worked on had a data join that resulted in over 168 MapReduce jobs. Not a code base I want to write and maintain by hand! A SQL script is vastly preferrable...)

On the other hand, SQL (or whatever DSL your tool uses), is not completely general purpose and does not intimately know your data like you do. There are some cases where the business logic you need to perform is so complex, or your particular data can be exploited in a vastly more performant way, where you would want to use a custom MapReduce job.

Of course, a hybrid of these tools and custom MapReduce is common. You can use a tool like Apache Hive to join all your data, and then dump the results into a bucket that is then processed by your custom MapReduce code.

With all that on the table, let's talk about the kinds of constraints your system is bound by as a context for the different ways we can optimize our MapReduce jobs.

Bounds

When writing your own MapReduce code, you need to think about how your system is designed and what is bounding your cluster. This will vary wildly based on your cluster design and your data, but typically, you are bound in one of three ways:

  • Network / IO
  • Memory
  • CPU

You will always be bound by something. Usually it is the Network / IO. Sometimes it is Memory. Rarely in my (limited) experience is it CPU. But you will always be bound by something.

Before you try to optimize anything you should figure out what is bounding your system and focus your optimization there. Optimizing your memory usage will not help you if your system is bottlenecked by your network. You will be hurrying up faster so you can wait.

Memory Management

The largest mindset change for me as a Java developer was learning to think rigourous about memory. I have a garbage collector right? Memory mangement is for those C guys and the device driver people!

But when you are executing a piece of code over a trillion times in less than hour though, it really really matters how you use memory. What do you allocate. When do you allocate. These are all critical decisions in MapReduce code that will see a lot of iterations!

The most common problem is wasted object instantiation. When you create an object, it consumes space on the heap even if you lose all references to it.

for (int i = 0; i < 10; ++i) {  
    StringBuilder buffer = new StringBuilder();
    buffer.append("Text");
    ...
}

How many StringBuilder objects did you create? One? Afraid not. When this loop completes, there will be ten StringBuilder objects sitting on the heap. Each iteration of the loop instantiated a new one, and even though the reference falls out of scope on the next iteration, that object remains on the heap until the garbage collector comes along to clean it up.

If you code is executed very frequently, the object turnover can rapidly fill the heap, forcing the garbage collector to run more and more often to keep the heap usable. Depending on your circumstances this can cause a measurable slow down.

Whenever possible, especially in code that will execute repeatedly, you should avoid object instantiation and instead attempt to reuse objects by clearing their state between uses. For example:

StringBuilder buffer = new StringBuilder();  
for (int i = 0; i < 10; ++i) {  
    buffer.setLength(0);
    buffer.append("Text");
    ...
}

In this case, only a single StringBuilder is created. At the start of each loop iteration, its internal buffer is cleared, allowing it to be safely resused by each loop iteration.

This is a simple case. In reality, it may require a significant amount of redesign to allow your domain objects to fit into this Clear / Reuse pattern. But in some cases, the performance gains can be significant. On two different projects, we increased total throughput by 5%-10% in one case, and over 50% in the other, just by reusing objects and avoiding "instantiation storms" that cause the garbage collector to run every few seconds.

The performance gains you see are directly related to the degree to which you are memory bound. In a MapReduce environment it is often hard to use a profiler, so you may need to experiment to see whether or not your particular job or set of jobs would benefit.

Sometimes optimizations can be more subtle. As a Java developer I am very much accustomed to using the Java Collections API without much thought to the internal implementation. (In most cases, that is a good thing!) But on one project I increased total performance by 2% merely by initiazing my HashSet differently.

I was constructed a very large List of HashSets. Each Set however, only contained a small number of items. For most, it was just a single item, but for some, two or three. HashSet by default however, is initialized with a backing Map that has a capacity of 16. Simply initializing the HashSets like this:

Set<String> set = new HashSet<String>(1);  

saved me enough memory overhead to improve performance measurably! Admittedly by a small measure... But it was a small gain for a very small investment.

Note: This is one case where I would recommend using a comment to indicate your thought process so that another developer coming behind you does not accidently erase your performance gain!

Custom Data Objects

Managing memory wisely is often a large part of improving performance for your MapReduce jobs. But another area that you can control involves creating custom data objects to be used in transferring data to and from jobs and between Mappers and Reducers.

By default, Hadoop provides you with a number of primitive Writable objects: Text, IntWritable, and so forth. These are used to define the interface between Mappers and Reducers. They can also be written to HDFS and read out again using another job.

While the primitives Hadoop provides are useful, they are somewhat limiting. If you have a sufficiently complex data structure it can be hard to pass it from job to job in a single Text object.

(One project I worked on had a job that packaged a data structure with over 30 fields into a pipe delimited string wrote it out as a Text object from the Mapper and then reparsed it in the Reducer. Efficient with the network, but the object creation was killing performance!)

Using your own data objects can be useful too if you are network bound, by giving you more fine grained control over what and how much data is passed, and packing it as efficiently as possible.

The basic interface that you must implement is Writable.

There are two methods:

public void readFields(DataInput input) throws IOException  
public void write(DataOutput output) throws IOException  

A simple interface, but one with a lot of flexibility, especially when you nest objects. A simple example:

import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;

public class Person implements Writable {  
    public Text firstName = new Text();
    public Text lastName = new Text();

    @Override
    public void readFields(DataInput input) throws IOException {
        firstName.readFields(input);
        lastName.readFields(input);
    }

    @Override
    public void write(DataOutput output) throws IOException  {
        firstName.write(output);
        lastName.write(output);
    }
}

The Person class has two fields: firstName and lastName. Both of these are Writable and have the same interface as their container.

The marshalling of the fields is very simple. When a Person object is written, it writes the firstName first and the lastName second. When a Person object is read, it reads the firstName first, and the lastName second. Since Text is a Writable too, you do not need to write the underlying code to serializing the string to a byte stream. You just dictate the order and number of your fields and the API does the rest!

This paradigm is very flexible, because as you see above, you can nest these objects inside one another.

Another more complex example:

import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
import java.util.HashSet;  
import java.util.Set;

import org.apache.commons.lang.StringUtils;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;

public class TextSet implements Writable {  
    public final Set<Text> values = new HashSet<Text>();

   [email protected]
    public void readFields(DataInput input) throws IOException {
        values.clear(); // Note the reuse!
        int size = input.readInt();
        for (int i = 0; i < size; ++i) {
            Text value = new Text();
            value.readFields(input);
            values.add(value);
        }
    }

   [email protected]
    public void write(DataOutput output) throws IOException  {
        output.writeInt(values.size());
        for (Text value : values) {
            value.write(output);
        }
    }

    public void add(String value) {
        if (StringUtils.isNotBlank(value)) {
            values.add(new Text(value));
        }
    }
}

Note: The source code can be downloaded from Github.

In this case we have a wrinkle. We want a set of Text objects, but the size can vary. Since we need to know how many to read off the byte stream when this object is reconstituted, we have to write out the size of the list first, so that it can be read first and used to read the elements of the set.

Understanding how to create your own data objects gives you a lot more control over the data you are passing around in your cluster and can help you tune bottlenecks.

There is much much more than could be written on performance and optimization. These two sections are just starting points into a very broad topic. But hopefully they will get you introduced to the right mind set and as you apply these thoughts to your code you will be inspired to make like minded optimizations.

Questions? Comments? Email me at: [email protected]!