Asynchronous, event-driven Systems: Introductory Patterns and a Case Study

By: on June 29, 2020

For some reason, and probably sufficiently late to the party, I was thinking about conceptual patterns for designing an asynchronous, event-driven, distributed system the other day. If the system is event-based, and every action would result in a message passed through the system asynchronously, how, for example, might you implement a simple login? It seemed peculiar to assume that the client fires the event but doesn’t get an immediate answer. Would we then poll for a status? Or wait that a response message finds its way back to us? Wouldn’t that require a bi-directional connection to every client? Does that scale?

In this post, I want to explore some of the conceptual patterns suitable for building that asynchronous, event-driven distributed system. In order to do so, we’ll cover some basics of RabbitMQ—a message broker that could be used to implement the system—before going over common patterns when using a message broker. I’ll end with a case study, analysing how Wunderlist built version 3 of their app as an event-based system using RabbitMQ.

But first some lightweight (and likely not 100% academically accurate) definitions:

  • ‘asynchrony’ here means that we don’t block the execution of our program in order to wait for a response when firing off an event. The response, if at all, will arrive at a later time, and will be handled only then.
  • ‘event-driven’ here means, that actions by users, or most every other thing that can happen within the system or outside of it, are considered events, to which the system then reacts. Such events are propagated through the system as messages.
  • ‘distributed’ here means, that we communicate across network borders. Whether that means that the individual processes run on different machines or not, is not relevant.

Why that kind of system? The relevant answer for this blog post is simply that such systems seem interesting. They are often deployed for their ability to scale and are, due to their loose coupling, easily extensible. So, why not?

Why RabbitMQ? RabbitMQ is one of the most used open-source message brokers, and has proven its capabilities at scale in numerous enterprises and its ease of use in many startups. Since this blog is only dealing with high-level conceptual patterns, probably any message broker could be used. Yet the case study uses RabbitMQ, so it seems to be fitting.

Introduction to RabbitMQ / Message Broker

In an ideal world—in which nothing ever needs to be restarted, no network or hardware failures occur and there is always a subscriber to a message—the following topics should be enough to configure a system based on RabbitMQ.

Exchanges

A message broker works by receiving messages from a publisher and routes that message to a consumer via an exchange, which sends the message to one or more queues based on different rules, called bindings. The message broker will then deliver the message from the queue to any consumer that is subscribed to that queue.

The exchanges differ in the way they map the routing key of the message to the binding connecting the exchange to the queue.

Direct Exchange

The direct exchange delivers messages to queues based on a routing key. They are ideal when publishing messages onto just one queue, but multiple queues are possible. A message will be routed to a queue if the routing key of the message matches the routing key of the binding to the queue.

Fanout Exchange

A fanout exchange routes messages to all queues, that are bound to it. The routing key is ignored. Each queue receives every message. Fanout exchanges are ideal for broadcasting messages.

Topic Exchange

Topic exchanges are similar to direct exchanges, but they allow for wildcards on the routing keys of the bindings. They route messages to queues based on matches between the message’s routing key and the pattern used to bind the queue to the exchange. The topic exchange is often used to implement various publish-subscribe variations.

Headers Exchange

A headers exchange is used for routing via multiple attributes expressed in headers. The routing key is ignored for this type of exchange.

Queues

Messages are passed through queues, which work on a first-in-first-out basis. More than one subscriber can be registered to a queue.

Bindings

Bindings are the rules that specify how messages are routed from exchanges to queues. They may have a defined routing key that is used by some exchange types. If a message can’t be routed to a queue, it is either dropped or returned to the consumer.

Routing Key

The routing key is used by a given exchange to find the corresponding queue. If no binding matches, the message can be discarded or returned to the sender.

Simple Patterns

With the basics covered, let’s move on to some intuitive patterns when using a message broker.

When building control flow, two things come to mind: Sequences and branches. Different patterns can easily be implemented by chaining exchanges together appropriately.

Sequential pipeline

Each subscriber to a queue can, in turn, send a message to a different exchange.

Branching

Depending on the outcome of an operation, a message can be sent to the associated exchange.

Broadcasting

By configuring a fanout exchange accordingly.

Remote Procedure Call (RPC) / Request-Reply pattern

A more complex pattern is the Remote Procedure Call. When communicating asynchronously, the communication is one-way. Usually, we send the message then forget about it. But sometimes a direct answer is desired.

To achieve this, the client sends a message containing a ‘callback’ exchange (this would just be a string). The receiver of the message will then publish the response to that callback.

If the exchange is not uniquely assigned to the client, a correlation id can be used to identify the original request.

When thinking about asynchronous designs, I stumbled upon this pattern when wondering how to implement a simple login. It felt weird to assume that the login message went out and then the client either needs to poll the status in a synchronous way or that the response needs to find its way back in some way. The Remote Procedure Call solves that ‘in some way’ problem.

Multiple Preconditions

Certain user stories require multiple preconditions to be fulfilled before the request can be processed further. And this seems to be one of the more complicated things to implement in an asynchronous system:

(A node here is an instance, that can publish and subscribe to queues)

Assume Node_A emits two messages, M_1 and M_2, which will be handled somewhere unimportant. Assume further that Node_B requires positive responses to M_1 and M_2 (called M_1_p, M_2_p) to continue the workflow.

If there is only a single instance of Node_B, then this instance can simply wait for the messages, storing the first arriver in memory, in order to look it up when the second message arrives.

If there are multiple instances of Node_B, then synchronization via some kind of storage becomes necessary. All instances need a centralized place to store and look up the messages. In any distributed system, a central storage like this can lead to problems with performance and consistency, which need to be addressed accordingly (how you do that is a topic outside of the scope of this post).

A different approach would be to introduce some kind of ‘mediator’: Instances of NODE_A could be aware of the necessary steps of the workflow and use the Remote Procedure Call approach to execute each step.

An opinion: If the topology evolves into many mediator nodes rpc-ing nodes that are only ever called by that mediator, then the message broker topology might not be the best choice for the system.

Case Study: Wunderlist

The following case study is based on the description at cloudamqp. Since there is some ambiguity in the talk, the following outline needs to be regarded as my approach to the design of their system.

Wunderlist implemented its todo app with the following goals:

  1. The servers should not be intelligent
  2. Each change should be propagated within five seconds
  3. Building new features would be easier (compared to previous versions of Wunderlist)
  4. The system should be event-based (which should result in better scalability)

And they ended up with a system, that is roughly summarized by the following:

  1. Four main exchanges: create, update, destroy, touch
  2. The clients address the main exchanges, the routing key is the object type (like list, comment, …). Therefore, clients would address the create exchange with object type comment when they want to write a comment.
    1. The rest of the system happens, because subscribers are then bound to those queues
    2. The data of the system is readily available, just by creating queues and bindings.  The speaker of the talked called this the ‘river of data’. This way, new parts can be added to the system minimal disturbance.
  3. They have a queue called ‘events’, which sends slimmed-down messages to ‘apps’.
    1. Those ‘apps’ seem to be Scala applications, which in turn notify the connected clients, which can be a smartphone, a browser, or similar.
    2. Each of those Scala apps receives every message from the ‘events’ queue, and then sends them along if the right client is connected. Wunderlist manages three billion messages per day, and this design still holds up.
  4. They wrote Cerebro/Cerebra, which contains all information regarding exchanges, bindings and queues in a central spot, which they deem very important to have
  5. There are two connections per app
  6. They don’t cheat. A create event in the app goes all the way around
  7. They recommend not using Remote Procedure Calls
  8. RabbitMQ bindings are expensive, so don’t use too many. Allegedly 500,000 is too many.

Conclusion

What struck me the most was the fact, that Wunderlist simply broadcasts the events that are supposed to go to the clients to all Scala apps – which are maintaining the connections to the clients – and those then notify the correct end user. As mentioned in the first paragraph of this blog, I wondered whether that approach was feasible. It turns out that it is. The recording mentions that they aren’t sure to what extent that approach scales, but it seems feasible to adjust the broadcasting to a smaller set of ‘apps’.

Furthermore, according to this Erlang Factory 2014 – That’s ‘Billion’ with a ‘B’: Scaling to the Next Level at WhatsApp talk, Whatsapp was running an Erlang system on their servers, which managed to hold roughly 1 Million connections each. So, I should have some more trust in computers…

Some code

Here is a simple approximation of the Wunderlist system, using the 3 javascript files client.js, app.js and server.js. To run the code, readline and ws need to be installed using npm.

