Apr 23, 2011

Profiling Apache Hama BSP's

Hi all,

sometimes you aren't aware of why your application is performing slow or slower than before and you need a profiler to know in which methods the most time is spent.

Which profiler should I use?

That depends on your needs, I prefer YourKit. It has a fairly good thread monitoring and cool inspectations. Most of the time I'm searching for memory leaks or why a thread has deadlocked, so this profiler is a very helpful tool for these cases.

This time we are focussing on the profiling of a BSP method / class. Or: How you can profile your own Hama application inside of a cluster.

In my case I'm using VM's with Ubuntu, and Windows 7 on my host machine. This might be a common setup. YourKit itself is written in Java and it can be executed on Unix as well.
We are going to start the application with the profiler agent and tunnel the profiler port with Putty to our host machine: in my case Windows.

Let's go!

  1. Download Yourkit here
  2. Install Yourkit and Putty (if you're running windows, otherwise you can just use your shell) on your host machine.
  3. Download Yourkit for the server machine and untar it
  4. Look for the shared object (libyjpagent.so) in the yjp-9.5.5/bin/linux-x86-32 directory. Note: Make sure you pick the right version for your JVM. 32 bit if you aren't sure, 64 bit else.
  5. Copy it under a path that is more readable like "/usr/local/yk/libyjpagent.so"
Now is YourKit setup properly and we have to set the agent to the Hama configuration on the BSPMaster.
Look into your "hama-site.xml" and if the property key "bsp.child.java.opts" isn't set yet, you have to override it with that:

 <property>
    <name>bsp.child.java.opts</name>
    <value>-Xmx1024m -agentpath:/usr/local/yk/libyjpagent.so</value>
</property>
Sure you'll have to replace the agentpath with the path you copy your shared object to. Make sure you have chown'd it with the user that runs Hama. Note: The heap limit is just a copy of the default value in hama-default.xml. Anything configured in hama-default.xml will be overridden by the hama-site.xml configuration parameters.

Now you have to make sure that this libyjpagent.so is on EVERY machine on your cluster. Hama currently can't just let a few tasks start with the profiler (Like Hadoop), you'll have to provide this for every groom.

If this is working, you can just start DFS, start BSPMaster and the agent will be started within. You can see this in your groom's log by an entry like this:
2011-04-22 17:45:53,104 INFO org.apache.hama.bsp.TaskRunner: attempt_201104221744_0001_000001_0 [YourKit Java Profiler 9.5.5] Loaded. Log file: /home/hadoop/.yjp/log/1864.log
Don't forget to take a look into the log file to determine on which port the agent is broadcasting information. This is necassary for the remote connection of Putty.
In every cases yourkit will test your ports by starting with 10000, if its blocked, it picks 10001 and so on.
In the log you should see something equal to this:
[YourKit Java Profiler 9.5.5] [0.539]: Profiler agent is listening on port 10001
Ok now we have to setup our Putty on our Windows system.
Enter all your information on the Session tab: like the hostname of the BSPMaster. In my case this is thomasjungblut@192.168.56.102 and port 22.

Now switch to the Connection->SSH->Tunnels tab and add a new tunnel.
For port 10001 it will be:
Sourceport 10001, Destination raynor:10001, Local and Auto.
The destination host:port pair is the host:port of your BSPMaster.

Now open up the session, login and the profiling data will be tunneled over this port.

Inside of YourKit you'll have to click: "Connect to remote application" and enter localhost:10001 (or the other port if YourKit gave you another).
Congratulations, you connected to your running application inside of your cluster.

Just a tip if you are profiling Hama. You'll have to deactivate the apache.org filter under Settings->Filter. Otherwise every method call will be filtered. BUT if you just want your own application to profile, this is just noise of the framework. So you can filter it.

Good luck and have fun!

Apr 17, 2011

Apache Hama network throughput

Hey I've read about a new task: Improve output of RandBench example
This is basically about some more output. But this actually caused me to measure the throughput of my virtual Hama cluster.

My cluster

Some months ago I decided to get rid of my real hardware server and just go along with one massive machine that will host multiple virtual machines. I have now two different virtual hosts (virtualbox ftw) with 2 cores and 1gig of memory each.
The hosts are connected by a virtual Intel PRO/1000 MT Desktop card.
On server is configured as a namenode, datanode, bspmaster and groom. The other machine is just a datanode and a groom.

RandBench

Randbench is just a simple looping utility that will send a constant size data packet randomly across your cluster.
If you are familiar with BSP you know that you can put messages into a local queue, which is nothing else than storing the packet into the local random access memory. This doesn't really need to be benchmarked. So we can assume that if we run this on a two host cluster there is a 50% probability that the message will be send to the other host over the network. Every host executing its BSP method will actually twice the data send.
On a two host cluster you can actually assume that the size of the data we are giving at startup will be send to the other host.

Startup Options

In my little test we are going to use these parameters: bench 524288 100 2000.
The first number represents the message size in bytes (so this is 512kb), the second argument telling the benchmark how often we are sending this message in a superstep. And the last argument is the number of supersteps.
This will sum up to a total size of 97,65625gb. Note that the benchmark is also sending a string as the message tag which tells where the message comes from and goes to. This will cause the data send to be a bit higher.
In my case this is 30 bytes ("karrigan:61000 to raynor:61000", yea my vhosts are starcraft characters:P) for each messages that has been sent. So 100 * 2k supersteps will result in 200000 messages with 30 bytes each will sum up to 6000000 bytes, that is roughly 5,7 mb. This won't be a great deal compared to the 97g of byte arrays.

Result

Afterwards the benchmark tells you how much time this test took:
11/04/17 13:07:57 INFO bsp.BSPJobClient: The total number of supersteps: 2000
Job Finished in 1502.107 seconds
Now we can calculate the average networking throughput, because we know how much data we've sent in a delta of time.
97gb / 1502.107s = 66mb/s

Note that...
  • these timings are VM based so I will disclaim a conclusion here
  • the data send is a dummy data array filled with zeros
  • we can therefore greatly run into caching

Apr 16, 2011

PageRank with Apache Hama

Hey,

some days ago I read in the Hama-dev mailing list about the Nutch project that want a PageRank implementation:

Hi all,
Anyone interested in PageRank and collaborating w/ Nutch project? :-)
Source

