I’ll start off saying that the following things are true:

  • This is not a post about processing massive amounts of data at a time.
  • I am not an expert in this area.
  • I am not a database expert

This is a post about dealing with the perils of large Mongo datasets. This probably sounds eerily similar to what I’m saying this post is not, but hopefully you will see what I’m talking about quite soon.

If you can see that I’m going about this completely the wrong way, please pipe up and let me know by commenting on the post or shooting me an email.

Let’s talk about a hypothetical situation so we can see when we might start facing some troubles.

The situation

Okay, cool - you’re a back end developer who has just been tasked with the following:

Build a service which monitors a Mongo collection and depending on the states of the items in the collection, update an Amazon CloudSearch index accordingly. The three main operations we care about are: adding items, updating items and removing items.

Some assumptions we can make about this task are:

  • The CloudSearch index does not need to be updating the moment items are updated in our Mongo collection (i.e. it doesn’t need to be in real time). As long as the resolution of the search index is in the order of 30 minutes-2 hours, that is fine.
  • If possible, you are to keep the technology you use in your solution as basic as possible (i.e. we don’t want to overengineer this to the n-th degree - more about this later).
  • You are allowed to modify the document schema as much as you like whilst working on this task (i.e. if you need to add a flag to each document to track something, you can).
  • You are allowed to create other collections if you wish to solve this task (or make the task easier to manage).
  • Not every document in the collection will necessarily be wanted to be included in the CloudSearch index.

Some information about the Mongo collection you will be working with:

  • The collection is 500gb.
  • There have been no fancy optimisation techniques employed to speed up the Mongo database (i.e. it’s pretty vanilla - someone has a basic Mongo install with a collection created and documents are inserted as per normal).

You can also assume that the documents in question currently conform to the following schema:

  • name - String that represents the name of a person.
  • profession - String that represents the profession of a person.
  • modified - ISO date that represents the last date the document was modified.

With this basic outline and information about the task at hand, we can begin to think how we might tackle it.

Let’s talk about ways we could determine whether a document needs to be added, updated or deleted in our CloudSearch index.

Adding an item to the index

Before we begin, let’s quickly assume that we will add a flag to every document called lastIndexed, which is and ISO date that tracks the date that the document was last added or updated in our CloudSearch index. lastIndexed will be set to null initially.

We will also add a boolean flag to every document called includeInSearch which will represent whether a document should be included in our CloudSearch index or not. includeInSearch will initially be set to true, but keep in mind that documents that get added to the Mongo collection at a later point might have this flag set to false initially (i.e. it is arbitrary).

With this in mind, I think the logic here is quite simple. A document should be added to the CloudSearch index if the following criteria is true:

  1. includeInSearch is true.
  2. lastIndexed is null.

The way this reads to me is “if we want the document to be included in the search index and it hasn’t been indexed before, it must be new, so let’s add it”.

No dramas here.

Updating an item that is currently being indexed

Okay, so now we want to update something that is currently being indexed. The logic for that might look like this:

  1. includeInSearch is true.
  2. lastIndexed is not equal to null.
  3. The modified date is greater than the lastIndexed date.

Still not seeing any problems here.

Removing an item from the index

So for whatever reason, we don’t want an item to be indexed anymore - so we change it’s includeInSearch flag to false. This should leave us with the logic of:

  1. includeInSearch is false.
  2. lastIndexed is not equal to null.

Uh oh, we have nutted out our logic and it’s pretty sound…

The problem is in the pudding

But Dave - you promised me perils and frustration!

Right, so we’re at the point where we have worked out the logic for this indexing service but haven’t run into any problems yet. So let’s start to (sort of) code a solution. For the sake of brevity (and the fact you won’t need to look at as much of my terrible code), I’m not going to provide a complete solution for this as I think that you’ll understand what I’m talking about regardless. We will also just focus on the adding of a document to a CloudSearch index as the same problems exist for updating and deleting of items.

Let’s assume our solution will be written using NodeJS. We will use the node-mongodb-native driver, version 2.1. You can find the documentation for that here.

var mongo = require('mongodb').MongoClient;

var mongoUrl = 'linktomydatabase.com/mydb';

