Vert.x is a very powerful set of libraries that help to write reliable concurrent applications in several languages, typically, but not limited to, clients and servers. Beside the documentation, the web site provides many short examples that help to get up and running. I have been recently trying myself at clustering but I couldn’t find a detailed end-to-end description of how to set up a clustered system. So I decided to implement a toy in-memory registry and explore Vert.x internals. This post describes this process.

Introduction

Reactive systems

A system is said to be reactive if it continues to reply while facing some kind of pressure or failure. Reactiveness has been made popular by the The reactive manifesto. Basically, such a system still replies after that some exception has happen, after a server or a database had crashed, or during high workloads. There is no magic in there: a system is reactive because its architecture had been designed to be reactive and because it runs on an infrastructure that allows such capabilities. Just to give a simple idea, consider a web service that runs on a single (physical or virtual) server. If that server crashes, obviously the web service will also die and will not respond anymore to incoming requests. Yet, if that service runs on a couple of servers behind a load balancer and if they can exchange some information, for example about the sessions, loosing a node should not be an issue: the load balancer will detect the failure and will not send any request on that server anymore and if the session had been properly shared with other servers, any server will be able to handle any request so the end users will not even notice the outage of one node.

Those architectures often associated with terms like high availability, horizontal scalability, or clustering. The concepts are not really new but now days there are more and more tools made available to ease the design, the development, the deployment and the operations of such systems. Solutions range from languages (Erlang, Elixir, Clojure, Pony, …), libraries (Akka or Vert.x for JVM-based languages), infrastructures (Kubernetes, OpenStack, …), etc.. Real life applications often are mixes of those.

What is Vert.x

If you’ve never heard about Vert.x, you should definitely have a first in depth look at its home page and its documentation.

Basically, Vert.x is a set of libraries that run on the JVM but that are not tied to only Java. One can actually develop Vert.x applications in Java, Kotlin, Javascript, Scala and few others. Those libraries cover a very large scope of functionalities with the main guideline being to expose those functionalities as non-blocking APIs. Examples of functionalities are:

  • creation of software servers and clients for various protocols
  • creation of web sites with an integrated web toolkit
  • clients to various SQL and non-SQL databases
  • clients to various messaging brokers
  • authentication and authorisation
  • clustering
  • and there are more…

Vert.x achieves that target by providing an asynchronous, event driven, concurrent programming model based on very-light-threads (named in Vert.x verticles) that communicate by exchanging messages. The clustering libraries exposes an API that eases the distribution of an application across multiple Vert.x instances running on separates servers (later, we will speak about Vert.x nodes). All this with the goal to ease building of reactive applications on the JVM.

The objective of this post is to dig into the distribution of an application across multiple Vert.x nodes and how such an application is highly available. As an illustration, we will build a basic distributed Key:Value (String:String) registry and experiment how it resists to failures.

The registry

Registries are basically key/value stores. In a clustered environment with more than one node, a registry helps to pass information across the different modules of the application: if a module running on a node stores some data under the key K, then another module running on another node can retrieve that data by requesting the data associated to K. Of course the registry has to be known by all nodes or to be embedded in them.

The registry we will build is a very simple in memory String/String store which exposes few APIs through HTTP GET requests:

  • /put/key/value : will store the value value under the key key. If a value for the key already exists, it will be replaced by the new value. This API returns 200:Ok which means that the HTTP return code is 200 with the reply payload being Ok.
  • /get/key : will retrieve the value for the key key. The API replies 200:<value> if the key exists and 200:Ko otherwise.
  • /list : lists all the n mappings key -> value from the registry. It replies with:

     200:n
       <key_1> -> <val_1>
       ...
       <key_n> -> <val_n>
    

    where n is the number of records.

  • Any other type of request will result in 404:Error.

Tooling and coding iterations

If you are happy with the command line, the strict minimum is a build tool, a decent text editor and, of course, a JDK. Personally I use Maven and Emacs. If you prefer Graddle you’ll have to adapt the build file. If you dislike the command line and only use an IDEs, then you are on your own, but I believe you know what you are doing.

The best way to learn is to experiment by your self and writing the code. Yet, for the impatient ones, all the code is organized into branches, master-00 being the stating point, master-01 being the evolution of the starting point, master-02 being the evolution of master-01 and so on until the final version master-final.

To get the source code, first clone the Gitlab repository , then switch on branch master-00.

git clone git@gitlab.com:mszmurlo/vx-registry.git
cd vx-registry
git co master-00

