Dec 10, 2011

k-Means Clustering with BSP (Intuition)

Hey all,

I had a bit time to get my k-means clustering running with BSP and Apache Hama. I wanted to share this with you.
Actually this is a sequel to a series that I have announced long ago: Series: K-Means Clustering (MapReduce | BSP).
You may remember that I have already made a post about K-Means and MapReduce, now I want you to show that k-means clustering with BSP is much faster than the MapReduce one.
Don't expect a benchmark in this post, but I will give you a real comparision a bit later once Apache Hama 0.4.0 rolls out.
So this post is mainly about the algorithm itself and what my ideas were to make it running and scalable.
I'm going to make a next post which is containing more code and explain more clearly how it works. But code is still changing, so this will take a bit more time.

Quick note right at the beginning: As I mentioned it, I am currently developing with a Apache Hama 0.4.0 SNAPSHOT, since it hasn't been released yet.

Let's step into it!

Model Classes

We need some kind of Vector and something that is going to represent our Center.
These are very basic, actually it is just a wrapper object with an array of doubles and some convenience methods. A Center is just the same like a Vector, but it also adds some more methods to detect if it has converged or averaging two centers.

You can have a look on Github at them if you're interested.

Idea behind k-means with Apache Hama's BSP

Before really showing you the algorithm in the source code, I'd like to tell you what my plan and intention was. 
In a typical clustering scenario you will have much much more points than centers/means. So n  >>  k (">>" for much larger than), where n is our number of vectors and k is our number of centers. 
Since Apache Hama 0.4.0 will provide us with a new I/O system it makes it really easy to iterate over a chunk of the input on disk over and over again, we can use this fact and put all our centers into RAM.

The trick in this case is that unlike in graph algorithms, we do not split the centers over the tasks, but every task holds all k-centers in memory. 
So each task gets a part of the big input file and every task has all centers. 
Now we can easily do our assignment step, we just iterate over all input vectors and measure the distance against every center. 

While iterating we find the nearest center for each of the n vectors. To save memory we are going to average our new center "on-the-fly". Follow the Average in data streams article on Wikipedia, if you're not familiar with it. 

At the end of the assignment step, we have in each task the "on-the-fly" computed average new centers. 
Now we are going to broadcast each of this computed averages to the other tasks.
Then we are going to sync so all messages can be delivered.
Afterwards we are iterating in each task through the messages and averaging all incoming centers if they belong to the same "old" mean. 

I know this is difficult to explain, but please consult this picture, it is about the exchange of the locally computed mean for two tasks.
Message exchange with mean calculation

As you can see on this picture, we have two tasks which have calculated "their version" of a new mean. Since this isn't the "global mean" we have to calculate a new mean that will be consistent across all the tasks and is still the correct mathematical mean.
We apply the "Average on data streams" strategy. Each task is going to get the local computed averages from each other task and is reconstructing the global mean.

Since this calculation is the same on every task, the means are still globally consistent across the tasks just with the cost of one global synchronization. Fine!
Actually this is the whole intuition behind it. As the algorithm moves forward, this whole computation is running all over again until the centers converged = don't change anymore.
This is much faster than MapReduce, because you don't have to submit a new job for a new computation step.
In BSP the superstep is less costly than running a MapReduce job, therefore it is faster for this kind of tasks.

When you plot the result, you come up with something that looks like this:

K-Means BSP Clustering with K=3

In one of the upcoming posts, I'm going to explain you the code in detail.

If you are interested in the code and can't wait, you can have a look at it in my Github here:
K-Means BSP Code

The code will randomly create some vectos and assigns k initial centers to the first k-created records. You can run it via a main-method in your IDE or to your local-mode Hama cluster.

See you!

Oct 24, 2011

Apache Hama realtime processing

Hi there,

today is about realtime processing with Apache Hama.
One day, Edward told me about a guy, who told him, that he uses Hama for realtime processing.

At first this is quite a strange/new thing, because inherently BSP is used (just like MapReduce) for batch processing. It has several advantages over MapReduce, especially in graph and mathematical use cases.

I think this new "feature" is the greatest advantage over MapReduce.
Let me clarify a bit how it works.

At first you will have some tasks which are going to be our so called event collectors. In my example this will be a single master task.
Anyways, the trick is now that the event collectors are waiting for new events to come, or even poll for new events that happened, they do it in a while loop. Something which is possible in MapReduce though.

Now you can built a producer/consumer pattern on top of this. Which just says, your event collectors are messaging computation tasks to do some computation on the data we have just sent. This will allow you to do more complex stream analysis in near-realtime.
We will see this in an example a bit later.

Why is this better than a MapReduce job?
If you run a MapReduce job, you can straight poll for data available inside a while loop, too. But without a messaging system between the tasks you are forced to write your data into HDFS to make it available for a broader amount of tasks to parallelize your workload.
Since Hadoop has lots of job scheduling and setup overhead, this is not realtime anymore. That is now degenerating to batch processing.
For those of you who are familiar with Giraph, it is similar to that MapReduce Job, where tasks messaging with other MapReduce tasks. Sadly they just focused on graph computing and are strongly dependend on input from filesytem.

Example: Realtime Twitter Message Processing 
Yes, we can analyse Twitter data streams in BSP in realtime!
What do we need?
  • Twitter Library, in this case Twitter4J
  • Latest Hama, in this case this is a 0.4.0 snapshot. You can use 3.0 as well, with minor code changes.
Let's dive directly into it and look how to setup the job.

HamaConfiguration conf = new HamaConfiguration();
    // set the user we want to analyse
    conf.set("", "tjungblut");
    // I'm always testing in localmode so I use 2 tasks.
    conf.set("bsp.local.tasks.maximum", "2");
    BSPJob bsp = new BSPJob(conf);
    bsp.setJobName("Twitter stream processing");

I think this is pretty standard, the trick is here to set the desired username of the guy who you want to analyse.
In my case this is my twitter nick "tjungblut".

I omit the setup method and the fields now, if you have questions on what I've done there, feel free to comment on this post.

The real (time) processing
Let's step directly to the processing and the mimic of the producer/consumer pattern.
The idea is simple: A master task is polling for new "Tweets" and is sending this directly to our computation tasks (fieldname is otherPeers, which contains all tasks but the master task).
This happens while our computation tasks are waiting for new "food" to arrive.
Once our computation tasks get a message, they can directly start with their computation.