So I thought, that I can do this. I already implemented PageRank with MapReduce. Why don't we go for a BSP version?:D
This is basically what this blog post is about.

Let's make a few assumptions:
  • We have an adjacency list (web-graphs are sparse) of webpages and their unweighted edges
  • A working partitioning like here. (You must not implement it, but should know how it works)
  • We have read the Pregel Paper (or at least the summary)
  • Familiarity with PageRank
Web Graph Layout

This is the adjacency list. On the leftmost side is the vertexID of the webpage, followed by the outlinks that are seperated by tabs.

1 2 3
2
3 1 2 5
4 5 6
5 4 6
6 4
7 2 4

This will be pretty printed a graph like this:

I have colored them by the incoming links, the vertex with the most in-links is the brightest, the vertex with few or no in-links is more darker.
We will see that vertex 2 should get a high PageRank, 4, 5 and 6 should get a more or less equal rank and so on.

Short summary of the algorithm

I am now referring to the Google Pregel paper. At first we need a modelling class that will represent a vertex and holds its own tentative PageRank. In my case we are storing the tentative PageRank along with the id of a vertex in a HashMap.
In the first superstep we set the tentative PageRank to 1 / n. Where n is the number of vertices in the whole graph.
In each of the steps we are going to send for every vertex its PageRank, devided by the number of its outgoing edges, to all adjacent vertices in the graph.
So from the second step we are receiving messages with a tentative PageRank of a vertex that is adjacent. Now we are summing up these messages for each vertex "i" and using this formula:
P(i) = 0.15/NumVertices() + 0.85 * sum
This is the new tentative PageRank for a vertex "i".
I'm not sure whether NumVertices() returns the number of all vertices or just the number of adjacent vertices. I'll assume that this is the count of all vertices in the graph, this would then be a constant. Now adding the damping factor multiplying this by the sum of the received tentatives of the adjacent vertices.

We are looping these steps until convergence to a given error will be archived. This error is just a sum of absoluting the difference between the old tentative PageRank and the new one of each vertex.
Or we can break if we are reaching a iteration that is high enough.

We are storing the old PageRank as a copy of the current PageRank (simple HashMap).
The error will thus be a local variable that we are going to sync with a master task- he will average them and broadcasts it back to all the slaves.

Code

Let's look at the fields we need:

private static int MAX_ITERATIONS = 30;
 // alpha is 0.15/NumVertices()
 private static double ALPHA;
 private static int numOfVertices;
 private static double DAMPING_FACTOR = 0.85;
 // this is the error we want to archieve
 private static double EPSILON = 0.001;

        HashMap<Integer, List<Integer>> adjacencyList = new HashMap<Integer, List<Integer>>();
 // normally this is stored by a vertex, but I don't want to create a new
 // model for it
 HashMap<Integer, Double> tentativePagerank = new HashMap<Integer, Double>();
 // backup of the last pagerank to determine the error
 HashMap<Integer, Double> lastTentativePagerank = new HashMap<Integer, Double>();

Keep in mind that every task just has a subgraph of the graph. So these structures will hold just a chunk of PageRank.

Let's get into the init phase of the BSP:

