Sunday, October 12, 2014

How to execute a singleton task in a cluster?

Have you come across a situation where you wanted a singleton to execute a task in a cluster of servers (such as running a clean up task every day, sending a report everyday etc)?

You know that these type of tasks should be executed by exactly only one of many servers you have.

Here you go. Following simple program will let you do that exactly.
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.springframework.stereotype.Component;

/**
 *
 * @author Pragalathan M
 */
@Component
public class ClusteredSingletonManager {

    private JChannel channel;
    private Address ownAddress;
    private static final Logger logger = Logger.getLogger(ClusteredSingletonManager.class.getName());

    @PostConstruct
    private void init() {
        try {
            logger.info("Initializing jgroups...");
            channel = new JChannel();
            channel.connect("MySampleCluster");
            ownAddress = channel.getAddress();
            channel.setReceiver(new ReceiverAdapter() {
                @Override
                public void viewAccepted(View view) {
                    logger.log(Level.INFO, "View has changed. New view: {0}", view);
                }

            });
        } catch (Exception ex) {
            logger.log(Level.SEVERE, null, ex);
        }
    }

    @PreDestroy
    private void cleanup() {
        channel.close();
    }

    public boolean isMaster() {
        View views = channel.getView();
        int index = 0;
        for (Address address : views) {
            if (ownAddress.equals(address)) {
                break;
            }
            index++;
        }
        return index == 0;
    }
}

Add the following to your pom.xml file
    <dependency>
        <groupId>org.jgroups</groupId>
        <artifactId>jgroups</artifactId>
    </dependency>
You can use Spring or CDI to call the init() and the cleanup() lifecycle methods automatically.  Whenever you want to execute a task, you will have to do the following.

if (clusteredSingletonManager.isMaster()) {
    // execute your task
} else {
    // ignore it as some other node will execute
}

This uses UDP and multicast. If you are in a situation where you can't use multicast (such as you company network administrator has disabled multicast or you are using cloud), then you have to fallback to TCP.

Here is the TCP version. Copy and paste the following content into a file called jgroups-tcp.xml and place this file in your class path. Remember to replace myip1 and myip2 with your server IP addresses. It is recommended to add all your servers' (that are part of the same cluster) IP addresses here.

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="urn:org:jgroups"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
    <TCP bind_port="7800"
         recv_buf_size="${tcp.recv_buf_size:5M}"
         send_buf_size="${tcp.send_buf_size:5M}"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         use_send_queues="true"
         sock_conn_timeout="300"

         timer_type="new3"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"
         
         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="discard"/>
                         
    <TCPPING async_discovery="true"
             initial_hosts="${jgroups.tcpping.initial_hosts:myip1[7800],myip2[7800]}"
             port_range="1"/>
    <MERGE3  min_interval="10000"
             max_interval="30000"/>
    <FD_SOCK/>
    <FD timeout="3000" max_tries="3" />
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK2 use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST3 />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="2000"
                view_bundling="true"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <!--RSVP resend_interval="2000" timeout="10000"/-->
    <pbcast.STATE_TRANSFER/>
</config>

Then change the channel creation line in the above class to
    channel = new JChannel("jgroups-tcp.xml");

That all. Your clustered singleton manager is ready. You can enhance this to add retry mechanism, etc.