It’s finally time to implement that new personalization service — the one you’ve been pushing for for months.
With it, your app will be serving up relevant, personalized content to every user. But the further you look into it, the more you furrow your brow — lots of processing must happen to properly prepare user data, and the prospect of expanding it to a much larger population creates performance concerns.
This is a problem humans have seen before, and they’ve figured out a model that maps to it well: pipelining.
Pipelining is the decomposition of a larger task into smaller, distinct tasks, so that they may be calculated in parallel. The idea proliferated in computing when CPU technology blossomed, and a fast-but-general model for computing large problems was needed. Like most good ideas, it maps equally well now: our processing resources are now nodes in a cluster, and our datasets are orders of magnitude larger, but the fundamental idea remains the same.
A pipeline consists of few components:
None of these components know about each other, and none of them need to. This isolates each component’s scope, allows for stages to be added, shuffled, or removed transparently, and allows a large amount of freedom in delegating task computation. In plain English, it makes it effortless to move processing out of core to systems like Spark, Storm, or Hadoop. Transforms operating on only one item at a time also simplifies the code we write, allowing it to be applied any way we want.
One key benefit of using pipelines is that their innate separation of concerns between sourcing, transforming, and saving data makes refactoring a breeze at low initial time investment. Also, the inherent ambiguity in processing strategy allows for an open-ended path for scaling up processing. For instance, pipeline transforms can be fulfilled using streaming, batch, or even mini-batch strategies.
Pipelining produces performant, well-factored, and extendable code by default.
Pipelines are implementable in any language, but are quite easy to represent in Python. Let’s take an example for adding post information to a user’s account:
def pipeline(): user_posts = get_users_and_posts() # Source updated = map(process, user_posts) # Transforms save_results(updated) # Sink
The source stage pulls users and their posts, and associates them together:
from join import join, group def get_users_and_posts(): users = db.get_new_users() posts = db.get_user_posts(users) return join(users, group(posts, 'uid'), left_key='uid', right_key=lambda posts: posts.uid)
The transform stage passes each user and their posts through each data-processing function:
def process(user_and_posts): transforms = [post_count, post_sentiment, post_reach] results = user_and_posts for transform in transforms: results = transform(*results) return results def post_count(user, posts): user.post_count = len(posts) return user, posts def post_sentiment(user, posts): for post in posts: user.post_sentiment += sentiment(post.text) return user, posts def post_reach(user, posts): user.post_reach = sum(map(lambda p: p.reach, posts)) return user, posts
Since each function returns the same things it is passed, they may be composed (chained together) easily.
The sink stage is the simplest, and consumes the generator created by
def save_results(results): for user, posts in results: db.save(user)
This layout gives us the freedom to change the implementation of the source, transforms, or sink without concern of breaking the others. Also, it allows us to change execution strategies and use a
ProcessPoolExecutor to process the transforms in parallel.
Even converting the process to use Spark is effortless - only the
get_users_and_posts functions must change:
def pipeline(): user_posts = get_users_and_posts() updated = user_posts.map(process).collect() # Updated save_results(updated) def get_users_and_posts(): users = sc.parallelize(db.get_new_users()) \ .keyBy(lambda u: u.uid) # Updated posts = sc.parallelize(db.get_user_posts(users)) \ .keyBy(lambda u: u.uid).groupByKey() # Updated return users.join(posts) # Updated
Et voilà - you’ve just scaled your personalization service to cluster level!
Pipelining overall is a very good way of describing large data processing tasks, and using it provides clean, scalable, and changeable code from the start.
Know of any other great and simple pipeline implementations? I’d love to hear about them!
By Stuart Axelbrooke, who does data science and text analytics. You should follow him on Twitter