Hadoop : Lucene Indexing with MapReduce

Not too long ago I had the opportunity to work on a project where we indexed a significant amount of data into Lucene. Despite the improvements to Lucene indexing speed over the years, we were finding that single threaded indexing rapidly becomes a bottle neck as your data creeps into the multiple TB. Our system could support reading from multiple indexes and we had a reasonably large Hadoop cluster at our disposal, so we decided to try doing the indexing in Lucene.

Note: This approach really only makes sense if you have a way of searching index shards such as using Apache Blur or Apache Solr. Otherwise the time required to merge the shards into a single index with Lucene is much more than the time you save by indexing in parallel.

In our particular domain, we first needed to group our records into sets that shared a common key. That entire group was then indexed into Lucene as a single Lucene Document.

This paradigm was a natural fit for Hadoop. Our Mappers could parse and emit each row of data with the key to group on, and the Reducer could accept each group and index it into its Lucene index. Thus, each Reducer emits a "shard" of the overall index, containing only the groups that that reducer processed.

Writing the Mapper was easy. It simply needed to read data from HDFS, parse each row into our data structure, and emit that row with its appropriate key.

Note: I have a prior blog about how to write custom MapReduce datastructures if you want to use that as a reference.

Our data structure had numerous fields, so at first we just wrote it something like this:

import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;

import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;

public class DataStructure implements Writable {  
    public Text field1 = new Text();
    public Text field2 = new Text();
    public Text field3 = new Text();
    ...

    @Override
    public void readFields(DataInput input) throws IOException {
        field1.readFields(input);
        field2.readFields(input);
        field3.readFields(input);
        ...
    }

    @Override
    public void write(DataOutput output) throws IOException  {
        field1.readFields(input);
        field2.readFields(input);
        field3.readFields(input);
        ...
    }
}

We then realized though that in the Reducer we wanted to aggregate multiple data structures together. Lucene would index all of the field1 values as a single Term in the Document, all of the field2 values, and so forth.

We wanted to avoid created a second data structure for the aggregate (not to mention take the performance hit of instantiating all new fields). So we changed the fields in our data structure from Text to TextSet. (Consult the prior blog or Github for the source code) and added the following method to TextSet:

public void addAll(Set<String> newValues) {  
    values.addAll(newValues);
}

And added a merge() method to our data structure that invoked addAll() on each of its fields.

This allowed us, in the Reducer to do the following for aggregation:

DataStructure aggregate = new DataStructure();  
for (DataStructure ds : reducerInput) {  
    aggregate.merge(ds);
}

Thus we avoided any additional object instantiations and are able to use a single datastructure throughout our code!

HDFS Syncing Local Scratch

The most immediate problem we then encountered was that MapReduce jobs read and write to HDFS. Unfortunately, Lucene cannot index directly to a HDFS file system (and since Lucene needs lots of mutating writes it would be vastly inefficient even if it could.)

The solution we came up with was to create a custom OutputFormat for Hadoop that would, on the sly, create a Lucene index on the local file system of the node that was executing that Reducer. Once the Reducer was committed, it would then copy that index into HDFS and erase the local copy.

Here is the output format:

import java.io.File;  
import java.io.IOException;

import org.apache.hadoop.mapreduce.OutputCommitter;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HdfsSyncingLocalFileOutputFormat<K, V> extends FileOutputFormat<K, V> {  
    public static final String PARAMETER_LOCAL_SCRATCH_PATH = "param.localScratchPath";

    private HdfsSyncingLocalFileOutputCommitter committer;

    @Override
    public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {

        if (committer == null) {
            // Create temporary local directory on the local file system as pass it to the committer.
            File localScratchPath = new File (context.getConfiguration().get(PARAMETER_LOCAL_SCRATCH_PATH) + File.separator + "scratch" + File.separator + context.getTaskAttemptID().toString() + File.separator);

            committer = new HdfsSyncingLocalFileOutputCommitter(localScratchPath, super.getOutputPath(context), context);
        }

        return committer;
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new RecordWriter<K, V>() {
            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException { }

            @Override
            public void write(K key, V val) throws IOException, InterruptedException { }
        };
    }
}

Note: The code can be downloaded from Github.

The format relies on an OutputCommitter to handle the actual syncing. It gets from Configuration the root directory on each node to store the Lucene index and passes it to the committer. The configuration parameter is set by the job driver class like so:

getConf().set(HdfsSyncingLocalFileOutputFormat.PARAMETER_LOCAL_SCRATCH_PATH, localScratchPath);  

The localScratchPath variable can be initialized from anywhere in your driver class. In our case it was read as a command line parameter.

The format needs to be set as the output format of the job like so:

job.setOutputFormat(HdfsSyncingLocalFileOutputFormat.class);  

Note: The write() and close() methods on the RecordWriter in the OutputFormat are empty, because no actual data is written to HDFS from the OutputFormat. The data is side loaded by the OutputCommitter.

Without further ado:

import java.io.File;  
import java.io.IOException;

import org.apache.commons.io.FileUtils;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.JobStatus.State;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

public class HdfsSyncingLocalFileOutputCommitter extends FileOutputCommitter {  
    public static final String PREFIX_LUCENE_INDEX_PART = "part-";

    private final FileSystem localFileSystem;
    private final File localScratchPath;
    private final FileSystem hdfsFileSystem;
    private final Path hdfsSyncPath;

    public HdfsSyncingLocalFileOutputCommitter(File localScratchPath, Path hdfsSyncPath, TaskAttemptContext context) throws IOException {
        super(hdfsSyncPath, context);
        Configuration conf = context.getConfiguration();
        this.localFileSystem = FileSystem.getLocal(conf);
        this.localScratchPath = localScratchPath;
        this.hdfsFileSystem = FileSystem.get(conf);
        this.hdfsSyncPath = hdfsSyncPath;
    }

    public File getLocalScratchPath() {
        return localScratchPath;
    }

    @Override
    public void abortJob(JobContext context, State state) throws IOException {
        deleteLocalScratchPath();
        super.abortJob(context, state);
    }

    @Override
    public void abortTask(TaskAttemptContext context) {
        deleteLocalScratchPath();
        super.abortTask(context);
    }

    @Override
    public void commitTask(TaskAttemptContext context) throws IOException {
        if (localScratchPath.exists()) {
            syncToHdfs(context);
        }
        super.commitTask(context);
        deleteLocalScratchPath();
    }

    @Override
    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
        return localScratchPath.exists() || super.needsTaskCommit(context);
    }

    private void syncToHdfs(TaskAttemptContext context) throws IOException {
        if (!hdfsFileSystem.mkdirs(hdfsSyncPath)) {
            throw new IOException(String.format("Cannot create HDFS directory at [%s] to sync Lucene index!", hdfsSyncPath));
        }
        // Create subdirectory in HDFS for the Lucene index part from this particular reducer.
        Path indexPartHdfsFilePath = new Path(hdfsSyncPath, PREFIX_LUCENE_INDEX_PART + context.getTaskAttemptID().getTaskID().getId());

        if (!hdfsFileSystem.mkdirs(indexPartHdfsFilePath)) {
            throw new IOException(String.format("Cannot create HDFS directory at [%s] to sync Lucene index!", indexPartHdfsFilePath));
        }

        for (File localFile : localScratchPath.listFiles()) {
            context.progress();
            Path localFilePath = new Path("file://" + localFile.getPath());
            if (!localFileSystem.exists(localFilePath)) {
                throw new IOException(String.format("Cannot find local file [%s]!", localFilePath));
            }
            Path hdfsFilePath = new Path(indexPartHdfsFilePath, localFile.getName());
            if (hdfsFileSystem.exists(hdfsFilePath)) {
                throw new IOException(String.format("HDFS file [%s] already exists!", hdfsFilePath));
            }
            hdfsFileSystem.copyFromLocalFile(localFilePath, hdfsFilePath);
        }
    }

    private void deleteLocalScratchPath() {
        try {
            FileUtils.deleteDirectory(localScratchPath);
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
}

Note: The code can be downloaded from Github.

A lot to process there! So we will take it one step at a time.

  1. The constructor creates two FileSystem references, one for HDFS and one for the local file system.
  2. The local scratch path is surfaced through getLocalScratchPath(). This is so that the Reducer can create a Lucene index in the correct place and ensure that it is synced correctly. We will see that code in a moment.
  3. The overridden abortJob() and abortTask() merely ensure the scratch directory is cleaned up if the Hadoop jobs terminates prematurely.
  4. The overridden commitTask() calls syncToHdfs() which copies all the files in the scratch path to a corresponding location in HDFS. The base directory in HDFS is determined by the output path of the job which is configured through the normals means when you run the MapReduce job.
  5. The real magic is in syncToHdfs(). It traverses all the files in the local scratch path and copies them to HDFS using the copyFromLocalFile() method call. Note: It also invokes context.progress() to ensure that Hadoop does not kill the job if the copy takes too long because it thinks it stalled.

Now the actual Reducer needs to access the committer, and use the local scratch path (the contents of which will be synced automatically) to write its Lucene index.

In the Reducer class:

private IndexWriter indexWriter;

@Override
public void setup(Context context) throws IOException, InterruptedException {  
    File localScratchPath = ((HdfsSyncingLocalFileOutputCommitter)context.getOutputCommitter()).getLocalScratchPath();
    if (!localScratchPath.mkdirs()) {
        throw new IOException(String.format("Cannot create [%s] on local file system!", localScratchPath.getPath()));
    }

    indexWriter = new IndexWriter(FSDirectory.open(localScratchPath), ...);
}

The setup method gets the local scratch path from the committer, and creates a Lucene IndexWriter over that directory.

The Reducer then adds Lucene Documents to the IndexWriter as normal in the reduce() method:

indexWriter.addDocument(doc);  

These Document~s are created from the aggregate data structures discussed above, with oneTerm` per field containing all the values from all the aggregated data structures.

Finally, the Reducer then closes the IndexWriter upon completion:

@Override
public void cleanup(Context context) throws IOException, InterruptedException {  
    indexWriter.close();
}

Under the covers the committer will then sync all the index files to HDFS and when all the Reducers are complete, each shard will be found in a "part-X" subdirectory in the HDFS output directory (where X is the Reducer number).

Questions? Comments? Email me at: [email protected]!