It’s finally time to implement that new personalization service — the one you’ve been pushing for for months. 

With it, your app will be serving up relevant, personalized content to every user. But the further you look into it, the more you furrow your brow — lots of processing must happen to properly prepare user data, and the prospect of expanding it to a much larger population creates performance concerns.

Fear not!

This is a problem humans have seen before, and they’ve figured out a model that maps to it well: pipelining.

Pipelining is the decomposition of a larger task into smaller, distinct tasks, so that they may be calculated in parallel. The idea proliferated in computing when CPU technology blossomed, and a fast-but-general model for computing large problems was needed. Like most good ideas, it maps equally well now: our processing resources are now nodes in a cluster, and our datasets are orders of magnitude larger, but the fundamental idea remains the same.

pipelining.gif

A pipeline consists of few components:

  1. A source that pulls and joins / groups data together
  2. Any amount of transform stages (functions), each operating on one datum
  3. One or more sinks that save or send the data somewhere

None of these components know about each other, and none of them need to. This isolates each component’s scope, allows for stages to be added, shuffled, or removed transparently, and allows a large amount of freedom in delegating task computation. In plain English, it makes it effortless to move processing out of core to systems like Spark, Storm, or Hadoop. Transforms operating on only one item at a time also simplifies the code we write, allowing it to be applied any way we want.

One key benefit of using pipelines is that their innate separation of concerns between sourcing, transforming, and saving data makes refactoring a breeze at low initial time investment. Also, the inherent ambiguity in processing strategy allows for an open-ended path for scaling up processing. For instance, pipeline transforms can be fulfilled using streaming, batch, or even mini-batch strategies.

Pipelining produces performant, well-factored, and extendable code by default.

Pipelines are implementable in any language, but are quite easy to represent in Python. Let’s take an example for adding post information to a user’s account:

def pipeline():
    user_posts = get_users_and_posts()  # Source
    updated = map(process, user_posts)  # Transforms
    save_results(updated)               # Sink

The source stage pulls users and their posts, and associates them together:

from join import join, group

def get_users_and_posts():
    users = db.get_new_users()
    posts = db.get_user_posts(users)
    return join(users, group(posts, 'uid'), left_key='uid',
                right_key=lambda posts: posts[0].uid)

The transform stage passes each user and their posts through each data-processing function:

def process(user_and_posts):
    transforms = [post_count, post_sentiment, post_reach]
    results = user_and_posts
    for transform in transforms:
        results = transform(*results)
    return results

def post_count(user, posts):
    user.post_count = len(posts)
    return user, posts

def post_sentiment(user, posts):
    for post in posts:
        user.post_sentiment += sentiment(post.text)
    return user, posts

def post_reach(user, posts):
    user.post_reach = sum(map(lambda p: p.reach, posts))
    return user, posts

Since each function returns the same things it is passed, they may be composed (chained together) easily.

The sink stage is the simplest, and consumes the generator created by map:

def save_results(results):
    for user, posts in results:
        db.save(user)

This layout gives us the freedom to change the implementation of the source, transforms, or sink without concern of breaking the others. Also, it allows us to change execution strategies and use a ProcessPoolExecutor to process the transforms in parallel.

Even converting the process to use Spark is effortless - only the pipeline and get_users_and_posts functions must change:

def pipeline():
    user_posts = get_users_and_posts()
    updated = user_posts.map(process).collect()  # Updated
    save_results(updated)

def get_users_and_posts():
    users = sc.parallelize(db.get_new_users()) \
        .keyBy(lambda u: u.uid)                  # Updated
    posts = sc.parallelize(db.get_user_posts(users)) \
        .keyBy(lambda u: u.uid).groupByKey()     # Updated
    return users.join(posts)                     # Updated

Et voilà - you’ve just scaled your personalization service to cluster level!

Pipelining overall is a very good way of describing large data processing tasks, and using it provides clean, scalable, and changeable code from the start.

Know of any other great and simple pipeline implementations? I’d love to hear about them!