Let's see how the master tasks is doing the job:

  public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
      InterruptedException {

    if (isMaster) {
      while (true) {
          // this should get us the least 20 tweets of this user
          List<Status> statuses = twitter.getUserTimeline(userName);
          for (Status s : statuses) {
            // deduplicate
            if (alreadyProcessedStatusses.add(s)) {
              System.out.println("Got new status from: "
                  + s.getUser().getName() + " with message " + s.getText());
              // we distribute messages to the other peers for
              // processing via user id partitioning
              // so a task gets all messages for a user
                  otherPeers[(int) (s.getUser().getId() % otherPeers.length)],
                  new LongMessage(s.getUser().getId(), s.getText()));
          // sync before we get new statusses again.
 ... // computation task stuff
Note: I've ommitted a lot of details (try/catchs) and pre-mature optimizations which can be found in the code.

As you can see the event collector (aka master task) is polling the twitter API to get the newest tweets of a given user.
Now the master is sending the new messages to our computation task.
Note that there is a simple trick to distribute the work equally to the tasks. In our case we have just a single user we are listening on, and two tasks. So this won't do anything but sending this directly to another task.
You can change this behaviour by either listening to the public timeline or changing the distribution of the message by using the message id instead of the user id. I hope you get the gist ;)

In short: We are listening to a specific user and therefore every message goes from the collector directly to the computation task. In our case we have only 2 tasks, so increasing the tasks will just cause one task to be idle the whole time.

Let's have a look at the slave task (aka computation task).

This is very simple:
// we are not the master task... so lets do this:
} else {
      while (true) {
        // wait for some work...
        LongMessage message = null;
        while ((message = (LongMessage) bspPeer.getCurrentMessage()) != null) {
          System.out.println("Got work in form of text: " + message.getData()
              + " for the userid: " + message.getTag().longValue());

As you can see, this is a pretty simple consumer.
You could now add some logic to it. For example to track the communication between a person and others: How often, how much and what content.

In my case, this looks like this:

Console output of the realtime processing job

Note that it directly came up after it has been send.
Now, this is a real cool thing!

If you would have unlimited access to the public timeline (sadly this is capped by 150 requests/h) and you have enough computational power in your cluster, you can do your own trending topics!

Twitters worldwide trending topics

Of course you can do everything else you want to.

I hope this has been quite "illuminating" for you and shows you how to enable realtime processing if you have Hama.

Of course you can checkout my sourcecode my github. The class we just talked about is available here:

Have fun and good luck!

Apache Hama upcoming features

Hi all,

for me it is a pleasure to bring you a couple new things and announcements in this blog post.

Apache Hama 4.0 is on its way, and I want to introduce several pieces of fancyness before we dive into the realtime processing (will be the follow up blog post).
  1. Revised BSP execution flow
  2. Multiple Tasks per groom
  3. YARN integration 
Revised BSP execution flow

The first point is a very good improvement. Writing BSP is totally convenient now.
Let's take a look at the implementation of a BSP in Hama 3.0:

class OldBSP extends BSP{

  public void bsp(BSPPeerProtocol arg0) throws IOException, KeeperException,
      InterruptedException {
    // TODO Auto-generated method stub
  public void setConf(Configuration conf) {
    // TODO Auto-generated method stub
  public Configuration getConf() {
    // TODO Auto-generated method stub
    return null;

You see it in my eclipse generated subclass. You have to override a plenty of methods.
Two of them (if you are not familiar with Hadoop) seem to be very strange. What is that configuration? And why do I need to set this in my code?

Well, this is now history. We have revised the design and now shipping with default implementations of every method in the BSP class.

Additionally we have added a setup and a cleanup method. Setup is now called before the computation starts, cleanup after your computation has been done.

Let's see:

public class NewBSP extends BSP{

  public void setup(BSPPeer peer) throws IOException, KeeperException,
      InterruptedException {
  public void bsp(BSPPeer peer) throws IOException, KeeperException,
      InterruptedException {

  public void cleanup(BSPPeer peer) {


It is a lot more intuitive isn't it? Now YOU can control the methods you need to override. And it is fully transparent when the methods are called.

And the best side-effect is that you can send messages and trigger a barrier sync while in setup!
This enables you now to send initial messages to other tasks and distributed information which hasn't been set in the configuration.
BTW: Your jobs configuration can now be obtained via peer.getConfiguration().

Multiple Tasks per groom

Yeah, we made the step to multitasking. In Hama 3.0 we only had a single task inside the groom.
This didn't really utilize the machines, because while executing a single BSP, other cores might be unused.
Like in Hadoop this is now configurable per host. So you can set the number of tasks which should be executed on a machine.

YARN integration

If you don't know what YARN actually is, let me clarify a bit. YARN stands for Yet Another Resource Negotiator.
This is Hadoops new architecture. If you want to learn more, have a look at Aruns slides here.

If you now know what the new Hadoop is, I'm proud to tell you that Apache Hama will be a part of it.
We implemented our own application to fit with the new YARN module and bring Hama to your Hadoop 23.0 cluster.
No more setup and configuration of additional daemons!

We managed to get a first BSP (Serialize Printing) running on a YARN cluster.

Serialize Printing on YARN
That is soo great!

We are still in development, so please follow HAMA-431 if you are interested.

Thanks for your attention, and please follow us on the mailing list! We are happy to answer your questions if you have one.

Aug 30, 2011

Ant Colony Optimization for TSP Problems

Code can be found under (

Hi there,

I recently focused on playing around with ant colony AI (artificial intelligence) methods, as part of my second semester undergraduation project in Software Engineering.

Wikipedia's definition for Ant Colony Optimization is the following:
In computer science and operations research, the ant colony optimization algorithm (ACO) is a probabilistic technique for solving computational problems which can be reduced to finding good paths through graphs.
Unlike previous posts, it is a multithreaded solution which runs best on a single machine and not in a cluster and this post is about TSP (Traveling Salesman Problem) and not about finding shortest paths in a graph.

The Input

At first, we have to decide which input we take. In my case, this task was a university assignment so I have to use the given one. It is the Berlin52 problem which should be solved with the less costly route along the 52 best known / famous places in Berlin.
You can download an euclidian 2D coordinate version of it here. Just look for the "berlin52.tsp.gz" file. There is also an already calculated optimal solution here.

Plotting the file is going to show a cool roundtrip:

GnuPlot of the Berlin 52 Tour

The file content looks quite like this:
1 565.0 575.0
2 25.0 185.0
3 345.0 750.0
4 945.0 685.0
On the leftmost side is the ID of the vertex, in the middle is the x-coordinate and on the rightmost side is the y-coordinate in the euclidian plane.

The Algorithm

This algorithm is a mimicry of the real-life behaviour of ants. As you might know, Ants are quite smart in finding their way between their colony and their food source. A lot of workers are walking through the proximate environment and if they find some food, they leave a pheromone trail.
Some of the other ants are still searching for other paths, but the most of them are following the pheromone trail- making this path even more attractive. But over time the pherome trail is starting to decay / evaporate, so it is losing its attractiveness. Due to the time component, a long way has a very low density of the pheromone, because the pherome trail along the longer path is evaporating faster. Thus a very short path has a higher density of pheromones and is more attractive to the workers converging onto an approximately optimal path for our TSP problem.

For the problem, we are dropping ants on random vertices in our graph. Each ant is going to evaluate the next best destination vertex based on this following formula:
It is about the probabiliy "p" for an ant called "k" to move to the vertex described by "x" and "y".
The variable "Tau" is the amount of pheromone deposited on the edge between "xy". It gets raised by "alpha" which is a heuristic parameter describing how greedy the algorithm is in finding its path across the graph. This is going to be multiplied by our apriori knowledge of how "good" the edge is. In our case this is the inverted distance (1 / distance) between x and y. This gets raised by "beta" which is also a heuristic parameter, which describes how fast the ants are going to converge to a steady solution. To get a transition probability to a vertex, each gets divided by the sum of the numerator over all possible left destination vertices.

The next equation is about adjusting the pheromone matrix, which is described by the following formula:
"Tau" is the amount of absolute pheromone which gets deposited for worker "k" on the "xy" edge. "Rho" is a factor between 0-1 which represents the decay of the pheromone. This gets multiplied by the current amount of pheromone deposited and we just add updated new pheromone to it (which is the delta "Tau"). Delta "Tau" is an equatation too:
It is just "Q" by the accumulated distance of the path so far.

Finish! That is all we need to start the implementation!

The whole thing works like this:
  1. Initialize the best distance to infinity
  2. Choose a random vertex in the graph where to plant your ant
  3. Let the ant work their best paths using the formulas from above
    1. Let the ant update the pheromones on our graph
  4. If the worker returned with a new best distance update our currently best distance
  5. Start from 2. until we found our best path or we have reached our maximum amount of workers.
  6. Output the best distance
The single worker will now return with a path which he has taken and with a corresponding distance. Now you can decide if you are confident with the result, or going to let the worker calculate more of them.

So let's see how we can multithread this.

How to Multithread

In this case, multithreading is very easy. Each worker unit (in my repository called "Agent") is a single thread. In Java we have a cool thread pool construct called ExecutorService and a completion service which tells us when workers finished.

We are submitting a few workers for to the pool, they work on the same resources and once completed we get a reponse of the completion service.

Woot? Shared resources? Yes we need some sort of synchronization. In our case when writing to the pheromone matrix. In my latest implementation I used lock-free updates using Guava's AtomicDouble.

The whole algorithm is not going to change, we are just working in parallel.

The Result

After a lot of parameter testing you can find a very good solution:

Console output of my algorithm, finding a very close solution to the optimal distance for Berlin52

Parameter Testing

Because there are a hell lot of parameters (5) in there, we have to write an testing utility which calculates the best parameters for our problem (aka GridSearch).

We only care about the distance to the optimal solution:
In the grid search we only want to keep the lowest possible mean distance to the optimal solution (measured over multiple repetitions) and a very low variance.

For Berlin52, the best parameters using 200k agents I found are:

Alpha = 0.1
Beta = 9
Q = 1e-4
P = 0.8

So feel free to check out the code in the repository and let it run.
And please feel free to use this as a hint for your own implementation, or even improve it via a pull request.

Thanks for your interest and have fun :-)


Aug 20, 2011

Apache Hama Partitioning Improved

Hey guys,

my work phase is over and I'm back to university next week so I'm having a bit more time to write here.

The first free time yesterday focussed on generating random data for Apache Hama / Apache Whirr and for my pagerank algorithm. HAMA-420 is the related issue on Jira.

After a lot of stress with the broken DMOZ file, I came along that partitioning takes far to long for some (1-3) million vertices. I already knew this issue, because it always took nearly 10 minutes to partition the sequencefile (1,26GB) of my SSSP(Single Source Shortest Paths) algorithm.

Where were the problems?

Actually, there were two major problems I saw.
  •  Structure of the in- and output File
  •  Zlib Compression

So first off the structure, for SSSP it looked like:

K           /                V 
Vertex[Text] / AdjacentVertex : Weight [Text]

Every line contained one(!) vertex and it's adjacent. Someone of you might notice that this is quite verbose: yes it is!
We used this structure for both, the input and the partitioned files.

The second thing is the standard compression that comes with SequenceFiles, it is the zLib codec which you have to explicitly turn off. Otherwise compression takes nearly 80% of the writing time without any real effort in file sizes.

How to deal with it?

These two problems were quite easy to solve. The first thing is to change structure and put adjacent vertices along with their origin vertex into one line of input textfile.

To be quite general for inputs I sticked with textfiles (this is very common and readable in every editor) and directly write into a sequencefile which is key/value composal of a vertex and an array of vertices.


The new architecture works on the input textfile and reads each line and passes it into an abstract method.
So everyone can implement their own input-textfile-parser and put it into the vertex the BSP needs. Which then gets directly partitioned via hash and gets written into the sequencefile of the groom.

Later on we have to extend this with various stuff of compression, inputformat, recordreader and other partitioning ways.

The groom will then just read the sequencefile objects and will put them into ram. So no more text deserialization has to be done: CHEERS!

Comparision new vs old

The result is overwhelming! See it in my Calc Sheet:

Improvement of Apache Hamas Example Partitioning

This is just a totally sick improvement. It get's comitted as HAMA-423.

Thanks and Bye!

Jul 9, 2011

Dealing with "OutOfMemoryError" in Hadoop

Hey all,

some of you may have seen some kind of OutOfMemoryError in your Hadoop jobs, which looks like this:
          unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(

This is mainly what is the theme of today's blog post. I'll be writing about what causes this to happen and how to solve it.

As it is currently Juli 2011, I refer to Hadoop 0.20.2 since this is the latest stable release.

The Cause

I've seen many OutOfMemoryErrors in my short life. But this error isn't a "my jvm run out of heap" error. We'll usually see this in deeper in the stacktrace, in my case this was caused by FileSystem.create(). So this error is causing a OOM(will refer to it for OutOfMemory), but not in your JVM. The process that gets started by Hadoop which will execute your task, can't allocate more memory on your host-system.

What does FileSystem.create() do to let your host system get out of memory?
Well, I laughed at it first, too. It was setting the permissions with CHMOD.

The first time I've seen this error in the logs, I googled it. I came across a blogpost which rants about several concerns with Hadoop I come across every day (e.G. Zombie Tasks, XCievers), but it tells the truth.

Let's see what they wrote on this kinds of error:

Terrible with memory usage

We used to have problems with Hadoop running out of memory in various contexts. Sometimes our tasks would randomly die with out of memory exceptions, and sometimes reducers would error during the shuffle phase with "Cannot allocate memory" errors. These errors didn't make a lot of sense to us, since the memory we allocated for the TaskTrackers, DataNodes, and tasks was well under the total memory for each machine.

We traced the problem to a sloppy implementation detail of Hadoop. It turns out that Hadoop sometimes shells out to do various things with the local filesystem.

What I've seen is that the RawLocalFileSystem is going to create a file on the disk and is setting the permission on it with a given FSPermission object which will represent an equal call to CHMOD "XXX" on your system's shell.
In fact, Hadoop is going to launch a process using the ProcessBuilder, to just CHMOD the file it just created.

I guess you are now asking yourself, why this is going to cause an OOM error. If you not already followed the link I've provided above, I recommend to do so.
But I'm going to clarify this a bit more.

The ProcessBuilder in Java under linux will fork a child process. This process is allocating the same amount of memory as its parent process did. So for example you've provided your Hadoop Job to allocate 1G of Heap per Task, you'll end up temporary using 2G of your hostsystem when calling FileSystem.create().

Let me explain a bit more about the fork() in linux.
When calling fork(), linux is going to setup a new task-structure which is going to be a full copy of the parent process. The process is going to get a new process-id and is using the same memory as its parent process. Everything seems fine, but if one of them is going to modify or write something in the memory, the memory will be duplicated. This is called copy on write.

If you're able to read german, I recommend you the book "C und Linux" of "Martin Gräfe". It is very well explained there (although it is in a C context).

I had a job which downloads lots (millions) of files and creates a new file for it inside a mapper. This is going to be parallized so we have multiple task per host machine. The funny thing is, that each task is going to shell-out and CHMOD'ing the file it just created, if other jobs are going to run then, they are simply failing, because they cannot allocate enough memory for their JVM. So did the tasks itself.

The worst thing is to confuse this with JVM OOM's, they are dependend on what you're task is doing. So if you're having a 2GB HashMap to compute things faster in RAM, but your JVM has only 1GB Heap, this is clearly a JVM OOM since no more Heap can be allocated INSIDE.
A usual fix for this is to increase the heapsize of the JVM with -Xmx2G or -Xmx2048M. Don't do this in this case of Host-OOMs! This will even worse the problem. Especially in this specific problem, a child process will then allocate even more RAM, which probably yields in faster failure.

Now let's see how to solve this.

Solve the problem

"" is saying, that adding a lot of swap space is going to solve the problem. I am not a fan of lot's of swap space on Hadoop nodes, mainly because there is a high chance that tasks are getting swapped (due to misconfiguration, other processes etc). And JVM's are not constructed to work correctly when swapped. So you can think of really degrading performance once a task gets swapped. I was suprised that they told that everything got far more stable afterwards. Hell they must have the worst problems on their cluster.

I've backtracked the problem to FileSyste.create(). Actually it was create(FileSystem fs, Path file, FsPermission permission).

Actually, this will cause CHMOD to be forked twice. So the first fix was to use the non-static methods.
Using the non-setting FSPermission methods like create(Path path) will cause a process to be forked anyways.
So you have to call the abstract method in FileSystem directly. It has this signature:

create(Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress)

The way I prevent Hadoop from forking the process is going to set the permission to null.
This will cause a NullPointerException in the NameNode while setting the permission in the distributed FileSystem (the permissions you can see in the webfrontend). This is because the DFSClient is going to RPC the NameNode to register the new created file in the NameNode.
So you have to patch your NameNode by adding a null-check in the setPermission method.
Like this:
public void setPermission(String src, FsPermission permissions
      ) throws IOException {
    if(permissions == null){
       permissions = FsPermission.getDefault();
    namesystem.setPermission(src, permissions);

Note that this is going to not work if you're running Hadoop with lots of different users. I have just one user, which is also the user on the host-system. So this won't cause any problems in my case.

And be aware that adding this nifty "hack" to your code will cause your job to not run on the LocalFileSystem, since this isn't null-safe, too.

Now each task is not going to fork any process at all.

Jun 25, 2011

Set-intersection and -difference with Hadoop

Hi there,

first off a short message:
I am currently a bit busy so I need a bit of time for the BSP K-Means-Clustering I promised in one of the previous posts. I am going to contribute this as an example for Apache Hama and not for my GSoC trunk (where to you can find a link now on the right below the twitter box!). Although I am going to setup a github to store my new code things. I'll link this on the sidebar, too.

The real beginning

So I came across a question on stackoverflow some days ago. It was about computing the set-intersection and -difference with Hadoop's MapReduce. I simply wanted to share my solution with you, this is the main reason of this blog post.

The question was about only two files, we are going to scale this up to "n"-files since this is why we need hadoop ;)

The input

Let's assume (like in the question) that we have a file that is looking like this:

File 1 contains following lines:
File 2 contains following lines:

Actually it is pretty simple, we have to make sure that these lines are stored in different text files.

The Mapper

The main focus and trick in the mapper is how to get the name of the file the task is currently working on.
Let's have a look at my mapper implementation, called FileMapper:

public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {
  static Text fileName;

  protected void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
       context.write(value, fileName);

  protected void setup(Context context) throws IOException,
          InterruptedException {
       String name = ((FileSplit) context.getInputSplit()).getPath().getName();
       fileName = new Text(name);
       context.write(new Text("a"), fileName);

The trick is that when we are importing textfiles, the input split is of the type filesplit which let's you get the path and the name of the file.

The main problem is here to know how many files you've got at all. There can be a nifty hack to just emit the first line of the mapper output with a key that is guranteed to be the first input in our reducer like a plain "a". This is the last line in the setup method.

Now we are going to emit each text line as the key and the filename as the value.
Then we get a bunch of key / values that look like this, key and value are seperated by space and assuming the files names are File1 and File2:

    a File1 // nifty haxx
    b File1
    c File1
    d File1

    a File2 // nifty haxx
    d File2
    e File2

Obviously reducing them will get you an input like this:
    a File1,File2 // our nifty hack :))
    b File1
    c File1
    d File1,File2
    e File2

Once again pretty straightforward. Now we can see a clear structure and know what we have to do in our reducer.

The Reducer

Now let's have a look at our reducer:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    private final HashSet<String> fileNameSet = new HashSet<String>();
    enum Counter {

    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // add for our first key every file to our set
        // make sure that this action is the first of the entire reduce step
            for (Text t : values) {
        } else {
            // now add evey incoming value to a temp set
            HashSet<String> set = new HashSet<String>();
            for (Text t : values) {
            // perform checks
            if(set.size() == fileNameSet.size()){
                // we know that this must be an intersection of all files
            } else {
                // do anything what you want with the difference

As you can see we are just using our "hack" to build a set of files we had in our input. And now we are just checking if we have full intersection over all files and incrementing a counter on that.

What you're doing with the set difference is left up to you, maybe you want to ignore them and do something with the intersecting lines.
Have a try with some files on project gutenberg! I would be pretty interested how many lines are equal in different books.

Here in germany were some discussions about plagiarism, maybe this can be helpful to find intersection of many books / papers very fast.

I've just wanted to point out a possible solution of how to deal with such a problem in mapreduce.

Thanks for your attention and support :)

Jun 2, 2011

Redraw canvas in Android

Hey all,

this post is quite unrelated to every other in this blog.
I was playing around a bit with OpenGL and Android canvas drawing.

I faced a little problem while updating the canvas.

So I had a view like this:

class MyView extends View {
   protected void onDraw(Canvas canvas) {

I wanted to update my canvas on the screen. Some of you maybe think of: save your canvas from the creation and recall the method from outside: won't work.

You actually have to call invalidate(); on your view object.

class MyView extends View {
   protected void onDraw(Canvas canvas) {

   protected void reDraw() {


I didn't know that. So if you are reading this, you know what to do ;)

Greetz from Germany!

May 25, 2011

k-Means Clustering with MapReduce

Hi all,

just finished the MapReduce side implementation of k-Means clustering. Notice that this is a series that contains this post and a follow-up one which implements the same algorithm using BSP and Apache Hama.

Note that this is just an example to explain you k-means clustering and how it can be easily solved and implemented with MapReduce.
If you want to use a more generic version of k-means, you should head over to Apache Mahout. Mahout provides k-means clustering and other fancy things on top of Hadoop MapReduce. This code is also not thought for production usage, you can cluster quite small datasets from 300m to 10g very well with it, for lager sets please take the Mahout implementation.

The clustering itself

We need some vectors (which dimension doesn't matter, hopefully they have all the same dimension). These vectors representing our data, and then we need k-centers. These centers are vectors too, sometimes they are just a subset of the input vectors, but sometimes they are random points or points-of-interest to which we are going to cluster them.

Since this is a MapReduce version I tell you what keys and values we are using. This is really simple, because we are just using a vector, a vector can be a clustercenter as well. So we treat our clustercenter-vectors always like keys, and the input vectors are simple values.

The clustering itself works like this then:
  • In the map step
    • Read the cluster centers into memory from a sequencefile
    • Iterate over each cluster center for each input key/value pair. 
    • Measure the distances and save the nearest center which has the lowest distance to the vector
    • Write the clustercenter with its vector to the filesystem.
  • In the reduce step (we get associated vectors for each center)
    • Iterate over each value vector and calculate the average vector. (Sum each vector and devide each part by the number of vectors we received).
    • This is the new center, save it into a SequenceFile.
    • Check the convergence between the clustercenter that is stored in the key object and the new center.
      • If it they are not equal, increment an update counter
  • Run this whole thing until nothing was updated anymore.
Pretty easy isn't it?:D


Let's have a look at the involved models:

Vector class:

public class Vector implements WritableComparable<Vector> {

 private double[] vector;

 public Vector() {

 public Vector(Vector v) {
  int l = v.vector.length;
  this.vector = new double[l];
  System.arraycopy(v.vector, 0, this.vector, 0, l);

 public Vector(double x, double y) {
  this.vector = new double[] { x, y };

 public void write(DataOutput out) throws IOException {
  for (int i = 0; i < vector.length; i++)

 public void readFields(DataInput in) throws IOException {
  int size = in.readInt();
  vector = new double[size];
  for (int i = 0; i < size; i++)
   vector[i] = in.readDouble();

 public int compareTo(Vector o) {

  boolean equals = true;
  for (int i = 0; i < vector.length; i++) {
     int c = vector[i] - o.vector[i];
     if (c != 0.0d) {
     return c;
  return 0;
   // get and set omitted


You see everything is pretty standard. The compareTo method is just checking equality, just because we don't need an inner ordering- but we want the same keys to get in the same chunk. Be aware that we are returning 1 if they are not equal. Hadoop's quicksort is only swapping the element if it is greater than the other one. <- This is a great tip ;)

If you are not sure aware about this hack, please reimplement this correctly.

The cluster center is basically just an "has-a-vector" class that just delegates the read/write/compareTo method to the vector. It is just devided so we can exactly differ between a center and a vector, altough it is the same.

The distance measurement

I've spoken in the algorithm-description about a distance measuring. But I left this open. Let's declare some things:

We need a measurement of a distance between two vectors, especially between a center and a vector.
I've came up with the manhattan distance because it doesn't require much computation overhead like square-rooting (Euclidian distance) and it is not too complex.
Let's have a look:

public static final double measureDistance(ClusterCenter center, Vector v) {
  double sum = 0;
  int length = v.getVector().length;
  for (int i = 0; i < length; i++) {
   sum += Math.abs(center.getCenter().getVector()[i]
     - v.getVector()[i]);

  return sum;

As you can see, just a sum of each part of the vectors difference. So easy!!! Let's head to the map implementation...

The Mapper

Let's assume that there is a list or a list-like sequencefile-iterating interface that is called centers. It contains ClusterCenter objects that represent the current centers. The DistanceMeasurer class contains the static method we defined in the last part.
// setup and cleanup stuffz omitted
 protected void map(ClusterCenter key, Vector value, Context context)
   throws IOException, InterruptedException {

  ClusterCenter nearest = null;
  double nearestDistance = Double.MAX_VALUE;
  for (ClusterCenter c : centers) {
   double dist = DistanceMeasurer.measureDistance(c, value);
   if (nearest == null) {
    nearest = c;
    nearestDistance = dist;
   } else {
    if (nearestDistance > dist) {
     nearest = c;
     nearestDistance = dist;
  context.write(nearest, value);

Like told in the introduction, it's just a looping and a measuring. Always keeping a reference to the nearest center. Afterwards we are writing it out.  

The Reducer

Once again let's have a list or a list-like sequencefile-iterating interface that is called centers. Here we need it for storage reasons.
// setup and cleanup stuffz omitted once again
 protected void reduce(ClusterCenter key, Iterable<Vector> values,
   Context context) throws IOException, InterruptedException {

  Vector newCenter = new Vector();
  List<Vector> vectorList = new LinkedList<Vector>();
  int vectorSize = key.getCenter().getVector().length;
  newCenter.setVector(new double[vectorSize]);
  for (Vector value : values) {
   vectorList.add(new Vector(value));
   for (int i = 0; i < value.getVector().length; i++) {
    newCenter.getVector()[i] += value.getVector()[i];

  for (int i = 0; i < newCenter.getVector().length; i++) {
   newCenter.getVector()[i] = newCenter.getVector()[i]
     / vectorList.size();

  ClusterCenter center = new ClusterCenter(newCenter);
  for (Vector vector : vectorList) {
   context.write(center, vector);

  if (center.converged(key))


So sorry, but this got a bit more bulky than I initially thought it could be. Let me explain: The first loop only dumps the values in the iterable into a list and sums up each component of the vector in a newly created center. Then we are averaging it in another loop and we are writing the new center along with each vector we held in memory the whole time. Afterwards we are just checking if the vector has changed, this method is just a delegating to the underlying vectors compareTo. If the centers are not equal it returns true. And therefore it updates an counter. Actually the name of the counter is misleading, it should be named "updated". If you are now asking how we are controlling the recursion part, head over here and look how it should work: Controlling Hadoop MapReduce recursion.


I don't want anyone to leave without a working example ;) SO here is our 2-dimensional input: k-Centers:
Input vectors:
Vector [vector=[16.0, 3.0]]
Vector [vector=[7.0, 6.0]]
Vector [vector=[6.0, 5.0]]
Vector [vector=[25.0, 1.0]]
Vector [vector=[1.0, 2.0]]
Vector [vector=[3.0, 3.0]]
Vector [vector=[2.0, 2.0]]
Vector [vector=[2.0, 3.0]]
Vector [vector=[-1.0, -23.0]]
Now the jobs getting scheduled over and over again and the output looks like this:
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[16.0, 3.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[7.0, 6.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[6.0, 5.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[25.0, 1.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[1.0, 2.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[3.0, 3.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[2.0, 2.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[2.0, 3.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[-1.0, -23.0]]

So we see that the two initial centers were moved to (1.4,-2.6) and to (13.5,3.75). Cool thing :D

Here is the code:

The code is located in the de.jungblut.clustering.mapreduce package, if you click run on the KMeansClusteringJob the example data is getting loaded and you can step through the code if you are interested. If you want to run it on your cluster, I assume that you're using 2.2, if not, then you have to take care of the up/downgrade for yourself.

Note that if you are submitting this to a real cluster files like _logs or _SUCCESS may be in the directory of your job. This will break the outputter at the end of the Job.
Either remove the files or modify the method.
Also note that if you run this with a large file, the number of reducers should be set to 1, otherwise there will be file collisions (See the reducer's cleanup method). This can be done better, but I'll leave this to you ;)

Thank you very much.

May 21, 2011

Series: K-Means Clustering (MapReduce | BSP)

Hi all,

I was a bit busy last time so I hadn't that much time to blog.
Several days ago after PageRank I had an idea to implement k-means clustering with Apache Hama and BSP.
Now I've decided to first implement a MapReduce implementation of it, since this is very simple: Reading the centers in setup's method and calculate the distance from each vector to the centers in map phase. In the reduce phase we are going to calculate new cluster centers.

This is very very straightforward. So this will be a series about a MapReduce implementation and a better one with BSP.

'till then!


May 7, 2011

Shortest Path Finding with Apache Hama

Hi guys,

I've finished my Google Summer of Code task. Really! Remember today is the 7th of may. And the actualy coding period goes until mid September.

Okay obviously I've just finished the task itself, developing a new example with BSP. Since Hama does not require HBase anymore I have decided to split the tasks.
One example (which I have submitted) is a straight single source shortest path implementation described in Google Pregel's paper.
The second one will be a HBase version using Dijkstra and its extend A*. The second won't be committed to the codebase of Hama, just because I don't want to add the old HBase dependency once again.

So in the end everyone won: I used HBase to get more familiar with BigTable, Hama has a shortest path example and I can code the whole summer long knowing that I've finished my task ;D

Okay 'nuff talked, let's dive into the algorithm!

Like in PageRank you should be familiar withthe idea behind the partitioning, read the Pregel paper and this time you should be familiar with (single source) shortest path finding.

Short summary of the algorithm

First off I just briefly describe how it should work, and then how I solved it.

  • Initialize all vertices' cost to reach it to INFINITY, just the start vertex will have cost 0
    • initially send the new cost to all adjacent vertex containing the new cost plus the edge weight between them
  • Reviewing messages: if the cost coming from a message is lower than the actual cost, update the cost and send a message to the adjacent vertices, containing the new cost plus the edge weight between them (similar to the last step)
  • Repeat the last step until no updates can be made anymore.
That is pretty much it.

How we do it!

First we need a model class that represents a shortest path vertex. It has a name/ID, a weight and a cost. The cost is the cost with the vertex can be reached from our starting vertex.
A vertex will have an ID, that is just the hashcode of the name. I wanted a common way to partition a vertex so I've just set this based on the name called it ID. Watch out, when adding e.G. cities with the same name.

I will skip the whole partitioning step, you can read the other posts to learn more about it, shortly described it is just a modulo function that will spread the vertices to different sequencefiles. These sequencefiles will get read during job initilization and mapped into memory.

So let's step into the code...

Fields we need

Because we store this time the cost and weights into a modelling vertex we just need a adjacency list and a lookup map.
This looks like this:

private Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
private Map<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();

Basically we are storing a vertex to its adjacents and the name to the vertex itself. I'll tell you later why we need a lookup map.


In the init phase we need to map our adjacency list into ram, get our start vertex (just create it, we need it for equality check in the following loop which will just check the name)
The following loop will just init the costs and send it to the adjacents.

  public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
      InterruptedException {
    // map our input into ram
    mapAdjacencyList(conf, peer);
    // get the start vertex
    ShortestPathVertex start = new ShortestPathVertex(0,
    // get our master groom
    String master = conf.get(MASTER_TASK);
    // init the vertices
    for (ShortestPathVertex v : adjacencyList.keySet()) {
      if (v.equals(start)) {
      } else {
        // INF
      // initial message bypass
      sendMessageToNeighbors(peer, v);

The send method

The send method takes advantage of the partitioning, to get the target groom where the vertex is actually stored.
It will bascially send a message containing the name of the vertex it targets and the cost it can be reached through the vertex in the parameter line.

private void sendMessageToNeighbors(BSPPeerProtocol peer,
      ShortestPathVertex id) throws IOException {

    List outgoingEdges = adjacencyList.get(id);
    for (ShortestPathVertex adjacent : outgoingEdges) {
      int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length));
          new IntegerMessage(adjacent.getName(),
              id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
                  + adjacent.getWeight()));

Main Loop

Very simple is the main loop, it is a while(true) loop that will break if no updates can be made anymore.
So we are just parsing incoming messages, comparing the cost with the current cost. If the new cost is lower, then update it, put it into a queue and increment a local update counter.

Now we need the lookup map, to get fast access to the actual cost in the vertex.

boolean updated = true;
    while (updated) {
      int updatesMade = 0;

      IntegerMessage msg = null;
      Deque<ShortestPathVertex> updatedQueue = new LinkedList<ShortestPathVertex>();
      while ((msg = (IntegerMessage) peer.getCurrentMessage()) != null) {
        ShortestPathVertex vertex = vertexLookupMap.get(msg.getTag());
        // check if we need an distance update
        if (vertex.getCost() > msg.getData()) {
      // synchonize with all grooms if there were updates
      updated = broadcastUpdatesMade(peer, master, updatesMade);
      // send updates to the adjacents of the updated vertices
      for (ShortestPathVertex vertex : updatedQueue) {
        sendMessageToNeighbors(peer, vertex);

Afterwards we are sending the updatecounter to a master groom that will evaluate and check if updates can be applied. I leave this method out, you can check out the pagerank error method. It is roughly the same.

If we have updates to apply, we just send them to the neighbor edges again.
Then we are just repeating until the master says: no updates can occur anymore.

Submit your own SequenceFile adjacency list

This is of course an example, so you can submit this to your own cluster and give it the input you like. I have designed the input like this:

The adjacencylist contains two text fields on each line. The key
component is the name of a vertex, the value is a ":" separated Text
field that contains the name of the adjacent vertex leftmost and the
weight on the rightmost side.
K         /                V 
Vertex[Text] / AdjacentVertex : Weight [Text]
So you can setup a sequencefile like this (obviously I won't write any binary code here :p ):
Berlin /  Paris : 25
Berlin / London : 40
London / Paris : 10

The basic usage of the command line arguments are:
<name of the start vertex> <optional: output path> <optional: path of your own sequencefile>

So you can run this with:
hama/bin/hama jar ../hama-0.x.0-examples.jar sssp Berlin /srv/tmp/ /home/user/myOwnGraph.seq

I've submitted this as a patch here:
So feel free to check it out, I hope it will get comitted soon. Never the less, it is contained also in my trunk on google code:
Class is called: de.jungblut.hama.bsp.ShortestPaths
Just run the main method ;)

Have fun with it! 

May 1, 2011

Google Summer of Code

Hey all,

maybe you've read it already on Twitter, I got picked for GSoC2k11 :D
Of course I'm very proud and want to thank everybody who helped me across the way. Especially my mentor: Edward J. Yoon.

If you don't know what my project actually was, have a look at Development of Shortest Path Finding Algorithm.

Basically I want to create a scalable solution, that takes a HUGE graph :) and and provides a fast solution of SSSP (single source shortest path) problems.
Like I already told in the Apache's Jira issue, I focus on a distributed Dijkstra algorithm that uses an adjacency list. So at first I'll focus on sparse graphs. The adjacency list is stored into a BigTable structure, in my case this will be HBase.

After I finished this, I will focus on a refinement of partitioning and if I have enough time I extend the Dijkstra with an A* heuristic.

These steps are basically my plan for this summer :)

Right now I started my first commit of Dijkstra's algorithm, it currently contains the old BSP version which I started with weeks ago to get familiar with Apache Hama. It is actually a working example, so feel free to check out the code and run its main method. This will start a local multithreaded BSP with some sample data of the german wikipedia page of Dijkstra's algorithm.
The class is located here: de.jungblut.hama.bsp.BSPDijkstra.
The repository can be found here:

Please star this project and the jira issue :D

I'll keep you updated on every change and thoughts I made. 
Thank you very much: Edward, Apache Software Foundation and Google!

Apr 23, 2011

Profiling Apache Hama BSP's

Hi all,

sometimes you aren't aware of why your application is performing slow or slower than before and you need a profiler to know in which methods the most time is spent.

Which profiler should I use?

That depends on your needs, I prefer YourKit. It has a fairly good thread monitoring and cool inspectations. Most of the time I'm searching for memory leaks or why a thread has deadlocked, so this profiler is a very helpful tool for these cases.

This time we are focussing on the profiling of a BSP method / class. Or: How you can profile your own Hama application inside of a cluster.

In my case I'm using VM's with Ubuntu, and Windows 7 on my host machine. This might be a common setup. YourKit itself is written in Java and it can be executed on Unix as well.
We are going to start the application with the profiler agent and tunnel the profiler port with Putty to our host machine: in my case Windows.

Let's go!

  1. Download Yourkit here
  2. Install Yourkit and Putty (if you're running windows, otherwise you can just use your shell) on your host machine.
  3. Download Yourkit for the server machine and untar it
  4. Look for the shared object ( in the yjp-9.5.5/bin/linux-x86-32 directory. Note: Make sure you pick the right version for your JVM. 32 bit if you aren't sure, 64 bit else.
  5. Copy it under a path that is more readable like "/usr/local/yk/"
Now is YourKit setup properly and we have to set the agent to the Hama configuration on the BSPMaster.
Look into your "hama-site.xml" and if the property key "" isn't set yet, you have to override it with that:

    <value>-Xmx1024m -agentpath:/usr/local/yk/</value>
Sure you'll have to replace the agentpath with the path you copy your shared object to. Make sure you have chown'd it with the user that runs Hama. Note: The heap limit is just a copy of the default value in hama-default.xml. Anything configured in hama-default.xml will be overridden by the hama-site.xml configuration parameters.

Now you have to make sure that this is on EVERY machine on your cluster. Hama currently can't just let a few tasks start with the profiler (Like Hadoop), you'll have to provide this for every groom.

If this is working, you can just start DFS, start BSPMaster and the agent will be started within. You can see this in your groom's log by an entry like this:
2011-04-22 17:45:53,104 INFO org.apache.hama.bsp.TaskRunner: attempt_201104221744_0001_000001_0 [YourKit Java Profiler 9.5.5] Loaded. Log file: /home/hadoop/.yjp/log/1864.log
Don't forget to take a look into the log file to determine on which port the agent is broadcasting information. This is necassary for the remote connection of Putty.
In every cases yourkit will test your ports by starting with 10000, if its blocked, it picks 10001 and so on.
In the log you should see something equal to this:
[YourKit Java Profiler 9.5.5] [0.539]: Profiler agent is listening on port 10001
Ok now we have to setup our Putty on our Windows system.
Enter all your information on the Session tab: like the hostname of the BSPMaster. In my case this is thomasjungblut@ and port 22.

Now switch to the Connection->SSH->Tunnels tab and add a new tunnel.
For port 10001 it will be:
Sourceport 10001, Destination raynor:10001, Local and Auto.
The destination host:port pair is the host:port of your BSPMaster.

Now open up the session, login and the profiling data will be tunneled over this port.

Inside of YourKit you'll have to click: "Connect to remote application" and enter localhost:10001 (or the other port if YourKit gave you another).
Congratulations, you connected to your running application inside of your cluster.

Just a tip if you are profiling Hama. You'll have to deactivate the filter under Settings->Filter. Otherwise every method call will be filtered. BUT if you just want your own application to profile, this is just noise of the framework. So you can filter it.

Good luck and have fun!

Apr 17, 2011

Apache Hama network throughput

Hey I've read about a new task: Improve output of RandBench example
This is basically about some more output. But this actually caused me to measure the throughput of my virtual Hama cluster.

My cluster

Some months ago I decided to get rid of my real hardware server and just go along with one massive machine that will host multiple virtual machines. I have now two different virtual hosts (virtualbox ftw) with 2 cores and 1gig of memory each.
The hosts are connected by a virtual Intel PRO/1000 MT Desktop card.
On server is configured as a namenode, datanode, bspmaster and groom. The other machine is just a datanode and a groom.


Randbench is just a simple looping utility that will send a constant size data packet randomly across your cluster.
If you are familiar with BSP you know that you can put messages into a local queue, which is nothing else than storing the packet into the local random access memory. This doesn't really need to be benchmarked. So we can assume that if we run this on a two host cluster there is a 50% probability that the message will be send to the other host over the network. Every host executing its BSP method will actually twice the data send.
On a two host cluster you can actually assume that the size of the data we are giving at startup will be send to the other host.

Startup Options

In my little test we are going to use these parameters: bench 524288 100 2000.
The first number represents the message size in bytes (so this is 512kb), the second argument telling the benchmark how often we are sending this message in a superstep. And the last argument is the number of supersteps.
This will sum up to a total size of 97,65625gb. Note that the benchmark is also sending a string as the message tag which tells where the message comes from and goes to. This will cause the data send to be a bit higher.
In my case this is 30 bytes ("karrigan:61000 to raynor:61000", yea my vhosts are starcraft characters:P) for each messages that has been sent. So 100 * 2k supersteps will result in 200000 messages with 30 bytes each will sum up to 6000000 bytes, that is roughly 5,7 mb. This won't be a great deal compared to the 97g of byte arrays.


Afterwards the benchmark tells you how much time this test took:
11/04/17 13:07:57 INFO bsp.BSPJobClient: The total number of supersteps: 2000
Job Finished in 1502.107 seconds
Now we can calculate the average networking throughput, because we know how much data we've sent in a delta of time.
97gb / 1502.107s = 66mb/s

Note that...
  • these timings are VM based so I will disclaim a conclusion here
  • the data send is a dummy data array filled with zeros
  • we can therefore greatly run into caching

Apr 16, 2011

PageRank with Apache Hama


some days ago I read in the Hama-dev mailing list about the Nutch project that want a PageRank implementation:

Hi all,
Anyone interested in PageRank and collaborating w/ Nutch project? :-)

So I thought, that I can do this. I already implemented PageRank with MapReduce. Why don't we go for a BSP version?:D
This is basically what this blog post is about.

Let's make a few assumptions:
  • We have an adjacency list (web-graphs are sparse) of webpages and their unweighted edges
  • A working partitioning like here. (You must not implement it, but should know how it works)
  • We have read the Pregel Paper (or at least the summary)
  • Familiarity with PageRank
Web Graph Layout

This is the adjacency list. On the leftmost side is the vertexID of the webpage, followed by the outlinks that are seperated by tabs.

1 2 3
3 1 2 5
4 5 6
5 4 6
6 4
7 2 4

This will be pretty printed a graph like this:

I have colored them by the incoming links, the vertex with the most in-links is the brightest, the vertex with few or no in-links is more darker.
We will see that vertex 2 should get a high PageRank, 4, 5 and 6 should get a more or less equal rank and so on.

Short summary of the algorithm

I am now referring to the Google Pregel paper. At first we need a modelling class that will represent a vertex and holds its own tentative PageRank. In my case we are storing the tentative PageRank along with the id of a vertex in a HashMap.
In the first superstep we set the tentative PageRank to 1 / n. Where n is the number of vertices in the whole graph.
In each of the steps we are going to send for every vertex its PageRank, devided by the number of its outgoing edges, to all adjacent vertices in the graph.
So from the second step we are receiving messages with a tentative PageRank of a vertex that is adjacent. Now we are summing up these messages for each vertex "i" and using this formula:
P(i) = 0.15/NumVertices() + 0.85 * sum
This is the new tentative PageRank for a vertex "i".
I'm not sure whether NumVertices() returns the number of all vertices or just the number of adjacent vertices. I'll assume that this is the count of all vertices in the graph, this would then be a constant. Now adding the damping factor multiplying this by the sum of the received tentatives of the adjacent vertices.

We are looping these steps until convergence to a given error will be archived. This error is just a sum of absoluting the difference between the old tentative PageRank and the new one of each vertex.
Or we can break if we are reaching a iteration that is high enough.

We are storing the old PageRank as a copy of the current PageRank (simple HashMap).
The error will thus be a local variable that we are going to sync with a master task- he will average them and broadcasts it back to all the slaves.


Let's look at the fields we need:

private static int MAX_ITERATIONS = 30;
 // alpha is 0.15/NumVertices()
 private static double ALPHA;
 private static int numOfVertices;
 private static double DAMPING_FACTOR = 0.85;
 // this is the error we want to archieve
 private static double EPSILON = 0.001;

        HashMap<Integer, List<Integer>> adjacencyList = new HashMap<Integer, List<Integer>>();
 // normally this is stored by a vertex, but I don't want to create a new
 // model for it
 HashMap<Integer, Double> tentativePagerank = new HashMap<Integer, Double>();
 // backup of the last pagerank to determine the error
 HashMap<Integer, Double> lastTentativePagerank = new HashMap<Integer, Double>();

Keep in mind that every task just has a subgraph of the graph. So these structures will hold just a chunk of PageRank.

Let's get into the init phase of the BSP:

 public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
   InterruptedException {
  fs = FileSystem.get(getConf());
  String master = conf.get(MASTER_TASK);
  // setup the datasets
  adjacencyList = mapAdjacencyList(getConf(), peer);
  // init the pageranks to 1/n where n is the number of all vertices
  for (int vertexId : adjacencyList.keySet())
     .put(vertexId, Double.valueOf(1.0 / numOfVertices));


Like we said, we are reading the data chunk from HDFS and going to set the tentative pagerank to 1/n.

Main Loop

// while the error not converges against epsilon do the pagerank stuff
  double error = 1.0;
  int iteration = 0;
  // if MAX_ITERATIONS are set to 0, ignore the iterations and just go
  // with the error
  while ((MAX_ITERATIONS > 0 && iteration < MAX_ITERATIONS)
    || error >= EPSILON) {


   if (iteration >= 1) {
    // copy the old pagerank to the backup
    // sum up all incoming messages for a vertex
    HashMap<Integer, Double> sumMap = new HashMap<Integer, Double>();
    IntegerDoubleMessage msg = null;
    while ((msg = (IntegerDoubleMessage) peer.getCurrentMessage()) != null) {
     if (!sumMap.containsKey(msg.getTag())) {
      sumMap.put(msg.getTag(), msg.getData());
     } else {
        msg.getData() + sumMap.get(msg.getTag()));
    // pregel formula:
    // ALPHA = 0.15 / NumVertices()
    // P(i) = ALPHA + 0.85 * sum
    for (Entry<Integer, Double> entry : sumMap.entrySet()) {
       ALPHA + (entry.getValue() * DAMPING_FACTOR));

    // determine the error and send this to the master
    double err = determineError();
    error = broadcastError(peer, master, err);
   // in every step send the tentative pagerank of a vertex to its
   // adjacent vertices
   for (int vertexId : adjacencyList.keySet())
    sendMessageToNeighbors(peer, vertexId);


I guess this is self explaining. The function broadcastError() will send the determined error to a master task, he will average all incoming errors and broadcasts this back to the slaves (similar to aggregators in the Pregel paper).
Let's take a quick look at the determineError() function:

private double determineError() {
  double error = 0.0;
  for (Entry<Integer, Double> entry : tentativePagerank.entrySet()) {
   error += Math.abs(lastTentativePagerank.get(entry.getKey())
     - entry.getValue());
  return error;

Like I described in the summary we are just summing up the errors that is a difference between the old and the new rank.


Finally we are able to run this and receive a fraction between 0 and 1 that will represent the PageRank of each site.
I was running this with a convergence error of 0.000001 and a damping factor of 0.85. This took about 17 iterations.

------------------- RESULTS --------------------
2 | 0.33983048615390526
4 | 0.21342628110369394
6 | 0.20495452025114747
5 | 0.1268811487940641
3 | 0.0425036157080356
1 | 0.0425036157080356
7 | 0.02990033228111791

This will result in about 1.0 in the sum of all ranks, which is correct.
Note that the output if you are running this job is not guaranteed to be sorted, I did this to give you a better view.

We'll see that we were quite good in our guessing of the PageRank in the beginning.

I think this is all, if you are interested in testing / running this- feel free to do so.
This class and test data is located in my Summer of Code repository under:
The classes name is de.jungblut.hama.bsp.PageRank.
Just execute the main method, it will run a local multithreaded BSP on your machine.

Star this project and vote for my GSoC task. :)

Thank you.

Apr 13, 2011

HTC Sensation


my first not code-related blog post :D
I've read that HTC launches the new Sensation (codename pyramid) here in Germany in Mai. I am very excited about this phone, I have to replace my Iphone 3GS because it sucks :D

I believe that I can hold it in my hands in June (theres my birthday!) and I will code for android. Like android apps that can help people with their daily problems. Actually I have a developer account so I could've purchased a dev-phone. But they are not cheap- 500$ for a Nexus?:D Not really.

Starting in June I'll provide some tutorials on Android and Multithreading. Multithreading is actually my love and as you already heard: HTC Sensation has a dual core CPU!
Let's get the most out of android ;)

Apr 12, 2011

Apache Hama Partitioning

Hello there!

I was a bit busy today after I've read the Google Pregel paper. It described quite good "how to think like a vertex" and how they are partition data across their cluster.
I wanted to share this new knowledge with you and I've shortly rewritten my Graph Exploration algorithm. It can be found as the post below or just following this link.

Just a short summary of what we are going to change:
  • Partition the vertices across the servers.
  • Messages that are related to a specific vertex will just go to the server the vertex was partitioned to.
  • The master task just keeps track of when the task is finished (no updates were made).
  • Each task has its own output.
I really felt dumb today after I read the Pregel paper. They are just partitioning the data with a hashfunction. And I was like running several MapReduce jobs to determine the optimal distribution of an adjacency list. So scratch that, we just use a simple HashFunction that is none other than returning the id of the vertex. If you are having for example Vertex IDs' from 0-100 and you have to distribute it over 5 machines, just loop to 100 and modulo it with the 5 machines. Life can be so simple. sic!

How to distribute?
First off we share the common FileSystem across the servers. The idea is that every server get's its own SequenceFile where the data is stored in. Let's rush into the code that parses my AdjacencyList textfile into the SequenceFiles for the servers:

public static List<Path> partitionAdjacencyTextFile(int sizeOfCluster,
   Map<String, String> groomNames, Path fileToPartition,
   Configuration conf) throws IOException {
  // setup the paths where the grooms can find their input
  List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
  List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
  FileSystem fs = FileSystem.get(conf);
  for (Entry<String, String> entry : groomNames.entrySet()) {
   partPaths.add(new Path(fileToPartition.getParent().toString()
     + Path.SEPARATOR + "parted" + Path.SEPARATOR
     + entry.getValue()));
  // create a seq writer for that
  for (Path p : partPaths) {
   fs.delete(p, true);
   writers.add(SequenceFile.createWriter(fs, conf, p,
     IntWritable.class, IntWritable.class));

  // parse our file
  FSDataInputStream data =;
  BufferedReader br = new BufferedReader(new InputStreamReader(data));
  String line = null;
  while ((line = br.readLine()) != null) {
   String[] arr = line.split("\t");
   int vId = Integer.parseInt(arr[0]);
   LinkedList<Integer> list = new LinkedList<Integer>();
   for (int i = 0; i < arr.length; i++) {

   int mod = vId % sizeOfCluster;
   System.out.println(vId + " goes to " + partPaths.get(mod));
   for (int value : list) {
    writers.get(mod).append(new IntWritable(vId),
      new IntWritable(value));


  for (SequenceFile.Writer w : writers)

  return partPaths;

Basically we are creating a SequenceFile for each Groom and writing the data with the modulo function into the SequenceFiles. Note that the names of the SequenceFiles are related to the name of the peer. That is because we can simply let each peer find its partition.

How to pass messages between vertices using partitioning?
If we know what kind of partition we used, this is very simple. Look at the layered send method.

private void send(BSPPeerProtocol peer, BSPMessage msg) throws IOException {
  int mod = ((Integer) msg.getTag()) % peer.getAllPeerNames().length;
  peer.send(peer.getAllPeerNames()[mod], msg);

The only requirement is, that the indices of peer.getAllPeerNames() are the same we used in the partitioning phase, otherwise it will result in strange behaviour.

With the help of these two methods we are now able to make the main algorithm use less master-slave communication and therefore use a more collective communication.
But keep in mind that we have to use the master-slave communication to keep track of the main loop. The problem behind it is that if a slave breaks its calculation because there can't be more updates made in a superstep, the other peers will deadlock in the next sync phase because one slave already has finished.
So we have to sync the updates with a master task and broadcast whether we can break the main loop altogether (if no task can update anymore) or we need another superstep.

This is actually a bit hacky. In the Pregel paper it is called "voteToHalt". Have a look at my implementation of the same:
private boolean voteForHalt(BSPPeerProtocol peer, String master)
   throws IOException, KeeperException, InterruptedException {
  peer.send(master, new IntegerMessage("", activeList.size()));
  if (peer.getPeerName().equals(master)) {
   boolean halt = true;
   IntegerMessage message;
   while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
    message = (IntegerMessage) peer.getCurrentMessage();
    if (message.getData() != 0) {
     halt = false;
   for (String name : peer.getAllPeerNames()) {
    peer.send(name, new BooleanMessage("", halt));

  BooleanMessage message = (BooleanMessage) peer.getCurrentMessage();
  if (message.getData() == true) {
   return false;
  } else {
   return true;

A bit more human readable it says: Every task sends how many vertices are stored in the activeList and a master decides whether to break or not. This decision is broadcasted again and a false return value will break the outer running loop.

Keep in mind that this is not very very very optimal.

Now we have multiple files in HDFS that can now be read or merged to get the classified components of the graph.

This snippet is quite generic if you want to use it in your application: feel free to copy and paste ;)

private static void printOutput(FileSystem fs, Configuration conf)
   throws IOException {
  System.out.println("-------------------- RESULTS --------------------");
  FileStatus[] stati = fs.listStatus(new Path(conf.get("out.path")
    + Path.SEPARATOR + "temp"));
  for (FileStatus status : stati) {
   Path path = status.getPath();
   SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
   IntWritable key = new IntWritable();
   IntWritable value = new IntWritable();
   while (, value)) {
    System.out.println(key.get() + " | " + value.get());

As you can see, it just fetches the parent directory of the outputs' and loops over the files while writing the content to STDOUT.
Gladly this helped a guy on Stackoverflow today. And helped me to get my 500 reputation :)

Finally it gaves the same output as the other algorithms:
-------------------- RESULTS --------------------
0 | 0
1 | 1
2 | 2
3 | 2
4 | 1
5 | 2
6 | 2
7 | 1
8 | 2
9 | 0

The new class is called: de.jungblut.hama.graph.BSPGraphExplorationPartitioned.
Just execute the main method, it will run a local multithreaded BSP on your machine.
This class is located in my Summer of Code repository under:

Star this project or vote for my GSoC task.

The next week I'll go for a comparision between these three implementations: partitioned-BSP, BSP and MapReduce.
But I guess before it I focus at PageRank. Some guys of Apache Nutch wanted a distributed PageRank in BSP, I'll go for a working example next time.
The Pregel paper said something about 15 lines of code. Looks like a short task ;)


If you are interested in a more faster way to partition, check out the new blog post here:


Apr 10, 2011

Graph Exploration with Apache Hama

Hey guys,
I've been busy for a tiny bit of time, but I finished the graph exploration algorithm with Apache Hama recently. This post is about the BSP portation of this post.
So I already told in this post how BSP basically works. Now I'm going to tell you what you can do with it in terms of graph exploration. Last post I did this with MapReduce, so let's go and get into Hama!

As of today this is merely outdated, since Hama 0.5.0, you can use the Pregel-like Graph API and run the connected components example shipped. This will do everything this code below did a year ago.
I just keep that as a part of history, some of the statements still hold true, some parts needs to be updated though.

BSP Class
Like coding a mapper you'll need to inherit from a baseclass. In Hama this is the BSP class, it is abstract and implements a interface called BSPInterface. BSPInterface has a method that you are going to implement for yourself:
public void bsp(BSPPeerProtocol bspPeer)

What is BSPPeerProtocol and how is it getting executed?
The BSPPeerProtocol is nothing else than a reference to your local running Groom server. A Groom is similar to Hadoop's tasktracker. This is also a hint on how your own BSP get's executed. Hama will launch several tasks inside a Groom and create a new instance of your BSP class per task.

In BSP it is necessary to synchronize the Grooms' in order to introduce the messaging phase described here. You can simply call the sync() method from your BSPPeerProtocol reference.

Letting code be executed on only one Server / Groom / Task
This is actually pretty easy, you could take a look at the PiEstimator example.

Okay I guess this is enough, let's step into the code. The thoughts are the same like in the MapReduce example:
  1. Initialize the datasets, we have several maps' storing active vertices, the adjacency list and a map which holds the minimal vertices for each vertex.
  2. In the first iteration, send a message for each adjacent of a vertex containing the id of the adjacent vertex and it's currently minimal vertex to a master task. If we are not in the first iteration we are going to loop through the active vertices and their adjacent vertices, broadcasting to every adjacent vertex what the new minimal id is.
  3. Sync so that every task receives the messages.
  4. A master task is fetching the results and updating the minimal vertex ids. If a vertex has been updated, we put its id into the "activeVertexMap". Sending the active vertices to all existing peers.
  5. Sync again: if we received nothing, just break the main loop resulting in exiting the bsp. If we receive new active vertices, increment the iteration and continue with point 2. 
Input / Output System
Maybe you know that Hama has currently no real Input and Output system. So you have to take care for yourself: Manage the data the grooms need for their computation, partitioning and updating.
I hope hat this issue will be resolved soon, so that this whole management will be inside Hama and is not blowing up your BSP class.
For this example we need two major files: The actual adjacency list and the map that keeps track of the current minimas. If you are a wondering why, the last time we saved this up into a vertex itself. This is true, but I don't want to add another model for a vertex. This is just a personal reason, so feel free to fork and build your own ;)
Both files are in HDFS, a far more scalable solution would be to store these into a HBase table. But since Hama doesn't require HBase anymore, I'll go for a FileSystem-way of storing data.

Source Code
If you want to see the full implementation, check it out at

Let's start with the initialization phase in the bsp method:

String master = conf.get(MASTER_TASK);
  fs = FileSystem.get(getConf());

  // setup the datasets
  adjacencyList = mapAdjacencyList(getConf());
  // setup the local minimas
  if (peer.getPeerName().equals(master)) {
   // init the minimum map
   for (Entry<Integer, List<Integer>> row : adjacencyList.entrySet()) {
    int localAdjacentMinimum = row.getValue().get(0);
    for (int adjacent : row.getValue()) {
     if (adjacent < localAdjacentMinimum)
      localAdjacentMinimum = adjacent;
    minimalMap.put(row.getKey(), localAdjacentMinimum);
   // save our minimal map to HDFS so that every task can read this

As you can see, we are getting from our configuration which groom is currently the master server, aftwards we are initializing the FileSystem and map our adjacency list file into RAM. After that follows code that is only executed by the master.
It simply loops through the list and setups the currently minimum adjacent vertex.
Only the master has write access to the minimalmap file and updates it after each iteration.
That's it. Let's step to the main loop.

// real start
  boolean updated = true;
  int iteration = 0;
  while (updated) {
   // sync so we can receive the new active messages
   List<Integer> activeQueue = new LinkedList<Integer>();
   if (peer.getNumCurrentMessages() > 0) {
    IntegerMessage message = (IntegerMessage) peer
    if (message.getTag().equals("size") && message.getData() == 0)
    BSPMessage msg = null;
    while ((msg = peer.getCurrentMessage()) != null) {
     message = (IntegerMessage) msg;
   // apply updates on the minimal map

First off we are syncing in this loop, in the first iteration it is obvious that nobody would receive a message, but you can also use the sync to keep the grooms at the same line of code. Maybe you already seen this: a server is ahead in computation and the master hadn't finished writing the map into the HDFS. This groom is no longer consistent to the rest of the cluster.
So we are going to prevent this using sync, in the following iterations this is used to receive the active vertices.
If the list of active vertices is empty we are going to break this while loop- the algorithm is done. Otherwise we are updating the activeQueue with the vertex ids we got. Then we are applying the changes the master could have done to the minimal map (method applyMinimalMap()).

Let's advance to the main algorithm.
// main algorithm
   if (iteration == 0) {
    for (Entry<Integer, List<Integer>> row : adjacencyList
      .entrySet()) {
     int min = minimalMap.get(row.getKey());
     for (int adjacent : row.getValue()) {
      peer.send(master, new FullIntegerMessage(adjacent, min));
   } else {
    for (int active : activeQueue) {
     int min = minimalMap.get(active);
     for (int l : adjacencyList.get(active)) {
      if (l != min)
       peer.send(master, new FullIntegerMessage(l, min));


I guess this is pretty good described in the listing, if we are in the first iteration we are going to send messages to every adjacent of a vertex in the adjacency list. In the following iterations we are just going to loop over the active vertices and sending messages of the new minimum to every adjacent except for the vertex itself.
> Sync step for sending and receiving messages
Don't worry, now comes the last part ;)

// only the master keeps track of the minimal
   if (peer.getPeerName().equals(master)) {
    FullIntegerMessage msg = null;
    List<Integer> activeList = new LinkedList<Integer>();
    while ((msg = (FullIntegerMessage) peer.getCurrentMessage()) != null) {
     if (minimalMap.get(msg.getTag()) > msg.getData()) {
      minimalMap.put(msg.getTag(), msg.getData());
      // flag as active
    // save to hdfs for next iteration
    // send messages to all peers containing the size of the
    // activelist and the content
    for (String peerName : peer.getAllPeerNames()) {
       new IntegerMessage("size", activeList.size()));
     for (int active : activeList)
      peer.send(peerName, new IntegerMessage("", active));
    // increment the iteration

This part is only executed by a master groom. We receiving every message and updating the minimalmap. If we updated a vertex we are going to put them into the list of active vertices. Afterwards we are saving our minimal map so the grooms have a fresh state of minimum in their RAM.
Then we are going to send the size of this list along with it's content. This is necessary for the breaking condition. And don't forget to increment the iteration variable.

That's it. It is the same algorithm we used in MapReduce- translated to BSP.
Wasn't that difficult, was it?

If you want to take a close look at how this works, I already posted the project site of my GSoC project above. Feel free to check it out and play a little. The class we were talking about can be found here: de.jungblut.hama.graph.BSPGraphExploration
It comes along with the latest Hama build out of the trunk, it also has a local bsp runner that will multithread grooms on your local machine. Just run the main method as a java application and you'll see. So be aware when you are running this on your Hama Cluster, there could be some problems with the compatibilty to version 0.2.