Micro-Service Fun – Node.js + Messaging + Clustering Combo

monty-1920-1200-wallpaper
Micro-Services in Silly Walk, Monty Python

A:    I told you, I’m not allowed to argue unless you’ve paid.
M:   I just paid!
A:   No you didn’t.
M:   I DID!
A:   No you didn’t.
M:  Look, I don’t want to argue about that.
A:  Well, you didn’t pay.
M:  Aha. If I didn’t pay, why are you arguing? I Got you!
A:   No you haven’t.
M:  Yes I have. If you’re arguing, I must have paid.
A:   Not necessarily. I could be arguing in my spare time.

 

Monty Python, ‘The Argument Clinic’

And now for something completely different. I decided to get off the soap box I kept climbing recently and give you some useful code for a change. I cannot stray too much from the topic of micro-services (because that is our reality now), or Node.js (because they is the technology with which we implement that reality). But I can zero in on a very practical problem we unearthed this week.

First, a refresher. When creating a complex distributed system using micro-services, one of the key problems to solve is to provide for inter-service communication. Micro-services normally provide REST APIs to get the baseline state, but the system is in constant flux and it does not take long until the state you received from the REST API is out of date. Of course, you can keep refreshing your copy by polling but this is not scalable and adds a lot of stress to the system, because a lot of these repeated requests do not result in new state (I can imagine an annoyed version of Scarlett Johansson in ‘Her’, replying with ‘No, you still have no new messages. Stop asking.’).

A better pattern is to reverse the flow of information and let the service that owns the data tell you when there are changes. This is where messaging comes in – modern micro-service based systems are hard to imagine without a message broker of some kind.

Another important topic of a modern system is scalability. In a world where you may need to quickly ramp up your ability to handle the demand (lucky you!), an ability to vertically or horizontally scale your micro-services on a moments notice is crucial.

Vertical and Horizontal Scalability

An important Node.js quirk explainer: people migrating from Java may not understand the ‘vertical scalability’ part of the equation. Due to the auto-magical handling of thread pools in a JEE container, increasing the real or virtual machine specs requires no effort on the software side. For example, if you add more CPU cores to your VM, JEE container will spread out to take advantage of them. If you add more memory, you may need to tweak the JVM settings but otherwise it will just work. Of course, at some point you will need to resort to multiple VMs (horizontal scalability), at which point you may discover that your JEE app is actually not written with clustering in mind. Bummer.

In the Node.js land, adding more cores will help you squat unless you make a very concrete effort to fork more processes. In practice, this is not hard to do with utilities such as PM2 – it may be as easy as running the following command:

pm2 start app.js -i max

Notice, however, that for Node.js, vertical and horizontal scalability is the same regarding the way you write your code. You need to cluster just to take advantage of all the CPU cores on the same machine, never mind load balancing multiple separate servers or VMs.

I actually LOVE this characteristic of Node.js – it forces you to think about clustering from the get go, discouraging you from holding onto data between requests, forcing you to store any state in a shared DB where it can be accessed by all the running instances. This makes the switch from vertical to horizontal scalability a non-event for you, which is a good thing to discover when you need to scale out in a hurry. Nothing new here, just basic share-nothing goodness (see 12factors for a good explainer).

However, there is one important difference between launching multiple Node.js processes using PM2 or Node ‘cluster’ module, and load-balancing multiple Node servers using something like Nginx. With load balancing using a proxy, we have a standalone server binding to a port on a machine, and balancing and URL proxying is done at the same time. You will write something like this:

http {
    upstream myapp1 {
        server srv1.example.com;
        server srv2.example.com;
        server srv3.example.com;
    }

    server {
        listen 80;

        location / {
            proxy_pass http://myapp1;
        }
    }
}

If you try to launch multiple Node servers on a single machine like this, all except the first one will fail because they cannot bind to the same port. However, if you use Node’s ‘cluster’ module (or use PM2 with uses the same module under the covers), a little bit of white magic happens – the master process has a bit of code that enables socket sharing between the workers using a policy (either OS-defined, or ’round-robin’ as of Node 0.12). This is very similar to what Nginx does to your Node instances running on separate servers, with a few more load balancing options (round-robin, least connected, IP-hash, weight-directed).

Now Add Messaging to the Mix

So far, so good. Now, as it is usual in life, the fun starts when you combine the two concepts I talked about (messaging and clustering) together.

To make things more concrete, let’s take a real world example (one we had to solve ourselves this week). We were writing an activity stream micro-service. Its job is to collect activities expressed using Activity Stream 2.0 draft spec, and store them in Cloudant DB, so that they can later be retrieved as an activity stream. This service does one thing, and does it well – it aggregates activities that can originate anywhere in the system – any micro-service can fire an activity by publishing into a dedicated MQTT topic.

On the surface, it sounds clear cut – we will use a well behaved mqtt module as a MQTT client, RabbitMQ for our polyglot message broker, and Node.js for our activity micro-service. This is not the first time we are using this kind of a system.

