Apr 9, 2011

Controlling Hadoop MapReduce Job recursion

This post is related to the previous post.

Sometimes you coming along problems that need to be solved in a recursive manner. For example the graph exploration algorithm in my previous post.
You have to chain the jobs and let the next job work on the output of the previous job. And of course you need a breaking condition. This could either be a fixed limit of "how many recursions it should do" or "how many recursion it really does".
Let me focus at the second breaking condition along with my graph exploration example.

Counter
First off you should know that in Hadoop you have counters, you may see them after a job ran or in the Webinterface of the Jobtracker. "Famous" counters are the "Map input records" or the "Reduce output records".
The best of all is that we can setup our own counters, just with the use of enums.

How to setup Counter?
The simplest approach is to just define an enum like this:

public enum UpdateCounter {
  UPDATED
 }

Now you can manipulate the counter using:

context.getCounter(UpdateCounter.UPDATED).increment(1);

"context" is the context object you get from your mapper or your reducer.
This line will obviously increment your update counter by 1.

How to fetch the counter?

This is as easy as setting up an enum. You are submitting a job like this:
Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJobName("Graph explorer");

  job.setMapperClass(DatasetImporter.class);
  job.setReducerClass(ExplorationReducer.class);
  // leave out the stuff with paths etc.
  job.waitForCompletion(true);

Be sure that the job has finished, using waitForCompletion is recommended. Querying the counter during runtime can end in strange results ;)
You can access your counter like this:
long counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)
    .getValue();

How to get the recursion running?

Now we know how to get the counter. Now setting up a recursion is quite simple. The only thing that you should watch for is already existing paths from older job runs.
Look at this snippet:
// variable to keep track of the recursion depth
int depth = 0;
// counter from the previous running import job
long counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)
    .getValue();

  depth++;
  while (counter > 0) {
   // reuse the conf reference with a fresh object
   conf = new Configuration();
   // set the depth into the configuration
   conf.set("recursion.depth", depth + "");
   job = new Job(conf);
   job.setJobName("Graph explorer " + depth);

   job.setMapperClass(ExplorationMapper.class);
   job.setReducerClass(ExplorationReducer.class);
   job.setJarByClass(ExplorationMapper.class);
   // always work on the path of the previous depth
   in = new Path("files/graph-exploration/depth_" + (depth - 1) + "/");
   out = new Path("files/graph-exploration/depth_" + depth);

   SequenceFileInputFormat.addInputPath(job, in);
   // delete the outputpath if already exists
   if (fs.exists(out))
    fs.delete(out, true);

   SequenceFileOutputFormat.setOutputPath(job, out);
   job.setInputFormatClass(SequenceFileInputFormat.class);
   job.setOutputFormatClass(SequenceFileOutputFormat.class);
   job.setOutputKeyClass(LongWritable.class);
   job.setOutputValueClass(VertexWritable.class);
   // wait for completion and update the counter
   job.waitForCompletion(true);
   depth++;
   counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)
     .getValue();
  }


Note that if you never incremented your counter it will be always 0. Or it could be null of you never used it in your mapper or reducer.

Full sourcecodes can always be found here:
http://code.google.com/p/hama-shortest-paths/

25 comments:

  1. Your blog posts are very helpful for me.
    One question. What if we wanted to use TextInputFormat?
    KeyValueTextInputFormat is the one that is designed for "iterative" mapreduce jobs. But, unfortunately it is no longer supported in the new API.

    ReplyDelete
  2. Hi,

    you can use the TextInputFormat as well, but you have to make sure that you output text as well. So you have to use the TextOutputFormat, which should exactly output the formatting like your previous input.

    If you can't archieve that formatting, you should better add a preprocessing job and stick with SequenceFiles.

    ReplyDelete
  3. Hi,

    Your job is so helpful.
    Is there a way to cache the output of the job for the next job, Skip the HHD writing or just Skip HDD input the reading process.

    Regards,

    ReplyDelete
  4. Hi,

    if you are in distributed mode, the distribution of the splits are "completely" random. So Hadoop itself won't benefit from caching.

    If you are searching for a full cached solution, you should take a look into Spark.

    http://www.spark-project.org

    Or you can take a look at Apache Hama, there you can use caching very well and the iteration is much faster than Hadoop MapReduce.

    ReplyDelete
  5. This comment has been removed by a blog administrator.

    ReplyDelete
  6. Where is the counter incremented in the code and when do we get out of the loop?

    ReplyDelete
  7. The counter is incremented in the mapreduce job that is run between the while loop. Which has the breaking condition: (counter > 0)

    ReplyDelete
  8. Hi, thanks for this tutorial, it's really helpful.
    But what if some reducer tasks increment the counter but some do not?

    ReplyDelete
  9. Hey,

    it checks the counters after the job run, so it takes the sum of all reducers of a single job.
    If you have jobs where all the reducers do not increment the counter, well then this won't work and you have to find another metric.

    ReplyDelete
  10. You mean if some reducer increment but some do not, this mechanism for recursive is not suitable?

    ReplyDelete
  11. Oh! I got the idea of your example!
    All I have to do is to make sure that the non-converged reducer tasks will increment the counter, and check if counter is still > 0.
    Thanks very much again!

    ReplyDelete
  12. hi.. Is there any output type in Hadoop which can output a graph structure? or how are graphs in general implemented in hadoop?

    ReplyDelete
  13. No there is no built in structure within Hadoop.
    Graphs are somewhat abstract, so you actually can express it with an adjacency list by using a key and a list of keys as value.

    In Hadoop generics spoken you would have something like this:

    < Text, ArrayWritable >

    , whereas ArrayWritable is consisting of keys of Text. This is then your adjacency list. Now you can run fancy graph algorithms on it ;)

    ReplyDelete
  14. hello,I want to ask you a question,
    >>>i know in the reduce() fuction,if vertex is updated(this vertex is set activated too) the counter is increment(1);if vertex is not updated(this vertex will be set to no-activated) the counter will noe increment.
    >>>in the main() function,you use while(count>0) as loop condition.
    -----my question is : in the begining itrations,in the reduce() function the counter will incremnet,and in the main() function,while will be executed。and if in one iteration, in the reduce() function,counter is not incremented,this means that no vertex is updated,so the while() will stop,but the while(counter>0) is also true(i have not see the decrement of counter),loop will continue。
    i want to ask,how the counter decrease ?so the loop condition will stop.

    ReplyDelete
  15. Hi the counter never decreases, but in every MapReduce job this counter starts from 0. So if the reducer not increments the counter, it will return 0.

    ReplyDelete
  16. Hi, I want to ask a question not related directly to this article .. I am new to Hadoop and wanted to ask how I can call Hadoop-based jobs externally from a machine not in the hadoop cluster (not a namenode or a datanode).. is just including hadoop libraries/jars and setting the configuration object with hdfs and jobtracker urls will do the job??

    ReplyDelete
  17. Hi, exactly like you told. Or if you more the XML kind of guy you can copy the hdfs-site.xml/core-site.xml of your cluster that contains this information to the other server and use conf.addResource(...). The jars should reside on both sides.

    ReplyDelete
    Replies
    1. I couldn't get what you mean by "The jars should reside on both sides." .. for example if I am running an example that will call mahout code on my machine and I want it to start a job on an external hadoop cluster (I don't have have hadoop installed on my machine ) .. so do you mean I have to copy the jar file for my program to the hadoop namenode or what do you mean by both sides?
      Thanks a lot & best regards

      Delete
    2. The Mahout jars must reside on your classpath and the jars that are in Hadoop's lib folder should be there too.

      Delete
    3. Thanks Thomas .. that's what I did and it is running locally with the configuration set to localhost and the ports used with hadoop installed on my machine but I was afraid with the actual setup this won't work by just replacing the url with the url of the namenode?

      Delete
    4. That will work yes, then it writes/reads from the HDFS you configured.

      Delete
  18. Hi

    Thanks for the info you have shared. I have couple of quick questions:

    1. Is the enum declared in Mapper/Reducer Class - since its their count which we need to monitor?
    2. context. increment again called in mapper or reducer?
    3. I am implementing this in a reducer, so even if one reducer instance runs, it will increment and exit the loop?

    Thanks
    M

    ReplyDelete
  19. Hi M,

    to 1: The enum can be declared anywhere, but it must be accessible from the Controller class that submits the job, as well as the Mapper/Reducer class.

    to 2: Exactly, the counters are incremented in the mapper and/or in the reducer

    to 3: yes.

    ReplyDelete
  20. This comment has been removed by the author.

    ReplyDelete