博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm TridentTopology的构建
阅读量:6516 次
发布时间:2019-06-24

本文共 22642 字,大约阅读时间需要 75 分钟。

  hot3.png

本文主要研究一下storm TridentTopology的构建

实例

@Test    public void testDebugTopologyBuild(){        FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,                new Values("nickt1", 4),                new Values("nickt2", 7),                new Values("nickt3", 8),                new Values("nickt4", 9),                new Values("nickt5", 7),                new Values("nickt6", 11),                new Values("nickt7", 5)        );        spout.setCycle(false);        TridentTopology topology = new TridentTopology();        Stream stream1 = topology.newStream("spout1",spout)                .each(new Fields("user", "score"), new BaseFunction() {                    @Override                    public void execute(TridentTuple tuple, TridentCollector collector) {                        System.out.println("tuple:"+tuple);                    }                },new Fields());        topology.build();    }
  • 后面的分析为了简单起见,很多是依据这个实例来

TridentTopology.newStream

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

public Stream newStream(String txId, IRichSpout spout) {        return newStream(txId, new RichSpoutBatchExecutor(spout));    }        public Stream newStream(String txId, IPartitionedTridentSpout spout) {        return newStream(txId, new PartitionedTridentSpoutExecutor(spout));    }        public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));    }    public Stream newStream(String txId, ITridentDataSource dataSource) {        if (dataSource instanceof IBatchSpout) {            return newStream(txId, (IBatchSpout) dataSource);        } else if (dataSource instanceof ITridentSpout) {            return newStream(txId, (ITridentSpout) dataSource);        } else if (dataSource instanceof IPartitionedTridentSpout) {            return newStream(txId, (IPartitionedTridentSpout) dataSource);        } else if (dataSource instanceof IOpaquePartitionedTridentSpout) {            return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);        } else {            throw new UnsupportedOperationException("Unsupported stream");        }    }    public Stream newStream(String txId, IBatchSpout spout) {        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);        return addNode(n);    }        public Stream newStream(String txId, ITridentSpout spout) {        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);        return addNode(n);    }    protected Stream addNode(Node n) {        registerNode(n);        return new Stream(this, n.name, n);    }    protected void registerNode(Node n) {        _graph.addVertex(n);        if(n.stateInfo!=null) {            String id = n.stateInfo.id;            if(!_colocate.containsKey(id)) {                _colocate.put(id, new ArrayList());            }            _colocate.get(id).add(n);        }    }
  • newStream的第一个参数是txId,第二个参数是ITridentDataSource
  • ITridentDataSource分为好几个类型,分别有IBatchSpout、ITridentSpout、IPartitionedTridentSpout、IOpaquePartitionedTridentSpout
  • 最后都是创建SpoutNode,然后registerNode添加到_graph(如果node的stateInfo不为null,还会添加到_colocate,不过SpoutNode该值为null),注意SpoutNode的SpoutType为SpoutNode.SpoutType.BATCH

Node

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/Node.java

public class Node extends DefaultResourceDeclarer
implements Serializable { private static final AtomicInteger INDEX = new AtomicInteger(0); private String nodeId; public String name = null; public Fields allOutputFields; public String streamId; public Integer parallelismHint = null; public NodeStateInfo stateInfo = null; public int creationIndex; public Node(String streamId, String name, Fields allOutputFields) { this.nodeId = UUID.randomUUID().toString(); this.allOutputFields = allOutputFields; this.streamId = streamId; this.name = name; this.creationIndex = INDEX.incrementAndGet(); } @Override public boolean equals(Object o) { if (this == o) { return true; } return nodeId.equals(((Node) o).nodeId); } @Override public int hashCode() { return nodeId.hashCode(); } @Override public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); } public String shortString() { return "nodeId: " + nodeId + ", allOutputFields: " + allOutputFields; }}
  • Node继承了DefaultResourceDeclarer,而它实现了resources相关的接口:ResourceDeclarer以及ITridentResource
  • Node有几个子类,分别是SpoutNode、ProcessorNode、PartitionNode
  • SpoutNode就是spout信息的节点描述,ProcessorNode一般是trident的each、map、aggregrate、reduce、project等操作的节点描述,PartitionNode就是partition相关的节点描述

