【发布时间】:2013-10-16 17:22:43
【问题描述】:
我的 main() 方法中有两个独立的拓扑。我正在使用“for loop”来设置其相应的 spout 和 bolts 并提交该拓扑。 每个拓扑都有单独的一组 spout 和 bolts,其中包含不同的逻辑。
现在我想动态调用不同的 spout 和 bolt。我该怎么做??
main方法中的代码:
public static void main(String[] args) throws InterruptedException,Exception
{
LocalCluster cluster = new LocalCluster();
try{
BufferedReader in=new BufferedReader(new FileReader("/home/praveen /workspace/OfferEngine/OfferListJSON.json"));
//ArrayList<String> content = new ArrayList<String>();
String str="";
String str1="";
while((str1=in.readLine())!=null)
{
str = str + str1;
}
JsonParser parser = new JsonParser();
JsonObject o = (JsonObject)parser.parse(str);
JsonObject o2 = (JsonObject)o.get("OfferData");
ArrayList<String> ol = new ArrayList<String>();
ol.add(o2.get("strOfferId").toString().replaceAll("^\"|\"$", ""));
ol.add(o2.get("strOfferId1").toString().replaceAll("^\"|\"$", ""));
TopologyBuilder bu = null;
Config config = new Config();
config.setDebug(false);
for(String x : ol) {
System.out.println(x); //prints element x
String y="", z="";
bu = new TopologyBuilder();
bu.setSpout(x, new Off1Spout(x).ks(), 2);
y = x+"2";
bu.setBolt(y, new main.java.bolts.Off1Bolt()).shuffleGrouping(x);
z = x+"offerlimit";
bu.setBolt(z, new OfferLimit()).shuffleGrouping(y);
cluster.submitTopology(x,config,bu.createTopology());
}
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
任何建议都会更有帮助...谢谢问候
【问题讨论】:
-
只是一个想法,如果为 Spouts 创建一个工厂,它采用 String 类型并基于该返回所需的 Spout .. 它就像一个非常简单的工厂。但话又说回来,如果您的病情增加,您需要在简单工厂中进行更改。
标签: java apache-storm