Map-reduce is a way to summarize and run aggregation functions on large data sets, potentially stored across many servers, in an efficient fashion. It works by processing the data on each server in parallel and then combining those results into one set. It was originally designed by Google and later implemented in database systems such as Apache Hadoop and MongoDB.
In RethinkDB, map-reduce queries operate on sequences and are composed of two or three parts:
Some other map-reduce implementations, like Hadoop’s, use the mapping step to perform grouping as well; RethinkDB’s implementation explicitly separates them. This is sometimes referred to as “group-map-reduce,” or GMR. RethinkDB distributes GMR queries over tables and shards efficiently. You write GMR queries with the group, map and reduce commands, although as we’ll see in our examples, many ReQL commands compile to GMR queries behind the scenes—many common map-reduce cases can be accomplished in one or two lines of ReQL.
Suppose you are running a blog and would like to retrieve the number of posts. A map-reduce query to perform this operation would consist of the following steps:
1
(since we’re counting each post once).We won’t need a group step for this example.
For our blog, we have a table posts
that contains blog posts. Here’s an example document from the table. (We’ll use Python for this example, but other ReQL drivers are very similar.)
{
"id": "7644aaf2-9928-4231-aa68-4e65e31bf219"
"title": "The line must be drawn here"
"content": "This far, no further! ..."
"category": "Fiction"
}
First, we’ll map each post to the number 1
:
r.table('posts').map(lambda post: 1)
And sum the posts with reduce
:
r.table('posts').map(lambda post: 1).reduce(lambda a, b: a + b).run(conn)
For many cases where a GMR query might be used, ReQL provides even simpler aggregation functions. This example is really more easily written using count:
r.table('posts').count().run(conn)
RethinkDB has shortcuts for five common aggregation operations: count
, sum
, avg
, min
, and max
. In practice, you’ll often be able to use these with group
in place of writing your own map
and reduce
functions.
Suppose on the blog in the last example, you’d like to retrieve the number of posts per category. A map-reduce query to perform this operation would consist of the following steps:
First, we’ll group
the posts:
r.table('posts').group(lambda post: post['category'])
Then as before, we map each post to the number 1
. Commands after the group
command will be applied to each grouped set.
r.table('posts').group(lambda post: post['category']).map(
lambda post: 1)
And again, we sum the posts with reduce
, which produces totals for each group this time:
r.table('posts').group(lambda post: post['category']).map(
lambda post: 1).reduce(lambda a, b: a + b).run(conn)
And, of course, we can use count
to shorten that. We can actually shorten it even more: ReQL will let you provide group
with the name of the field rather than a lambda function. So the simplified function is:
r.table('posts').group('category').count().run(conn)
This is based on an example from MongoDB. Imagine a table of orders, with each document in the table structured like this:
{
"customer_id": "cs11072",
"date": r.time(2014, 27, 2, 12, 13, 09, '-07:00'),
"id": 103,
"items": [
{
"price": 91,
"quantity": 1,
"item_id": "sku10491"
} ,
{
"price": 9,
"quantity": 3,
"item_id": "sku14667"
} ,
{
"price": 37 ,
"quantity": 3,
"item_id": "sku16857"
}
],
"total": 229
}
First, let’s return the total price per customer. Since this is pre-computed per order in the total
field, this is easily done with one of RethinkDB’s aggregation functions.
r.table('orders').group('customer_id').sum('total').run(conn)
Now for something more complicated: calculating the total and average quantities sold per item. For this, we’ll use the concat_map function, which combines mapping and concatenation together. In this case, we want to produce a sequence of all the items sold throughout all the orders with their item IDs and quantities. We’ll also add a “count” field set to 1
; we’ll use this the same way we used the mapping of each post in the blog example.
r.table('orders').concat_map(lambda order:
order['items'].map(lambda item:
{'item_id': item['item_id'], 'quantity': item['quantity'], 'count': 1}
))
The inner map
function is just being used to iterate through the items in each order. At this point, our query will return a list of objects, each object with three fields: item_id
, quantity
and count
.
Now, we’ll group
by the item_id
field and use a custom reduce
function to sum the quantities and counts.
r.table('orders').concat_map(lambda order:
order['items'].map(lambda item:
{'item_id': item['item_id'], 'quantity': item['quantity'], 'count': 1}
)).group('item_id').reduce(lambda left, right: {
'item_id': left['item_id'],
'quantity': left['quantity'] + right['quantity'],
'count': left['count'] + right['count']
})
Finally, we’ll use ungroup to turn this grouped data into an array of objects with group
and reduction
keys. The group
field will be the item ID for each group; the reduction
field will have all the items from the concat_map
function that belong to each group. Then we’ll use map
once more to iterate through that array, computing the average on this pass.
r.table('orders').concat_map(lambda order:
order['items'].map(lambda item:
{'item_id': item['item_id'], 'quantity': item['quantity'], 'count': 1}
)).group('item_id').reduce(lambda left, right: {
'item_id': left['item_id'],
'quantity': left['quantity'] + right['quantity'],
'count': left['count'] + right['count']
}).ungroup().map(lambda group: {
'item_id': group['group'],
'quantity': group['reduction']['quantity'],
'avg': group['reduction']['quantity'] / group['reduction']['count']
}).run(conn)
The output will be in this format:
[
{
"avg": 3.3333333333333,
"quantity": 20,
"item_id": "sku10023"
},
{
"avg": 2.2142857142857,
"quantity": 31,
"item_id": "sku10042"
},
...
]
(Note that JavaScript, or another language where +
and /
operators aren’t overridden to work with ReQL, will require you to use div and add.)
RethinkDB’s GMR queries are distributed and parallelized across shards and CPU cores whenever possible. While this allows them to execute efficiently, it’s important to keep in mind that the reduce
function is not called on the elements of its input stream from left to right. It’s called on either the elements of the stream in any order or on the output of previous calls to the function.
Here is an example of an incorrect way to write the previous grouped map-reduce query, simply incrementing the first value passed to the reduction function:
# Incorrect!
r.table('posts').group(lambda post: post['category']).map(
lambda post: 1).reduce(lambda a, b: a + 1).run(conn)
Suppose we have ten documents in a single category in a sharded table. Four of the documents are on shard 1; six are on shard 2. When the incorrect query is executed, this is its path:
4
for the shard.6
for the shard.4 + 6
, the query executes 4 + 1
.Be careful! Make sure your reduction function doesn’t assume the reduction step executes from left to right!
For more information about map-reduce in general, read the Wikipedia article. For more information about RethinkDB’s implementation, browse our API documentation.