TridentTopology.build

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

public StormTopology build() {        DefaultDirectedGraph
graph = (DefaultDirectedGraph) _graph.clone(); //...... List
spoutNodes = new ArrayList<>(); // can be regular nodes (static state) or processor nodes Set
boltNodes = new LinkedHashSet<>(); for(Node n: graph.vertexSet()) { if(n instanceof SpoutNode) { spoutNodes.add((SpoutNode) n); } else if(!(n instanceof PartitionNode)) { boltNodes.add(n); } } Set
initialGroups = new LinkedHashSet<>(); //...... for(Node n: boltNodes) { initialGroups.add(new Group(graph, n)); } GraphGrouper grouper = new GraphGrouper(graph, initialGroups); grouper.mergeFully(); Collection
mergedGroups = grouper.getAllGroups(); // add identity partitions between groups for(IndexedEdge
e: new HashSet<>(graph.edgeSet())) { if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) { Group g1 = grouper.nodeGroup(e.source); Group g2 = grouper.nodeGroup(e.target); // g1 being null means the source is a spout node if(g1==null && !(e.source instanceof SpoutNode)) throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning"); if(g1==null || !g1.equals(g2)) { graph.removeEdge(e); PartitionNode pNode = makeIdentityPartition(e.source); graph.addVertex(pNode); graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0)); graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index)); } } } //...... // add in spouts as groups so we can get parallelisms for(Node n: spoutNodes) { grouper.addGroup(new Group(graph, n)); } grouper.reindex(); mergedGroups = grouper.getAllGroups(); Map
batchGroupMap = new HashMap<>(); List
> connectedComponents = new ConnectivityInspector<>(graph).connectedSets(); for(int i=0; i
parallelisms = getGroupParallelisms(graph, grouper, mergedGroups); TridentTopologyBuilder builder = new TridentTopologyBuilder(); Map
spoutIds = genSpoutIds(spoutNodes); Map
boltIds = genBoltIds(mergedGroups); for(SpoutNode sn: spoutNodes) { Integer parallelism = parallelisms.get(grouper.nodeGroup(sn)); Map
spoutRes = new HashMap<>(_resourceDefaults); spoutRes.putAll(sn.getResources()); Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); SpoutDeclarer spoutDeclarer = null; if(sn.type == SpoutNode.SpoutType.DRPC) { spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId, (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn)); } else { ITridentSpout s; if(sn.spout instanceof IBatchSpout) { s = new BatchSpoutExecutor((IBatchSpout)sn.spout); } else if(sn.spout instanceof ITridentSpout) { s = (ITridentSpout) sn.spout; } else { throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor"); // TODO: handle regular rich spout without batches (need lots of updates to support this throughout) } spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn)); } if(onHeap != null) { if(offHeap != null) { spoutDeclarer.setMemoryLoad(onHeap, offHeap); } else { spoutDeclarer.setMemoryLoad(onHeap); } } if(cpuLoad != null) { spoutDeclarer.setCPULoad(cpuLoad); } } for(Group g: mergedGroups) { if(!isSpoutGroup(g)) { Integer p = parallelisms.get(g); Map
streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap); Map
groupRes = g.getResources(_resourceDefaults); Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p, committerBatches(g, batchGroupMap), streamToGroup); if(onHeap != null) { if(offHeap != null) { d.setMemoryLoad(onHeap, offHeap); } else { d.setMemoryLoad(onHeap); } } if(cpuLoad != null) { d.setCPULoad(cpuLoad); } Collection
inputs = uniquedSubscriptions(externalGroupInputs(g)); for(PartitionNode n: inputs) { Node parent = TridentUtils.getParent(graph, n); String componentId = parent instanceof SpoutNode ? spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent)); d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping); } } } HashMap
combinedMasterCoordResources = new HashMap
(_resourceDefaults); combinedMasterCoordResources.putAll(_masterCoordResources); return builder.buildTopology(combinedMasterCoordResources); }
  • 这里创建了TridentTopologyBuilder,然后对于spoutNodes,调用TridentTopologyBuilder.setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup)方法,添加spout
  • 对于IBatchSpout类型的spout,通过BatchSpoutExecutor包装为ITridentSpout
  • 这里的streamName为streamId,通过UniqueIdGen.getUniqueStreamId生成,以s开头,之后是_streamCounter的计数,比如1,合起来就是s1;txStateId为用户传入的txId;batchGroup以bg开头,之后是connectedComponents的元素的index,比如0,合起来就是bg0;parallelism参数就是用户构建topology时设置的
  • 设置完spout之后,就是设置spout的相关资源配置,比如memoryLoad、cpuLoad;之后设置bolt,这里使用的是SubtopologyBolt,然后设置bolt相关的资源配置
  • 最后调用TridentTopologyBuilder.buildTopology

