Showing posts with label mapreduce. Show all posts
Showing posts with label mapreduce. Show all posts

May 6, 2012

Distributed DBSCAN (Intuition)

Hey all,

it has been quite a long time since my last blog post. Thanks to my work, to keep me busy all day and don't let me research on cool new things. However, over the few holidays and weekends over the last weeks I came across a very interesting algorithm called DBSCAN.
It is abbreviated for "density-based spatial clustering of applications with noise", it is a unsupervised clustering algorithm just like k-means, besides that it is much smarter in many aspects.
Another objective I'd like to solve is the parallelization of this algorithm. I've seen just some ancient papers and what buffels me is that I've seen no implementation in Mahout (for MapReduce) or other distributed frameworks.

As you may know, I'm working for Apache Hama. It is a framework for distributed computing with the BSP (bulk synchronous parallel) model. I always searching for new algorithms that could fit into the model of BSP computing, e.G. graph algorithms of all sorts, strongly iterative algorithms, real-time algorithms.
And I think that DBSCAN also fits into the BSP model, I tell you why a bit later in this post.
First off, just a little introduction of the DBSCAN algorithm itself...

The algorithm

The algorithm is very easy to understand. Actually you have a bunch of points (or vectors in higher dimensionalities) as input, then you have to parameters and some fancy output.
The two parameters are called "epsilon" and "minpoints", epsilon is the minimum distance between two vectors to connect two points strongly and minpoints is the number of points that are at least needed to build a cluster out of strongly connected vectors.
Now you are going through the graph, point by point, marking visited vectors and adding points to a cluster while they are not violating the rules defined by epsilon and minpoints.

You can read on wikipedia about how the sequential version works in detail, however I am going to propose a much more easier to understand version of the algorithm.

Distributed algorithm steps

Instead of defining a big distributed algorithm that translates the sequential version into some distributed programming model, I have assembled three main steps to get the same result as the sequential version.
However each of these steps are strongly parallelizable in every major programming model (at least I know how it works in MapReduce, BSP and MPI).

Here are the three steps:
  1. compute a distance matrix between the vectors with a given distance measurement
    1. trivial step to parallelize, can also be merged with the next point
  2. extract adjacent points via the epsilon threshold and the minpoints restriction
    1. This step creates an adjacency list/matrix representing a graph
    2. Noise is filtered at this step of the algorithm
  3. run a connected component algorithm on the resulting graph of the previous step
    1. Already done that in MapReduce and BSP, the last BSP version will be updated shortly after Apache Hama 0.5.0 comes out.
These three simple steps will give you the same result as a DBSCAN. Normally you can merge step 1 with step two, you can simply extract the adjacents points while computing the distances. 
In the end, you will receive n-connected components, every of them will represent a cluster.
The delta to the points of your original input would be the noise cluster.

Note that the initial step is O(n²) which is obviously pretty bad and not scalable. So think about techniques like Similarity Hashing to speed this step up.

Pretty easy right? I think it is even more easier than the pseudocode on wikipedia.

Of course I put up a sample version (although sequential) on my github:

There is a nice plot I received when running it:


To make the noise more easy to spot, I have made horrible yellow circles arround them with Paint, please forgive me ;)

Stay tuned for an implementation with Apache Hama!

Update:

So far I haven't found the time to implement this whole system with Apache Hama. However, if you want to practically use this here are some advices:


  • For the distance matrix to compute, better use a heuristic to find close vectors
    • Mahout has a MinHashing implementation of such a clustering
  • Once you obtained "mini" clusters, you can compute more expensive distance measurements and extract your graph (step two in the above list)

Jun 25, 2011

Set-intersection and -difference with Hadoop

Hi there,

first off a short message:
I am currently a bit busy so I need a bit of time for the BSP K-Means-Clustering I promised in one of the previous posts. I am going to contribute this as an example for Apache Hama and not for my GSoC trunk (where to you can find a link now on the right below the twitter box!). Although I am going to setup a github to store my new code things. I'll link this on the sidebar, too.

The real beginning

