The connected components example, rewritten using RHadoop/rmr
My new implementation of random mate for mapreduce, using the package rmr from Revolution Analytics open source project RHadoop.
This story has now three episodes. First, I got interested in how to compute connected components in map reduce in a way that works even for large diameter graphs and proposed an algorithm, in fact a port to map reduce of an old PRAM algorithm. Then I learned RHIPE, a map reduce package for R and implemented that algorithm using it. Then Revolution Analytics knocked at my door and asked me to work on a similar package and give my best stab at an API that's both easy and powerful. You can read a bit more about the ideas behind the package, but keep in mind that this is a young project and there is more work to do to turn those lofty goals into reality. You can help by just trying it out and providing feedback or by contributing to it. From documentation to unit tests, to minor features, to very significant ones, there's plenty to do.
Granted, clarity and simplicity are in the eye of the beholder and I am hardly unbiased, being the author of the code, the main committer to rmr and having Revolution as a client. But I invite you to compare this example with its previous incarnation using RHIPE, namely how the mapreduce jobs are described and invoked, how much less boilerplate code there is and how completely devoid of uncommon R constructs it is, replacing unevaluated expressions and expression substitution with regular functions. Another interesting comparison is with this alternate algorithm for the same problem, implemented in a mix of Pig and Python. A number of additional examples can be found in the rmr tutorial, including k-means, logistic regression and linear least squares. Joshua Block of Effective Java fame recommends to "write to your API early and often" (p. 9) and in my mind I had been writing to something like this for a long time, in some respects even before I had heard of mapreduce. I just needed the opportunity and environment to make it happen and I am grateful to Revolution for providing them. But let the code (git repo) speak:
Line 1: Load the library.
Line 3: The main function. It takes the graph to analyze, optionally a forest representing a partition into connected components, which start as individual nodes and end up as maximal connected components by the time the algorithm terminates, and an integer. The forest shares the nodes with the input graph and the roots are marked by a self-loop. Forest elements are trees and in this algorithms have depth 1 most of the time, that is it's a forest of stars. The integer argument represents the iteration and is used to generate correlated pseudorandomness across all nodes, details later.
Lines 4–5: Nodes are represented by integers and edges by a list with a from and a to element, one per record. These two functions, used over and over again as map functions, set the key to the from and to node of an edge resp., while leaving the value alone.
Lines 6–12: If a forest is not provided as argument, initialize it to the set of all isolated nodes. Add a self-loop to mark the root.
Lines 14–32: merge the graph and the forest so that every node in the graph is labelled with its parent in the forest, that is the component it belongs too. This is implemented with two successive join-like operations, which in turn are implemented as map-reduce jobs. rmr takes care of some of the ugly details of performing joins in mapreduce providing equijoins as part of the package. Build from the dev branch to get this feature.
Lines 34–53: active edges are edges connecting nodes belonging to different components. If none exists, then components are maximal and the algorithm terminates. The test for active edges is implemented as a separate map-reduce job whereas it could be folded into the forest update job to be described shortly, but we went for modularity here. The forest.update call decides, for each active edge, which components should be merged and how and expresses that as additional edges to be merged into the forest. The next step is formed by two chained equijoins: the inner one, which executes first, merges in the new edges into the forest while keeping the trees trees (invariant: each node has one parent). The outer one transforms each tree, now potentially of depth two after a merger with another tree, into a star.
Lines 55–56: recursive call. Keep growing that forest or return it when done.
Lines 58–64: count active edges. Works on the merged graph-forest data structure, so it's pretty trivial to check for components that should be merged, that is edges incident on different components. The interesting bit is that this is a potentially very large computation that ends with a single number. That's why we can simply grab that data from HDFS with from.dfs and bring into memory, where it can be used for the termination condition.
Lines 66–83: This is the logic that decides which components to actually merge and I think the most technically sophisticated bit (not my idea). If we just tried to merge every pair of components connected by an active edge, mayhem would ensue. The forest would not be a forest anymore, and cycles would be possible. We need to elect "donor" components and "acceptor" components and exclusively merge the former into the latter. That will leave the forest a forest after the merger step is performed. The smart part is the very simple and distributed mechanism employed to decide donors and acceptors. It's a pseudorandom boolean computed as a hash function of the component root node name, which works as component id, and the iteration number (the somewhat convoluted line 70 does just that, would love to hear of a simpler way). The latter is included so that the "donor" and "acceptor" assignments change at each iteration and eventually all active edges have a chance to be used for a merger, thus becoming active no more.
Lines 85–88: How to create some test data
If you liked this or anything else piqued your interest, please give rmr a spin and let me know of any problems and comments or just new applications and examples.
This story has now three episodes. First, I got interested in how to compute connected components in map reduce in a way that works even for large diameter graphs and proposed an algorithm, in fact a port to map reduce of an old PRAM algorithm. Then I learned RHIPE, a map reduce package for R and implemented that algorithm using it. Then Revolution Analytics knocked at my door and asked me to work on a similar package and give my best stab at an API that's both easy and powerful. You can read a bit more about the ideas behind the package, but keep in mind that this is a young project and there is more work to do to turn those lofty goals into reality. You can help by just trying it out and providing feedback or by contributing to it. From documentation to unit tests, to minor features, to very significant ones, there's plenty to do.
Granted, clarity and simplicity are in the eye of the beholder and I am hardly unbiased, being the author of the code, the main committer to rmr and having Revolution as a client. But I invite you to compare this example with its previous incarnation using RHIPE, namely how the mapreduce jobs are described and invoked, how much less boilerplate code there is and how completely devoid of uncommon R constructs it is, replacing unevaluated expressions and expression substitution with regular functions. Another interesting comparison is with this alternate algorithm for the same problem, implemented in a mix of Pig and Python. A number of additional examples can be found in the rmr tutorial, including k-means, logistic regression and linear least squares. Joshua Block of Effective Java fame recommends to "write to your API early and often" (p. 9) and in my mind I had been writing to something like this for a long time, in some respects even before I had heard of mapreduce. I just needed the opportunity and environment to make it happen and I am grateful to Revolution for providing them. But let the code (git repo) speak:
library(rmr)One remark to better understand how this code works (you may need to take the rmr tutorial to really get all the details): the objects returned by mapreduce calls do not directly contain the data but point to it. Since we are potentially working on big data, the data stays on HDFS unless the programmer explicitly requests otherwise. These objects take care of creating temp filenames, cleaning them up when they are not referenced anymore, and can be passed around from mapreduce call to mapreduce call. It is a simple but recurring concern that rmr hopefully takes off the programmer's plate once and for all. The line-by-line description may not be so interesting to you if you are familiar with the old RHIPE based implementation, but here it is for completeness sake.
connected.components = function(graph, forest = NULL, i = 0) {
key.from = function(k,v) keyval(v[['from']], v)
key.to = function(k,v) keyval(v[['to']], v)
if(is.null(forest)) {
##create trivial forest
forest = mapreduce(
input = graph,
map = function(k, v) list(keyval(v[['from']], NULL),
keyval(v[['to']], NULL)),
reduce = function(k, vv) keyval(NULL, list(from = k, to = k)))}
##merge graph and forest
graph.forest = equijoin(leftinput =
equijoin(leftinput = graph,
rightinput = forest,
map.left = key.from,
map.right = key.from,
reduceall =
function(k, vl, vr)
keyval(NULL,
list(parentfrom = vr[['to']],
from = vl[['from']],
to = vl[['to']]))),
rightinput = forest,
map.left = key.to,
map.right = key.from,
reduceall = function(k, vl, vr) keyval(NULL,
list(parentfrom = vl[['parentfrom']],
parentto = vr[['to']],
from = vl[['from']],
to = vl[['to']])))
##find component merge candidates
if(active.edges.count(graph.forest) > 0) {
forest.update = update.parent(graph.forest, i)
##depth-reducing step
forest.new = equijoin(input =
##forest update
equijoin(leftinput = forest,
rightinput = forest.update,
outer = "left",
map.left = key.from,
map.right = key.from,
reduceall = function(k,vl,vr) keyval(NULL,
if(all(is.na(vr))) {vl}
else {vr})),
outer = "left",
map.left = key.to,
map.right = key.from,
reduceall = function(k,vl,vr) keyval(NULL,
list(from = vl[['from']],
to = if(all(is.na(vr)){vl[['to']]}
else {vr[['to']]})))
##recursion
connected.components(graph, forest.new, i+1)}
else forest}
active.edges.count = function(gf) {
from.dfs(mapreduce(input = gf,
map = function(k,v)
if(v[['parentfrom']] != v[['parentto']] )
keyval(NULL,1) else keyval(NULL, 0),
reduce = function(k, vv) keyval(NULL, sum(unlist(vv))),
combine = T))[[1]]$val}
update.parent = function(input, iter) {
library(digest)
symmetry.break = function(x,r) {
as.integer(paste("0x", substr(digest(c(x,r)), 1,1), sep = ""))%%2 == 1}
mapreduce(input = input,
map = function(k, v) {
if(v[['parentfrom']] != v[['parentto']]) {
symmetry.from = symmetry.break(v[['parentfrom']], iter);
symmetry.to = symmetry.break(v[['parentto']], iter);
if(symmetry.from != symmetry.to) {
if(symmetry.from) {
keyval(v[['parentfrom']],list(from = v[['parentfrom']],
to = v[['parentto']]))}
else {
keyval(v[['parentto']], list(from = v[['parentto']],
to = v[['parentfrom']]))}}}},
reduce = function(k, vv) keyval(NULL, vv[[1]]),
combine = F)}
random.graph = to.dfs(unique(lapply(1:10,
function(i)keyval(NULL,
c(from = sample(1:10,1),
to = sample(1:10,1))))))
Line 1: Load the library.
Line 3: The main function. It takes the graph to analyze, optionally a forest representing a partition into connected components, which start as individual nodes and end up as maximal connected components by the time the algorithm terminates, and an integer. The forest shares the nodes with the input graph and the roots are marked by a self-loop. Forest elements are trees and in this algorithms have depth 1 most of the time, that is it's a forest of stars. The integer argument represents the iteration and is used to generate correlated pseudorandomness across all nodes, details later.
Lines 4–5: Nodes are represented by integers and edges by a list with a from and a to element, one per record. These two functions, used over and over again as map functions, set the key to the from and to node of an edge resp., while leaving the value alone.
Lines 6–12: If a forest is not provided as argument, initialize it to the set of all isolated nodes. Add a self-loop to mark the root.
Lines 14–32: merge the graph and the forest so that every node in the graph is labelled with its parent in the forest, that is the component it belongs too. This is implemented with two successive join-like operations, which in turn are implemented as map-reduce jobs. rmr takes care of some of the ugly details of performing joins in mapreduce providing equijoins as part of the package. Build from the dev branch to get this feature.
Lines 34–53: active edges are edges connecting nodes belonging to different components. If none exists, then components are maximal and the algorithm terminates. The test for active edges is implemented as a separate map-reduce job whereas it could be folded into the forest update job to be described shortly, but we went for modularity here. The forest.update call decides, for each active edge, which components should be merged and how and expresses that as additional edges to be merged into the forest. The next step is formed by two chained equijoins: the inner one, which executes first, merges in the new edges into the forest while keeping the trees trees (invariant: each node has one parent). The outer one transforms each tree, now potentially of depth two after a merger with another tree, into a star.
Lines 55–56: recursive call. Keep growing that forest or return it when done.
Lines 58–64: count active edges. Works on the merged graph-forest data structure, so it's pretty trivial to check for components that should be merged, that is edges incident on different components. The interesting bit is that this is a potentially very large computation that ends with a single number. That's why we can simply grab that data from HDFS with from.dfs and bring into memory, where it can be used for the termination condition.
Lines 66–83: This is the logic that decides which components to actually merge and I think the most technically sophisticated bit (not my idea). If we just tried to merge every pair of components connected by an active edge, mayhem would ensue. The forest would not be a forest anymore, and cycles would be possible. We need to elect "donor" components and "acceptor" components and exclusively merge the former into the latter. That will leave the forest a forest after the merger step is performed. The smart part is the very simple and distributed mechanism employed to decide donors and acceptors. It's a pseudorandom boolean computed as a hash function of the component root node name, which works as component id, and the iteration number (the somewhat convoluted line 70 does just that, would love to hear of a simpler way). The latter is included so that the "donor" and "acceptor" assignments change at each iteration and eventually all active edges have a chance to be used for a merger, thus becoming active no more.
Lines 85–88: How to create some test data
If you liked this or anything else piqued your interest, please give rmr a spin and let me know of any problems and comments or just new applications and examples.
Very nice write-up(s).
I'm contemplating 2 options for connected-components MR: yours, and Chaser's (http://chasebradford.wordpress.com/2010/10/23/mapreduce-implementation-for-union-find).
I'm trying to understand some tech details, mainly: how come you had to come up with the gender(donator/acceptor) notion, while in the other alg we simply use natural order of the root node IDs?