TridentTopologyBuilder.setSpout

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

Map
_batchIds = new HashMap(); Map
_spouts = new HashMap(); public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) { Map
batchGroups = new HashMap(); batchGroups.put(streamName, batchGroup); markBatchGroups(id, batchGroups); TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup); _spouts.put(id, c); return new SpoutDeclarerImpl(c); } private void markBatchGroups(String component, Map
batchGroups) { for(Map.Entry
entry: batchGroups.entrySet()) { _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue()); } }
  • 这里调用了markBatchGroups,将新的component添加到_batchIds中,同时也添加到_spouts中

TridentTopologyBuilder.setBolt

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

Map
_batchIds = new HashMap(); Map
_bolts = new HashMap(); // map from stream name to batch id public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set
committerBatches, Map
batchGroups) { markBatchGroups(id, batchGroups); Component c = new Component(bolt, parallelism, committerBatches); _bolts.put(id, c); return new BoltDeclarerImpl(c); } private void markBatchGroups(String component, Map
batchGroups) { for(Map.Entry
entry: batchGroups.entrySet()) { _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue()); } }
  • 这里调用了markBatchGroups将新的component添加到_batchIds中,同时也添加到_bolts中;对于trident来说,就是一系列的ProcessorNode(可能也会有PartitionNode)

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

