Vert.x Master Worker Architecture

logo-sm

Today I am going to explain how Vert.x can be used for creating distributed Master Worker Paradigm. In large scale systems it’s applicable to wide variety of problems.

First – Just to refresh our memories about what Vert.x is

Vert.x as we know is a lightweight framework for creating distributed microservices. It can sale up and scale out depending on your needs. It also takes away all your pain of dealing with complexity of heavily multithreaded environments, race conditions etc. etc.

Primary unit of work in Vert.x is a verticle. Verticles are thread safe and they can run locally or remotely. One Verticle interacts with other verticle using Events which carry data with them.

Now – let’s take a day to day scenario.

We are getting lot of requests. Each request is independent of each other but we are unable to process all these requests on the commodity hardware that we have. How to serve all these requests coming to our cool high traffic website?

Well one answer is serve each request in a new “thread” and keep increasing the CPU Cores (Scale Up) and hope it will work. This is what your webserver does. Problem is you can only increase no of cores to a limit (How high you can go?).

Once you reach that limit you will add more such machines and leave it to load balancer to divide all these requests equally between all machines. Sounds familiar?

Well, you will have problem relying on load balancer when every service in the system faces same issue. Every time you will have to scale these services and keep re-configuring load balancer. What if this was possible in application layer dynamically. What if we could scale up and out without any pain of load balancer.  Good news is it can be achieved using Vert.x except load balancing happens inside your application layer. There can be lot of benefits of this approach which I will discuss some other time but for now let’s just focus on how can this be achieved using Vert.x

So this problem has 2 major challenges : –

a. How to divide the work between different machines so that we can keep up with this load.

b. How to combine the result from all this processing so that we can return this result to client (master) who needs answers from all workers before proceeding further (Can this be achieved by load balancer?).

So Master is like Management whose only job is to distribute all the work to developers (like you and me) and when work is done…combine all the statuses, create a report and notify the boss and hopefully get a fat pay hike (sounds familiar?)

In terms of Vert.x We have a master Verticle which gets lot of work to do. But Master does not want to do any work..Why? Because its a “Master”. So master wants to assign all this work to Workers. Worker are also verticles in Vert.x. But then problem arises that master needs to know if all work is completed so that it can make right decision about what to do next..Right..

So here is high level architecture we are going to fllow

Vert.x Master Worker

Ok..so in order to simulate this first lets create lot of work

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;

/**
 * Created by marutsingh on 2/20/17.
 */
public class Application {

    public static void main(String[] args){

        final Vertx vertx = Vertx.vertx();
        //vertx.deployVerticle(new HttpVerticle());
        vertx.deployVerticle(new MasterWorker());

        //DeploymentOptions options = new DeploymentOptions().setInstances(10);
        //vertx.deployVerticle("WorkVerticle",options);
        for (int i = 0; i < 5; i++){
            vertx.deployVerticle("WorkVerticle");
        }

        vertx.eventBus().send("vrp", "Job1,Job2,Job3,Job4,Job5,Job6,Job7,Job8,Job9,Job10");

        System.out.println("Deployment done");
    }
}

Great ..We created our own master..lets see how does it look

public class MasterWorker extends AbstractVerticle {

    @Override
    public void start(Future fut) {
       vertx.eventBus().localConsumer("vrp", new Handler() {
           @Override
           public void handle(Message objectMessage) {
               String data =  objectMessage.body().toString();
               String[] work = data.split(",");
               String jobId = UUID.randomUUID().toString();
               List futureList = new ArrayList();

               for (String w : work){
                   Future f1 = Future.future();
                   futureList.add(f1);
                   vertx.eventBus().send("work",w + ":" + jobId,
                  f1.completer());
               }
           }
       });
    }
}

Great so our master is doing…sending work over event bus and hoping some worker will start working upon it.

Lets see what our worker is doing

public class WorkVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        final String verticleId = super.deploymentID();

        vertx.eventBus().localConsumer("work",
new Handler() {
            @Override
            public void handle(Message objectMessage) {
                String[] data =  objectMessage.body().toString()
                .split(":");
                String work = data[0];
                String jobId = data[1];
                String result = work + "Completed***";
                objectMessage.reply(result);
            }
        });
    }
}

So worker does the work and sends an event on event bus with result.

Now master needs to combine all these results. This is way cool features introduced in Vert.x 3…Composable futures It makes this so easy

CompositeFuture.all(futureList).setHandler(ar -> {
                   if (ar.succeeded()) {
                       ar.result().list().forEach((result) ->
 resultSet.append(((MessageImpl) result).body().toString()));
                       System.out.println(resultSet.toString());
                       // All succeeded
                   } else {
                       // All completed and at least one failed
                   }
               });

Thats all !!.  I hope this will be useful in some of your scenario.

Source code is available at

https://github.com/singhmarut/vertx-master-worker-simulator

Happy coding !!

Vert.x Master Worker Architecture

Vert.x Cluster

Vert.x is an extremely simple event based, non blocking,  library for distributed computing that can be easily embedded in any Java framework of your choice.

For sometime now I have been exploring Vert.x. I was looking for a vert.x cluster sample but I did not find any decent example so I decided to share that with community.

I modified one of the sample available from Vert.x examples and I will explain core parts of it here.

First the cluster has to be configured.

Config hazelcastConfig = new Config();

Config hazelcastConfig = new Config();
hazelcastConfig.getNetworkConfig().getJoin().getTcpIpConfig().addMember("127.0.0.1").setEnabled(true);
hazelcastConfig.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);

ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
VertxOptions options = new VertxOptions().setClusterManager(mgr);
Vertx.clusteredVertx(options, res -> {
    if (res.succeeded()) {
        vertx = res.result();
        deploy(vertx,context,mode);
    }
});

This code can be part of your main function. You can also create a cluster config file called “cluster.xml” rather than creating configurations programatically. This should be on classpath of your application so it can be put inside src/main/resources I am going to test this application on my local machine so I am using TCP discovery rather than Multicast for my application. Vert.x underlying uses hazelcast (default) for all it’s clustering capabilities. Hazelcast also comes up with a special configuration for AWS. So if you are going to deploy your application on AWS then that is the configuration you should use.

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.2.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <properties>
       .....
        <property name="hazelcast.wait.seconds.before.join">0</property>
    </properties>

    <group>
        <name>dev</name>
        <password>dev-pass</password>
    </group>
    <management-center enabled="false">http://localhost:8080/mancenter</management-center>
    <network>
        <port auto-increment="true" port-count="10000">5701</port>
        <outbound-ports>
            <!--
            Allowed port range when connecting to other nodes.
            0 or * means use system provided port.
            -->
            <ports>0</ports>
        </outbound-ports>
        <join>
            <!--<multicast enabled="false">-->
            <!--<multicast-group>224.2.2.3</multicast-group>-->
            <!--<multicast-port>54327</multicast-port>-->
            <!--</multicast>-->
            <tcp-ip enabled="true">
                <interface>192.168.1.8</interface>
            </tcp-ip>
            <aws enabled="false">
                .....
            </aws>
        </join>
        <interfaces enabled="false">
            <interface>10.10.1.*</interface>
        </interfaces>        
    </network>
    <partition-group enabled="false"/>
    <executor-service name="default">
        <pool-size>16</pool-size>
        <!--Queue capacity. 0 means Integer.MAX_VALUE.-->
        <queue-capacity>0</queue-capacity>
    </executor-service>
    <map name="__vertx.subs">

        <!--
            Number of backups. If 1 is set as the backup-count for example,
            then all entries of the map will be copied to another JVM for
            fail-safety. 0 means no backup.
        -->
        <backup-count>1</backup-count>
      
        <time-to-live-seconds>0</time-to-live-seconds>
        <max-idle-seconds>0</max-idle-seconds>
        <!--
            Valid values are:
            NONE (no eviction),
            LRU (Least Recently Used),
            LFU (Least Frequently Used).
            NONE is the default.
        -->
        <eviction-policy>NONE</eviction-policy>
        <!--
            Maximum size of the map. When max size is reached,
            map is evicted based on the policy defined.
            Any integer between 0 and Integer.MAX_VALUE. 0 means
            Integer.MAX_VALUE. Default is 0.
        -->
        <max-size policy="PER_NODE">0</max-size>
        <!--
            When max. size is reached, specified percentage of
            the map will be evicted. Any integer between 0 and 100.
            If 25 is set for example, 25% of the entries will
            get evicted.
        -->
        <eviction-percentage>25</eviction-percentage>
        <merge-policy>
      com.hazelcast.map.merge.LatestUpdateMapMergePolicy
</merge-policy>
    </map>

    <!-- Used internally in Vert.x to implement async locks -->
    <semaphore name="__vertx.*">
        <initial-permits>1</initial-permits>
    </semaphore>

</hazelcast>

I have included only important parts and left others from configuration Once we are done with cluster configuration all we have to do is create a verticle and deploy it.

public class ServerVerticle extends AbstractVerticle {

    int port;
    public ServerVerticle(int port){
        super();
        this.port = port;
    }
    @Override
    public void start() throws Exception {
        super.start();
        HttpServer server = vertx.createHttpServer();
        server.requestHandler(req -> {
            if (req.method() == HttpMethod.GET) {
                req.response().setChunked(true);

                if (req.path().equals("/products")) {
                    vertx.eventBus().<String>send(SpringDemoVerticle.ALL_PRODUCTS_ADDRESS, "", result -> {
                        if (result.succeeded()) {
                            req.response().setStatusCode(200).write(result.result().body()).end();
                        } else {
                            req.response().setStatusCode(500).write(result.cause().toString()).end();
                        }
                    });
                } else {
                    req.response().setStatusCode(200).write("Hello from vert.x").end();
                }

            } else {
                // We only support GET for now
                req.response().setStatusCode(405).end();
            }
        });

        server.listen(port);
    }
}

Great…let’s deploy this verticle.

Once configuration has been done you need to deploy verticles in cluster

Vertx.clusteredVertx(options, res -> {
    if (res.succeeded()) {
        Vertx vertx = res.result();
       //You should deploy verticles only when cluster has been initialized
        vertx.deployVerticle(new ServerVerticle(Integer.parseInt(args[0])));
    } else {
    }
});

 

You start the application by giving a port let’s say at port 9000 and start another instance at different port let’s say 9005 and Voila can see both of them start communicating. Complete source code can be found at https://github.com/singhmarut/vertx-cluster.git Happy Coding !!

Vert.x Cluster