Scaling MongoDB at Mailbox

// By Cuong Do • Sep 12, 2013

Mailbox has grown unbelievably quickly. During that growth, one performance issue that impacted us was MongoDB’s database-level write lock. The amount of time Mailbox’s backends were waiting for the write lock was resulting in user-perceived latency. While MongoDB allows you to add shards to a MongoDB cluster easily, we wanted to spare ourselves potential long-term pain by moving one of the most frequently updated MongoDB collections, which stores email-related data, to its own cluster. We theorized that this would, at a minimum, cut the amount of write lock contention in half.

While we could have chosen to scale by adding more shards, we wanted to be able to independently optimize and administer the different types of data separately.

I started by poring through the MongoDB documentation. I quickly found the cloneCollection command. However, to quote the MongoDB 2.2 documentation: “ cloneCollection cannot clone a collection through a mongos: you must connect directly to the mongod instance.” In other words, you can’t use this command with a sharded collection. You can’t use renameCollection on sharded collections either, closing off other possibilities. There were other possible solutions, but they all would’ve impacted performance for Mailbox users or would have simply failed to work at Mailbox’s scale.

So, I wrote a quick Python script to copy the data, and another to compare the original versus the copy to ensure data integrity. Along the way, I encountered many surprises. For example, a single Python process using gevent and pymongo can copy a large MongoDB collection in half the time that mongodump (written in C++) takes, even when the MongoDB client and server are on the same machine.

Our experiences have culminated in Hydra, our newly open-sourced set of tools we've developed for MongoDB collection migration.

Creating the initial snapshot of a MongoDB collection

To copy all documents in a collection, I started with an intentionally naive implementation that didn’t have much more code than this: 

for email_data in source_email_data.find():
    destination_email_data.insert(email_data)

Issue #1: Slowness

It was obvious that such a naive approach wouldn’t perform well for larger amounts of data, so I quickly experimented with different means of achieving faster copy performance. I implemented various micro-optimizations, like adjusting how many documents the MongoDB driver fetched at once. However, those only yielded only marginal performance improvements. My goal was to finish the data migration in about a day, I was still far from that goal.

An early experiment I did was to measure the “speed of light” for MongoDB API operations – the speed of a simple C++ implementation using the MongoDB C++ SDK. Being rusty at C++ and wanting my mostly Python-proficient colleagues to easily be able to use/adapt the code for other uses, I didn’t pursue the C++ implementation too far but found that for simple cases, a naive C++ implementation was typically 5–10 times as fast as a naive Python implementation for the same task.

So, I returned to Python, which is the default language of choice for Dropbox. Moreover, when performing a series of remote network requests, such as queries to mongod, the client often spends much of its time waiting for the server to respond; there didn’t seem to be very many CPU-intensive parts for copy_collection.py (my MongoDB collection copying tool). This was corroborated by the very low CPU usage of the initial copy_collection.py.

I then experimented with adding concurrent MongoDB requests to copy_collection.py. Initial experiments with worker threads resulted in disappointment. Next, I tried using worker processes communicating through a Python Queue object. The performance still wasn’t much better, because the overhead of the IPCs was overwhelming any potential concurrency benefits. Using Pipes and other IPC mechanisms didn’t help much either.

Next, I decided to see how much performance I could squeeze out of a single Python process using asynchronous MongoDB queries. One of the more popular libraries for this is gevent, so I decided to give it a try. gevent patches standard Python modules, such as socket, to execute asynchronously. The beauty of gevent is that you can write asynchronous code that reads simply, like synchronous code.

Traditionally, asynchronous code to copy documents between two collections might have looked like this:

import asynclib
 
def copy_documents(source_collection, destination_collection, _ids, callback):
    """
    Given a list of _id's (MongoDB's unique identifier field for each document),
    copies the corresponding documents from the source collection to the destination
    collection
    """
 
    def _copy_documents_callback(...):
        if error_detected():
            callback(error)
 
    # copy documents, passing a callback function that will handle errors and
    # other notifications
    for _id in _ids:
        copy_document(source_collection, destination_collection, _id,
                      _copy_documents_callback)
 
    # more error handling omitted for brevity
    callback(None)
 