mongo.connect(mongoUrl, function(err, db) {
  var query = {
    includeInSearch: true,
    lastIndexed: null
  };

  var collection = db.collection('people');
  collection.find(query)
    .toArray(function(err, docs) {
      // process() updates our CloudSearch index and updates the db.
      process(docs);      
      db.close();
    });
});

Stop right there! This looks pretty straightforward, right. We connect to our database and grab all the documents that satisfy our search criteria. This would be fine, but remember that the collection is pretty large and if the result set is greater than the BSON size limit (16mb) then you will get an exception. This immediately puts us in some strife, because 16mb is pretty tiny compared to our collection size of 500gb.

What could we do about this?

Why don’t we just change the BSON size limit? Unfortunately, you can’t. Bummer. If anyone knows why this is, please let me know.

How about we try use streams? Streams are very handy when it comes to most Mongo relating processing. However, we don’t want to hit the CloudSearch API too much or we will risk being throttled or blocked from the API, so let’s process our documents in batches of 1000, rather than one at a time. It might look something like this:

var mongo = require('mongodb').MongoClient;

var mongoUrl = 'linktomydatabase.com/mydb';

mongo.connect(mongoUrl, function(err, db) {
  var query = {
    includeInSearch: true,
    lastIndexed: null
  };
  
  var collection = db.collection('people');
  var stream = collection.find(query).batchSize(1000).stream();
  var batch = [];

  stream.on('end', function() {
    console.log('Stream ended!');
    db.close();
  });

  stream.on('data', function(doc) {
    batch.push(doc);

    if (batch.length == 1000 || !stream.hasNext()) {
      stream.pause();
      // process() updates our CloudSearch index and updates the db.
      process(batch)
        .then(function() {
          // Only resume stream once we're done processing.
          stream.resume();
        });      
    }
  });

This seems like it solves the problem, however there are still random cases (willing to discuss this with people in more detail if they like - just let me know in the comments) where the data event can be fired while the batch is being processed which means documents are essentially ‘lost’ in processing.

So now what are we left with? Let’s see if we can operate on the next available document from a cursor and call a function for each document. Fortunately we can, with Cursor’s next() function, which takes a function as a parameter.

We can assume here that processBatch() is the same function as before that was taking care of uploading documents to the CloudSearch index and updating the database accordingly. In addition to this, it also can control when the cursor.next() function is called (i.e. once the documents have been uploaded and the database is updated, then the next() function is invoked).

var mongo = require('mongodb').MongoClient;

var mongoUrl = 'linktomydatabase.com/mydb';

mongo.connect(mongoUrl, function(err, db) {
  var query = {
    includeInSearch: true,
    lastIndexed: null
  };
  var currentBatch = [];
  var batchSize = 1000;

  var collection = db.collection('people');
  var cursor = collection.find(query);

  // Kickstart the processing.
  cursor.next(process);

  function process(err, doc) {
    // Indicates whether there are more documents to process after the current one.
    var hasMore = (doc !== null) ? true : false;

    if (doc === null) {
      if (currentBatch.length > 0) {
        processBatch(hasMore);
      }
      console.log('Finished processing documents');
      return;
    }
    else {
      setTimeout(function() {
        currentBatch.push(doc);
        if (currentBatch.length % batchSize == 0) {
          processBatch(hasMore);
        }
        else if (hasMore){
          cursor.next(process);
        }
        else {
          // This shouldn't ever get hit.
          return;
        }
      });
    }
  }

Sweet - this works! However, the problem here is that it is essentially working in a serial fashion - i.e. the next batch will not be processed until the previous one has finished uploading. This doesn’t bother me too much as it seems to tick all the boxes. However, I’m stumped for how you can reliably speed this up (keyword: reliably) without getting any “fancier”. Just to clear the air here - I’m not against more complex solutions, but I’m definitely not for complex solutions for the sake of being complex. In my eyes, it’s a reasonably unambiguous problem to solve, but there seem to be factors which stop a simple, reliable, efficient solution.

Obviously we could thread this application, or completely change it’s architecture to a producer-consumer one (i.e. a message is created every time a document is updated and there are consumer processes running across a series of hosts to consume those messages and process them in a more real time fashion).

I’m interested in what you think might be a good approach to this problem domain. Let me know!

-Dave