@Override
 public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
   InterruptedException {
  fs = FileSystem.get(getConf());
  String master = conf.get(MASTER_TASK);
  // setup the datasets
  adjacencyList = mapAdjacencyList(getConf(), peer);
  // init the pageranks to 1/n where n is the number of all vertices
  for (int vertexId : adjacencyList.keySet())
   tentativePagerank
     .put(vertexId, Double.valueOf(1.0 / numOfVertices));

...

Like we said, we are reading the data chunk from HDFS and going to set the tentative pagerank to 1/n.

Main Loop

// while the error not converges against epsilon do the pagerank stuff
  double error = 1.0;
  int iteration = 0;
  // if MAX_ITERATIONS are set to 0, ignore the iterations and just go
  // with the error
  while ((MAX_ITERATIONS > 0 && iteration < MAX_ITERATIONS)
    || error >= EPSILON) {

   peer.sync();

   if (iteration >= 1) {
    // copy the old pagerank to the backup
    copyTentativePageRankToBackup();
    // sum up all incoming messages for a vertex
    HashMap<Integer, Double> sumMap = new HashMap<Integer, Double>();
    IntegerDoubleMessage msg = null;
    while ((msg = (IntegerDoubleMessage) peer.getCurrentMessage()) != null) {
     if (!sumMap.containsKey(msg.getTag())) {
      sumMap.put(msg.getTag(), msg.getData());
     } else {
      sumMap.put(msg.getTag(),
        msg.getData() + sumMap.get(msg.getTag()));
     }
    }
    // pregel formula:
    // ALPHA = 0.15 / NumVertices()
    // P(i) = ALPHA + 0.85 * sum
    for (Entry<Integer, Double> entry : sumMap.entrySet()) {
     tentativePagerank.put(entry.getKey(),
       ALPHA + (entry.getValue() * DAMPING_FACTOR));
    }

    // determine the error and send this to the master
    double err = determineError();
    error = broadcastError(peer, master, err);
   }
   // in every step send the tentative pagerank of a vertex to its
   // adjacent vertices
   for (int vertexId : adjacencyList.keySet())
    sendMessageToNeighbors(peer, vertexId);

   iteration++;
  }

I guess this is self explaining. The function broadcastError() will send the determined error to a master task, he will average all incoming errors and broadcasts this back to the slaves (similar to aggregators in the Pregel paper).
Let's take a quick look at the determineError() function:

private double determineError() {
  double error = 0.0;
  for (Entry<Integer, Double> entry : tentativePagerank.entrySet()) {
   error += Math.abs(lastTentativePagerank.get(entry.getKey())
     - entry.getValue());
  }
  return error;
 }

Like I described in the summary we are just summing up the errors that is a difference between the old and the new rank.

Output

Finally we are able to run this and receive a fraction between 0 and 1 that will represent the PageRank of each site.
I was running this with a convergence error of 0.000001 and a damping factor of 0.85. This took about 17 iterations.

------------------- RESULTS --------------------
2 | 0.33983048615390526
4 | 0.21342628110369394
6 | 0.20495452025114747
5 | 0.1268811487940641
3 | 0.0425036157080356
1 | 0.0425036157080356
7 | 0.02990033228111791

This will result in about 1.0 in the sum of all ranks, which is correct.
Note that the output if you are running this job is not guaranteed to be sorted, I did this to give you a better view.

We'll see that we were quite good in our guessing of the PageRank in the beginning.

I think this is all, if you are interested in testing / running this- feel free to do so.
This class and test data is located in my Summer of Code repository under: http://code.google.com/p/hama-shortest-paths/
The classes name is de.jungblut.hama.bsp.PageRank.
Just execute the main method, it will run a local multithreaded BSP on your machine.

Star this project and vote for my GSoC task. :)

Thank you.

Apr 13, 2011

HTC Sensation

Hey,

my first not code-related blog post :D
I've read that HTC launches the new Sensation (codename pyramid) here in Germany in Mai. I am very excited about this phone, I have to replace my Iphone 3GS because it sucks :D

I believe that I can hold it in my hands in June (theres my birthday!) and I will code for android. Like android apps that can help people with their daily problems. Actually I have a developer account so I could've purchased a dev-phone. But they are not cheap- 500$ for a Nexus?:D Not really.

Starting in June I'll provide some tutorials on Android and Multithreading. Multithreading is actually my love and as you already heard: HTC Sensation has a dual core CPU!
Let's get the most out of android ;)

Apr 12, 2011

Apache Hama Partitioning

Hello there!

I was a bit busy today after I've read the Google Pregel paper. It described quite good "how to think like a vertex" and how they are partition data across their cluster.
I wanted to share this new knowledge with you and I've shortly rewritten my Graph Exploration algorithm. It can be found as the post below or just following this link.

Just a short summary of what we are going to change:
  • Partition the vertices across the servers.
  • Messages that are related to a specific vertex will just go to the server the vertex was partitioned to.
  • The master task just keeps track of when the task is finished (no updates were made).
  • Each task has its own output.
Partitioning
I really felt dumb today after I read the Pregel paper. They are just partitioning the data with a hashfunction. And I was like running several MapReduce jobs to determine the optimal distribution of an adjacency list. So scratch that, we just use a simple HashFunction that is none other than returning the id of the vertex. If you are having for example Vertex IDs' from 0-100 and you have to distribute it over 5 machines, just loop to 100 and modulo it with the 5 machines. Life can be so simple. sic!

How to distribute?
First off we share the common FileSystem across the servers. The idea is that every server get's its own SequenceFile where the data is stored in. Let's rush into the code that parses my AdjacencyList textfile into the SequenceFiles for the servers:

public static List<Path> partitionAdjacencyTextFile(int sizeOfCluster,
   Map<String, String> groomNames, Path fileToPartition,
   Configuration conf) throws IOException {
  // setup the paths where the grooms can find their input
  List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
  List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
    sizeOfCluster);
  FileSystem fs = FileSystem.get(conf);
  for (Entry<String, String> entry : groomNames.entrySet()) {
   partPaths.add(new Path(fileToPartition.getParent().toString()
     + Path.SEPARATOR + "parted" + Path.SEPARATOR
     + entry.getValue()));
  }
  // create a seq writer for that
  for (Path p : partPaths) {
   fs.delete(p, true);
   writers.add(SequenceFile.createWriter(fs, conf, p,
     IntWritable.class, IntWritable.class));
  }

  // parse our file
  FSDataInputStream data = fs.open(fileToPartition);
  BufferedReader br = new BufferedReader(new InputStreamReader(data));
  String line = null;
  while ((line = br.readLine()) != null) {
   String[] arr = line.split("\t");
   int vId = Integer.parseInt(arr[0]);
   LinkedList<Integer> list = new LinkedList<Integer>();
   for (int i = 0; i < arr.length; i++) {
    list.add(Integer.parseInt(arr[i]));
   }

   int mod = vId % sizeOfCluster;
   System.out.println(vId + " goes to " + partPaths.get(mod));
   for (int value : list) {
    writers.get(mod).append(new IntWritable(vId),
      new IntWritable(value));
   }

  }
  data.close();

  for (SequenceFile.Writer w : writers)
   w.close();

  return partPaths;
 }


Basically we are creating a SequenceFile for each Groom and writing the data with the modulo function into the SequenceFiles. Note that the names of the SequenceFiles are related to the name of the peer. That is because we can simply let each peer find its partition.

How to pass messages between vertices using partitioning?
If we know what kind of partition we used, this is very simple. Look at the layered send method.

private void send(BSPPeerProtocol peer, BSPMessage msg) throws IOException {
  int mod = ((Integer) msg.getTag()) % peer.getAllPeerNames().length;
  peer.send(peer.getAllPeerNames()[mod], msg);
 }

The only requirement is, that the indices of peer.getAllPeerNames() are the same we used in the partitioning phase, otherwise it will result in strange behaviour.

With the help of these two methods we are now able to make the main algorithm use less master-slave communication and therefore use a more collective communication.
But keep in mind that we have to use the master-slave communication to keep track of the main loop. The problem behind it is that if a slave breaks its calculation because there can't be more updates made in a superstep, the other peers will deadlock in the next sync phase because one slave already has finished.
So we have to sync the updates with a master task and broadcast whether we can break the main loop altogether (if no task can update anymore) or we need another superstep.

This is actually a bit hacky. In the Pregel paper it is called "voteToHalt". Have a look at my implementation of the same:
private boolean voteForHalt(BSPPeerProtocol peer, String master)
   throws IOException, KeeperException, InterruptedException {
  peer.send(master, new IntegerMessage("", activeList.size()));
  peer.sync();
  if (peer.getPeerName().equals(master)) {
   boolean halt = true;
   IntegerMessage message;
   while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
    message = (IntegerMessage) peer.getCurrentMessage();
    if (message.getData() != 0) {
     halt = false;
     break;
    }
   }
   peer.clear();
   for (String name : peer.getAllPeerNames()) {
    peer.send(name, new BooleanMessage("", halt));
   }
  }

  peer.sync();
  BooleanMessage message = (BooleanMessage) peer.getCurrentMessage();
  if (message.getData() == true) {
   return false;
  } else {
   return true;
  }
 }

A bit more human readable it says: Every task sends how many vertices are stored in the activeList and a master decides whether to break or not. This decision is broadcasted again and a false return value will break the outer running loop.

Keep in mind that this is not very very very optimal.

Output
Now we have multiple files in HDFS that can now be read or merged to get the classified components of the graph.

This snippet is quite generic if you want to use it in your application: feel free to copy and paste ;)

private static void printOutput(FileSystem fs, Configuration conf)
   throws IOException {
  System.out.println("-------------------- RESULTS --------------------");
  FileStatus[] stati = fs.listStatus(new Path(conf.get("out.path")
    + Path.SEPARATOR + "temp"));
  for (FileStatus status : stati) {
   Path path = status.getPath();
   SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
   IntWritable key = new IntWritable();
   IntWritable value = new IntWritable();
   while (reader.next(key, value)) {
    System.out.println(key.get() + " | " + value.get());
   }
   reader.close();
  }
 }

As you can see, it just fetches the parent directory of the outputs' and loops over the files while writing the content to STDOUT.
Gladly this helped a guy on Stackoverflow today. And helped me to get my 500 reputation :)

Finally it gaves the same output as the other algorithms:
-------------------- RESULTS --------------------
0 | 0
1 | 1
2 | 2
3 | 2
4 | 1
5 | 2
6 | 2
7 | 1
8 | 2
9 | 0

The new class is called: de.jungblut.hama.graph.BSPGraphExplorationPartitioned.
Just execute the main method, it will run a local multithreaded BSP on your machine.
This class is located in my Summer of Code repository under: http://code.google.com/p/hama-shortest-paths/

Star this project or vote for my GSoC task.

The next week I'll go for a comparision between these three implementations: partitioned-BSP, BSP and MapReduce.
But I guess before it I focus at PageRank. Some guys of Apache Nutch wanted a distributed PageRank in BSP, I'll go for a working example next time.
The Pregel paper said something about 15 lines of code. Looks like a short task ;)