public StormTopology buildTopology(Map
masterCoordResources) { TopologyBuilder builder = new TopologyBuilder(); Map
batchIdsForSpouts = fleshOutStreamBatchIds(false); Map
batchIdsForBolts = fleshOutStreamBatchIds(true); Map
> batchesToCommitIds = new HashMap<>(); Map
> batchesToSpouts = new HashMap<>(); for(String id: _spouts.keySet()) { TransactionalSpoutComponent c = _spouts.get(id); if(c.spout instanceof IRichSpout) { //TODO: wrap this to set the stream name builder.setSpout(id, (IRichSpout) c.spout, c.parallelism); } else { String batchGroup = c.batchGroupId; if(!batchesToCommitIds.containsKey(batchGroup)) { batchesToCommitIds.put(batchGroup, new ArrayList
()); } batchesToCommitIds.get(batchGroup).add(c.commitStateId); if(!batchesToSpouts.containsKey(batchGroup)) { batchesToSpouts.put(batchGroup, new ArrayList
()); } batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout); BoltDeclarer scd = builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID); for(Map
m: c.componentConfs) { scd.addConfigurations(m); } Map
specs = new HashMap(); specs.put(c.batchGroupId, new CoordSpec()); BoltDeclarer bd = builder.setBolt(id, new TridentBoltExecutor( new TridentSpoutExecutor( c.commitStateId, c.streamName, ((ITridentSpout) c.spout)), batchIdsForSpouts, specs), c.parallelism); bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID); bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID); if(c.spout instanceof ICommitterTridentSpout) { bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID); } for(Map
m: c.componentConfs) { bd.addConfigurations(m); } } } //...... Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); for(String batch: batchesToCommitIds.keySet()) { List
commitIds = batchesToCommitIds.get(batch); SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch))); if(onHeap != null) { if(offHeap != null) { masterCoord.setMemoryLoad(onHeap, offHeap); } else { masterCoord.setMemoryLoad(onHeap); } } if(cpuLoad != null) { masterCoord.setCPULoad(cpuLoad); } } for(String id: _bolts.keySet()) { Component c = _bolts.get(id); Map
specs = new HashMap<>(); for(GlobalStreamId s: getBoltSubscriptionStreams(id)) { String batch = batchIdsForBolts.get(s); if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec()); CoordSpec spec = specs.get(batch); CoordType ct; if(_batchPerTupleSpouts.containsKey(s.get_componentId())) { ct = CoordType.single(); } else { ct = CoordType.all(); } spec.coords.put(s.get_componentId(), ct); } for(String b: c.committerBatches) { specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); } BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism); for(Map
conf: c.componentConfs) { d.addConfigurations(conf); } for(InputDeclaration inputDecl: c.declarations) { inputDecl.declare(d); } Map
> batchToComponents = getBoltBatchToComponentSubscriptions(id); for(Map.Entry
> entry: batchToComponents.entrySet()) { for(String comp: entry.getValue()) { d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey())); } } for(String b: c.committerBatches) { d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); } } return builder.createTopology(); }
  • buildTopology对于非IRichSpout的的spout会在topology中创建TridentSpoutCoordinator这个bolt,它globalGrouping了MasterBatchCoordinator.BATCH_STREAM_ID($batch)、MasterBatchCoordinator.SUCCESS_STREAM_ID($success)这两个stream;同时还创建了TridentBoltExecutor这个bolt,它allGrouping了MasterBatchCoordinator.BATCH_STREAM_ID($batch)、MasterBatchCoordinator.SUCCESS_STREAM_ID($success),对于spout是ICommitterTridentSpout类型的,还allGrouping了MasterBatchCoordinator.COMMIT_STREAM_ID($commit);注意这里将非IRichSpout的spout转换为bolt
  • 之后对batchesToCommitIds中的每个batch创建MasterBatchCoordinator这个spout,正好前前面的TridentSpoutCoordinator以及TridentBoltExecutor衔接起来
  • 对于bolt来说(包装了ProcessorNode的SubtopologyBolt),这里设置了TridentBoltExecutor这个bolt,它directGrouping了TridentBoltExecutor.COORD_STREAM($coord-),同时还allGrouping了MasterBatchCoordinator.COMMIT_STREAM_ID($commit)

TridentTopologyBuilder.createTopology

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

public StormTopology createTopology() {        Map
boltSpecs = new HashMap<>(); Map
spoutSpecs = new HashMap<>(); maybeAddCheckpointSpout(); for(String boltId: _bolts.keySet()) { IRichBolt bolt = _bolts.get(boltId); bolt = maybeAddCheckpointTupleForwarder(bolt); ComponentCommon common = getComponentCommon(boltId, bolt); try{ maybeAddCheckpointInputs(common); boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); }catch(RuntimeException wrapperCause){ if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){ throw new IllegalStateException( "Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " + "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " + "should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause); } throw wrapperCause; } } for(String spoutId: _spouts.keySet()) { IRichSpout spout = _spouts.get(spoutId); ComponentCommon common = getComponentCommon(spoutId, spout); try{ spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common)); }catch(RuntimeException wrapperCause){ if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){ throw new IllegalStateException( "Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " + "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " + "should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause); } throw wrapperCause; } } StormTopology stormTopology = new StormTopology(spoutSpecs, boltSpecs, new HashMap
()); stormTopology.set_worker_hooks(_workerHooks); return Utils.addVersions(stormTopology); } /** * If the topology has at least one stateful bolt * add a {@link CheckpointSpout} component to the topology. */ private void maybeAddCheckpointSpout() { if (hasStatefulBolt) { setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1); } }
  • createTopology的时候,判断如果有stateful的bolt,则会添加CheckpointSpout这个spout;同时对每个bolt判断如果是statefulBolt且不是StatefulBoltExecutor,那么会添加CheckpointTupleForwarder
  • 经过buildTopology的一系列设置,到了createTopology这里,已经有了3个bolt,一个是包装了ProcessNode的TridentBoltExecutor,一个是TridentSpoutCoordinator,还有一个是包装了原始spout的TridentBoltExecutor
  • spout这里只有一个就是MasterBatchCoordinator,在buildTopology的时候,对于非IRichSpout的的spout,会被转化为TridentSpoutCoordinator这个bolt

