【问题标题】:Unable to consume kafka messages using apache storm无法使用 apachestorm 使用 kafka 消息
【发布时间】:2018-11-26 10:19:37
【问题描述】:

我已经开发了一个应用程序来使用 apachestorm 使用 kafka 消息,当我在 eclipse 中使用 LocalCluster 运行拓扑时,它工作正常并且消息可以正常使用,但是当我使用storm命令运行它时(bin\storm jar ..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.Topologystorm-kafka-topology),拓扑开始但无法消费任何消息,我做错了什么,或指导我可以做些什么来找到问题

拓扑代码

public class Topology {

public Properties configs;
public BoltBuilder boltBuilder;
public SpoutBuilder spoutBuilder;   

public Topology(String configFile) throws Exception {
    configs = new Properties();

    InputStream is = null;
    try {
        is = this.getClass().getResourceAsStream("/application.properties");
        configs.load(is);
        //configs.load(Topology.class.getResourceAsStream("/application.properties"));
        boltBuilder = new BoltBuilder(configs);
        spoutBuilder = new SpoutBuilder(configs);
    } catch (Exception ex) {
        ex.printStackTrace();
        System.exit(0);
    }
}

private void submitTopology() throws Exception {
    System.out.println("Entered in submitTopology");
    TopologyBuilder builder = new TopologyBuilder();    
    KafkaSpout<?, ?> kafkaSpout = spoutBuilder.buildKafkaSpout();
    SinkTypeBolt sinkTypeBolt = boltBuilder.buildSinkTypeBolt();
    MongoDBBolt mongoBolt = boltBuilder.buildMongoDBBolt();


    //set the kafkaSpout to topology
    //parallelism-hint for kafkaSpout - defines number of executors/threads to be spawn per container
    int kafkaSpoutCount = Integer.parseInt(configs.getProperty(Keys.KAFKA_SPOUT_COUNT));
    builder.setSpout(configs.getProperty(Keys.KAFKA_SPOUT_ID), kafkaSpout, kafkaSpoutCount);


    //set the sinktype bolt
    int sinkBoltCount = Integer.parseInt(configs.getProperty(Keys.SINK_BOLT_COUNT));
    builder.setBolt(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),sinkTypeBolt,sinkBoltCount).shuffleGrouping(configs.getProperty(Keys.KAFKA_SPOUT_ID));

    //set the mongodb bolt
    int mongoBoltCount = Integer.parseInt(configs.getProperty(Keys.MONGO_BOLT_COUNT));
    builder.setBolt(configs.getProperty(Keys.MONGO_BOLT_ID),mongoBolt,mongoBoltCount).shuffleGrouping(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),Keys.MONGODB_STREAM);


    String topologyName = configs.getProperty(Keys.TOPOLOGY_NAME);

    Config conf = new Config();
    //Defines how many worker processes have to be created for the topology in the cluster.
    conf.setNumWorkers(1);

    System.out.println("Submitting Topology");
    //StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
    System.out.println("Topology submitted");

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(topologyName, conf, builder.createTopology());
}

public static void main(String[] args) throws Exception {
    String configFile;
    if (args.length == 0) {
        System.out.println("Missing input : config file location, using default");
        configFile = "application.properties";
    } else{
        configFile = args[0];
    }

    Topology ingestionTopology = new Topology(configFile);
    ingestionTopology.submitTopology();
}

}

Spout 代码

public class SpoutBuilder {

public Properties configs = null;

public SpoutBuilder(Properties configs) {
    this.configs = configs;
}
public KafkaSpout<?, ?> buildKafkaSpout() {
    String servers = configs.getProperty(Keys.KAFKA_BROKER);
    String topic = configs.getProperty(Keys.KAFKA_TOPIC);
    String group = configs.getProperty(Keys.KAFKA_CONSUMERGROUP);

    return new KafkaSpout<>(getKafkaSpoutConfig(servers,topic,group));
}

protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic, String group) {
    return KafkaSpoutConfig.builder(bootstrapServers, new String[]{topic})
        .setProp(ConsumerConfig.GROUP_ID_CONFIG, group)
        .setRetry(getRetryService())
        .setOffsetCommitPeriodMs(10_000)
        .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
        .setMaxUncommittedOffsets(250)
        .setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE)
        .setTupleTrackingEnforced(true)
        .setEmitNullTuples(false)
        .setRecordTranslator(new DefaultRecordTranslator<String, String>())
        .build();
}

protected KafkaSpoutRetryService getRetryService() {
    return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
        TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
}

}

螺栓生成器

public class BoltBuilder {

public Properties configs = null;

public BoltBuilder(Properties configs) {
    this.configs = configs;
}

public SinkTypeBolt buildSinkTypeBolt() {
    return new SinkTypeBolt();
}

public MongoDBBolt buildMongoDBBolt() {
    String host = configs.getProperty(Keys.MONGO_HOST);
    int port = Integer.parseInt(configs.getProperty(Keys.MONGO_PORT));
    String db = configs.getProperty(Keys.MONGO_DATABASE);
    String collection = configs.getProperty(Keys.MONGO_COLLECTION);
    return new MongoDBBolt(host, port, db, collection);
}

}

SinkTypeBolt 代码

public class SinkTypeBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;

public void execute(Tuple tuple) {
    String value = tuple.getString(4);
    System.out.println("Received in SinkType bolt : "+value);
    if (value != null && !value.isEmpty()){
        collector.emit(Keys.MONGODB_STREAM,new Values(value));
        System.out.println("Emitted : "+value);
    }
    collector.ack(tuple);   
}

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declareStream(Keys.MONGODB_STREAM, new Fields("content"));
}

}

MongoDB 螺栓

public class MongoDBBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private MongoDatabase mongoDB;
private MongoClient mongoClient;
private String collection;

public String host;
public int port ;
public String db;

protected MongoDBBolt(String host, int port, String db,String collection) {
    this.host = host;
    this.port = port;
    this.db = db;
    this.collection = collection;
}

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
    this.mongoClient = new MongoClient(host,port);
    this.mongoDB = mongoClient.getDatabase(db);
}

public void execute(Tuple input) {
    Document mongoDoc = getMongoDocForInput(input);
    try{
        mongoDB.getCollection(collection).insertOne(mongoDoc);
        collector.ack(input);
    }catch(Exception e) {
        e.printStackTrace();
        collector.fail(input);
    }
}

@Override
public void cleanup() {
    this.mongoClient.close();
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // TODO Auto-generated method stub
}

public Document  getMongoDocForInput(Tuple input) {
    Document doc = new Document();
    String content = (String) input.getValueByField("content");
    String[] parts = content.trim().split(" ");
    System.out.println("Received in MongoDB bolt "+content);
    try {
        for(String part : parts) {
            String[] subParts = part.split(":");
            String fieldName = subParts[0];
            String value = subParts[1];
            doc.append(fieldName, value);
        }
    } catch(Exception e) {

    }
    return doc;
}

}

pom.xml 代码

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.2.2</version>
        <scope>provided</scope>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>1.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.0.4</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.4</version>
            <configuration>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.kafka_storm.util.Topology</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-resources-plugin</artifactId>
            <version>2.4</version>
        </plugin>
    </plugins>
    <resources>
        <resource>
            <directory>src/main/java</directory>
            <includes>
                <include> **/*.properties</include>
            </includes>
        </resource>
    </resources>
</build>

Storm UI

【问题讨论】:

    标签: java apache-kafka apache-storm


    【解决方案1】:

    可以肯定的是,当您使用storm jar 提交拓扑时,您记得在 Topology 中使用 StormSubmitter 行,而不是 LocalCluster,对吧?

    另外请检查您是否启动了所有正确的守护进程,即storm nimbusstorm supervisor 至少应该运行(加上您的 Zookeeper 安装)

    接下来要查看的地方是您的日志文件。在 Storm 目录中,您将拥有一个 logs 目录。查看logs/worker-artifacts/&lt;your-topology-id&gt;/&lt;your-worker-port&gt;/worker.log 文件。这些有望使您走上正确的轨道,以弄清楚发生了什么。我会打开 Storm UI,找到你的 spout 并查看它在哪些工作端口上运行,这样你就可以查看正确的日志文件。

    【讨论】:

    • 谢谢 Stig,我检查了你提到的日志,Java 版本存在问题,因为我使用 java 8 进行开发,但使用 java 10 进行 Strom 部署。现在可以正常使用了。
    猜你喜欢
    • 1970-01-01
    • 2019-09-23
    • 2018-11-29
    • 2021-11-09
    • 2019-01-11
    • 2018-07-17
    • 2019-04-18
    • 1970-01-01
    • 2021-03-28
    相关资源
    最近更新 更多