Once done, you can build the first executable with mvn package which will produce (among others) the file target/registry-0.1-fat.jar. This very first version is a self contained HTTP server which you can run with the command java -jar target/registry-0.1-fat.jar. You can query it from the command line with curl or wget on the http://localhost:8080/hello endpoint.

This application is just a skeleton that had been generated by vep, a quick and dirty Groovy script of mine quite handy to begin a new application or to build one quickly just for testing. If you are interested by vep see this repository for details. The master-00 branch had been generated with the following command: groovy vep.groovy -pn vx-registry -pkg org.buguigny.vertx_registry -mfunc.

A standalone registry

As the first step we will modify master-00 code to create a registry that conforms to the API specified above. First let’s see what is our starting point.

The starting point

The master-00 directory tree is as follows:

── pom.xml
├── README.md
├── src
│   ├── main
│   │   ├── java
│   │   │   └── org
│   │   │       └── buguigny
│   │   │           └── vertxcluster
│   │   │               └── registry
│   │   │                   └── Main.java
│   │   └── resources
│   │       └── logback.xml
│   └── test
│       └── ...
└── target
    ├── ...

We see two main directories under src. main contains the source code for the application while test is for the unit tests.

resourceswill host all resources like configuration files as logback.xml which contains the configuration of the logger. We will later put in resources the cluster configuration file as well.

If we look now at the content Main.java file, we see it has three main methods:

  • startup(String[] args) defines the router of the HTTP server along with the handlers to be called and eventually starts that server on port 8080. It is called by main().
  • rootHandler(RoutingContext rc) handles GET requests of the form /<some_string>,
  • defHandler(RoutingContext rc) handles all other requests as errors and returns 404:Error\n.

To implement our registry we need to define a data structure to hold the key / value pairs and implement some additional handlers that will manipulate that structure.

The registry data structure

To hold information in the form of key/value pairs we’ll need some kind of map. As Vert.x is designed to implement concurrent applications it also implements its own data structures in order to avoid as much as possible race conditions. The package io.vertx.core.shareddata contains some classes (actually interfaces) specifically designed for that usage. Among those, there are AsyncMap and LocalMap which look promising. According to the documentation, a LocalMap allows to share data between different parts of an application running in the same Vert.x node while a AsyncMap allows to share data across nodes. For the standalone registry, we will be using the LocalMap.

LocalMap behaves the same way regular Java maps do (it actually extends the java.util.Map interface) and adds some Vert.x dedicated methods (see the javadoc). The usage pattern is as follows:

LocalMap<String, String> map = sharedData.getLocalMap("a_map");
map.put("the_key", "the value");

The string a_map is just the identifier of the LocalMap in the system.

Then, elsewhere in the code we can retrieve the value of the key the_key with:

LocalMap<String, String> my_map = sharedData.getLocalMap("a_map");
String val = map.get("the_key"); 
// val holds "the value"

Now that we know how to store our key/value pairs, let’s define the HTTP API that will manipulate that structure.

The HTTP server

We will modify the HTTP server to handle the routes for the registry API. Basically, each API will have its own route:

HttpServer server = vertx.createHttpServer();

router.get("/put/:k/:v").handler(this::putHandler);
router.get("/get/:k").handler(this::getHandler);
router.get("/list").handler(this::listHandler);

router.route().handler(this::defHandler);

Every route is associated with a handler that will handle that request. The last line defines a catch all handler that handles any request that doesn’t comply with any route.

The API handlers will mostly all look the same. As an example the putHandler(), that will be triggered by an URL of the form /put/key/value, looks like:

private void putHandler(RoutingContext rc) {
    String key = rc.request().getParam("k");
    String val = rc.request().getParam("v");
    LocalMap<String, String> map = vertx.sharedData().getLocalMap("the_map");
    log.info("put(k={}, v={})", key, val);
    map.put(key, val);
    send(rc, 200, "Ok");
}

The only parameter to the handler is an instance of the RoutingContext class. Such an object is created for every HTTP request that is received by the HTTP server and holds the context of this request like cookies, parameters, session, etc.

We first get the parameters from the URL (actually from the context), the key key and its value held in val. Then, we get the shared map. The first call to vertx.sharedData().getLocalMap("the_map") creates an empty map and returns it while further calls return the populated map. Later on we will define a member to avoid all those calls. Eventually we execute the operation to be implemented by this handler, here a put and on the last line, we send a reply back to the client.

As explained before, the full source code for this version of the registry is on the branch master-01. You’ll find a getHandler() that retrieves values from the map, a listHandler() that lists the content of the map, a defHandler() which replies with an error to any request that doesn’t comply with our specification and the send() method to send back the reply to the client. The port number to listen on is also retrieved from the command line.

An example of interactive session