def copy_document(source_collection, destination_collection, _id, callback):
    """
    Copies document corresponding to the given _id from the source to the
    destination.
    """
    def _insert_doc(doc):
        """
        callback that takes the document read from the source collection
        and inserts it into destination collection
        """
        if error_detected():
            callback(error)
        destination_collection.insert(doc, callback) # another MongoDB operation
 
    # find the specified document asynchronously, passing a callback to receive
    # the retrieved data
    source_collection.find_one({'$id': _id}, callback=_insert_doc)

With gevent, the code uses no callbacks and reads sequentially:

import gevent
gevent.monkey.patch_all()
 
def copy_documents(source_collection, destination_collection, _ids):
    """
    Given a list of _id's (MongoDB's unique identifier field for each document),
    copies the corresponding documents from the source collection to the destination
    collection
    """
 
    # copies each document using a separate greenlet; optimizations are certainly
    # possible but omitted in this example
    for _id in _ids:
        gevent.spawn(copy_document, source_collection, destination_collection, _id)
 
def copy_document(source_collection, destination_collection, _id):
    """
    Copies document corresponding to the given _id from the source to the
    destination.
    """
    # both of the following function calls block without gevent; with gevent they
    # simply cede control to another greenlet while waiting for Mongo to respond
    source_doc = source_collection.find_one({'$id': _id})
    destination_collection.insert(source_doc) # another MongoDB operation

This simple code will copy documents from a source MongoDB collection to a destination, based on their _id fields, which are the unique identifiers for each MongoDB document. copy_documents delegates the work of copying documents to greenlets (which are like threads but are cooperatively scheduled) that run copy_document(). When a greenlet performs a blocking operation, such as any request to MongoDB, it yields control to any other greenlet that is ready to execute. Since greenlets all execute in the same thread and process, you generally don’t need any kind of inter-greenlet locking.

With gevent, I was able to achieve much faster performance than either the thread worker pool or process worker pool approaches. Here’s a summary of the performance of each approach:

Approach Performance (higher is better)
single process, no gevent 520 documents/sec
thread worker pool 652 documents/sec
process worker pool 670 documents/sec
single process, with gevent 2,381 documents/sec

Combining gevent with worker processes – one for each shard – yielded a linear increase in performance. The key to using worker processes efficiently was to eliminate as much IPC as possible.

Somewhat surprisingly, using gevent in just a single process could produce a full copy of a collection in just under half the time as the mongodump tool, which is written in C++ but queries synchronously and is single-process/thread.

Issue #2: Replicating updates after the snapshot

Because MongoDB is not transactional, when you try to read a large MongoDB collection while updates are being performed to it, you will receive a result set that reflects MongoDB’s state at different points in time. For example, suppose you start reading a whole collection using a MongoDB find() query. Your result set could look like this:

included: document saved before your find()
included: document saved before your find()
included: document saved before your find()
excluded: document deleted just after your find() began
included: document inserted after your find() began

Moreover, to minimize the downtime required to point the Mailbox backend to the new copy of the collection, it was necessary to figure out a way to stream changes from the source MongoDB cluster to the new MongoDB cluster with as little latency as possible.

Like most asynchronously replicating data stores, MongoDB uses a log of operations – its oplog – to record and distribute a record of the insert/update/remove operations executed on a mongod instance to other mongod replicas. Given a snapshot of the data, the oplog can be used to apply all changes performed since the snapshot was taken.

So, I decided to stream oplog entries from the source cluster and apply those changes at the destination cluster. Thanks to an informative post on Kristina Chodorow’s blog, I was quickly able to grasp the basics of the oplog format. Replicating inserts and removes was trivial, because their serialization format is straightforward. On the other hand, updates took more work.

The structure of update oplog entries was not immediately obvious, and in MongoDB 2.2.x, it uses duplicate keys that can’t be displayed by the Mongo shell, let alone most MongoDB drivers. After some thought, I devised a workaround that simply used the _id embedded in the update to trigger another copy of the document from the source. While this doesn’t have identical semantics as applying just the specified update, this guarantees that the copied data is at least as recent as the op we’ve received. Here is a diagram showing how intermediate versions of documents (in this case, v2) are not necessarily copied, but the source and destination are still eventually consistent:

applying update ops

I also ran into a performance issue replaying ops on the destination cluster. Though I had a separate process to replay ops for each shard, applying ops serially (my initial approach for prototyping and ensuring correctness) was far too slow to keep up with the onslaught of Mailbox queries.

Applying ops concurrently seemed to be the way to go, but the question was how to preserve correctness. Specifically, two operations affecting the same _id cannot execute out of order. A simple workaround I devised was to maintain, in a Python set, the set of _ids being modified by in-progress operations. When copy_collection.py encounters another update to an _id that is currently being updated, we block the later update and any other ops that come after it from being applied. We start applying new ops only when the older operation on the _id has finished. Here’s a diagram to illustrate op blocking:

blocking_ops
concurrent op replay

Verifying copied data

Comparing the copied data to the original is normally a straightforward operation. Doing it efficiently also isn’t particularly challenging when you use multiple processes and gevent.

However, doing it when the source and the copy are both being updated requires some thought. At first, I tried just logging warnings whenever compare_collections.py (the tool I wrote to compare two collections) found a data inconsistency in a document that had been recently updated. Later, I could repeat verification for those documents. However, that doesn’t work for deleted documents, for which there remains no last modified timestamp.

I started thinking about the term “ eventual consistency,” which is often used when talking about asychronously replicating systems such as MongoDB’s replica sets and MySQL’s master/slave replication. Given enough time (i.e. after some amount of retries) and barring catastrophe, the source and the copy will eventually become consistent. So, I added retry comparisons with an increasing backoff between successive retries. There are potential issues with certain cases, such as data that oscillates between two values. However, the data being migrated didn’t have any problematic update patterns.

Before performing the final cutover from the original MongoDB cluster to the new MongoDB cluster, I wanted the ability to verify that the most recent ops had been applied. So, I added a command-line option to compare_collections.py to compare the documents modified by the most recent N ops. Running this for a sufficiently large set of ops during downtime would provide additional confidence that there weren’t undetected data inconsistencies. Running it for even hundreds of thousands of ops per shard only takes a few minutes. This also mitigates concerns regarding undetected data inconsistencies resulting from the compare/retry approach.

Handling the unexpected

Despite taking various precautions to handle errors (retries, catching possible exceptions, logging), there were still an uncomfortable number of issues arising during my final test runs leading up to the production migration. There were sporadic network issues, a specific set of documents that was consistently causing mongos to sever its connection from copy_collection.py, and occasional connection resets from mongod.

Soon, I realized that I couln’t identify all the relevant failure scenarios, so I shifted my focus to quickly recovering from failures. I added logging of _ids of documents for which compare_collections.py had detected inconstencies. Then, I created another tool whose sole job was to re-copy the documents with those _ids.

Migration time!

During the production migration, copy_collection.py created an initial snapsphot of hundreds of millions of emails and replayed more than a hundred million MongoDB operations. Performing the initial snapshot, building indices, and catching up on replication took about 9 hours – well within the 24 hour goal I had set. I continued to let copy_collection.py replay ops from the source cluster’s oplogs for another day while I used compare_collections.py to verify all copied data three times (for additional safety).

The actual cutover to the new MongoDB cluster happened recently. The MongoDB-related work was very short (a few minutes). During a brief maintence window, I ran compare_collections.py to compare documents modified by the last 500,000 operations in each shard. After detecting no inconsistencies in the most recently updated data, we ran some smoke tests, pointed the Mailbox backend code to the new cluster, and brought the Mailbox service back up to the public. Our users haven’t reported any issues caused by the cutover. This was a success in my mind, as the best backend migrations are invisible to our users.

In contrast, our backend monitoring showed us the true benefits of the migration:

write_lock1
before and after

The decrease in the percentage of time the write lock was held was far better than the linear (50%) improvement we had expected based on our MongoDB profiling. Great success!

Hello, world

We're open-sourcing Hydra, the suite of tools we developed to perform the aforementioned MongoDB collection migration. We hope this code will be useful for anyone who needs to perform a live re-partitioning of their MongoDB data.

// Copy link