So I came across a question on stackoverflow some days ago. It was about computing the set-intersection and -difference with Hadoop's MapReduce. I simply wanted to share my solution with you, this is the main reason of this blog post.

The question was about only two files, we are going to scale this up to "n"-files since this is why we need hadoop ;)

The input

Let's assume (like in the question) that we have a file that is looking like this:

File 1 contains following lines:
a
b
c
File 2 contains following lines:
a
d

Actually it is pretty simple, we have to make sure that these lines are stored in different text files.

The Mapper

The main focus and trick in the mapper is how to get the name of the file the task is currently working on.
Let's have a look at my mapper implementation, called FileMapper:

public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {
  static Text fileName;

  @Override
  protected void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
       context.write(value, fileName);
  }

  @Override
  protected void setup(Context context) throws IOException,
          InterruptedException {
       String name = ((FileSplit) context.getInputSplit()).getPath().getName();
       fileName = new Text(name);
       context.write(new Text("a"), fileName);
  }
}

The trick is that when we are importing textfiles, the input split is of the type filesplit which let's you get the path and the name of the file.

The main problem is here to know how many files you've got at all. There can be a nifty hack to just emit the first line of the mapper output with a key that is guranteed to be the first input in our reducer like a plain "a". This is the last line in the setup method.

Now we are going to emit each text line as the key and the filename as the value.
Then we get a bunch of key / values that look like this, key and value are seperated by space and assuming the files names are File1 and File2:

    a File1 // nifty haxx
    b File1
    c File1
    d File1

    a File2 // nifty haxx
    d File2
    e File2

Obviously reducing them will get you an input like this:
    a File1,File2 // our nifty hack :))
    b File1
    c File1
    d File1,File2
    e File2

Once again pretty straightforward. Now we can see a clear structure and know what we have to do in our reducer.

The Reducer

Now let's have a look at our reducer:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    private final HashSet<String> fileNameSet = new HashSet<String>();
    
    enum Counter {
        LINES_IN_COMMON
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // add for our first key every file to our set
        // make sure that this action is the first of the entire reduce step
        if(key.toString.equals("a")){
            for (Text t : values) {
                fileNameSet.add(t.toString());
            }
        } else {
            // now add evey incoming value to a temp set
            HashSet<String> set = new HashSet<String>();
            for (Text t : values) {
                set.add(t.toString());
            }
            
            // perform checks
            if(set.size() == fileNameSet.size()){
                // we know that this must be an intersection of all files
                context.getCounter(Counter.LINES_IN_COMMON).increment(1);
            } else {
                // do anything what you want with the difference
            }
 
        }
    }
}

As you can see we are just using our "hack" to build a set of files we had in our input. And now we are just checking if we have full intersection over all files and incrementing a counter on that.

What you're doing with the set difference is left up to you, maybe you want to ignore them and do something with the intersecting lines.
Have a try with some files on project gutenberg! I would be pretty interested how many lines are equal in different books.

Here in germany were some discussions about plagiarism, maybe this can be helpful to find intersection of many books / papers very fast.

I've just wanted to point out a possible solution of how to deal with such a problem in mapreduce.

Thanks for your attention and support :)

May 25, 2011

k-Means Clustering with MapReduce

Hi all,

just finished the MapReduce side implementation of k-Means clustering. Notice that this is a series that contains this post and a follow-up one which implements the same algorithm using BSP and Apache Hama.

Note that this is just an example to explain you k-means clustering and how it can be easily solved and implemented with MapReduce.
If you want to use a more generic version of k-means, you should head over to Apache Mahout. Mahout provides k-means clustering and other fancy things on top of Hadoop MapReduce. This code is also not thought for production usage, you can cluster quite small datasets from 300m to 10g very well with it, for lager sets please take the Mahout implementation.

The clustering itself

