The genesis of the software industry to stream processing is well underway. Open source systems like Kafka handle huge throughputs with surprisingly few resources, and aid heavily in decomposing monoliths into micro-services.

When developers and engineers first step into this world of stream processing, though, there can be some uncertainty: How do you create succinct, resilient, and performant components of this system? How do they come together to form the larger system? How do you get answers without querying a database?

Reducers answer these questions.

Reducers provide a simple, performant, and fault tolerant strategy for any log processing system.

What is a reducer? At its simplest, a reducer is a function that accepts an item (of a sequence) and a state, and produces a new state. It’s the function you pass to list.reduce(). It is a simple, but very powerful perspective on stream processing.

In [1]: def reducer(state, item):
   ...:     return [item] + state # reverse!
   ...:
In [2]: reduce(reducer, [0, 1, 2, 3], [])
Out[2]: [3, 2, 1, 0]

Since every reducer has an explicit and limited scope, it can be safely partitioned, allowing your stream processors to scale horizontally. Also, dealing with failure is easy: reducers handle input incrementally, and contain limited but easily recreated state. Kafka will handle resending un-acked messages when a reducer disappears and is subsequently reconstructed. And best of all - they are dirt simple: “do this when you are created, then do this when ever you get something."

A diagram showing fold/reduce, thanks to Wikimedia

As an example, let’s take a component that must efficiently add data to a stream - adding organization ID to events emitted from an app. Traditionally, you might be tempted to do a memoized query to your database for each event you receive - and this is mostly fine, but what about when an app changes orgs? How are the event handlers notified? Is the database used as the central notifier of this change? If so, our caching works against us.

Stream processing shines here: simply build a map from app ID to organization ID when the reducer is constructed, and listen for new events and apps (and app changes) and keep the map updated - Kafka will handle distributing app changes to all interested parties.

def org_labler(app_org_ids, message):
    if message['type'] == 'app':
        app_org_ids[message['app_id']] = message['org_id']
    elif message['type'] == 'event':
        message['org_id'] = app_org_ids[message['app_id']]
        emit('event-orgd', message)
    return app_org_ids

def fetch_app_org_map():
    apps = db.apps.get()
    return {app['app_id']: app['org_id'] for app in apps}

app_event_stream = Stream(configs).subscribe(['app', 'event'])
app_event_stream.reduce(org_labler, fetch_app_org_map())

In a scant 14 lines, we’ve defined a simple and scalable solution for our problem. Making changes is easy also - want to listen for another piece of data to be added? Simply subscribe to it and add it to the persisted state object. Some things may be difficult in this model, but this usually helps stop you from shooting yourself in the foot with a complex or non-scalable solution.

Generally, reducers allow you to build large, simple topic networks that are easy to update. Know about other effective stream processing patterns? I’d love to hear about them!

Related Resources