创建并提交Topology到Storm集群的完整代码

//创建TopologyBuilder对象

TopologyBuilder builder=new TopologyBuilder();

//添加一个id为1,并行度为5的TestWordSpout对象

builder.setSpout("1",new TestWordSpount(true),5);

//添加一个id为2,并行度为3的TestWordSpout对象

builder.setSpout("2",new TestWordSpount(true),3);

//添加一个id为3,并行度为3的TestWordCounter对象

 //对id为1的组件按照“word”字段进行分组

//对id为2的组件按照“word”字段进行分组

builder.setBolt("3",new TestWordCounter(),3).fieldsGrounping("1",new Fields("word")).fieldsGrounping("2",new Fields("word"));

//添加一个id为4,并行度为1的TestGlobalCount对象

//对id为1的组件按照全局分组

builder.setBolt("4",new TestGlobalCont(),1).globalGrounoinhg("1");

Map conf=new HashMap();

conf.put(Config.TOPOLOGY_WORKERS,4);

//提交拓扑

StormSubmitter.submitTopology("mytopology",conf,builder.creatTology);

 

相关文章:

猜你喜欢
  • 2021-09-04
  • 2021-06-19
相关资源
相似解决方案