We need some vectors (which dimension doesn't matter, hopefully they have all the same dimension). These vectors representing our data, and then we need k-centers. These centers are vectors too, sometimes they are just a subset of the input vectors, but sometimes they are random points or points-of-interest to which we are going to cluster them.

Since this is a MapReduce version I tell you what keys and values we are using. This is really simple, because we are just using a vector, a vector can be a clustercenter as well. So we treat our clustercenter-vectors always like keys, and the input vectors are simple values.

The clustering itself works like this then:
  • In the map step
    • Read the cluster centers into memory from a sequencefile
    • Iterate over each cluster center for each input key/value pair. 
    • Measure the distances and save the nearest center which has the lowest distance to the vector
    • Write the clustercenter with its vector to the filesystem.
  • In the reduce step (we get associated vectors for each center)
    • Iterate over each value vector and calculate the average vector. (Sum each vector and devide each part by the number of vectors we received).
    • This is the new center, save it into a SequenceFile.
    • Check the convergence between the clustercenter that is stored in the key object and the new center.
      • If it they are not equal, increment an update counter
  • Run this whole thing until nothing was updated anymore.
Pretty easy isn't it?:D


Model

Let's have a look at the involved models:

Vector class:

public class Vector implements WritableComparable<Vector> {

 private double[] vector;

 public Vector() {
  super();
 }

 public Vector(Vector v) {
  super();
  int l = v.vector.length;
  this.vector = new double[l];
  System.arraycopy(v.vector, 0, this.vector, 0, l);
 }

 public Vector(double x, double y) {
  super();
  this.vector = new double[] { x, y };
 }

 @Override
 public void write(DataOutput out) throws IOException {
  out.writeInt(vector.length);
  for (int i = 0; i < vector.length; i++)
   out.writeDouble(vector[i]);
 }

 @Override
 public void readFields(DataInput in) throws IOException {
  int size = in.readInt();
  vector = new double[size];
  for (int i = 0; i < size; i++)
   vector[i] = in.readDouble();
 }

 @Override
 public int compareTo(Vector o) {

  boolean equals = true;
  for (int i = 0; i < vector.length; i++) {
     int c = vector[i] - o.vector[i];
     if (c != 0.0d) {
     return c;
  }
  return 0;
 }
   // get and set omitted

}

You see everything is pretty standard. The compareTo method is just checking equality, just because we don't need an inner ordering- but we want the same keys to get in the same chunk. Be aware that we are returning 1 if they are not equal. Hadoop's quicksort is only swapping the element if it is greater than the other one. <- This is a great tip ;)

If you are not sure aware about this hack, please reimplement this correctly.

The cluster center is basically just an "has-a-vector" class that just delegates the read/write/compareTo method to the vector. It is just devided so we can exactly differ between a center and a vector, altough it is the same.

The distance measurement

I've spoken in the algorithm-description about a distance measuring. But I left this open. Let's declare some things:

We need a measurement of a distance between two vectors, especially between a center and a vector.
I've came up with the manhattan distance because it doesn't require much computation overhead like square-rooting (Euclidian distance) and it is not too complex.
Let's have a look:

public static final double measureDistance(ClusterCenter center, Vector v) {
  double sum = 0;
  int length = v.getVector().length;
  for (int i = 0; i < length; i++) {
   sum += Math.abs(center.getCenter().getVector()[i]
     - v.getVector()[i]);
  }

  return sum;
 }

As you can see, just a sum of each part of the vectors difference. So easy!!! Let's head to the map implementation...

The Mapper

Let's assume that there is a list or a list-like sequencefile-iterating interface that is called centers. It contains ClusterCenter objects that represent the current centers. The DistanceMeasurer class contains the static method we defined in the last part.
// setup and cleanup stuffz omitted
@Override
 protected void map(ClusterCenter key, Vector value, Context context)
   throws IOException, InterruptedException {

  ClusterCenter nearest = null;
  double nearestDistance = Double.MAX_VALUE;
  for (ClusterCenter c : centers) {
   double dist = DistanceMeasurer.measureDistance(c, value);
   if (nearest == null) {
    nearest = c;
    nearestDistance = dist;
   } else {
    if (nearestDistance > dist) {
     nearest = c;
     nearestDistance = dist;
    }
   }
  }
  context.write(nearest, value);
 }

Like told in the introduction, it's just a looping and a measuring. Always keeping a reference to the nearest center. Afterwards we are writing it out.  

The Reducer

Once again let's have a list or a list-like sequencefile-iterating interface that is called centers. Here we need it for storage reasons.
// setup and cleanup stuffz omitted once again
@Override
 protected void reduce(ClusterCenter key, Iterable<Vector> values,
   Context context) throws IOException, InterruptedException {

  Vector newCenter = new Vector();
  List<Vector> vectorList = new LinkedList<Vector>();
  int vectorSize = key.getCenter().getVector().length;
  newCenter.setVector(new double[vectorSize]);
  for (Vector value : values) {
   vectorList.add(new Vector(value));
   for (int i = 0; i < value.getVector().length; i++) {
    newCenter.getVector()[i] += value.getVector()[i];
   }
  }

  for (int i = 0; i < newCenter.getVector().length; i++) {
   newCenter.getVector()[i] = newCenter.getVector()[i]
     / vectorList.size();
  }

  ClusterCenter center = new ClusterCenter(newCenter);
  centers.add(center);
  for (Vector vector : vectorList) {
   context.write(center, vector);
  }

  if (center.converged(key))
   context.getCounter(Counter.CONVERGED).increment(1);

 }

So sorry, but this got a bit more bulky than I initially thought it could be. Let me explain: The first loop only dumps the values in the iterable into a list and sums up each component of the vector in a newly created center. Then we are averaging it in another loop and we are writing the new center along with each vector we held in memory the whole time. Afterwards we are just checking if the vector has changed, this method is just a delegating to the underlying vectors compareTo. If the centers are not equal it returns true. And therefore it updates an counter. Actually the name of the counter is misleading, it should be named "updated". If you are now asking how we are controlling the recursion part, head over here and look how it should work: Controlling Hadoop MapReduce recursion.


Example

I don't want anyone to leave without a working example ;) SO here is our 2-dimensional input: k-Centers:
(1,1);(5,5)
Input vectors:
Vector [vector=[16.0, 3.0]]
Vector [vector=[7.0, 6.0]]
Vector [vector=[6.0, 5.0]]
Vector [vector=[25.0, 1.0]]
Vector [vector=[1.0, 2.0]]
Vector [vector=[3.0, 3.0]]
Vector [vector=[2.0, 2.0]]
Vector [vector=[2.0, 3.0]]
Vector [vector=[-1.0, -23.0]]
Now the jobs getting scheduled over and over again and the output looks like this:
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[16.0, 3.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[7.0, 6.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[6.0, 5.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[25.0, 1.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[1.0, 2.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[3.0, 3.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[2.0, 2.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[2.0, 3.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[-1.0, -23.0]]


So we see that the two initial centers were moved to (1.4,-2.6) and to (13.5,3.75). Cool thing :D


Here is the code:
https://github.com/thomasjungblut/mapreduce-kmeans

The code is located in the de.jungblut.clustering.mapreduce package, if you click run on the KMeansClusteringJob the example data is getting loaded and you can step through the code if you are interested. If you want to run it on your cluster, I assume that you're using 2.2, if not, then you have to take care of the up/downgrade for yourself.

Note that if you are submitting this to a real cluster files like _logs or _SUCCESS may be in the directory of your job. This will break the outputter at the end of the Job.
Either remove the files or modify the method.
Also note that if you run this with a large file, the number of reducers should be set to 1, otherwise there will be file collisions (See the reducer's cleanup method). This can be done better, but I'll leave this to you ;)

Thank you very much.

May 21, 2011

Series: K-Means Clustering (MapReduce | BSP)

Hi all,

I was a bit busy last time so I hadn't that much time to blog.
Several days ago after PageRank I had an idea to implement k-means clustering with Apache Hama and BSP.
Now I've decided to first implement a MapReduce implementation of it, since this is very simple: Reading the centers in setup's method and calculate the distance from each vector to the centers in map phase. In the reduce phase we are going to calculate new cluster centers.

This is very very straightforward. So this will be a series about a MapReduce implementation and a better one with BSP.

'till then!

Greetzz

Apr 16, 2011

PageRank with Apache Hama

Hey,

some days ago I read in the Hama-dev mailing list about the Nutch project that want a PageRank implementation:

Hi all,
Anyone interested in PageRank and collaborating w/ Nutch project? :-)
Source

So I thought, that I can do this. I already implemented PageRank with MapReduce. Why don't we go for a BSP version?:D
This is basically what this blog post is about.

Let's make a few assumptions:
  • We have an adjacency list (web-graphs are sparse) of webpages and their unweighted edges
  • A working partitioning like here. (You must not implement it, but should know how it works)
  • We have read the Pregel Paper (or at least the summary)
  • Familiarity with PageRank
Web Graph Layout

This is the adjacency list. On the leftmost side is the vertexID of the webpage, followed by the outlinks that are seperated by tabs.

1 2 3
2
3 1 2 5
4 5 6
5 4 6
6 4
7 2 4

This will be pretty printed a graph like this:

I have colored them by the incoming links, the vertex with the most in-links is the brightest, the vertex with few or no in-links is more darker.
We will see that vertex 2 should get a high PageRank, 4, 5 and 6 should get a more or less equal rank and so on.

Short summary of the algorithm

I am now referring to the Google Pregel paper. At first we need a modelling class that will represent a vertex and holds its own tentative PageRank. In my case we are storing the tentative PageRank along with the id of a vertex in a HashMap.
In the first superstep we set the tentative PageRank to 1 / n. Where n is the number of vertices in the whole graph.
In each of the steps we are going to send for every vertex its PageRank, devided by the number of its outgoing edges, to all adjacent vertices in the graph.
So from the second step we are receiving messages with a tentative PageRank of a vertex that is adjacent. Now we are summing up these messages for each vertex "i" and using this formula:
P(i) = 0.15/NumVertices() + 0.85 * sum
This is the new tentative PageRank for a vertex "i".
I'm not sure whether NumVertices() returns the number of all vertices or just the number of adjacent vertices. I'll assume that this is the count of all vertices in the graph, this would then be a constant. Now adding the damping factor multiplying this by the sum of the received tentatives of the adjacent vertices.

We are looping these steps until convergence to a given error will be archived. This error is just a sum of absoluting the difference between the old tentative PageRank and the new one of each vertex.
Or we can break if we are reaching a iteration that is high enough.

We are storing the old PageRank as a copy of the current PageRank (simple HashMap).
The error will thus be a local variable that we are going to sync with a master task- he will average them and broadcasts it back to all the slaves.

Code

Let's look at the fields we need:

private static int MAX_ITERATIONS = 30;
 // alpha is 0.15/NumVertices()
 private static double ALPHA;
 private static int numOfVertices;
 private static double DAMPING_FACTOR = 0.85;
 // this is the error we want to archieve
 private static double EPSILON = 0.001;

        HashMap<Integer, List<Integer>> adjacencyList = new HashMap<Integer, List<Integer>>();
 // normally this is stored by a vertex, but I don't want to create a new
 // model for it
 HashMap<Integer, Double> tentativePagerank = new HashMap<Integer, Double>();
 // backup of the last pagerank to determine the error
 HashMap<Integer, Double> lastTentativePagerank = new HashMap<Integer, Double>();

Keep in mind that every task just has a subgraph of the graph. So these structures will hold just a chunk of PageRank.

Let's get into the init phase of the BSP:

@Override
 public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
   InterruptedException {
  fs = FileSystem.get(getConf());
  String master = conf.get(MASTER_TASK);
  // setup the datasets
  adjacencyList = mapAdjacencyList(getConf(), peer);
  // init the pageranks to 1/n where n is the number of all vertices
  for (int vertexId : adjacencyList.keySet())
   tentativePagerank
     .put(vertexId, Double.valueOf(1.0 / numOfVertices));

...

Like we said, we are reading the data chunk from HDFS and going to set the tentative pagerank to 1/n.

Main Loop

// while the error not converges against epsilon do the pagerank stuff
  double error = 1.0;
  int iteration = 0;
  // if MAX_ITERATIONS are set to 0, ignore the iterations and just go
  // with the error
  while ((MAX_ITERATIONS > 0 && iteration < MAX_ITERATIONS)
    || error >= EPSILON) {

   peer.sync();

   if (iteration >= 1) {
    // copy the old pagerank to the backup
    copyTentativePageRankToBackup();
    // sum up all incoming messages for a vertex
    HashMap<Integer, Double> sumMap = new HashMap<Integer, Double>();
    IntegerDoubleMessage msg = null;
    while ((msg = (IntegerDoubleMessage) peer.getCurrentMessage()) != null) {
     if (!sumMap.containsKey(msg.getTag())) {
      sumMap.put(msg.getTag(), msg.getData());
     } else {
      sumMap.put(msg.getTag(),
        msg.getData() + sumMap.get(msg.getTag()));
     }
    }
    // pregel formula:
    // ALPHA = 0.15 / NumVertices()
    // P(i) = ALPHA + 0.85 * sum
    for (Entry<Integer, Double> entry : sumMap.entrySet()) {
     tentativePagerank.put(entry.getKey(),
       ALPHA + (entry.getValue() * DAMPING_FACTOR));
    }

    // determine the error and send this to the master
    double err = determineError();
    error = broadcastError(peer, master, err);
   }
   // in every step send the tentative pagerank of a vertex to its
   // adjacent vertices
   for (int vertexId : adjacencyList.keySet())
    sendMessageToNeighbors(peer, vertexId);

   iteration++;
  }

I guess this is self explaining. The function broadcastError() will send the determined error to a master task, he will average all incoming errors and broadcasts this back to the slaves (similar to aggregators in the Pregel paper).
Let's take a quick look at the determineError() function:

private double determineError() {
  double error = 0.0;
  for (Entry<Integer, Double> entry : tentativePagerank.entrySet()) {
   error += Math.abs(lastTentativePagerank.get(entry.getKey())
     - entry.getValue());
  }
  return error;
 }

Like I described in the summary we are just summing up the errors that is a difference between the old and the new rank.

Output

Finally we are able to run this and receive a fraction between 0 and 1 that will represent the PageRank of each site.
I was running this with a convergence error of 0.000001 and a damping factor of 0.85. This took about 17 iterations.

------------------- RESULTS --------------------
2 | 0.33983048615390526
4 | 0.21342628110369394
6 | 0.20495452025114747
5 | 0.1268811487940641
3 | 0.0425036157080356
1 | 0.0425036157080356
7 | 0.02990033228111791

This will result in about 1.0 in the sum of all ranks, which is correct.
Note that the output if you are running this job is not guaranteed to be sorted, I did this to give you a better view.

We'll see that we were quite good in our guessing of the PageRank in the beginning.

I think this is all, if you are interested in testing / running this- feel free to do so.
This class and test data is located in my Summer of Code repository under: http://code.google.com/p/hama-shortest-paths/
The classes name is de.jungblut.hama.bsp.PageRank.
Just execute the main method, it will run a local multithreaded BSP on your machine.

Star this project and vote for my GSoC task. :)

Thank you.

Apr 8, 2011

Graph Exploration with Apache Hadoop and MapReduce

Hi all,
sometimes you will have data where you don't know how elements of these data are connected. This is a common usecase for graphs, this is because they are really abstract.
So if you don't know how your data is looking like, or if you know how it looks like and you just want to determine various graph components, this post is a good chance for you to get the "MapReduce-way" of graph exploration. As already mentioned in my previous post, I ranted about message passing through DFS and how much overhead it is in comparison to BSP.
Just let this be a competition of Apache Hama BSP and Apache Hadoop MapReduce. Both sharing the HDFS as a distributed FileSystem.
Looking at the title you know that this post is about the MapReduce implementation, I write a BSP implementation later and compare this with this MapReduce implementation.
Let's introduce the prequisites now.

Prequisites
We have a graph in a format of an adjacency list looking like this:
0
1    4    7
2    3    8
3    5
4    1
5    6
6
7    
8    3
9    0

So the first entry on the left side is always the vertex, therefore all vertices are listed on the leftmost side. Each vertex is described by a number: its id.
Separated by tabs are the vertex ids that are adjacent to the vertex on the leftmost side.

This is quite abstract so let's take a look at this pretty picture:
graph with multiple components
This is how this graph looks like. As you can see there are three components: [1,4,7];[2,3,5,6,8];[0,9].
In some datasets you want to classify each component to a common key that is unique. In this case it is the most common solution to just let a component be classified by its lowest id. E.G the component [1,4,7] has the lowest id 1. It is the classifier for this component.

How do we deal with this in MapReduce?
First of all I recommend you to read into this paper. It describes a technique named "message passing".
Simple: The idea behind this is, that you let the vertices pass messages if a new local minima has been found. Afterwards you are just merging the messages with the real vertices and apply updates on the vertices that had a higher minimum.

So our first task is to write the value class that is representing a vertex AND a message at the same time.
public class VertexWritable implements Writable, Cloneable {

 LongWritable minimalVertexId;
 TreeSet<LongWritable> pointsTo;
 boolean activated;
        
public boolean isMessage() {
  if (pointsTo == null)
   return true;
  else
   return false;
 }

}

And the typical read/write stuff coming with Writable.
So let me explain to you, we have this class representing the Vertex: it has a pointsTo tree that will maintain the adjacent vertex ids and the currently minimalVertexId. And there is also a boolean field that is called "activated".
There is also a method that determines whether this is representing a message or a vertex.

The whole thing is just working like this:
  1. Import the vertices from the adjacency list to the ID mapped to Vertex form.
  2. In the first iteration flag every vertex as activated and write it again.
  3. If a vertex is activated, loop through the pointsTo tree and write a message with the (for this vertex) minimal vertex to every element of the tree.
  4. Merge messages with the related vertex and if we found a new minimum activate the vertex. If nothing was updated then deactivate it.
And then repeat from point 3 until no vertex can be updated anymore.
Part 1 and 3 are inside the Map Task, part 2 and 4 are reduce tasks.
Look here how you can implement a job recursion using Apache Hadoop.

So after all iteration is done you'll have the following output:
0 | VertexWritable [minimalVertexId=0, pointsTo=[0]]
 1 | VertexWritable [minimalVertexId=1, pointsTo=[1, 4, 7]]
 2 | VertexWritable [minimalVertexId=2, pointsTo=[2, 3, 8]]
 3 | VertexWritable [minimalVertexId=2, pointsTo=[3, 5]]
 4 | VertexWritable [minimalVertexId=1, pointsTo=[1, 4]]
 5 | VertexWritable [minimalVertexId=2, pointsTo=[5, 6]]
 6 | VertexWritable [minimalVertexId=2, pointsTo=[6]]
 7 | VertexWritable [minimalVertexId=1, pointsTo=[7]]
 8 | VertexWritable [minimalVertexId=2, pointsTo=[3, 8]]
 9 | VertexWritable [minimalVertexId=0, pointsTo=[0, 9]]

So you see that we always have every vertex on the left side, but now the minimalVertexId is the classifier for the component. And we have the three lowest component identifiers found: 0,1 and 2!

So this looks like that now:
classified graph with multiple components


If you are now interested in getting all vertices to a component identifier you'll be able to write a new mapper that will extract the minimalVertexId as a key and the pointsTo elements as a value. So that in the reduce step they'll be merged together and you can persist your data.

And if you are interested in more source code you are free to look into my Summer of Code project under: http://code.google.com/p/hama-shortest-paths/
You'll find a working algorithm inside of the package called "de.jungblut.mapreduce.graph". The main function to call is inside the class "DatasetImporter.java".
The example input used in this post is also in the trunk. So check this out and you are welcome to use it for your own problems ;)

So the next time I write a BSP that will do the same.

Back to Blogsphere and how BSP works

Hey guys,

I'm back! For those of you that do not remember me: I did some cool Robots for Google Wave.
Maybe you remember the times where no export function was in Google Wave, I targeted it with my bot called "PorteXy". It was a Java portation of the existing Google example called "Exporty". From the count in the AppEngine database table I know a lot of you used it and I wanted to thank you all for your support!
Now we all now that Google Wave passed by and gets an Apache Incubator project, I have to focus on other themes that could potentially be interesting for you. Since the last time, a lot has happened.
I am no pupil anymore, I started working at testberichte.de, recently got my OCP in Java and in the mean time I am studying Computer Sciences at HWR Berlin.
You must be thinking: "Oh he must be really busy" - in fact I am not. School kinda sucks, because it is too boring and I have to do something useful. So you've probably heard that Google is offering a new Summer of Code this year: GSoC2011. And I decided to apply to it, my first thought was that I am going to apply for the Apache Software Foundation.
Mainly because I worked a lot with Hadoop and Distributed Systems in general and loved to use the Apache2 server running my PHP applications or my Tomcat running my Java applications. But Hadoop wasn't a project eligible for GSoC (Why? I know it has a really large codebase, but there are so many cool task that can be done by students too!), so I had to look for another project. Then I've seen Mahout, I used Mahout for k-means clustering recently and looked at the task offering: Support more data import mechanisms. Are they serious? Just importing raw data into SequenceFiles? Seriously, you can't use 12(!) weeks for that task coding 40h/week. This would be a task for a weekend, if at all. So I was looking for something more cool and complex, but this should be in Java and somehow related to distributed systems.

So I came across Apache Hama and this task is smiling at me: Development of Shortest Path Finding Algorithm. BÄM! This was the task I've searched for, I love distributed Computing and I love Graphs.
You probably know that Hama uses BSP (Bulk synchronous parallel) for its computing.
This is actually an abstraction to MapReduce. Have a look at this picture on wikipedia:

BSP


If you translate MapReduce into BSP, then your map-phase will be the local computation-phase. After that you are going to merge the map output and sort it. That would be the communication phase. Now comes the Barrier Synchronisation: You know that no reducer can run if not all map task completed. So this step is a bit merged with the communication phase, but after that it is entering a new local computation phase: the reduce-phase.
So you see, BSP is not the same like MapReduce, but you can actually describe MapReduce with BSP.

What is the advantage of using BSP instead of MapReduce in Graph Processing:
Those of you who implemented graph algorithms with MapReduce probably know what a pain it is to pass messages through HDFS and process them in several chained jobs. I believe it was this paper here, what described very well how this works: http://www.umiacs.umd.edu/~jimmylin/publications/Lin_Schatz_MLG2010.pdf

Even if you don't know it how to implement them (maybe I'll go for a tutorial later on) believe me: this is not optimal!
Why should we pass messages through DFS? Well, just communicate the messages!
And that is the real advantage of BSP, you having less sync (in MapReduce you have 3 major sync steps in a job: the beginning, the reducer and the end. The end, because you can't run a follow-up job before the last one finishes, and therefore the beginning too) and a more "real-time" messaging than MapReduce.

BSP is actually used at Google too: http://googleresearch.blogspot.com/2009/06/large-scale-graph-computing-at-google.html, you can google for the Pregel paper for further reading.

In my opinion Apache Hama has a great perspective, because there is no other OpenSource alternative for BSP processing and the best thing of all: I'll be part of it.

Back to the shortest path finding. As you can see, I already started to implement a basic Dijkstra and made some thoughts on how this is going to work within a large cluster, mainly in terms of scalability.
If you are interested in shortest paths in large graphs you should definitely bookmark this issue or at least vote it up.

The application deadline of GSoC is in approx. 10 hours and I hope ASF will pick me :-)

This was a really large first posting and I want to thank especially Edward J. Yoon, he is my mentor at Apache Hama and told me that I should blog again. This is part of his "young open source stars"-support ;) Visit his blog or follow him on twitter, he is a real genius!

Obviously you can also follow me on twitter, look at the sidebar on the right side.

Greetings from Germany!