[UPDATE]

If you are interested in a more faster way to partition, check out the new blog post here:
http://codingwiththomas.blogspot.com/2011/08/apache-hama-partitioning-improved.html

bye!

Apr 10, 2011

Graph Exploration with Apache Hama

Hey guys,
I've been busy for a tiny bit of time, but I finished the graph exploration algorithm with Apache Hama recently. This post is about the BSP portation of this post.
So I already told in this post how BSP basically works. Now I'm going to tell you what you can do with it in terms of graph exploration. Last post I did this with MapReduce, so let's go and get into Hama!

As of today this is merely outdated, since Hama 0.5.0, you can use the Pregel-like Graph API and run the connected components example shipped. This will do everything this code below did a year ago.
I just keep that as a part of history, some of the statements still hold true, some parts needs to be updated though.

BSP Class
Like coding a mapper you'll need to inherit from a baseclass. In Hama this is the BSP class, it is abstract and implements a interface called BSPInterface. BSPInterface has a method that you are going to implement for yourself:
public void bsp(BSPPeerProtocol bspPeer)

What is BSPPeerProtocol and how is it getting executed?
The BSPPeerProtocol is nothing else than a reference to your local running Groom server. A Groom is similar to Hadoop's tasktracker. This is also a hint on how your own BSP get's executed. Hama will launch several tasks inside a Groom and create a new instance of your BSP class per task.

Syncing
In BSP it is necessary to synchronize the Grooms' in order to introduce the messaging phase described here. You can simply call the sync() method from your BSPPeerProtocol reference.

Letting code be executed on only one Server / Groom / Task
This is actually pretty easy, you could take a look at the PiEstimator example.

Okay I guess this is enough, let's step into the code. The thoughts are the same like in the MapReduce example:
  1. Initialize the datasets, we have several maps' storing active vertices, the adjacency list and a map which holds the minimal vertices for each vertex.
  2. In the first iteration, send a message for each adjacent of a vertex containing the id of the adjacent vertex and it's currently minimal vertex to a master task. If we are not in the first iteration we are going to loop through the active vertices and their adjacent vertices, broadcasting to every adjacent vertex what the new minimal id is.
  3. Sync so that every task receives the messages.
  4. A master task is fetching the results and updating the minimal vertex ids. If a vertex has been updated, we put its id into the "activeVertexMap". Sending the active vertices to all existing peers.
  5. Sync again: if we received nothing, just break the main loop resulting in exiting the bsp. If we receive new active vertices, increment the iteration and continue with point 2. 
Input / Output System
Maybe you know that Hama has currently no real Input and Output system. So you have to take care for yourself: Manage the data the grooms need for their computation, partitioning and updating.
I hope hat this issue will be resolved soon, so that this whole management will be inside Hama and is not blowing up your BSP class.
For this example we need two major files: The actual adjacency list and the map that keeps track of the current minimas. If you are a wondering why, the last time we saved this up into a vertex itself. This is true, but I don't want to add another model for a vertex. This is just a personal reason, so feel free to fork and build your own ;)
Both files are in HDFS, a far more scalable solution would be to store these into a HBase table. But since Hama doesn't require HBase anymore, I'll go for a FileSystem-way of storing data.

Source Code
If you want to see the full implementation, check it out at http://code.google.com/p/hama-shortest-paths/

Let's start with the initialization phase in the bsp method:

String master = conf.get(MASTER_TASK);
  fs = FileSystem.get(getConf());

  // setup the datasets
  adjacencyList = mapAdjacencyList(getConf());
  // setup the local minimas
  if (peer.getPeerName().equals(master)) {
   // init the minimum map
   for (Entry<Integer, List<Integer>> row : adjacencyList.entrySet()) {
    int localAdjacentMinimum = row.getValue().get(0);
    for (int adjacent : row.getValue()) {
     if (adjacent < localAdjacentMinimum)
      localAdjacentMinimum = adjacent;
    }
    minimalMap.put(row.getKey(), localAdjacentMinimum);
   }
   // save our minimal map to HDFS so that every task can read this
   saveMinimalMap();
  }


As you can see, we are getting from our configuration which groom is currently the master server, aftwards we are initializing the FileSystem and map our adjacency list file into RAM. After that follows code that is only executed by the master.
It simply loops through the list and setups the currently minimum adjacent vertex.
Only the master has write access to the minimalmap file and updates it after each iteration.
That's it. Let's step to the main loop.

