May 25, 2011

k-Means Clustering with MapReduce

Hi all,

just finished the MapReduce side implementation of k-Means clustering. Notice that this is a series that contains this post and a follow-up one which implements the same algorithm using BSP and Apache Hama.

Note that this is just an example to explain you k-means clustering and how it can be easily solved and implemented with MapReduce.
If you want to use a more generic version of k-means, you should head over to Apache Mahout. Mahout provides k-means clustering and other fancy things on top of Hadoop MapReduce. This code is also not thought for production usage, you can cluster quite small datasets from 300m to 10g very well with it, for lager sets please take the Mahout implementation.

The clustering itself

We need some vectors (which dimension doesn't matter, hopefully they have all the same dimension). These vectors representing our data, and then we need k-centers. These centers are vectors too, sometimes they are just a subset of the input vectors, but sometimes they are random points or points-of-interest to which we are going to cluster them.

Since this is a MapReduce version I tell you what keys and values we are using. This is really simple, because we are just using a vector, a vector can be a clustercenter as well. So we treat our clustercenter-vectors always like keys, and the input vectors are simple values.

The clustering itself works like this then:
  • In the map step
    • Read the cluster centers into memory from a sequencefile
    • Iterate over each cluster center for each input key/value pair. 
    • Measure the distances and save the nearest center which has the lowest distance to the vector
    • Write the clustercenter with its vector to the filesystem.
  • In the reduce step (we get associated vectors for each center)
    • Iterate over each value vector and calculate the average vector. (Sum each vector and devide each part by the number of vectors we received).
    • This is the new center, save it into a SequenceFile.
    • Check the convergence between the clustercenter that is stored in the key object and the new center.
      • If it they are not equal, increment an update counter
  • Run this whole thing until nothing was updated anymore.
Pretty easy isn't it?:D


Model

Let's have a look at the involved models:

Vector class:

public class Vector implements WritableComparable<Vector> {

 private double[] vector;

 public Vector() {
  super();
 }

 public Vector(Vector v) {
  super();
  int l = v.vector.length;
  this.vector = new double[l];
  System.arraycopy(v.vector, 0, this.vector, 0, l);
 }

 public Vector(double x, double y) {
  super();
  this.vector = new double[] { x, y };
 }

 @Override
 public void write(DataOutput out) throws IOException {
  out.writeInt(vector.length);
  for (int i = 0; i < vector.length; i++)
   out.writeDouble(vector[i]);
 }

 @Override
 public void readFields(DataInput in) throws IOException {
  int size = in.readInt();
  vector = new double[size];
  for (int i = 0; i < size; i++)
   vector[i] = in.readDouble();
 }

 @Override
 public int compareTo(Vector o) {

  boolean equals = true;
  for (int i = 0; i < vector.length; i++) {
     int c = vector[i] - o.vector[i];
     if (c != 0.0d) {
     return c;
  }
  return 0;
 }
   // get and set omitted

}

You see everything is pretty standard. The compareTo method is just checking equality, just because we don't need an inner ordering- but we want the same keys to get in the same chunk. Be aware that we are returning 1 if they are not equal. Hadoop's quicksort is only swapping the element if it is greater than the other one. <- This is a great tip ;)

If you are not sure aware about this hack, please reimplement this correctly.

The cluster center is basically just an "has-a-vector" class that just delegates the read/write/compareTo method to the vector. It is just devided so we can exactly differ between a center and a vector, altough it is the same.

The distance measurement

I've spoken in the algorithm-description about a distance measuring. But I left this open. Let's declare some things:

We need a measurement of a distance between two vectors, especially between a center and a vector.
I've came up with the manhattan distance because it doesn't require much computation overhead like square-rooting (Euclidian distance) and it is not too complex.
Let's have a look:

public static final double measureDistance(ClusterCenter center, Vector v) {
  double sum = 0;
  int length = v.getVector().length;
  for (int i = 0; i < length; i++) {
   sum += Math.abs(center.getCenter().getVector()[i]
     - v.getVector()[i]);
  }

  return sum;
 }

As you can see, just a sum of each part of the vectors difference. So easy!!! Let's head to the map implementation...

The Mapper

Let's assume that there is a list or a list-like sequencefile-iterating interface that is called centers. It contains ClusterCenter objects that represent the current centers. The DistanceMeasurer class contains the static method we defined in the last part.
// setup and cleanup stuffz omitted
@Override
 protected void map(ClusterCenter key, Vector value, Context context)
   throws IOException, InterruptedException {

  ClusterCenter nearest = null;
  double nearestDistance = Double.MAX_VALUE;
  for (ClusterCenter c : centers) {
   double dist = DistanceMeasurer.measureDistance(c, value);
   if (nearest == null) {
    nearest = c;
    nearestDistance = dist;
   } else {
    if (nearestDistance > dist) {
     nearest = c;
     nearestDistance = dist;
    }
   }
  }
  context.write(nearest, value);
 }

Like told in the introduction, it's just a looping and a measuring. Always keeping a reference to the nearest center. Afterwards we are writing it out.  

The Reducer

Once again let's have a list or a list-like sequencefile-iterating interface that is called centers. Here we need it for storage reasons.
// setup and cleanup stuffz omitted once again
@Override
 protected void reduce(ClusterCenter key, Iterable<Vector> values,
   Context context) throws IOException, InterruptedException {

  Vector newCenter = new Vector();
  List<Vector> vectorList = new LinkedList<Vector>();
  int vectorSize = key.getCenter().getVector().length;
  newCenter.setVector(new double[vectorSize]);
  for (Vector value : values) {
   vectorList.add(new Vector(value));
   for (int i = 0; i < value.getVector().length; i++) {
    newCenter.getVector()[i] += value.getVector()[i];
   }
  }

  for (int i = 0; i < newCenter.getVector().length; i++) {
   newCenter.getVector()[i] = newCenter.getVector()[i]
     / vectorList.size();
  }

  ClusterCenter center = new ClusterCenter(newCenter);
  centers.add(center);
  for (Vector vector : vectorList) {
   context.write(center, vector);
  }

  if (center.converged(key))
   context.getCounter(Counter.CONVERGED).increment(1);

 }

So sorry, but this got a bit more bulky than I initially thought it could be. Let me explain: The first loop only dumps the values in the iterable into a list and sums up each component of the vector in a newly created center. Then we are averaging it in another loop and we are writing the new center along with each vector we held in memory the whole time. Afterwards we are just checking if the vector has changed, this method is just a delegating to the underlying vectors compareTo. If the centers are not equal it returns true. And therefore it updates an counter. Actually the name of the counter is misleading, it should be named "updated". If you are now asking how we are controlling the recursion part, head over here and look how it should work: Controlling Hadoop MapReduce recursion.


Example

I don't want anyone to leave without a working example ;) SO here is our 2-dimensional input: k-Centers:
(1,1);(5,5)
Input vectors:
Vector [vector=[16.0, 3.0]]
Vector [vector=[7.0, 6.0]]
Vector [vector=[6.0, 5.0]]
Vector [vector=[25.0, 1.0]]
Vector [vector=[1.0, 2.0]]
Vector [vector=[3.0, 3.0]]
Vector [vector=[2.0, 2.0]]
Vector [vector=[2.0, 3.0]]
Vector [vector=[-1.0, -23.0]]
Now the jobs getting scheduled over and over again and the output looks like this:
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[16.0, 3.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[7.0, 6.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[6.0, 5.0]]
ClusterCenter [center=Vector [vector=[13.5, 3.75]]] / Vector [vector=[25.0, 1.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[1.0, 2.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[3.0, 3.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[2.0, 2.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[2.0, 3.0]]
ClusterCenter [center=Vector [vector=[1.4, -2.6]]] / Vector [vector=[-1.0, -23.0]]


So we see that the two initial centers were moved to (1.4,-2.6) and to (13.5,3.75). Cool thing :D


Here is the code:
https://github.com/thomasjungblut/mapreduce-kmeans

The code is located in the de.jungblut.clustering.mapreduce package, if you click run on the KMeansClusteringJob the example data is getting loaded and you can step through the code if you are interested. If you want to run it on your cluster, I assume that you're using 2.2, if not, then you have to take care of the up/downgrade for yourself.

Note that if you are submitting this to a real cluster files like _logs or _SUCCESS may be in the directory of your job. This will break the outputter at the end of the Job.
Either remove the files or modify the method.
Also note that if you run this with a large file, the number of reducers should be set to 1, otherwise there will be file collisions (See the reducer's cleanup method). This can be done better, but I'll leave this to you ;)

Thank you very much.

May 21, 2011

Series: K-Means Clustering (MapReduce | BSP)

Hi all,

I was a bit busy last time so I hadn't that much time to blog.
Several days ago after PageRank I had an idea to implement k-means clustering with Apache Hama and BSP.
Now I've decided to first implement a MapReduce implementation of it, since this is very simple: Reading the centers in setup's method and calculate the distance from each vector to the centers in map phase. In the reduce phase we are going to calculate new cluster centers.

This is very very straightforward. So this will be a series about a MapReduce implementation and a better one with BSP.

'till then!

Greetzz

May 7, 2011

Shortest Path Finding with Apache Hama

Hi guys,

I've finished my Google Summer of Code task. Really! Remember today is the 7th of may. And the actualy coding period goes until mid September.

Okay obviously I've just finished the task itself, developing a new example with BSP. Since Hama does not require HBase anymore I have decided to split the tasks.
One example (which I have submitted) is a straight single source shortest path implementation described in Google Pregel's paper.
The second one will be a HBase version using Dijkstra and its extend A*. The second won't be committed to the codebase of Hama, just because I don't want to add the old HBase dependency once again.

So in the end everyone won: I used HBase to get more familiar with BigTable, Hama has a shortest path example and I can code the whole summer long knowing that I've finished my task ;D

Okay 'nuff talked, let's dive into the algorithm!

Like in PageRank you should be familiar withthe idea behind the partitioning, read the Pregel paper and this time you should be familiar with (single source) shortest path finding.


Short summary of the algorithm

First off I just briefly describe how it should work, and then how I solved it.

  • Initialize all vertices' cost to reach it to INFINITY, just the start vertex will have cost 0
    • initially send the new cost to all adjacent vertex containing the new cost plus the edge weight between them
  • Reviewing messages: if the cost coming from a message is lower than the actual cost, update the cost and send a message to the adjacent vertices, containing the new cost plus the edge weight between them (similar to the last step)
  • Repeat the last step until no updates can be made anymore.
That is pretty much it.

How we do it!

First we need a model class that represents a shortest path vertex. It has a name/ID, a weight and a cost. The cost is the cost with the vertex can be reached from our starting vertex.
A vertex will have an ID, that is just the hashcode of the name. I wanted a common way to partition a vertex so I've just set this based on the name called it ID. Watch out, when adding e.G. cities with the same name.

I will skip the whole partitioning step, you can read the other posts to learn more about it, shortly described it is just a modulo function that will spread the vertices to different sequencefiles. These sequencefiles will get read during job initilization and mapped into memory.

So let's step into the code...

Fields we need

Because we store this time the cost and weights into a modelling vertex we just need a adjacency list and a lookup map.
This looks like this:

private Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
private Map<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();


Basically we are storing a vertex to its adjacents and the name to the vertex itself. I'll tell you later why we need a lookup map.

Initialization

In the init phase we need to map our adjacency list into ram, get our start vertex (just create it, we need it for equality check in the following loop which will just check the name)
The following loop will just init the costs and send it to the adjacents.

@Override
  public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
      InterruptedException {
    // map our input into ram
    mapAdjacencyList(conf, peer);
    // get the start vertex
    ShortestPathVertex start = new ShortestPathVertex(0,
        conf.get("shortest.paths.start.vertex.id"));
    // get our master groom
    String master = conf.get(MASTER_TASK);
    // init the vertices
    for (ShortestPathVertex v : adjacencyList.keySet()) {
      if (v.equals(start)) {
        v.setCost(0);
      } else {
        // INF
        v.setCost(Integer.MAX_VALUE);
      }
      // initial message bypass
      sendMessageToNeighbors(peer, v);
    }


The send method

The send method takes advantage of the partitioning, to get the target groom where the vertex is actually stored.
It will bascially send a message containing the name of the vertex it targets and the cost it can be reached through the vertex in the parameter line.

private void sendMessageToNeighbors(BSPPeerProtocol peer,
      ShortestPathVertex id) throws IOException {

    List outgoingEdges = adjacencyList.get(id);
    for (ShortestPathVertex adjacent : outgoingEdges) {
      int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length));
      peer.send(peer.getAllPeerNames()[mod],
          new IntegerMessage(adjacent.getName(),
              id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
                  + adjacent.getWeight()));
    }
  }


Main Loop

Very simple is the main loop, it is a while(true) loop that will break if no updates can be made anymore.
So we are just parsing incoming messages, comparing the cost with the current cost. If the new cost is lower, then update it, put it into a queue and increment a local update counter.

Now we need the lookup map, to get fast access to the actual cost in the vertex.

boolean updated = true;
    while (updated) {
      int updatesMade = 0;
      peer.sync();

      IntegerMessage msg = null;
      Deque<ShortestPathVertex> updatedQueue = new LinkedList<ShortestPathVertex>();
      while ((msg = (IntegerMessage) peer.getCurrentMessage()) != null) {
        ShortestPathVertex vertex = vertexLookupMap.get(msg.getTag());
        // check if we need an distance update
        if (vertex.getCost() > msg.getData()) {
          updatesMade++;
          updatedQueue.add(vertex);
          vertex.setCost(msg.getData());
        }
      }
      // synchonize with all grooms if there were updates
      updated = broadcastUpdatesMade(peer, master, updatesMade);
      // send updates to the adjacents of the updated vertices
      for (ShortestPathVertex vertex : updatedQueue) {
        sendMessageToNeighbors(peer, vertex);
      }
    }


Afterwards we are sending the updatecounter to a master groom that will evaluate and check if updates can be applied. I leave this method out, you can check out the pagerank error method. It is roughly the same.

If we have updates to apply, we just send them to the neighbor edges again.
Then we are just repeating until the master says: no updates can occur anymore.

Submit your own SequenceFile adjacency list

This is of course an example, so you can submit this to your own cluster and give it the input you like. I have designed the input like this:

The adjacencylist contains two text fields on each line. The key
component is the name of a vertex, the value is a ":" separated Text
field that contains the name of the adjacent vertex leftmost and the
weight on the rightmost side.
K         /                V 
Vertex[Text] / AdjacentVertex : Weight [Text]
So you can setup a sequencefile like this (obviously I won't write any binary code here :p ):
Berlin /  Paris : 25
Berlin / London : 40
London / Paris : 10
etc.

The basic usage of the command line arguments are:
<name of the start vertex> <optional: output path> <optional: path of your own sequencefile>

So you can run this with:
hama/bin/hama jar ../hama-0.x.0-examples.jar sssp Berlin /srv/tmp/ /home/user/myOwnGraph.seq

I've submitted this as a patch here: https://issues.apache.org/jira/browse/HAMA-359
So feel free to check it out, I hope it will get comitted soon. Never the less, it is contained also in my trunk on google code: http://code.google.com/p/hama-shortest-paths/
Class is called: de.jungblut.hama.bsp.ShortestPaths
Just run the main method ;)

Have fun with it! 

May 1, 2011

Google Summer of Code

Hey all,

maybe you've read it already on Twitter, I got picked for GSoC2k11 :D
Of course I'm very proud and want to thank everybody who helped me across the way. Especially my mentor: Edward J. Yoon.

If you don't know what my project actually was, have a look at Development of Shortest Path Finding Algorithm.

Basically I want to create a scalable solution, that takes a HUGE graph :) and and provides a fast solution of SSSP (single source shortest path) problems.
Like I already told in the Apache's Jira issue, I focus on a distributed Dijkstra algorithm that uses an adjacency list. So at first I'll focus on sparse graphs. The adjacency list is stored into a BigTable structure, in my case this will be HBase.

After I finished this, I will focus on a refinement of partitioning and if I have enough time I extend the Dijkstra with an A* heuristic.

These steps are basically my plan for this summer :)

Right now I started my first commit of Dijkstra's algorithm, it currently contains the old BSP version which I started with weeks ago to get familiar with Apache Hama. It is actually a working example, so feel free to check out the code and run its main method. This will start a local multithreaded BSP with some sample data of the german wikipedia page of Dijkstra's algorithm.
The class is located here: de.jungblut.hama.bsp.BSPDijkstra.
The repository can be found here: http://code.google.com/p/hama-shortest-paths/

Please star this project and the jira issue :D

I'll keep you updated on every change and thoughts I made. 
Thank you very much: Edward, Apache Software Foundation and Google!