Consumer framework

We do our recharges through third party apis. The way it used to work was, whenever we get a request for recharge, we invoke the recharge api, wait for the response and based on the response, let the end user know whether the recharge is successful or not. Any technically astute reader will identify the problem with this approach, we are doing blocking third party requests within an HTTP request response cycle. One of the major manifestation of this is that we are holding our site hostage to the response time of third party apis. To alleviate this, recently we moved to a queue based system, where we put the request into a queue and a consumer picks it up and does the recharge.

We have been using kestrel for quite sometime for other asynchronous processes and it has worked very well for us. Hence, we moved our recharges also into a kestrel queuing system. On the consumer side, we were using camel which is an EAI framework. We ran into some rough weather with camel:

  • While queuing, if there was an exception, it would silently eat it. We tried different mechanisms as instructed in camel documentation, but none worked for us.
  • We could not figure out a way to gracefully shutdown a camel consumer. This was critical to us going by the no of deployments that we do in a day.
  • Monitoring of consumers so that we could plug in our nagios alerting system into it.
  • Kestrel has a nice mechanism where in we can claim ownership of an item once we take it out from the queue. Camel did not expose this functionality.

We tried to fix this in camel itself but it was looking like an effort of diminishing returns. We wanted a simple consumer, we were not after any of the enterprise capabilities of camel. So we gave it a good thought and instead of spending time and energy to make camel work for us, we decided to build a simple consumer framework.

All our consumers implement an interface as follows:


public interface Consumer {
  public void setConsumerWaitPeriod(long consumerWaitPeriod);
  public long getConsumerWaitPeriod();
  public void setNoOfConsumers(int noOfConsumers);
  public int getNoOfConsumers();
  public void setQueueName(String queueName);
  public String getQueueName();
  public void consume(Object object);
}

We run our code inside a spring container, so, as the first step in the consumer framework, we pick up all the beans that implement the Consumer interface.

ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
Map<String, Consumer> map = ctx.getBeansOfType(Consumer.class);

The back bone of the whole consumer framework is the below class:

public class KestrelConsumer implements Runnable {
    @Autowired
    private MemcachedClient memcachedClient;

    private Consumer consumer;

    private ConsumerFramework.StopSwitch stopSwitch;

    private ExecutorService executorService;

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

    public ConsumerFramework.StopSwitch getStopSwitch() {
        return stopSwitch;
    }

    public void setStopSwitch(ConsumerFramework.StopSwitch stopSwitch) {
        this.stopSwitch = stopSwitch;
    }

    @Override
    public void run() {
        //We use semaphores to block until a thread is available in the executor pool
        final Semaphore semaphore = new Semaphore(consumer.getNoOfConsumers());
        this.executorService = Executors.newFixedThreadPool(consumer.getNoOfConsumers());

        while (!stopSwitch.isStop()) { //Check for the global stop command
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error(e);
                continue;
            }

            final Object payLoad;
            try {
                payLoad = memcachedClient.get(KestrelUtil.getKestrelBlockingReadCommand(dopConsumer.getQueueName(),
                   dopConsumer.getConsumerWaitPeriod()));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();

                semaphore.release();
                continue;
            } catch (Exception e) {
                semaphore.release();
                continue;
            }

            if (payLoad == null) {
                semaphore.release();
                continue;
            }

            executorService.submit(
                    new Thread() {
                        public void run() {
                            try {
                                consumer.consume(payLoad);
                            } catch (Exception e) {
                            } finally {
                                semaphore.release();
                            }
                        }
                    }
            );
        }
    }

    public void shutDown() {
        executorService.shutdown();
    }

    public void awaitShutdown(long timeOut, TimeUnit timeUnit) throws InterruptedException {
        executorService.awaitTermination(timeOut, timeUnit);
    }
}

The above code has been sanitized(removed error logging, etc) for this post.

I will try to walk you through the above code. Kestrel talks memcached protocol, we use xmemcached library to talk to kestrel, xmemcached has native support for kestrel.

For every consumer bean found in the spring context, we create a new KestrelConsumer object, set the consumer bean in KestrelConsumer object and spawn it off. We need a blocking executor service as we do not want to pick up items from the queue when all the executors are busy. This is where the semaphore comes in, we instantiate a semaphore with the concurrency count as configured in the consumer bean.

The StopSwitch object gives us the ability to do a graceful shutdown. Once a shutdown signal is received, we set the StopSwitch to true and once this is true, KestrelConsumer stops processing more items.

StopSwitch class looks as below:

 public class StopSwitch {
   private boolean stop = false;

   public synchronized boolean isStop() {
       return stop;
   }

   public synchronized void setStop(boolean stop) {
       this.stop = stop;
   }
}

It is very important for the getters and setters to be synchronized as this object is shared between multiple threads.

Since we want a neat shutdown, when the whole consumer framework is bootstrapped, we add a shutdown hook as below:

Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
        final CountDownLatch countDownLatch = new CountDownLatch(consumers.size());

        stopSwitch.setStop(true);

        for (final KestrelConsumer consumer : consumers) {
            new Thread() {
                public void run() {
                    consumer.shutDown();
                    try {
                        consumer.awaitShutdown(5, TimeUnit.MINUTES);
                    } catch (InterruptedException e) {
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            }.start();
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
});

The shudown hook is invoked when the JVM gets a kill signal. As soon as this is invoked, we set the StopSwitch to true, so that all running KestrelConsumer threads stop. We iterate over each of the KestrelConsumer objects and shut them down. The CountDownLatch makes sure that all the consumers are shutdown before the JVM exits.

Along with this, the consumer framework understands telnet protocol and when queried with a consumer name, lets the user know whether the consumer is running or not. We utilized netty to implement telnet.

We have this running in production for around two weeks now and things are looking good. If you too are interested in hacking on things like this, let us know at devgigs@freecharge.com. We are always looking for curious people who want to build beautiful products.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s