However, things become murky when clustering is added to the mix. This is what happens: MQTT is a pub/sub protocol. In order to allow each subscriber to read the messages from the queue at its own pace, RabbitMQ implements the protocol by hooking up separate queues for each Node instance in the cluster.

mqtt-cluster2

This is not what we want. Each instance will receive a ‘new activity’ message, and attempt to write it to the DB, requiring contention avoidance. Even if the DB can prevent all but one node to succeed in writing the activity record, this is wasteful because all the nodes are attempting the same task.

The problem here is that ‘white magic’ used for the clustering module to handle http/https server requests does not extend to the mqtt module.

Our initial thoughts around solving this problem were like this: if we move the message client to the master instance, it can react to incoming messages and pass them on to the forked workers in some kind of ’round- robin’ fashion. It seemed plausible, but had a moderate ‘ick’ factor because implementing our own load balancing seemed like fixing a solved problem. In addition, it would prevent us from using PM2 (because we had to be in control of forking the workers), and if we used multiple VMs and Nginx load balancing, we would be back to square one.

Fortunately, we realized that RabbitMQ can already handle this if we partially give up the pretense and acknowledge we are running AMQP under the MQTT abstraction. The way RabbitMQ works for pub/sub topologies is that publishers post to ‘topic’ exchanges, that are bound to queues using routing keys (in fact, there is direct mapping between AMQP routing keys and MQTT topics – it is trivial to map back and forth).

The problem we were having by using MQTT client on the consumer side was that each cluster instance received its own queue. By dropping to an AMQP client and making all the instances bind to the same queue, we let RabbitMQ essentially load balance the clients using ’round-robin’ policy. In fact, this way of working is listed in RabbitMQ documentation as work queue, which is exactly what we want.

amqp-cluster

OK, Show Us Some Code Now

Just for variety, I will publish MQTT messages to the topic using Eclipse PAHO Java client. Publishing using clients in Node.js or Ruby will be almost identical, modulo syntax differences:

	public static final String TOPIC = "activities";
	MqttClient client;

	try {
		client = new MqttClient("tcp://"+host+":1883",
                                          "mqtt-client1");
		client.connect();
		MqttMessage message = new MqttMessage(messageText.getBytes());
		client.publish(TOPIC, message);
	    System.out.println(" [x] Sent to MQTT topic '"
                                  +TOPIC+"': "+ message + "'");
		client.disconnect();
	} catch (MqttException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}

The client above will publish to the ‘activities’ topic. What we now need to do on the receiving end is set up a single queue and bind it to the default AMQP topic exchange (“amq.topic”) using the matching routing key (again, ‘activities’). The name of the queue does not matter as long as all the Node workers are using it (and they will by the virtue of being clones of each other).

var amqp = require('amqp');

var connection = amqp.createConnection({ host: 'localhost' });

// Wait for connection to become established.
connection.on('ready', function () {
  // Use the default 'amq.topic' exchange
  connection.queue('worker-queue', { durable: true}, function (q) {
      // Route key identical to the MQTT topic
      q.bind('activities');

      // Receive messages
      q.subscribe(function (message, headers, deliveryInfo, messageObject) {
        // Print messages to stdout
        console.log('Node AMQP('+process.pid+'): received topic "'+
        		deliveryInfo.routingKey+
        		'", message: "'+
        		message.data.toString()+'"');
      });
  });
});

Implementation detail: RabbitMQ MQTT plug-in uses the default topic exchange “amq.topic”. If we set up a different exchange for the MQTT traffic, we will need to explicitly name the exchange when binding the queue to it.

Any Downsides?

There are scenarios in which it is actually beneficial for all the workers to receive an identical message. When the workers are managing Socket.io chat rooms with clients, a message may affect multiple clients, so all the workers need to receive it and decide if it applies to them. A single queue topology used here is limited to cases where workers are used strictly for scalability and any single worker can do the job.

From the more philosophical point of view, by resorting to AMQP we have broken through the abstraction layer and removed the option to swap in another MQTT broker in the future. We looked around and noticed that other people had to do the same in this situation. MQTT is a pub/sub protocol and many pub/sub protocols have a ‘unicast’ feature which would deliver the message to only one of the subscribers using some kind of a selection policy (that would work splendidly in our case). Unfortunately, there is no ‘unicast’ in MQTT right now.

Nevertheless, by retaining the ability to handle front end and devices publishing in MQTT we preserved the nice features of MQTT protocol (simple, light weight, can run on the smallest of devices). We continue to express the messaging side of our APIs using MQTT. At the same time, we were able to tap into the ‘unicast’ behavior by using the RabbitMQ features.

That seems like a good compromise to me. Here is hoping that unicast will become part of the MQTT protocol at some point in the future. Then we can go back to pretending we are running a ‘mystery MQTT broker’ rather than RabbitMQ for our messaging needs.

Nudge, nudge. Wink wink. Say no more.

© Dejan Glozic, 2014

Advertisements

6 thoughts on “Micro-Service Fun – Node.js + Messaging + Clustering Combo

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s