Now that we have a registry server with HTTP APIs, let’s have a live session. First, we start the server with the command java -jar target/registry-0.1-fat.jar 8080. Then, in another terminal, we send some requests:

$ curl http://localhost:8080/list
Ok: #0
$ curl http://localhost:8080/put/a/1
Ok
$ curl http://localhost:8080/put/b/2
Ok
$ curl http://localhost:8080/list
Ok: #2
a -> 1
b -> 2
$ curl http://localhost:8080/get/b
2
$ curl http://localhost:8080/put/b/42
Ok
$ curl http://localhost:8080/list
Ok: #2
b -> 42
a -> 1
$

First we check that the registry is empty. Then we put the mappings a:1 and b:2 and we list the content of the registry which is as expected. Then we map 42 on b which already has a value and we verify it had been changed. Hopefully it is the case.

Not bad!

Not bad, but not sufficient!

If the node crashes for some reason, sending requests to it will obviously fail. We would basically like to have another node that would keep the same registry information as the node that had crashed. In other words, on a clustered registry composed of two nodes, listening on ports 8081 and 8082, the following should work:

$ curl http://localhost:8081/put/a/1
  Ok
$ curl http://localhost:8082/get/a
  1
$

We are sending the put request to the node that listens on port 8081. But we want to retrieve that value by sending a request to second node listening on port 8082. Let’s test that on our current implementation. We start two nodes in two terminals, and from a third terminal we fire the following requests:

$ curl http://localhost:8081/put/a/1
  Ok
$ curl http://localhost:8082/get/a
  Ko 
$ 

The Ko shows that the value for the key a does not exist on node 8082. Obviously there is no magic and our applications is not clustered. We didn’t do much to help it either… So we’ll change that in the next section!

Clustered registry

Changing the data structure

In the section about data structures we saw there were two candidates to hold the registry structure, AsyncMap and LocalMap. we have chosen the latter because it implemented shared data across one node which was fine for a standalone registry. So lets try the AsyncMap as according to the documentation it allows to share data across several nodes.

There are important changes between the AsyncMap and LocalMap APIs. AsyncMap is asynchronous and in Vert.x that means handling the result of calls to methods in handlers.

To obtain an AsyncMap we can’t simply write AsyncMap<String, String> map = vertx.sharedData().getAsyncMap("the_map"); but

vertx.sharedData().<String, String>getAsyncMap("the_map", ar -> {
  if(ar.succeeded()) {
    // do some operation on the map (get, put, ...)
  }
  else {
    // handle the failure to obtain the map 
  }
});

To avoid the hassle of having that block copied over and over again in every map manipulation method, we’ll move it in the startup() method and we’ll define map as a member. startup() will be thus re-defined as follows:

// member definition
...
  private AsyncMap<String, String> map = null;
...

  private void startup(String[] args) {
     
    // initialize the vertx object (no change)
    // get the port for the HTTP Server (no change)
    // initialize the HTTP server with the routes (no change)
    
    vertx.sharedData().<String, String>getAsyncMap("the_map", ar_map -> {
      if(ar_map.succeeded()) {
        // get the map
        map = (AsyncMap<String, String>)ar_map.result();
        // start the HTTP server
        server.requestHandler(router::accept).listen(port, ar_http -> {
          if(ar_http.succeeded()) {
            // ...
          }
          else {
            log.error("Could not start the HTTP server on port '{}'", port);
            System.exit(-1);
          }
        }); // server listen
      }
      else {
        log.error("Could not obtain the AsyncMap 'the_map'");
        System.exit(-1);
      }
    }); // obtaining the map
  ...
  } // end of class

Another change is related to how map manipulation methods are called: because the calls are asynchronous, we also need here to use handlers that will get called when the result of the method will be available. The global pattern is:

map.<method>(<parameters for that method>, ar -> {
    if(ar.succeeded()) {
      // Code to execute upon success
    }
    else {
      // code to execute upon failure
    }
  });

For example, the putHandler() with an AsyncMap will be rewritten as:

private void putHandler(RoutingContext rc) {
  String key = rc.request().getParam("k");
  String val = rc.request().getParam("v");
  map.put(key, val, ar -> {
      if(ar.succeeded()) {
        log.info("put(k={}, v={}) -> Ok", key, val);
        send(rc, 200, "Ok");
      }
      else {
        log.error("put(k={}, v={}) -> Ko. Cause: '{}'", key, val, ar.cause().getMessage());
        send(rc, 200, "Ko");
      }
    });
}

The source code for this stage is on branch master-02.

As previously, let’s start two nodes and let’s query them:

$ curl localhost:8081/list
#0
$ curl localhost:8082/list
#0
$ curl localhost:8081/put/a/1
Ok
$ curl localhost:8082/list
#0
$ 

First we make sure both nodes have empty registries. Then we add the a:1 mapping to the registry on node 8081 and we query the registry on node 8082 which happens to be empty again! Obviously we are missing something.

This something is the cluster manager.

Including a cluster manager

Vert.x provides integrations for four cluster managers, namely Hazelcast, Infinispan, Apache Ignite, Apache Zookeeper. They all share similar features exposed to Vert.x:

  • Discovery and group membership of Vert.x nodes in a cluster: basically starting a new node will be detected automatically and the new node will join the cluster and will further benefit from the features below. The same holds for a node leaving a cluster, either suddenly, if it had crashed, or smoothly upon shutdown.

  • Maintaining cluster wide topic subscriber lists: this is mainly to provide the ability to send (receive) messages to (or from) another node.

  • Distributed Map support: allows maps to span across the nodes in the cluster. This is the feature that interests us.

  • Distributed Locks: same for locks

  • Distributed Counters: same for counters

Each manager also provides its own specific set of features which should drive the choice. In our project, we will use Hazelcast as this is the default cluster manager in Vert.x and as it fits well our basic requirements.

To enable Hazelcast we need to do two things: first add the vertx-hazelcast dependency to the pom.xml file so that it will get downloaded and installed by Maven and second to instantiate vertx to be a clustered instance.

To add the dependency, add the following in the pom.xml file:

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-hazelcast</artifactId>
<!--
    <version>3.8.5</version>
-->	
</dependency>

then rebuild the whole project with mvn clean package. Notice, that Maven will get the appropriate version of the package itself.

The second point is to change how vertx is instantiated so that it will be a cluster-aware instance. As previously, this change implies code reorganization in the startup() method. First, we tell Vert.x to run in clustered mode:

// ...
VertxOptions opts = new VertxOptions()
  .setClustered(true)
  ;

Then, getting a Vert.x instance, vertx, is now an asynchronous operation, so the code has to be updated as follows:

...
// start vertx in cluster mode
Vertx.clusteredVertx(opts, ar -> {
    if (ar.failed()) {
      log.error("Failed to start Vertx cluster. Cause: {}", ar.cause().getMessage());
      System.exit(-1);
    }
    else {
      vertx = ar.result();
      // chain here the creation of the AsyncMap 
      // and upon success the launch of the HTTP server
	  // as previously
    }
});

The source code for this stage is on the branch master-03.

Playing around with the cluster

First of all, when node 8081 is started, the output is much more verbose: we see for example the startup of Hazelcast, the creation of partitions, and so on. When the second node, 8082, is started, we also see that node 8081 emits some logs about new cluster connection: Initialized new cluster connection. Eventually, we see that a cluster with both nodes had been formed:

Members {size:2, ver:2} [
	Member [10.147.18.222]:5701 - cf6f92c7-9c52-4bd5-9e4b-ee27ecf188c3 this
	Member [10.147.18.222]:5702 - 4745846e-bb74-42e9-9475-8d95fc0e2c54
]

cf6f92c7-9c52-4bd5-9e4b-ee27ecf188c3 and 4745846e-bb74-42e9-9475-8d95fc0e2c54 are node identifiers. In the terminal window where we started node 8082, we see the opposite:

Members {size:2, ver:2} [
	Member [10.147.18.222]:5701 - cf6f92c7-9c52-4bd5-9e4b-ee27ecf188c3
	Member [10.147.18.222]:5702 - 4745846e-bb74-42e9-9475-8d95fc0e2c54 this
]

That looks good!

Now, let’s redo the previous example: create a mapping on node 8081 and query node 8082 for that key:

$ curl localhost:8081/put/c/3
Ok
$ curl localhost:8082/get/c
3
$

Great! It looks like the map is seen on both nodes.

Let’s do a last experiment : on node 8081 we create some mappings (a:1, b:2, etc.). Then we’ll kill that very node and see what happens on node 8082.

$ curl localhost:8081/put/a/1
Ok
...
$ curl localhost:8081/put/d/4
Ok

## Here we kill node 8081 with Ctrl-C

$ curl localhost:8082/list
#4
a -> 1
b -> 2
c -> 3
d -> 4
$

Basically, an information written to the first node is available from the second node, even if the first node crashed. That looks pretty much what we were looking for, a distributed and fault tolerant registry.

Configuring the application

So, eventually, we have a distributed registry running on a cluster of Vert.x nodes. The cluster configuration, that is Hazelcast configuration, can be fine tuned by putting a configuration file in src/main/resources/. Let’s see some main configuration items.

Cluster configuration file

So far, the cluster manager we have been using had default configuration which is taken from the file default-cluster.xml that is bundled inside the Vert.x Hazelcast library. This configuration can be overridden if a file named cluster.xml is provided on the class path or at the root of the fat jar or if the configuration file name is defined as a system property with -Dvertx.hazelcast.config=./my-cluster-conf.xml. System property has precedence over class path which has precedence over the default configuration file.

To get a correct cluster.xml, just extract and copy the default-cluster.xml to src/main/resources/:

$ unzip ~/.m2/repository/io/vertx/vertx-hazelcast/<version>/vertx-hazelcast-<version>.jar \
  default-cluster.xml
$ mv default-cluster.xml src/main/resources/cluster.xml

Vert.x documentation for the configuration of the Hazelcast cluster manager can be found here.

The branch that corresponds to this stage is master-04.

Joining the cluster

One very handy feature provided by Hazelcast is the ability to discover an already running cluster and automatically join. Basically, a newly started node has to:

  1. discover that there is a cluster around and join it
  2. or, if it’s “alone”, initiate a new, one-node, cluster.

Hazelcast proposes several discovery mechanisms two of them being, multicast and tcp-ip; both are to be configured in the <join>...</join> section of cluster.xml.

Discovering a cluster can be made the easy way with multicast method. The newly started node will broadcast the fact that it exists, the cluster will invite it to join. This is the default behaviour and the configuration is as follows:

<network>
...
    <multicast enabled="true">
        <multicast-group>224.2.2.3</multicast-group>
        <multicast-port>54327</multicast-port>
    </multicast>
    <tcp-ip enabled="false"></tcp-ip>
...
</network>

On the other hand, if multicast is not available (as this is the case on some cloud infrastructures or on VPN defined virtual interfaces), there is another method called tcp-ip. To use it, we would need to define a subset of servers known to belong to the cluster. This is quite a strong assumption as all those known servers may be down… The configuration would be as follows:

<network>
...
    <multicast enabled="false"></multicast>
    <tcp-ip enabled="true">
        <hostname>machine1</hostname>
        <interface>192.168.1.0-7</interface>
        <interface>192.168.1.21</interface> 
    </tcp-ip>
...
</network>

Selecting the network interface

Servers in production environment typically have several interfaces that have access to different LANs: one for the administration, one for the backup, one for production, etc. Hazelcast allows to specify on which network interface a specific node will communicate with the cluster. This is configured in <interfaces>...</interfaces> section:

<network>
...
    <interfaces enabled="true">
        <interface>192.168.1.*</interface> 
        <interface>192.168.2.4-18</interface> 
        <interface>192.168.3.3</interface>         
    </interfaces>    
</network>
...

Wildcards are used to specify ranges. In the above declarations, 10.3.16.* stands for addresses between 192.168.1.0 and 192.168.1.255, while 192.168.2.4-18 stands for addresses between 192.168.2.4 and 192.168.2.18, included.

Security

Hazelcast allows to use SSL or symetric encryption to secure socket level communications. However these feature require the enterprise edition.

Creating Separate Clusters

It is possible to create different clusters with the same nodes by specifying different group names. Each single node is allowed to belong to one single group and thus to one single cluster. Group information is defined in the <group>...</group> section:

<group>
    <name>dev</name>
    <!-- <password>dev-pass</password> -->
</group>

Notice that since Hazelcast 3.8.2 <password> is ignored and will be removed in some future release.

Homogeneous logs

By default Hazelcast uses JUL, the default Java logging framework. I do prefer using slf4j; a matter of personal taste… Hopefully Hazelcast allows the selection of the logging framework with the hazelcast.logging.type property:

...
    <properties>
        ...
		<property name="hazelcast.logging.type">slf4j</property>
  </properties>
...

Then logs level can be managed in the src/main/resources/logback.xml file. For example, to make Hazelcast less noisy:

<logger name="com.hazelcast" level="warn"/>

Of course, the file src/main/resources/logback.xml also defines behaviour of the logs of all other components, Vert.x and our own code.

The branch master-05 reflects the current state of the application. This is also the final state and is on the branch master-final.

Conclusion on configuration

We have just scratched the topic of configuration as there are much more items available to fine tune. Full Hazelcast configuration documentation is avalable here.

Conclusion

In this post we have build a very simple distributed registry. It was a pretext to explore some aspects of Vert.x clustering: libraries, data structures, configuration, etc.. I believe this has given you the basic knowledge of the concepts and principles and that you will be now able to explore by yourself.