Monday, April 18, 2016

A simple Apache Storm tutorial [Part 2: Implementing failsafes]


Continued from part1


If you really want to understand what the Value class is, what the Tuple class is etc., the best place to look, is not the tutorials on the internet. Look at the actual Storm source code.
It's available here: https://github.com/apache/storm
Go into the "storm-core/src/jvm/org/apache/storm" folder and have a look at those Java files. The code is very simple to understand and I promise you, it will be an enlightening experience.

Now, onto the ack and fail aspects of Storm.

Given below, is the exact same program as Part 1 of this tutorial. The added sections and sections that need your attention are highlighted.


BasicStorm.java:

package com.sdint.basicstorm;

import org.apache.storm.Config;

import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class BasicStorm {

    public static void main(String[] cmdArgs) {
       
        Config config = new Config();
        //config.put(Config.TOPOLOGY_DEBUG, false);
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second
       
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("myDataSpout", new DataSpout());
       
        builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");
       
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("BasicStorm", config, builder.createTopology());
       
        System.out.println("\n\n\nTopology submitted\n\n\n");
        pause(120);//pause for 120 seconds during which the emitting of tuples will happen
       
        //localCluster.killTopology("BasicStorm");
        localCluster.shutdown();
    }//main


    public static void pause(int timeToPause_InSeconds) {
        try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
        catch (InterruptedException e) {System.out.println(e.getCause());}
    }
 }//class


DataSpout.java:

package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
    private TopologyContext context;
    private SpoutOutputCollector collector;
   
    //---logger
    private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
   
    private boolean tupleAck = true;
    private Long oldTupleValue;
   
   
    @Override
    public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
        this.context = tc;
        this.collector = soc;
       
        System.out.println("\n\n\nopen of DataSpout\n\n\n");      
    }
   
    public DataSpout() {
        System.out.println("\n\n\nDataSpout ctor called\n\n\n");
    }//ctor

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");
       
        ofd.declare(new Fields("line"));
    }

    @Override
    public void nextTuple() {
        System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");
       
        Long newTupleValue;
        if (tupleAck) {
            newTupleValue = System.currentTimeMillis() % 1000;
            oldTupleValue = newTupleValue;
        }
        else {newTupleValue = oldTupleValue;}

       
        this.collector.emit(new Values(newTupleValue), newTupleValue);
        System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");
        pause(1);
    }
   
    @Override
    public void ack(Object msgId) {
        System.out.println("\n\n\nAck received for DataSpout"+msgId+"\n\n\n");
        tupleAck = true;
    }   
   
    @Override
    public void fail(Object msgId) {
        System.out.println("\n\n\nFailed tuple msgID: "+msgId+"\n\n\n");
        //replay logic should be here
        tupleAck = false;
    }

 

    public void pause(int timeToPause_InSeconds) {
        try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
        catch (InterruptedException e) {System.out.println(e.getCause());}
    }
    
}//class



ProcessingBolt.java:

package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ProcessingBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
        System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
        collector = oc;
    }

    @Override
    public void execute(Tuple tuple) {
        System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");
        collector.ack(tuple);
    }

   
}



Notice that this time when you run the program, the ack function in the Spout will get called whenever the Bolt executes the collector.ack(tuple); statement.

But suppose you comment out collector.ack(tuple);, then after a certain time period (normally 30 seconds, but in our program we made it 10 seconds), the fail function will get called.

This is how the Spout (and we) know whether a tuple has been received by the Bolt and acknowledged or not. The above program basically uses the System time as a Tuple and in case the Bolt does not acknowledge that it has received the Tuple, then the Spout sends the same old Tuple to the Bolt again.




And before getting into hardcore Storm programming, there is this important thing:

Apache Storm concepts you really need to know.



No comments: