Saturday, May 7, 2016

More concepts of Apache Storm you need to know

As my mentor tells me -  
"To be able to program in Storm, you first need to think in Storm".


That's very true. Storm is designed in a way that allows you to structure your application design in a way that allows it to scale. But before you begin that journey, there are a few concepts you need to know, which aren't explained in the official documentation (or sometimes not explained clearly enough).




The Constructor concept
[Thanks to Matthias J. Sax on the Storm mailing list for explaining this]

When you create a Spout or a Bolt, Storm calls that class constructor only once. After that, the class gets serialized and from then on, whenever Storm needs to create a new instance of the Spout or Bolt, Storm uses the serialized instance to do so.

But for every new instance Storm creates, Storm will call the open() function for Spouts and the prepare() function for Bolts.
So open() and prepare() are like constructors.



Making your Spouts or Bolts do things at specified intervals

Use tick tuples.



The reason you should exit the nextTuple() function ASAP
[Thanks to Spico Florin on the Storm mailing list for explaining this]

I had created a while loop in nextTuple() of my Spout to emit multiple tuples, but I didn't receive any ack's at all.
Turns out that nextTuple() and the ack() method are called in the same thread by the framework. So if you have  heavy computation in the next tuple, your ack() method will never be called and the buffers that are responsible for receiving the ack messages will not be emptied. The nextTuple() acts as producer for the these buffers while ack() as a consumer.

So remember to emit a tuple and exit the nextTuple() function immediately.
For those who don't know about the ack() method, you can override it (and the fail() method) in your Spout like this:

    @Override
    public void ack(Object msgId) {
        System.out.println("Ack received for Spout"+msgId);
        tupleAck = true;
    }   
   
    @Override
    public void fail(Object msgId) {
        System.out.println("Failed tuple msgID: "+msgId);
        //---tuple replay logic should be here
    }


This helps you know whether your tuple was received and processed by the Bolt or whether the transmission or processing failed.



More creative topology structures
[Thanks to Matthias J. Sax on the Storm mailing list for the idea]

When learning Storm, we come across simple examples and are conditioned into thinking that way.



It doesn't have to be that way though. When you use streams and the various techniques of grouping, you'll find a whole new world open up.

Example:
If you want to create a topology where the spout notifies the end bolts that it has no more input, you can do it this way:

Just specify a separate stream in the spout and emit the notification tuples. When creating the topology, specify an allGrouping for the receiving bolts. What happens is that no matter how many instances of the bolt are created, the spout will send the tuple to all of them. It's like a broadcast.

So the topology would be created like this:

TopologyBuilder b = new TopologyBuilder();

b.setSpout("SpoutA_name", new SpoutA(), 1)
.setNumTasks(1);      
     
b.setBolt("boltA_name", new boltA(), 2)
.shuffleGrouping("SpoutA_name");

b.setBolt("boltB_name", new boltB(), 5)
.fieldsGrouping("boltA_name", new Fields(someID))
.allGrouping("SpoutA_name", "StreamName");



This is how the spout sends a stream to the bolts at the end:

@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declare(new Fields("theNormalTuple"));
    ofd.declareStream("StreamName", new Fields("someID"));//this specifies the stream that reaches the end bolt B
}
   
@Override
public void nextTuple() {       

    if (nothingMoreToProcess) {
        collector.emit("StreamName", new Values(someID));//this emits the stream to bolts B
    }       
    else {
        collector.emit(new Values(someTuples), someTuples);//this emits to bolts A
    }
}     



...and this is how the bolt receives it:

@Override
public void execute(Tuple tuple) {
    
    if (("StreamName").equals(tuple.getSourceStreamId())) {//recognizes the stream from the spout
        //do whatever you do when there is nothing more to process
    }
    else {  
        //do your usual processing
    }
}


Don't stop here. There are plenty more ways to think of emitting streams and directing the flow. Remember that Storm is designed to be a Directed Acyclic Graph. You can design your topology as such.




In the code, which are the tasks and executors?

There's a confusion about tasks and executors, because in this case:

builder.setBolt("bolt1", new BoltA(), 3)
                .setNumTasks(2)

Storm creates 3 executors and 2 tasks.

but

In this case (if you don't specify setNumTasks)

builder.setBolt("bolt1", new BoltA(), 2) 

Storm creates 2 executors and 2 tasks.

Remember that a task is an instantiation of a serialized instance of BoltA class (see the constructor concept at the top of this page). An executor is just a thread which processes a task class. If an executor has to process two task classes, then the executor will process the first one and only then process the second one.





Additional links:

If you are looking for a simple tutorial or code sample for Storm, they are here:

No comments: