Vert.x Master Worker Architecture

logo-sm

Today I am going to explain how Vert.x can be used for creating distributed Master Worker Paradigm. In large scale systems it’s applicable to wide variety of problems.

First – Just to refresh our memories about what Vert.x is

Vert.x as we know is a lightweight framework for creating distributed microservices. It can sale up and scale out depending on your needs. It also takes away all your pain of dealing with complexity of heavily multithreaded environments, race conditions etc. etc.

Primary unit of work in Vert.x is a verticle. Verticles are thread safe and they can run locally or remotely. One Verticle interacts with other verticle using Events which carry data with them.

Now – let’s take a day to day scenario.

We are getting lot of requests. Each request is independent of each other but we are unable to process all these requests on the commodity hardware that we have. How to serve all these requests coming to our cool high traffic website?

Well one answer is serve each request in a new “thread” and keep increasing the CPU Cores (Scale Up) and hope it will work. This is what your webserver does. Problem is you can only increase no of cores to a limit (How high you can go?).

Once you reach that limit you will add more such machines and leave it to load balancer to divide all these requests equally between all machines. Sounds familiar?

Well, you will have problem relying on load balancer when every service in the system faces same issue. Every time you will have to scale these services and keep re-configuring load balancer. What if this was possible in application layer dynamically. What if we could scale up and out without any pain of load balancer.  Good news is it can be achieved using Vert.x except load balancing happens inside your application layer. There can be lot of benefits of this approach which I will discuss some other time but for now let’s just focus on how can this be achieved using Vert.x

So this problem has 2 major challenges : –

a. How to divide the work between different machines so that we can keep up with this load.

b. How to combine the result from all this processing so that we can return this result to client (master) who needs answers from all workers before proceeding further (Can this be achieved by load balancer?).

So Master is like Management whose only job is to distribute all the work to developers (like you and me) and when work is done…combine all the statuses, create a report and notify the boss and hopefully get a fat pay hike (sounds familiar?)

In terms of Vert.x We have a master Verticle which gets lot of work to do. But Master does not want to do any work..Why? Because its a “Master”. So master wants to assign all this work to Workers. Worker are also verticles in Vert.x. But then problem arises that master needs to know if all work is completed so that it can make right decision about what to do next..Right..

So here is high level architecture we are going to fllow

Vert.x Master Worker

Ok..so in order to simulate this first lets create lot of work

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;

/**
 * Created by marutsingh on 2/20/17.
 */
public class Application {

    public static void main(String[] args){

        final Vertx vertx = Vertx.vertx();
        //vertx.deployVerticle(new HttpVerticle());
        vertx.deployVerticle(new MasterWorker());

        //DeploymentOptions options = new DeploymentOptions().setInstances(10);
        //vertx.deployVerticle("WorkVerticle",options);
        for (int i = 0; i < 5; i++){
            vertx.deployVerticle("WorkVerticle");
        }

        vertx.eventBus().send("vrp", "Job1,Job2,Job3,Job4,Job5,Job6,Job7,Job8,Job9,Job10");

        System.out.println("Deployment done");
    }
}

Great ..We created our own master..lets see how does it look

public class MasterWorker extends AbstractVerticle {

    @Override
    public void start(Future fut) {
       vertx.eventBus().localConsumer("vrp", new Handler() {
           @Override
           public void handle(Message objectMessage) {
               String data =  objectMessage.body().toString();
               String[] work = data.split(",");
               String jobId = UUID.randomUUID().toString();
               List futureList = new ArrayList();

               for (String w : work){
                   Future f1 = Future.future();
                   futureList.add(f1);
                   vertx.eventBus().send("work",w + ":" + jobId,
                  f1.completer());
               }
           }
       });
    }
}

Great so our master is doing…sending work over event bus and hoping some worker will start working upon it.

Lets see what our worker is doing

public class WorkVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        final String verticleId = super.deploymentID();

        vertx.eventBus().localConsumer("work",
new Handler() {
            @Override
            public void handle(Message objectMessage) {
                String[] data =  objectMessage.body().toString()
                .split(":");
                String work = data[0];
                String jobId = data[1];
                String result = work + "Completed***";
                objectMessage.reply(result);
            }
        });
    }
}

So worker does the work and sends an event on event bus with result.

Now master needs to combine all these results. This is way cool features introduced in Vert.x 3…Composable futures It makes this so easy

CompositeFuture.all(futureList).setHandler(ar -> {
                   if (ar.succeeded()) {
                       ar.result().list().forEach((result) ->
 resultSet.append(((MessageImpl) result).body().toString()));
                       System.out.println(resultSet.toString());
                       // All succeeded
                   } else {
                       // All completed and at least one failed
                   }
               });

Thats all !!.  I hope this will be useful in some of your scenario.

Source code is available at

https://github.com/singhmarut/vertx-master-worker-simulator

Happy coding !!

Vert.x Master Worker Architecture

MicroService Architecture

Here is a microservice architecture deployment diagram. All Services are docker containers which are registered to Consul Server via Registrator. Client (External – Mobile, UI) makes a call to Bouncer which is our API Proxy. Bouncer has all permissions configured on API URLs. It makes a call to Auth Server which authenticates the request and if successful it passes the Service URL to HAProxy. HAProxy then has rules configured which redirect the URL to exact service.

Service always follow a naming convention so when service is registered in consul then consul-template refreshes the HAProxy configuration in the background.

microservice-architecture-1

Bouncer – API Proxy gateway…all calls come to bouncer to make sure that only authenticated requests are passed to actual services

Auth Server – Single point of authentication and authorization. All applications create permissions and save in Auth Server

External ELB – All public APIs talk to External ELB which in turn are passed to HA Proxy cluster

Internal ELB – All internal APIs are routed through Internal ELBs. There will be URLs which will only be exposed to Internal Services

HA Proxy (Cluster) – The Load balancer cum service router

Consul Server (Cluster) – Centralized Service Registry

Registrator – SideCar application running with each service which updates Consul Cluster about service health

Consul Template – Background application which updates HAProxy whenever there is a change in service configurations

ECS Cluster – AWS ECS where all docker containers are registered. Takes care of dynamically launching new docker containers based on parameters. Autoscaling is handled automatically

There you have major parts in deployment..Please share your comments..Happy Coding !!

A friend posted this question on my FB account

 

Question 1.

I am depicting from diagram that all microservice API will proxy through API gateway. Is it true for internal cluster communication also? If yes, then wouldn’t it be too much load on gateway server for too many micro-services in infrastructure? Or will it be as lightweight as load balancer? Can I assume this gateway as a simple reverse proxy server which is just passing through the request/response then why not use Apache or Nginx?

Answer : 1st….internal cluster communication may or may not happen through Bouncer..depending on what your security requirements are.if you want all API calls to be authenticated then yes it will go through Bouncer…Point to note is.Bouncer is very lightweight Node.JS application so it will have extremely high throughput and because its behind HA Proxy you can always add new nodes…

API Proxy is a reverse proxy but with some logic…Most of the time when you want to expose an API which interacts with multiple microservices you will have to aggregate data..that logic will reside in API Proxy..Its a common pattern in large scale architecture

Question 2.
As per your description, Auth server is also responsible for authorisation here? Then how are you making sure of isolation of microservice, if it’s authorisation logic is shared to auth server the how are you making sure of data integrity which is a security measure which protects against the disclosure of information to parties other than the intended recipient.

Answer:

Auth Server is like “Google auth server”…All resource permissions reside in AuthServer..Authorization server has permissions for each application…These permissions can either be added via API by app or by an Admin UI…so each app can have different permissions…A single user will be given different permissions for different apps so isolation is guaranteed. e.g. I May have “user-create” permission in UserService but I may not have “account-create” permission in AccountService

Who creates permissions, who gives them to users.. and when depends on your design.

 

MicroService Architecture

Apache Kafka – Simple Tutorial

logo

In this post I want to highlight my fascination with Kafka and its usage.

Kafka is a broker just like “RabbitMQ” or “JMS”. So what’s the difference?

Difference are:

  • It is distributed
  • it is fault tolerant – because of messages being replicated across the cluster
  • It does one thing and one thing only i.e. Transferring your messages and does it really well
  • Highly scalable due to its distributed nature
  • Tunable consistency
  • Parallel processing of messages unlike others which do sequential
  • Ordering guarantee per partition

How do you set it up?

Kafka is inherently distributed. So that means you are going to have multiple machine creating a Kafka cluster.

Kafka uses zookeeper for leader election among other things so you need to have zookeeper cluster already running somewhere. otherwise you can go to

https://www.tutorialspoint.com/zookeeper/zookeeper_installation.htm

You install Kafka on all the machines which will participate in Kafka Cluster and then open the ports where Kafka is running. Then provide configuration of all other machines in the cluster in each machine. e.g. if Kafka is running on machines K1,K2,K3 then K1 will have information of K2 and K3 and so son.

Yes its that simple

How does it work?

The way Kafka works is you create a topic, send a message and read message at the other end.

So if there are multiple machines how do you send message to Kafka? Well you keep a list of all the machines inside your code and then send message by high level Kafka Producer (which is a helper class in Kafka Driver). Kafka high level consumer class is available for reading messages.

Before you send a message create a topic first with a “replication factor”” which tells kafka hos many brokers will have the copy of this data

Some important terminologies related to Kafka are:

Topic – Where you publish message. You need to create beforehand

Partition – Number of consumers that can listen to a topic in parallel. Default is 1 but you can create hundreds

Ordering of Messages – Guaranteed for single partition

TTL – Time to live for messages on the disk – default 7 days

Group – Kafka guarantees that a message is only ever read by a single consumer in the group. so if you want that a message be delivered only once then just go and put all consumers in same group.

If you want to go deep here are some useful links

https://kafka.apache.org/08/design.html

http://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm

Apache Kafka – Simple Tutorial

Kafka Cluster Setup

from my own experience I find that while setting up kafka cluster on AWS we face some issues so just want to highlight them.

a. First setup zookeeper cluster. Let’s say 3 node cluster. Modify each node zoo.conf to publish ip address as internal IP address.

server.id=<internal aws ip1>:2888:3888

server.id=<internal aws ip2>:2888:3888

server.id=<internal aws ip3>:2888:3888

b. Go to kafka server.properties and change broker.id

node1 — broker.id 0

node2 — broker.id 1

node 3 — broker.id2

Change bind address to <internal ip> so that it is not accessible from outside

Change advertised.host.name to <internal ip>

List all zookeeper nodes within your own cluster under setting zookeeper.connect

Start all kafka nodes and you they should be able to create a cluster

Create a topic, publish a message using kafka-console-producer and see if there are not errors.

Kafka Cluster Setup