On a quest for an elegant and effective map reduce language, I went through a number of options and put together some considerations. And the winner is …

Update: since writing this post, I was approached by Revolution Analytics to write yet another map reduce library, this time for R, and it is, of course, my new favorite. Check it out!

In a couple of blog entries from my personal blog I described some map-reduce algorithms for statistical and graph problems and sketched their implementation using pseudo-code. Pseudo-code has two problems: not everybody agrees on what a statement means and it doesn't run, so you can't test it or use it. Real programming languages on the other hand tend to obscure the logic of a program with unnecessary detail and have other issues that hinder readability, the reason why people resort to pseudo-code. But there is more to it than just aesthetics. Conciseness of code is related to programming abstractions, constructs that achieve higher generality and remove unnecessary detail; to reuse, whereby the same code is used in different contexts, reducing total program size; and even testing, that is concise programs can be tested more easily. In short, short is better. The elegance of less is hardly my own or a software engineering discovery. As Antoine de Saint-Exupery, French writer and aviator, so eloquently put it :
Perfection is achieved, not when there is nothing more to add, but when there is nothing left to take away.
Unfortunately, in some circles, dull, predictable, repetitive code is considered simpler than short and to the point code, or at least tolerable. From java.util.Arrays:
478 /*
479 * The code for each of the seven primitive types is largely identical.
480 * C'est la vie.
481 */
In this case, repetition gets a free pass in exchange for efficiency. Very expressive languages tend to exact a higher toll on resources, and the different map-reduce environments we will look into are no exception.
I will present for each language or library the implementation of a word count program, lifted from its documentation, since this has become sort of the "Hello World" for map reduce. I don't think such a simple program is the ultimate test of the quality of a language, so this is just to give a taste of the language. What I am most interested in is:
  • Can I write reasonably concise, abstract programs in this language or library?
  • Can I write the "inside" of map reduce, that is the code for the mapper and the reducer, as well as the "outside", the logic that decides which map reduce jobs to run?
  • Is it general? Can I write any map-reduce program, including programs that require multiple map-reduce jobs, including the case of a data dependent number and type of jobs?
Java Hadoop
This is the original, the real thing, the current performance champion and what "real men" write in. It is also the most mature of the different options. But take a look:

public static class MapClass extends MapReduceBase
public class WordCount {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));

JobClient.runJob(conf);
}
}

48 lines to write a word count program (and I stripped the import statements at the top out of mercy)! My favorite line is number 5, a line devoted to redefining the number one. This makes sense in a world where programmer productivity is measured by number of lines of code written or for a production job that runs on a 1,000 node cluster for 5 hours every night, in which case efficiency may trump other considerations. But for a blog, for discussing and enjoying code, anything remotely more interesting than a word count program would not fit the size of an entry but would have to be an attachment, as John Mount did with his painstaking Java/Hadoop implementation of logistic regression. I wonder how many people opened that tar file and read it through and through.
Hadoop was started by Doug Cutting and is developed by a large community with a significant group employed at Yahoo.
Cascading
Cascading is a Java library written on top of Hadoop. It enables programming in a dataflow style, with some primitives inspired by SQL (like GroupBy). But according to a person closely related to the project, "it's still Java, it's still boilerplate code". My favorite line is number 18. Remarkably, it trims down the line count for the word count program to half as many as plain Hadoop. I don't have first hand experience with Cascading, but since there is no or little performance penalty compared to the real thing — depending on programmer skill, it could actually be better — it's worth a try for production work.
Cascading is developed by Chris Wensel at Concurrent.

Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );

Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

Pipe assembly = new Pipe( "wordcount" );

String regex = "(?>!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );

assembly = new GroupBy( assembly, new Fields( "word" ) );

Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );

Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );

FlowConnector flowConnector = new FlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

flow.complete();
Pipes — C++
C++ fits into the Java environment not without some effort, which is encapsulated in a library called Pipes. The word count program looks more compact than in Java/Hadoop. I've read opposite comments on the efficiency of Pipes/C++ vs Hadoop/Java and I suspect it may depend on the specific problem being tackled. Even if I used to be quite proficient in C++, I do not remember fondly the 8000 characters template-induced error messages and I don't think it is the type of language I would want to use to discuss algorithms or for prototyping.
Pipes is developed as part of the Hadoop project.

class WordCountMap: public HadoopPipes::Mapper {
public:
WordCountMap(HadoopPipes::TaskContext& context){}
void map(HadoopPipes::MapContext& context) {
std::vector<std::string> words =
HadoopUtils::splitString(context.getInputValue(), " ");
for(unsigned int i=0; i < words.size(); ++i) {
context.emit(words[i], "1");
}
}
};

class WordCountReduce: public HadoopPipes::Reducer {
public:
WordCountReduce(HadoopPipes::TaskContext& context){}
void reduce(HadoopPipes::ReduceContext& context) {
int sum = 0;
while (context.nextValue()) {
sum += HadoopUtils::toInt(context.getInputValue());
}
context.emit(context.getInputKey(), HadoopUtils::toString(sum));
}
};

int main(int argc, char *argv[]) {
return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap,
WordCountReduce>());
}
Hive
Hive is a SQL-like language that is interpreted on top of Hadoop. It can also be combined with small programs written in a variety of languages, to make up for the fact that the language itself is not general purpose. For what it does, it is very concise and expressive, but outside that you need to supplement it with other languages. Case in point, the word count example where two additional scripts are left as an exercise for the reader.
Hive started as part of the Hadoop project.

FROM
(MAP docs.contents USING 'tokenizer_script' AS word, cnt
FROM docs
CLUSTER BY word) map_output

REDUCE map_output.word, map_output.cnt USING 'count_script' AS word, cnt;
Pig
Pig adds to the limitations of Hive the hubris of creating a brand new language, as if creating a new programming language were easy. As you can see, it is inspired by SQL to a degree. It is not a general purpose language as clearly explained here. It interfaces with any JVM based language for custom extensions.


A = load '/tmp/bible+shakes.nopunc';
B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;
C = filter B by word matches '\\w+';
D = group C by word;
E = foreach D generate COUNT(C) as count, group as word;
F = order E by count desc;
store F into '/tmp/wc';
Pig development was started at Yahoo.

Rhipe
Rhipe is an R package to describe and execute map-reduce jobs. It is reasonably high level and satisfies all the criteria I listed above. It's not a speed daemon, because of R itself, there are some quirks in the API and it's still at an initial stage of development, but interesting.


rhinit()
m <- expression({
y <- strsplit(unlist(map.values)," ")
lapply(y,function(r) rhcollect(r,T))
})
r <- expression(
pre={
count=0
},
reduce={
count <- sum(as.numeric(unlist(reduce.values)),count)
},post={
rhcollect(reduce.key,count)
})
z=rhmr(map=m,reduce=r,comb=T,inout=c("text","sequence"),ifolder="/tmp/50mil",ofolder='/tmp/tof')
rhex(z)
Rhipe is developed by Saptarshi Guha at Mozilla.
Dumbo
Dumbo is a Hadoop library for python, but also imposes a set of tools to run dumbo program. If you look at the word count program in Dumbo, below, it almost looks like pseudo-code! Finally! But there is a serious catch. There can only be a run statement per dumbo-powered program — I asked the author himself after seeing some outlandish looking errors. To coordinate two runs, for instance one that starts based on the output of the first, one has to run separate python programs and go through the unix shell. This is different from static composition of jobs, which is well supported, but not general enough for my purposes. Other options for python include MR Job and pydoop, but I haven't had time to look into these yet.

def mapper(key,value):
for word in value.split(): yield word,1
def reducer(key,values):
yield key,sum(values)
if __name__ == "__main__":
import dumbo
dumbo.run(mapper,reducer)
Dumbo is developed by Klaas Bosteels at last.fm.
Cascalog
Built on top of the already powerful cascading as a domain specific language within Clojure, Cascalog wins the word count conciseness contest with a one-liner. Indeed, word counting is simple enough that a line is all that it should take. But look at what a line:

(?<- (stdout) [?word ?count] (sentence ?s) (split ?s :> ?word) (c/ count ?count))
It probably looks familiar to anybody who's familiar with it. Conciseness can become terseness, but once some domain specific concepts have been grasped a terse program such as this might become perfectly clear. It was to me at some point. My misgivings here are more about the JVM-powered revival of LISP in the form of Clojure. LISP has been around some 50-odd years without taking off despite several attempts at its revival (Common LISP, Scheme, Arc and now Clojure). I suspect something is wrong with it, even if popularity is not an accurate gauge of language quality, as BASIC has long proved. Personally, I dislike LISP odd syntax, the widespread use of side effects in a functional language and the poor abstraction that lists represent over RAM, from a performance point of view — indeed LISP variants often add additional data structures, somehow negating the "LIS" part of the language. In the specific case of Clojure, the fact that a compiled language is compiled into an interpreted one, JVM bytecode, combining a slow dev cycle with suboptimal performance, makes me think Clojure users must be glutton for punishment.
Cascalog is developed by Nathan Marz at Backtype.
Final thoughts
At the end of this by necessity incomplete and unscientific language and library comparison, there is a winner and there isn't. There isn't because language comparison is always multidimensional and subjective but also because the intended applications are very different. On the other hand, looking for a general purpose, moderately elegant, not necessarily most efficient, not necessarily mature language for exploration purposes, Rhipe seems to fit the bill pretty nicely. First, it is just a library, which means that one can continue to use the tools he's familiar with. I found it particularly useful to run map-reduce jobs in the interpreter, inspecting the inputs and outputs of each, an invaluable debugging help — but no, you can not step into a mapper or reducer, I use counters instead to trace what's going on in there. I also like that one can read and write sequence files with one call, to examine the output of previous jobs and decide what to do next. Additionally since R is a statistical language and Hadoop is the tool of choice for big data analytics, this seems like a natural fit. Personally, I am familiar with both, which helps, and I have used R, in combination with Hive or Hadoop, to do analytics in the past, but not at this level of integration. Since there is nothing like trying a more substantial example than word count to figure out a language pros and cons, stay tuned for a fairly complex example. After that is published, I plan to pose a friendly challenge to experts in the languages and libraries above or other Hadoop related languages and see what an implementation of the same algorithm would look like in their language of choice and learn something from the comparison. Maybe among my "25 readers" there is someone who will take it up.

Comments

Jeremy
Not considering Hadoop Streaming seems like a pretty big omission.
Antonio Piccolboni
dieswaytoofast, that makes Erlang a good candidate for implementing Hadoop itself. In fact Disco (discoproject.org) is a Hadoop equivalent in Erlang. I am not aware of Hadoop libraries for Erlang.

Brian, good point and add scrunch https://github.com/cloudera/crunch/tree/master/scrunch to the mix. This post is getting obsolete pretty quickly, as sign of the frantic activity around Hadoop.
Brian Schlining
Scala? See https://github.com/NICTA/scoobi

val lines = fromTextFile("hdfs://in/...")

val counts = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.groupByKey
.combine(_+_)

persist(toTextFile(counts, "hdfs://out/..."))
dieswaytoofast
Errr, Erlang??? The semantics and the inbuilt parallelism would seem to fit the bill quite well, no?
Anonymous
I prefer ruby with streaming. Personally, I don't see the need for any higher-level framework on top of it as it's already extremely easy to deal with STDIN/STDOUT on its own.
Fatal Error
I think the important thing about UDF's is what is the user excepted to write versus what is already available?

From an end user perspective there is no difference between invoking SENTENCES and any other keyword in the hive syntax. Under the covers, it's all java anyway, but the point is to abstract people that want to be abstracted while still providing a powerful too.

So, yes, I consider it part of the core language
Antonio Piccolboni
Thanks Fatal Error, I wasn't aware of the sentences UDF, nor I have written this Hive snippet that comes straight from the documentation. Since this UDF itself is written in a different language, not in Hive, it looks to me you are still supporting the point that you need other languages to integrate and extend Hive, unless you consider the UDF sentences part of the core language, which seems like a stretch to me. Anyway, there is nothing like implementing a more complex algorithm to test a language: can you implement this (http://piccolboni.info/2011/04/looking-for-map-reduce-language.html) with hive and nothing else? If you can I will gladly concede that I had underestimated its power, and I think we'll learn a lot from the implementation.
Antonio Piccolboni
Thanks riffraff, the first line didn't belong in there, not sure how that happened. Now it's as Klaas himself wrote it, if it doesn't work we need to ask him!
riffraff
I have the feeling you have some kind of typo in the python code: it's invalid syntax in at least three different ways :)
Fatal Error
I think something like this would work

SELECT count(*), word
FROM
SELECT explode(sentences(lower(contents))) as word
FROM docs) A
group by word
Antonio
Updated based on comments by Michael Driscoll before this entry was syndicated on the dataspora blog. Thanks to both!