这篇文章将为大家详细讲解有关storm中可靠性和非可靠性的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
1.Spout的可靠性保证
在Storm中,消息处理可靠性从Spout开始的。storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fail的处理, 如果一个tuple被处理成功,那么spout便会调用其ack方法,如果失败,则会调用fail方法。而topology中处理tuple的每一个bolt都会通过OutputCollector来告知storm,当前bolt处理是否成功。
我们知道spout必须能够追踪它发射的所有tuples或其子tuples,并且在这些tuples处理失败时能够重发。那么spout如何追踪tuple呢?storm是通过一个简单的anchor机制来实现的(在下面的bolt可靠性中会讲到)。
spout发射根tuple,根tuple产生子tuples。这就形成一个TupleTree。在这个tree中,所有的bolt都会ack或fail一个tuple,如果tree中所有的bolt都ack了经过它的tuple,那么Spout的ack方法就会被调用,表示整个消息被处理完成。如果tree中的任何一个bolt fail一个tuple,或者整个处理过程超时,则Spout的fail方法便会被调用。
另外一点, storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况, 对于成功/失败该如何处理, 必须由应用自己来决定, 因为storm内部也没有保存失败的具体数据, 但是也有办法知道失败记录,因为spout的ack/fail方法会附带一个msgId对象, 我们可以在最初发射tuple的时候将将msgId设置为tuple, 然后在ack/fail中对该tuple进行处理。这里其实有个问题, 就是每个bolt执行完之后要显式的调用ack/fail,否则会出现tuple不释放导致oom. 不知道storm在最初设计的时候,为什么不将bolt的ack设置为默认调用。
Storm的ISpout接口定义了三个与可靠性有关的方法:nextTuple,ack和fail。
public interface ISpout extends Serializable { void open( Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
我们知道,当Storm的Spout发射一个Tuple后,他便会调用nextTuple()方法,在这个过程中,保证可靠性处理的第一步就是为发射出的Tuple分配一个唯一的ID,并把这个ID传给emit()方法:
collector.emit( new Values("value1" , "value2") , msgId );
为Tuple分配一个唯一ID的目的就是为了告诉Storm,Spout希望这个Tuple产生的Tuple tree在处理完成或失败后告知它,如果Tuple被处理成功,Spout的ack()方法就会被调用,相反如果处理失败,Spout的fail()方法就会被调用,Tuple的ID也都会传入这两个方法中。
需要注意的是,虽然spout有可靠性机制,但这个机制是否启用由我们控制的。IBasicBolt在emit一个tuple后自动调用ack()方法,用来实现比较简单的计算,这个是不可靠的。如果是IRichBolt的话,如果想要实现anchor,必须自己调用ack方法,这个保证可靠性。
2.Bolt中的可靠性
Bolt中的可靠性主要靠两步来实现:
-
发射衍生Tuple的同时anchor原Tuple
-
对各个Tuples做ack或fail处理
anchor一个Tuple就意味着在输入Tuple和其衍生Tuple之间建立了关联,关联之后的Tuple便加入了Tuple tree。我们可以通过如下方式anchor一个Tuple:
collector.emit( tuple, new Values( word));
如果我们发射新tuple的时候不同时发射元tuple,那么新发射的Tuple不会参与到整个可靠性机制中,它们的fail不会引起root tuple的重发,我们成为unanchor:
collector.emit( new Values( word));
ack和fail一个tuple的操作方法:
this .collector.ack(tuple); this .collector.fail(tuple);
上面讲过了,IBasicBolt 实现类不关心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail来决定. 其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用. 相当于忽略了该bolt的ack/fail行为。
在 IRichBolt实现类中, 如果OutputCollector.emit(oldTuple,newTuple)这样调用来发射tuple(anchoring), 那么后面的bolt的ack/fail会影响spout ack/fail, 如果collector.emit(newTuple)这样来发射tuple(在storm称之为anchoring), 则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail. 所以某个bolt后面的bolt的成功失败对你来说不关心, 你可以直接通过这种方式来忽略.中间的某个bolt fail了, 不会影响后面的bolt执行, 但是会立即触发spout的fail. 相当于短路了, 后面bolt虽然也执行了, 但是ack/fail对spout已经无意义了. 也就是说, 只要bolt集合中的任何一个fail了, 会立即触发spout的fail方法. 而ack方法需要所有的bolt调用为ack才能触发. 所以IBasicBolt用来做filter或者简单的计算比较合适。
3.总结
storm的可靠性是由spout和bolt共同决定的,storm利用了anchor机制来保证处理的可靠性。如果spout发射的一个tuple被完全处理,那么spout的ack方法即会被调用,如果失败,则其fail方法便会被调用。在bolt中,通过在emit(oldTuple,newTuple)的方式来anchor一个tuple,如果处理成功,则需要调用bolt的ack方法,如果失败,则调用其fail方法。一个tuple及其子tuple共同构成了一个tupletree,当这个tree中所有tuple在指定时间内都完成时spout的ack才会被调用,但是当tree中任何一个tuple失败时,spout的fail方法则会被调用。
IBasicBolt类会自动调用ack/fail方法,而IRichBolt则需要我们手动调用ack/fail方法。我们可以通过TOPOLOGY_MESSAGE_TIMEOUT_SECS参数来指定一个tuple的处理完成时间,若这个时间未被处理完成,则spout也会调用fail方法。
4.一个可靠的WordCount例子
一个实现可靠性的spout:
public class ReliableSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector collector; private String[] sentences = { "my dog has fleas", "i like cold beverages" , "the dog ate my homework" , "don't have a cow man" , "i don't think i like fleas" }; private int index = 0; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "sentence")); } public void open( Map config, TopologyContext context, SpoutOutputCollector collector) { this. collector = collector; this. pending = new ConcurrentHashMap<UUID, Values>(); } public void nextTuple() { Values values = new Values( sentences[ index]); UUID msgId = UUID. randomUUID(); this. pending.put(msgId, values); this. collector.emit(values, msgId); index++; if ( index >= sentences. length) { index = 0; } //Utils.waitForMillis(1); } public void ack(Object msgId) { this. pending.remove(msgId); } public void fail(Object msgId) { this. collector.emit( this. pending.get(msgId), msgId); } }
一个实现可靠性的bolt:
public class ReliableSplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; public void prepare( Map config, TopologyContext context, OutputCollector collector) { this. collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence" ); String[] words = sentence.split( " "); for (String word : words) { this. collector.emit(tuple, new Values(word)); } this. collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word")); } }
这个例子中我们实现了storm的可靠性,tuple失败了将会重新发送,直到处理成功。这里pending是一个map,为了实现tuple的失败重发。storm里面topology.max.spout.pending属性解释:
1.同时活跃的batch数量,你必须设置同时处理的batch数量。你可以通过”topology.max.spout.pending” 来指定, 如果你不指定,默认是1。
2.topology.max.spout.pending 的意义在于 ,缓存spout发送出去的tuple,当下流的bolt还有topology.max.spout.pending 个 tuple 没有消费完时,spout会停下来,等待下游bolt去消费,当tuple 的个数少于topology.max.spout.pending个数时,spout 会继续从消息源读取消息。(这个属性仅对可靠消息处理)。
如果使用事务,则表示同时处理的batch数量,如果非事务,则理解成第二种。
总而言之,如果不需要保证可靠性,spout继承BaseRichSpout,bolt继承BaseBasicBolt,它们内部实现了一些方法,自动ack,我们不需要关心ack和fail;如果要保证可靠性,spout实现IRichSpout接口,发tuple的时候,带上msgId,自定义fail和ack方法,bolt继承BaseRichBolt,发送tuple的时候要带上原tuple,要手动ack。
关于“storm中可靠性和非可靠性的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
原创文章,作者:306829225,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/230390.html