// real start
  boolean updated = true;
  int iteration = 0;
  while (updated) {
   // sync so we can receive the new active messages
   peer.sync();
   List<Integer> activeQueue = new LinkedList<Integer>();
   if (peer.getNumCurrentMessages() > 0) {
    IntegerMessage message = (IntegerMessage) peer
      .getCurrentMessage();
    if (message.getTag().equals("size") && message.getData() == 0)
     break;
    BSPMessage msg = null;
    while ((msg = peer.getCurrentMessage()) != null) {
     message = (IntegerMessage) msg;
     activeQueue.add(message.getData());
    }
   }
   // apply updates on the minimal map
   applyMinimalMap();
...

First off we are syncing in this loop, in the first iteration it is obvious that nobody would receive a message, but you can also use the sync to keep the grooms at the same line of code. Maybe you already seen this: a server is ahead in computation and the master hadn't finished writing the map into the HDFS. This groom is no longer consistent to the rest of the cluster.
So we are going to prevent this using sync, in the following iterations this is used to receive the active vertices.
If the list of active vertices is empty we are going to break this while loop- the algorithm is done. Otherwise we are updating the activeQueue with the vertex ids we got. Then we are applying the changes the master could have done to the minimal map (method applyMinimalMap()).

Let's advance to the main algorithm.
// main algorithm
   if (iteration == 0) {
    for (Entry<Integer, List<Integer>> row : adjacencyList
      .entrySet()) {
     int min = minimalMap.get(row.getKey());
     for (int adjacent : row.getValue()) {
      peer.send(master, new FullIntegerMessage(adjacent, min));
     }
    }
   } else {
    for (int active : activeQueue) {
     int min = minimalMap.get(active);
     for (int l : adjacencyList.get(active)) {
      if (l != min)
       peer.send(master, new FullIntegerMessage(l, min));
     }
    }
   }

   peer.sync();

I guess this is pretty good described in the listing, if we are in the first iteration we are going to send messages to every adjacent of a vertex in the adjacency list. In the following iterations we are just going to loop over the active vertices and sending messages of the new minimum to every adjacent except for the vertex itself.
> Sync step for sending and receiving messages
Don't worry, now comes the last part ;)

// only the master keeps track of the minimal
   if (peer.getPeerName().equals(master)) {
    FullIntegerMessage msg = null;
    List<Integer> activeList = new LinkedList<Integer>();
    while ((msg = (FullIntegerMessage) peer.getCurrentMessage()) != null) {
     if (minimalMap.get(msg.getTag()) > msg.getData()) {
      minimalMap.put(msg.getTag(), msg.getData());
      // flag as active
      activeList.add(msg.getTag());
     }
    }
    // save to hdfs for next iteration
    saveMinimalMap();
    // send messages to all peers containing the size of the
    // activelist and the content
    for (String peerName : peer.getAllPeerNames()) {
     peer.send(peerName,
       new IntegerMessage("size", activeList.size()));
     for (int active : activeList)
      peer.send(peerName, new IntegerMessage("", active));
    }
    // increment the iteration
    iteration++;
   }
  }

This part is only executed by a master groom. We receiving every message and updating the minimalmap. If we updated a vertex we are going to put them into the list of active vertices. Afterwards we are saving our minimal map so the grooms have a fresh state of minimum in their RAM.
Then we are going to send the size of this list along with it's content. This is necessary for the breaking condition. And don't forget to increment the iteration variable.

That's it. It is the same algorithm we used in MapReduce- translated to BSP.
Wasn't that difficult, was it?

If you want to take a close look at how this works, I already posted the project site of my GSoC project above. Feel free to check it out and play a little. The class we were talking about can be found here: de.jungblut.hama.graph.BSPGraphExploration
It comes along with the latest Hama build out of the trunk, it also has a local bsp runner that will multithread grooms on your local machine. Just run the main method as a java application and you'll see. So be aware when you are running this on your Hama Cluster, there could be some problems with the compatibilty to version 0.2.

Apr 9, 2011

Controlling Hadoop MapReduce Job recursion

This post is related to the previous post.

Sometimes you coming along problems that need to be solved in a recursive manner. For example the graph exploration algorithm in my previous post.
You have to chain the jobs and let the next job work on the output of the previous job. And of course you need a breaking condition. This could either be a fixed limit of "how many recursions it should do" or "how many recursion it really does".
Let me focus at the second breaking condition along with my graph exploration example.

Counter
First off you should know that in Hadoop you have counters, you may see them after a job ran or in the Webinterface of the Jobtracker. "Famous" counters are the "Map input records" or the "Reduce output records".
The best of all is that we can setup our own counters, just with the use of enums.

How to setup Counter?
The simplest approach is to just define an enum like this:

public enum UpdateCounter {
  UPDATED
 }

Now you can manipulate the counter using:

context.getCounter(UpdateCounter.UPDATED).increment(1);

"context" is the context object you get from your mapper or your reducer.
This line will obviously increment your update counter by 1.

How to fetch the counter?

This is as easy as setting up an enum. You are submitting a job like this:
Configuration conf = new Configuration();
  Job job = new Job(conf);
  job.setJobName("Graph explorer");

  job.setMapperClass(DatasetImporter.class);
  job.setReducerClass(ExplorationReducer.class);
  // leave out the stuff with paths etc.
  job.waitForCompletion(true);

Be sure that the job has finished, using waitForCompletion is recommended. Querying the counter during runtime can end in strange results ;)
You can access your counter like this:
long counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)
    .getValue();

How to get the recursion running?

Now we know how to get the counter. Now setting up a recursion is quite simple. The only thing that you should watch for is already existing paths from older job runs.
Look at this snippet:
// variable to keep track of the recursion depth
int depth = 0;
// counter from the previous running import job
long counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)
    .getValue();

  depth++;
  while (counter > 0) {
   // reuse the conf reference with a fresh object
   conf = new Configuration();
   // set the depth into the configuration
   conf.set("recursion.depth", depth + "");
   job = new Job(conf);
   job.setJobName("Graph explorer " + depth);

   job.setMapperClass(ExplorationMapper.class);
   job.setReducerClass(ExplorationReducer.class);
   job.setJarByClass(ExplorationMapper.class);
   // always work on the path of the previous depth
   in = new Path("files/graph-exploration/depth_" + (depth - 1) + "/");
   out = new Path("files/graph-exploration/depth_" + depth);

   SequenceFileInputFormat.addInputPath(job, in);
   // delete the outputpath if already exists
   if (fs.exists(out))
    fs.delete(out, true);

   SequenceFileOutputFormat.setOutputPath(job, out);
   job.setInputFormatClass(SequenceFileInputFormat.class);
   job.setOutputFormatClass(SequenceFileOutputFormat.class);
   job.setOutputKeyClass(LongWritable.class);
   job.setOutputValueClass(VertexWritable.class);
   // wait for completion and update the counter
   job.waitForCompletion(true);
   depth++;
   counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)
     .getValue();
  }


Note that if you never incremented your counter it will be always 0. Or it could be null of you never used it in your mapper or reducer.

Full sourcecodes can always be found here:
http://code.google.com/p/hama-shortest-paths/

Apr 8, 2011

Graph Exploration with Apache Hadoop and MapReduce

Hi all,
sometimes you will have data where you don't know how elements of these data are connected. This is a common usecase for graphs, this is because they are really abstract.
So if you don't know how your data is looking like, or if you know how it looks like and you just want to determine various graph components, this post is a good chance for you to get the "MapReduce-way" of graph exploration. As already mentioned in my previous post, I ranted about message passing through DFS and how much overhead it is in comparison to BSP.
Just let this be a competition of Apache Hama BSP and Apache Hadoop MapReduce. Both sharing the HDFS as a distributed FileSystem.
Looking at the title you know that this post is about the MapReduce implementation, I write a BSP implementation later and compare this with this MapReduce implementation.
Let's introduce the prequisites now.

Prequisites
We have a graph in a format of an adjacency list looking like this:
0
1    4    7
2    3    8
3    5
4    1
5    6
6
7    
8    3
9    0

So the first entry on the left side is always the vertex, therefore all vertices are listed on the leftmost side. Each vertex is described by a number: its id.
Separated by tabs are the vertex ids that are adjacent to the vertex on the leftmost side.

This is quite abstract so let's take a look at this pretty picture:
graph with multiple components
This is how this graph looks like. As you can see there are three components: [1,4,7];[2,3,5,6,8];[0,9].
In some datasets you want to classify each component to a common key that is unique. In this case it is the most common solution to just let a component be classified by its lowest id. E.G the component [1,4,7] has the lowest id 1. It is the classifier for this component.

How do we deal with this in MapReduce?
First of all I recommend you to read into this paper. It describes a technique named "message passing".
Simple: The idea behind this is, that you let the vertices pass messages if a new local minima has been found. Afterwards you are just merging the messages with the real vertices and apply updates on the vertices that had a higher minimum.

So our first task is to write the value class that is representing a vertex AND a message at the same time.
public class VertexWritable implements Writable, Cloneable {

 LongWritable minimalVertexId;
 TreeSet<LongWritable> pointsTo;
 boolean activated;
        
public boolean isMessage() {
  if (pointsTo == null)
   return true;
  else
   return false;
 }

}

And the typical read/write stuff coming with Writable.
So let me explain to you, we have this class representing the Vertex: it has a pointsTo tree that will maintain the adjacent vertex ids and the currently minimalVertexId. And there is also a boolean field that is called "activated".
There is also a method that determines whether this is representing a message or a vertex.

The whole thing is just working like this:
  1. Import the vertices from the adjacency list to the ID mapped to Vertex form.
  2. In the first iteration flag every vertex as activated and write it again.
  3. If a vertex is activated, loop through the pointsTo tree and write a message with the (for this vertex) minimal vertex to every element of the tree.
  4. Merge messages with the related vertex and if we found a new minimum activate the vertex. If nothing was updated then deactivate it.
And then repeat from point 3 until no vertex can be updated anymore.
Part 1 and 3 are inside the Map Task, part 2 and 4 are reduce tasks.
Look here how you can implement a job recursion using Apache Hadoop.

So after all iteration is done you'll have the following output:
0 | VertexWritable [minimalVertexId=0, pointsTo=[0]]
 1 | VertexWritable [minimalVertexId=1, pointsTo=[1, 4, 7]]
 2 | VertexWritable [minimalVertexId=2, pointsTo=[2, 3, 8]]
 3 | VertexWritable [minimalVertexId=2, pointsTo=[3, 5]]
 4 | VertexWritable [minimalVertexId=1, pointsTo=[1, 4]]
 5 | VertexWritable [minimalVertexId=2, pointsTo=[5, 6]]
 6 | VertexWritable [minimalVertexId=2, pointsTo=[6]]
 7 | VertexWritable [minimalVertexId=1, pointsTo=[7]]
 8 | VertexWritable [minimalVertexId=2, pointsTo=[3, 8]]
 9 | VertexWritable [minimalVertexId=0, pointsTo=[0, 9]]

So you see that we always have every vertex on the left side, but now the minimalVertexId is the classifier for the component. And we have the three lowest component identifiers found: 0,1 and 2!

So this looks like that now:
classified graph with multiple components


If you are now interested in getting all vertices to a component identifier you'll be able to write a new mapper that will extract the minimalVertexId as a key and the pointsTo elements as a value. So that in the reduce step they'll be merged together and you can persist your data.

And if you are interested in more source code you are free to look into my Summer of Code project under: http://code.google.com/p/hama-shortest-paths/
You'll find a working algorithm inside of the package called "de.jungblut.mapreduce.graph". The main function to call is inside the class "DatasetImporter.java".
The example input used in this post is also in the trunk. So check this out and you are welcome to use it for your own problems ;)

So the next time I write a BSP that will do the same.

Back to Blogsphere and how BSP works

Hey guys,

I'm back! For those of you that do not remember me: I did some cool Robots for Google Wave.
Maybe you remember the times where no export function was in Google Wave, I targeted it with my bot called "PorteXy". It was a Java portation of the existing Google example called "Exporty". From the count in the AppEngine database table I know a lot of you used it and I wanted to thank you all for your support!
Now we all now that Google Wave passed by and gets an Apache Incubator project, I have to focus on other themes that could potentially be interesting for you. Since the last time, a lot has happened.
I am no pupil anymore, I started working at testberichte.de, recently got my OCP in Java and in the mean time I am studying Computer Sciences at HWR Berlin.
You must be thinking: "Oh he must be really busy" - in fact I am not. School kinda sucks, because it is too boring and I have to do something useful. So you've probably heard that Google is offering a new Summer of Code this year: GSoC2011. And I decided to apply to it, my first thought was that I am going to apply for the Apache Software Foundation.
Mainly because I worked a lot with Hadoop and Distributed Systems in general and loved to use the Apache2 server running my PHP applications or my Tomcat running my Java applications. But Hadoop wasn't a project eligible for GSoC (Why? I know it has a really large codebase, but there are so many cool task that can be done by students too!), so I had to look for another project. Then I've seen Mahout, I used Mahout for k-means clustering recently and looked at the task offering: Support more data import mechanisms. Are they serious? Just importing raw data into SequenceFiles? Seriously, you can't use 12(!) weeks for that task coding 40h/week. This would be a task for a weekend, if at all. So I was looking for something more cool and complex, but this should be in Java and somehow related to distributed systems.

So I came across Apache Hama and this task is smiling at me: Development of Shortest Path Finding Algorithm. BÄM! This was the task I've searched for, I love distributed Computing and I love Graphs.
You probably know that Hama uses BSP (Bulk synchronous parallel) for its computing.
This is actually an abstraction to MapReduce. Have a look at this picture on wikipedia:

BSP


If you translate MapReduce into BSP, then your map-phase will be the local computation-phase. After that you are going to merge the map output and sort it. That would be the communication phase. Now comes the Barrier Synchronisation: You know that no reducer can run if not all map task completed. So this step is a bit merged with the communication phase, but after that it is entering a new local computation phase: the reduce-phase.
So you see, BSP is not the same like MapReduce, but you can actually describe MapReduce with BSP.

What is the advantage of using BSP instead of MapReduce in Graph Processing:
Those of you who implemented graph algorithms with MapReduce probably know what a pain it is to pass messages through HDFS and process them in several chained jobs. I believe it was this paper here, what described very well how this works: http://www.umiacs.umd.edu/~jimmylin/publications/Lin_Schatz_MLG2010.pdf

Even if you don't know it how to implement them (maybe I'll go for a tutorial later on) believe me: this is not optimal!
Why should we pass messages through DFS? Well, just communicate the messages!
And that is the real advantage of BSP, you having less sync (in MapReduce you have 3 major sync steps in a job: the beginning, the reducer and the end. The end, because you can't run a follow-up job before the last one finishes, and therefore the beginning too) and a more "real-time" messaging than MapReduce.

BSP is actually used at Google too: http://googleresearch.blogspot.com/2009/06/large-scale-graph-computing-at-google.html, you can google for the Pregel paper for further reading.

In my opinion Apache Hama has a great perspective, because there is no other OpenSource alternative for BSP processing and the best thing of all: I'll be part of it.

Back to the shortest path finding. As you can see, I already started to implement a basic Dijkstra and made some thoughts on how this is going to work within a large cluster, mainly in terms of scalability.
If you are interested in shortest paths in large graphs you should definitely bookmark this issue or at least vote it up.

The application deadline of GSoC is in approx. 10 hours and I hope ASF will pick me :-)

This was a really large first posting and I want to thank especially Edward J. Yoon, he is my mentor at Apache Hama and told me that I should blog again. This is part of his "young open source stars"-support ;) Visit his blog or follow him on twitter, he is a real genius!

Obviously you can also follow me on twitter, look at the sidebar on the right side.

Greetings from Germany!