A simple but often occurring problem is computing sample quantiles, sometimes named top $k$ elements, in a large data set. Here I show a solution for the MapReduce model of computation.

The standard in memory algorithm for this problem is similar to quicksort, with the main difference that only one branch of the recursion is followed. An important detail is that if we are looking for the top $k$ element, after the pivoting phase we might still be looking for it among the $m$ elements higher than the pivot, if $m \geq k$, but if that's not the case then we need to look for the top $k-m$ element among the elements smaller than the pivot. This is slightly more complicated to state in terms of quantiles, but the idea is the same. To port this algorithm to the map reduce model, we need to avoid pivoting, which requires random access, and adopt a style more similar to that of streaming algorithms. Pivoting is replaced by a combination of counting and filtering. In each iteration, we maintain an upper and lower bound for the quantile of interest. To refine it, we split that interval in half at a threshold $t$. We count how many data points are above the threshold. If that indicates that the target quantile is higher than the threshold, it becomes our new lower bound, otherwise we update the upper bound. Now we could simply go for a new pass, until upper and lower bound identify only one value in the sample (it could be repeated multiple times though), but if writing is not too costly and we have spare disk space, we can write out only the elements that are in between the upper and lower bound, bringing the complexity from $O(n \log(n))$ to $O(n)$. In the filtered sample we will be looking for a different quantile.

To add some detail, this is how the counting happens (using a combiner is mandatory for this algorithm, or the only two reducers will have to process the whole dataset):
def map(v):  if (v <= t):    return [(T, 1)]  else:    return [(F, 1)]
def reduce(list):  sum = 0  for (is_low, count) in list:    sum += count  return[(is_low, sum)]
This is the filtering, just for completeness sake.
def map(v):  if (lower <= v < upper)    return [v]  else:    return []
And this is the update rule for the bounds and quantile values:
def bounds(u, l, t, ca, cb, q):  # u, l: quantile upper and lower bounds  # t: threshold  # ca, cb: count of values above and below threshold  # q: desired quantile  n = ca + cb  topk = (1 - q) * n  if topk <= ca:    l = t    q =  1 - topk/ca  else:    u = t    q = 1 - (topk - ca)/cb  t = floor(u + l / 2)  return (u, l, t, q)
There is a little more work to glue everything together, but it should be clear at this point. The algorithm terminates when the max and min of the current filtered sample are equal. An optimization is to use more than one threshold during each iteration. This allows to trade iterations with memory usage. More precisely, we could maintain as many bins as memory allows and perform the counting bin by bin. Once this is done, we pick the bin that contains the quantile, split its range into equal sized bins and repeat. Filtering works exactly the same way as above.

A couple of observations about the applications of this algorithm. Often we are trying to estimate a quantile from a distribution from which the data is sampled. In that case, calculating a sample quantile is only the start. As estimates of distribution quantiles, sample quantiles are biased towards the median and without confidence intervals or other measure of variability we don't know anything about their variability. To obtain confidence intervals for any quantile one can use the technique explained here and for extreme quantiles special techniques might give better results. In both cases we need to extract sample quantiles as an intermediate step, so the algorithm described here can be useful, but not necessarily from the whole dataset. We might opt for a single pass map reduce subsampling step followed by in memory analysis and get a tight enough confidence interval faster and using fewer resources. The best trade off between precision, speed and resources is application dependent.

This post was inspired by a job interview question, where we covered in memory algorithms and only hinted at the mapreduce solution, but I thought it would be interesting to write it down.