Bringing relational joins to Rhipe
Relational operations are a very common way to express map-reduce computations at a higher level, but Rhipe, an R package for mapreduce, doesn't have any. Let's start to fix this with a basic join function.
This is going to be a little dry and technical, in preparation of better things to come.
As I was working on implementing a non-trivial map-reduce algorithm in Rhipe, I realized I needed joins, in the relational sense — Rhipe has an rhjoin call but it is unrelated. The not-so-deep reason I needed them is that that's the way I described the algorithm in my pseudo-code, but is that the only or the best way to do it? I don't know the answer, but it looks like several Hadoop derivatives that try to offer higher level alternatives have joins and a number of other SQL-like features: Hive of course but also Cascading, Cascalog, and Pig. Dumbo has sort of a lower level support for joins in the form of "join keys". While these are hardly independent validations of the use of relational operations for mapreduce, they give me some confort that other people see them as important. In fact, I don't think we have seen enough examples of mapreduce algorithms yet to know which techniques are going to be the most important.
Before we get onto the actual code, please bear in mind that this is a very basic join and doesn't use any of the techniques that have been devised to make joins faster and less memory intensive, from secondary keys to map-side joins. I thought it would be premature optimization to try and implement them right away and also Rhipe poses some restrictions on the partitioner, I suspect inherited from Hadoop Streaming, that would have made using them in this context hard. So, without further ado, the code (syntax highlighting is approximate at best, my apologies):
Lines 1–15
This is the signature. There are left and right input files for regular joins or a single input for self joins. There's an output and then three flags to determine if it's going to be a left, right or full outer join. Then we have two functions, one for the left argument and one for the right argument to the join, to specify the join key (we need both also for a self-join). These functions are called for each key, value pair in the mapper and their output becomes the key for the shuffle and reduce steps. We can also specify which values will be sent into the reduce step from the left input and the right input, but these are optional and default to letting the values go through unmodified. Then we have two more functions that determine the key, value pairs emitted by the reduce phase. They take as arguments the value for the join key, the value from the left input and the value from the right input, of course record by record. The key defaults to simply the join key and the value is a named pair with the left side and the right side simply containing the value from each side resp., sort of an equivalent to SELECT *.
Lines 17–18
rhex executes a job and rhmr creates the specs for a job. The latter takes several arguments, that we are going to describe in detail
Lines 18–41
This is the map phase of the job, provided as an R expression. "Expressions" in R are unevaluated expressions. You get a value out of them by using eval and can modify them using substitute, which we use in lieu of passing arguments. For added confusion, substitute returns an expression generator, so you need eval to get again an unevaluated expression. Like, you replace the spark plugs in a car and you are left with a car generator — sounds too good to be true. Passing an expression instead of a function is a choice that the author of Rhipe himself explains. More precisely, there are two expressions, one for the join of two inputs and one for the self join. For the first case, it is enough, for each record, to decide which input the current record came from, evaluate the right key and value functions and emit a key, value pair. In the self-join case one has to emit both a left and right key value pair for each record.
Lines 42–70
This is the reduce phase of the job. The drill about passing an expression and substituting arguments is exactly the same as for map. The expression this time is composed of three sub-expressions. For each key, pre is evaluated before the first record, reduce several times, one per block of records, and post after the last one. Here pre simply defines a function and a variable for later use, reduce accumulates all the records and post splits left and right side of the join, adds NA entries when necessary for outer joins and emits all the output records with two nested loops, applying the functions in the arguments to create them.
Lines 71–74
Some additional rhmr arguments that specify whether to use a combiner, and format and names of inputs and outputs. The returned value is that returned by rhex.
This is going to be a little dry and technical, in preparation of better things to come.
As I was working on implementing a non-trivial map-reduce algorithm in Rhipe, I realized I needed joins, in the relational sense — Rhipe has an rhjoin call but it is unrelated. The not-so-deep reason I needed them is that that's the way I described the algorithm in my pseudo-code, but is that the only or the best way to do it? I don't know the answer, but it looks like several Hadoop derivatives that try to offer higher level alternatives have joins and a number of other SQL-like features: Hive of course but also Cascading, Cascalog, and Pig. Dumbo has sort of a lower level support for joins in the form of "join keys". While these are hardly independent validations of the use of relational operations for mapreduce, they give me some confort that other people see them as important. In fact, I don't think we have seen enough examples of mapreduce algorithms yet to know which techniques are going to be the most important.
Before we get onto the actual code, please bear in mind that this is a very basic join and doesn't use any of the techniques that have been devised to make joins faster and less memory intensive, from secondary keys to map-side joins. I thought it would be premature optimization to try and implement them right away and also Rhipe poses some restrictions on the partitioner, I suspect inherited from Hadoop Streaming, that would have made using them in this context hard. So, without further ado, the code (syntax highlighting is approximate at best, my apologies):
function(
ileftfolder = NULL,
irightfolder = NULL,
ifolder = NULL,
ofolder,
leftouter = F,
rightouter = F,
fullouter = F,
map.left.key,
map.right.key,
map.right.value = function(k, v) v,
map.left.value = function(k, v) v,
reduce.key = function(k, vl, vr) k,
reduce.value = function (k, vl, vr) list(left=vl, right=vr)){if (is.null(ileftfolder)) {
ileftfolder = ifolder}
mapcollect = function(k,v, kfun, vfun, left) rhcollect(kfun(k,v), list(value = vfun(k,v), left = left))
rhex(
rhmr(
map = eval(
substitute(
if (is.null(ifolder)) {
expression({
lapply(seq_along(map.keys), function(i) {
if(length(grep(paste("^file:", ileftfolder, sep = ""),
Sys.getenv("mapred.input.file")))) {
mapcollect(map.keys[[i]], map.values[[i]], map.left.key, map.left.value, T)}
else {
mapcollect(map.keys[[i]], map.values[[i]], map.right.key, map.right.value, F)}
})})}
else {
expression({
lapply(seq_along(map.keys), function(i) {
mapcollect(map.keys[[i]], map.values[[i]], map.left.key, map.left.value, T)
mapcollect(map.keys[[i]], map.values[[i]], map.right.key, map.right.value, F)
})})},
list(map.left.key = map.left.key,
map.right.key = map.right.key,
map.right.value = map.right.value,
map.left.value = map.left.value,
ileftfolder = ileftfolder,
mapcollect = mapcollect))),
reduce = eval(
substitute(
expression(
pre = {
reduce.split =
function(values, left) lapply(values[which(lapply(values,
function(x) x$left) == left)],
function(x) x$value)
all.values = c()},
reduce = {
all.values = c(all.values, reduce.values)},
post = {
reduce.values.left = reduce.split(all.values, T)
if(length(reduce.values.left) == 0 && (rightouter || fullouter)) {
reduce.values.left = c(NA)}
reduce.values.right = reduce.split(all.values, F)
if(length(reduce.values.right) == 0 && (leftouter || fullouter)) {
reduce.values.right = c(NA)}
lapply(reduce.values.left,
function(x) lapply(reduce.values.right,
function(y) rhcollect(reduce.key.fun(reduce.key, x, y),
reduce.value(reduce.key, x, y))))
}),
list(reduce.key.fun = reduce.key,
reduce.value = reduce.value,
leftouter = leftouter,
rightouter = rightouter,
fullouter = fullouter
))),
combiner = F,
inout = c('sequence','sequence'),
ifolder=c(ileftfolder,irightfolder),
ofolder= ofolder
)
)
}
Lines 1–15
This is the signature. There are left and right input files for regular joins or a single input for self joins. There's an output and then three flags to determine if it's going to be a left, right or full outer join. Then we have two functions, one for the left argument and one for the right argument to the join, to specify the join key (we need both also for a self-join). These functions are called for each key, value pair in the mapper and their output becomes the key for the shuffle and reduce steps. We can also specify which values will be sent into the reduce step from the left input and the right input, but these are optional and default to letting the values go through unmodified. Then we have two more functions that determine the key, value pairs emitted by the reduce phase. They take as arguments the value for the join key, the value from the left input and the value from the right input, of course record by record. The key defaults to simply the join key and the value is a named pair with the left side and the right side simply containing the value from each side resp., sort of an equivalent to SELECT *.
Lines 17–18
rhex executes a job and rhmr creates the specs for a job. The latter takes several arguments, that we are going to describe in detail
Lines 18–41
This is the map phase of the job, provided as an R expression. "Expressions" in R are unevaluated expressions. You get a value out of them by using eval and can modify them using substitute, which we use in lieu of passing arguments. For added confusion, substitute returns an expression generator, so you need eval to get again an unevaluated expression. Like, you replace the spark plugs in a car and you are left with a car generator — sounds too good to be true. Passing an expression instead of a function is a choice that the author of Rhipe himself explains. More precisely, there are two expressions, one for the join of two inputs and one for the self join. For the first case, it is enough, for each record, to decide which input the current record came from, evaluate the right key and value functions and emit a key, value pair. In the self-join case one has to emit both a left and right key value pair for each record.
Lines 42–70
This is the reduce phase of the job. The drill about passing an expression and substituting arguments is exactly the same as for map. The expression this time is composed of three sub-expressions. For each key, pre is evaluated before the first record, reduce several times, one per block of records, and post after the last one. Here pre simply defines a function and a variable for later use, reduce accumulates all the records and post splits left and right side of the join, adds NA entries when necessary for outer joins and emits all the output records with two nested loops, applying the functions in the arguments to create them.
Lines 71–74
Some additional rhmr arguments that specify whether to use a combiner, and format and names of inputs and outputs. The returned value is that returned by rhex.
And that's it, unless you are developing in Rhipe this is not so interesting. But in all of 77 lines we have built a missing component to Rhipe and we'll put it to good use in the next installment.