Example usage:

  • In its own terminal start server.js: ./server.js
    • Make sure a rabbit instance is running. For example via docker: docker run -d –hostname my-rabbit –name some-rabbit -p 5672:5672 rabbitmq
  • In its’ own terminal start app.js on some port: ./app.js 8070
  • In its’ own terminal start client.js and connect to the port of app.js: ./client.js 8070
    • client.js will then prompt user inputs. Log in with a unique user name, create a list and add to that list.
    • The show data option will show the created lists
  • In its’ own terminal start another client.js and connect to the port of app.js: ./client.js 8070
    • Log in with a different user name. Creating and adding to lists with the different clients will only affect their own data (the list names need to be unique)
    • When the second user joins a list created by the first user, that list is added to the second users data. And further additions will be shared
  • The topology can be further extended:
    • In yet another terminal, a second (or nth…) app can be started on a new port: ./app.js 8071
    • This app, in turn, can accept clients and propagate data

 

client.js


#!/usr/bin/env node

/**
 * A very simple client application.
 *
 * Usage: ./client.js <PORT_NUMBER>
 *
 * It creates a websocket connection on the provided port to the app.js process.
 *
 * The user then needs to `login` and can then create lists and add items to them.
 * The names of the lists needs to be unique, since server.js stores them by the name
 * in a hash-map
 *
 * Example:
 *
 *   ./client.js 8070
 *   0 user1
 *   1 foo
 *   2 foo yay :)
 *   4
 *
 * The logging will be a bit ugly ¯\_(ツ)_/¯
 */

// Create the websocket connection

const WebSocket = require('ws');
const readline = require('readline');

let [port, ...argvRest] = process.argv.slice(2);

const ws = new WebSocket(`ws://127.0.0.1:${parseInt(port)}`);
console.log(`Connected to port ${port}`);

let store = {};

ws.on('message', function incoming(message) {
  console.log(`Received message ${message}`);
  message = JSON.parse(message);

  switch (message.type) {
      case "login": {
        store["_user"] = message.user;
        break
      }
      case "created": {
        store[message.val] = [];
        break
      }
      case "added": {
        store[message.list].push(message.val);
        break
      }
      case "joined": {
        store[message.list] = message.val;
        break
      }
  }
});


// Prints the action loop

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout
});

rl.on('SIGINT', () => {
    process.exit()
});


(function prompt() {
  rl.question('What do you want to do:\n[0] login <user>\n[1] create list <list_name>\n[2] add to list <list_name> <content>\n[3] join list <list_name>\n[4] show data\n? ', (line) => {
    let [action, ...msg] = line.split(' ');
    msg = msg.join(' ').trim();
    switch (action) {
      case '0': {
        ws.send(JSON.stringify({type: 'login', user: msg}))
        break
      }
      case '1': {
        let user = store['_user'];
        ws.send(JSON.stringify({type: 'createList', user: user, list: msg}))
        break
      }
      case '2': {
        let user = store['_user'];
        let [list, ...val] = msg.split(' ');
        val = val.join(' ');
        ws.send(JSON.stringify({type: 'addToList', user: user, list: list, val: val}))
        break
      }
      case '3': {
        let user = store['_user'];
        ws.send(JSON.stringify({type: 'joinList', user: user, list: msg}))
        break
      }
      case '4': {
        console.log(JSON.stringify(store));
        break
      }
    }

    prompt();
  });
})();

app.js


#!/usr/bin/env node

/**
 * Forwards client message to RabbitMQ and returns the responses
 *
 * Usage: ./app.js <PORT_NUMBER>
 *
 * A client only logins in to this process
 *
 */

const WebSocket = require('ws');
const amqp = require('amqplib/callback_api');

let ch;

// Functions to publish to RabbitMQ
function publish(channel, exchange, exchangeType, payload) {
  channel.assertExchange(exchange, exchangeType, {
    durable: false
  });

  channel.publish(exchange, '', payload);
}

function createList(channel, user, list) {
  publish(
    channel, 'createList', 'fanout',
    Buffer.from(JSON.stringify({'user': user, 'list': list}))
  );

  console.log(" [x] Published 'creat list' %s", list);
}

function addToList(channel, user, list, val) {
  publish(
    channel, 'addToList', 'fanout',
    Buffer.from(JSON.stringify({user: user, list: list, val: val}))
  );

  console.log(" [x] Published 'add to list' %s", list);
}

function joinList(channel, user, list) {
  publish(
    channel, 'joinList', 'fanout',
    Buffer.from(JSON.stringify({user: user, list: list}))
  );

  console.log(" [x] Joined list %s", list);
}

