46665

Apache Storm spout stops emitting messages from spout

Question:

We have been struggling with this issue for a long time now. In short, our storm topology stops emitting messages from spout after some time in a random fashion. We have an automated script which re-deploys the topology at 06:00 UTC everyday after the master data refresh activity is complete.

In the last 2 weeks, our topology stopped emitting the messages for 3 times in late UTC hours (between 22:00 and 02:00). It only comes online when we restart it which is around 06:00 UTC.

I've searched for many answers & blogs but couldn't find out what's happening here. We have an un-anchored topology which is a choice we have made like 3-4 years ago. We started with 0.9.2 and now we are on 1.1.0.

I've checked all kind of logs and I'm 100% sure that the nextTuple() method for the controller is not getting called and there are no exceptions happening in the system which may cause this. I've also checked all kind of logs we accumulate and there is not even a single ERROR or WARN logs explaining the abrupt stoppage. The INFO logs are also not that helpful. There is nothing which can be connected to this issue in worker logs or supervisor logs or nimbus logs.

This is how our spout class looks: <strong>Controller.java</strong>

public class Controller implements IRichSpout { SpoutOutputCollector _collector; Calendar LAST_RUN = null; List<ControllerMessage> msgList; /** * It is to open the spout */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; msgList= new ArrayList<ControllerMessage>(); MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler(); mongoIndexingHandler.createMongoIndexes(); } /** * It executes the next tuple */ @Override public void nextTuple() { Map<String, Object> logMap = new HashMap<>(); logMap.put("BEGIN", new Date()); try { TriggerHandler thandler = new TriggerHandler(); if (msgList.size() == 0) { List<ControllerMessage> mList = thandler.getControllerMessage(new Date()); msgList = mList; } if (msgList.size() > 0) { ControllerMessage message = msgList.get(0); if(thandler.fire(message.getFireTime())) { Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date()); msgList.remove(0); _collector.emit(new Values(message)); } } else{ Utils.sleep(1000); } } catch (Exception e) { _collector.reportError(e); Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class); } } /** * It acknowledges the messages */ @Override public void ack(Object id) { } /** * It tells failed messages */ @Override public void fail(Object id) { } /** * It declares the message name */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("SPOUT_MESSAGE")); } @Override public void activate() { } @Override public void close() { } @Override public void deactivate() { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }

and this is the topology class: <strong>DiagnosticTopology.java</strong>

public class DiagnosticTopology { public static void main(String[] args) throws Exception { int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2; int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128; int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16; int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16; int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64; int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16; int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8; int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16; String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC"; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("controller", new Controller(), 1); builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller"); builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator"); builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping"); builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping"); builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo"); builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule"); builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule"); builder.setSpout("trigger", new TriggerSpout(), 1); builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger"); Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(wSize); StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); } }

We have fairly good servers (Xeon, 8 core, 32 GB and flash drives) in place for the production as well as testing environment and there are not external factors which can cause this issue as exception handling is everywhere in the code.

When this thing happens, it seems like everything stopped all of a sudden and there are no traces of why it happened.

Any help is highly appreciated!

Answer1:

I don't know what is causing your issue, but I'd recommend that you start by checking if upgrading to the latest Storm version resolves the issue. I know of at least two issues related to worker threads dying and not coming back up <a href="https://issues.apache.org/jira/browse/STORM-1750" rel="nofollow">https://issues.apache.org/jira/browse/STORM-1750</a> <a href="https://issues.apache.org/jira/browse/STORM-2194" rel="nofollow">https://issues.apache.org/jira/browse/STORM-2194</a>. 1750 is fixed in 1.1.0, but 2194 is not fixed until 1.1.1.

In case upgrading doesn't fix the issue for you, you might be able to debug it by doing the following.

Next time your topology is hanging, go open Storm UI and find your spout. It'll show the list of executors running that spout, along with which workers are responsible for running them. Pick one of the workers where the spout executor isn't emitting anything. Open a shell on the machine running that worker, and find the worker JVM's process id. You can do this easily with jps -m.

Example output showing the worker JVM with port 6701 on my local machine, which has pid 7592:

<blockquote>

7592 Worker test-2-1520361882 d24dc55d-76c7-4cc6-93fa-2663fcdcb1ba-10.0.75.1 6701 f7b6f8e4-6c87-47ca-a7b7-655009b6c62a

</blockquote>

Trigger a thread dump by doing kill -3 <pid>, or use jstack <pid> if you prefer.

In the thread dump, you should be able to find the executor thread that's hanging. For instance, when I do a thread dump for a topology with a spout called "word", where one of the spout executors has number 13, I see

edit: Stack overflow won't let me post the stack trace because the heuristic looking for unformatted code is bad. I've spent probably as long trying to post the stack trace as writing the original answer, so I can't be bothered to keep trying. Here's the trace that should have been here <a href="https://pastebin.com/2Sz5kkQ1" rel="nofollow">https://pastebin.com/2Sz5kkQ1</a>

which shows me what executor 13 is currently doing. In this case it's sleeping during a call to nextTuple.

If you can find out what your hanging executor is doing, you should be much better equipped to solve the issue, or report a bug to Storm.

Answer2:

We have observed this with our application where we had very busy CPU and all other threads were waiting for their turn. When we tried to find root cause using JVisualVM to check resource usage, we found that some function in some bolts were causing lot of overhead and CPU time. Please check via. any profiling tool if there are blocked threads in CPU critical path of nextTuple() method or are you receiving any data for the same from upstream.

Recommend

  • ListBox with overriden CreateParams doesn't raise item events
  • found scala.Int(0) required Int 0
  • Dynamics CRM 2015 Online: SubGrid's control.SetParameter method is not available
  • Merge Multiple Lists of Lists Based on Template
  • TFS edit build log using custom activity
  • Chrome breakpoint on radio doesn't fire
  • Android onKey w/ virtual keyboard
  • How do I Dispose a HttpResponseMessage in my Web Api Method?
  • Trying to string.Join an IList
  • HTML5 video only works in IE. The other browsers shows the black screen
  • Android Database Error - getWriteableDatabase
  • MeeGo Handset Emulator not starting on Windows 7
  • C++ friend class std::vector
  • Using Generics on right hand side in Java 6?
  • Knockout custom binding handler
  • Using $compile in a directive triggers AngularJS infinite digest error
  • How to make Twilio api Post request with the help of AFNetworking?
  • Unable to send e-mail through Java
  • didUpdatePushCredentials not get called
  • UIAlertController button function not working
  • SetWindowsHookEx does not react on media keys
  • Element.tagName for python not working
  • Graphics.CopyFromScreen [Web application] + The handle is invalid
  • What is the purpose of TaskExecutor in spring?
  • How solve “Qt: Untested Windows version 10.0 detected!”
  • how to find common suffix in java by using method
  • Install PHP intl extension on MacOS
  • Retrieve list of sent friend requests from friend_request FQL table
  • Change multiple background-images with jQuery
  • NHibernate Validation Localization with S#arp Architecture
  • How can I send an e-mail from a vbs script
  • Android screen density dpi vs ppi
  • Accessing IRQ description array within a module and displaying action names
  • DirectX11 ClearRenderTargetViewback with transparent buffer?
  • Can a Chrome extension content script make an jQuery AJAX request for an html file that is itself a
  • Change an a tag attribute in JavaScript based on screen width
  • Upload files with Ajax and Jquery
  • AngularJs get employee from factory
  • Proper way to use connect-multiparty with express.js?
  • Getting Messege Twice Using IMvxMessenger