拓扑结构

image

  • 以前面的实例来讲,经过TridentTopologyBuilder的createTopology,最后的拓扑结构为一个spout为MasterBatchCoordinator($mastercoord-bg0),3个bolt分别为TridentSpoutCoordinator($spoutcoord-spout-spout1)、包装了非IRichSpout的的spout的TridentBoltExecutor(spout-spout1)、包装了ProcessorNode的TridentBoltExecutor(b-0);一共涉及到了几个stream,分别为MasterBatchCoordinator.SUCCESS_STREAM_ID($success)、MasterBatchCoordinator.COMMIT_STREAM_ID($commit)、MasterBatchCoordinator.BATCH_STREAM_ID($batch)、TridentBoltExecutor.COORD_STREAM($coord-bg0)、s1、s2
  • $mastercoord-bg0它declare了$success$commit$batch这三个stream,outputFields均为tx这个字段
  • $spoutcoord-spout-spout1它接收了$mastercoord-bg0$success$batch这两个stream,同时declare了$batch这个stream,outputFields为[tx,metadata]
  • spout-spout1,它allGrouping接收$mastercoord-bg0$success,以及$spoutcoord-spout-spout1$batch这两个stream的数据;同时会往$coord-bg0发送[id,count]数据,以及stream(s1)发送数据tuple
  • b-0它接收了spout-spout1$coord-bg0以及s1这两个stream的数据,之后往stream(s2)发送数据(output_fields:[$batchId, user, score]),同时也会往stream($coord-bg0)发送[id, count]数据

小结

  • TridentTopologyBuilder在buildTopology的时候,对于非IRichSpout的的spout,会被转化为TridentBoltExecutor这个bolt,同时会新增一个TridentSpoutCoordinator这个bolt;ProcessorNode则会被包装为TridentBoltExecutor这个bolt;TridentTopology为了方便管理将用户设定的spout包装为bolt,然后创建MasterBatchCoordinator作为真正的spout
  • TridentBoltExecutor.COORD_STREAM($coord-)这个stream用来在component之间传递[id, count]数据,用于保障tuple在每个component能够完整传输,即spout和bolt都会往该stream发送[id, count]数据
  • MasterBatchCoordinator、TridentSpoutCoordinator、包装原始spout的TridentBoltExecutor(spout-spout1)它们之间的关系如下:master会给spout-spout1发送suceess数据(tuple\指令),给coordinator发送suceess、batch数据(tuple\指令);coordinator会给spout-spout1发送batch数据(tuple\指令)

doc

转载于:https://my.oschina.net/go4it/blog/2873974

你可能感兴趣的文章
模拟手指或者鼠标单击和双击
查看>>
修改版的echojs支持iScroll
查看>>
20181023-2 贡献分配
查看>>
CentOS 7 关闭启动防火墙
查看>>
Vue-选项卡切换
查看>>
linux网络命令
查看>>
nodejs ejs 请求路径和静态资源文件路径
查看>>
4.1 State Snapshot Transfer
查看>>
C++小代码
查看>>
记一次思维转变的时刻
查看>>
libuv 中文编程指南(二)libuv 基础
查看>>
phpcms v9 中的数据库操作函数
查看>>
远程桌面无法复制粘贴
查看>>
bzoj2754
查看>>
redis liunx下安装和配置
查看>>
Asp.Net MVC 学习心得 之 View
查看>>
ajax同步和异步的区别
查看>>
HDU max sum(最大子序列之和)
查看>>
STL - Map - 运行期自定义排序
查看>>
matplotlib制第一个简单图(转自matplotlib手册)
查看>>