/**
 * When receiving message from RabbitMQ, identify the corresponding user and
 * forward them.
 */
function eventsCallback(message) {
  console.log('Rabbit:');
  let msg = JSON.parse(message.content.toString());

  if (msg.type == 'added') {
    // This is a little more elaborate, since users can join lists
    for (let user of msg.participants) {
      let ws = connections[user];
      if (ws !== undefined) {
        console.log(' [x] Sending to user %s', msg.user);
        ws.send(JSON.stringify(msg));
      } else {
        console.log(' [x] Unkown user %s', msg.user);
      }
    }
  } else {
    let ws = connections[msg.user];
    if (ws !== undefined) {
      console.log(' [x] Sending to user %s', msg.user);
      ws.send(JSON.stringify(msg));
    } else {
      console.log(' [x] Unkown user %s', msg.user);
    }
  }
}

// Connect to RabbitMQ
amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }

  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    ch = channel;

    channel.assertQueue('', {
      exclusive: true
      }, function(error2, q) {
        if (error2) {
          throw error2;
        }
      channel.bindQueue(q.queue, 'events', '');
      channel.consume(q.queue, eventsCallback);
    });
  });

});


// Listen to websocket connections on the provided port
let [port, ...argvRest] = process.argv.slice(2);

const wss = new WebSocket.Server({ port: parseInt(port) });
console.log(`Listening on port ${port}`);

let connections = {}
 
wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('Client: %s', message);
    message = JSON.parse(message);
    switch (message.type) {
      case "login": {
        // logging in just associates the websocket with the user name
        connections[message.user] = ws;
        ws._user = message.user;
        ws.send(JSON.stringify(message));
        break;
      }
      case "createList": {
        let user = ws._user;
        createList(ch, user, message.list);
        break;
      }
      case "addToList": {
        let user = ws._user;
        addToList(ch, user, message.list, message.val);
        break;
      }
      case "joinList": {
        let user = ws._user;
        joinList(ch, user, message.list);
        break;
      }
    };
  });
});

server.js


#!/usr/bin/env node

/**
 * An approximation to a lot fo rabbit functionality.
 *
 * Usage: ./server.js
 *
 * Fills a `store` based on the received messages
 *
 * Requires a RabbitMQ instance running
 */

var amqp = require('amqplib/callback_api');


let store = {};
let listParticipants = {};

function assertExchange(channel, exchange, callback) {
  channel.assertExchange(exchange, 'fanout', {
    durable: false
  });

  channel.assertQueue('', {
    exclusive: true
    }, function(error2, q) {
      if (error2) {
        throw error2;
      }

      channel.bindQueue(q.queue, exchange, '');
      channel.consume(q.queue, callback);
  });
}

const createList = (channel) => (msg) => {
  msg = JSON.parse(msg.content.toString());
  console.log(" [x] Created list %s", msg.list);

  store[msg.list] = [];
  listParticipants[msg.list] = [msg.user];

  channel.assertExchange('events', 'fanout', {
    durable: false
  });

  channel.publish('events', msg.list, Buffer.from(JSON.stringify({type: 'created', user: msg.user, val: msg.list})));
}

const addToList = (channel) => (msg) => {
  msg = JSON.parse(msg.content.toString());
  store[msg.list].push(msg.val);

  console.log(" [x] Added to list %s", msg.list);

  channel.assertExchange('events', 'fanout', {
    durable: false
  });

  channel.publish('events', msg.list, Buffer.from(JSON.stringify({type: 'added', participants: listParticipants[msg.list], ...msg})));
}

const joinList = (channel) => (msg) => {
  msg = JSON.parse(msg.content.toString());
  console.log(" [x] Joined list %s", msg.list);

  listParticipants[msg.list].push(msg.user);

  channel.assertExchange('events', 'fanout', {
    durable: false
  });

  channel.publish('events', msg.list, Buffer.from(JSON.stringify({'type': 'joined', 'val': store[msg.list], ...msg})));
}

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }

    const log = (msg) => {
      console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
    }

    channel.assertExchange('events', 'fanout', {
      durable: false
    });

    assertExchange(channel, 'createList', createList(channel));
    assertExchange(channel, 'addToList', addToList(channel));
    assertExchange(channel, 'joinList', joinList(channel));
  });

});
Share

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*