以下是我们使用过的库的版本列表:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>apache-cassandra</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.2</version>
</dependency>
您将面临的主要兼容性问题是不同的 scala 版本(2.10.* 或 2.11.*)。您将不得不照顾它,并看到所有依赖项都使用相同的 scala 版本。我认为您可以毫无疑问地将所有版本更新到最新版本,只要您关注所有地方的相同 scala 版本。
这里还有代码示例,可以帮助您开始:
public static void main(String[] args) throws InterruptedException {
JavaStreamingContext jssc = new JavaStreamingContext(getSparkConfiguration(), Durations.seconds(5));
JavaInputDStream<ConsumerRecord<String, LoggingEvent>> messages =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, LoggingEvent>Subscribe(Arrays.asList("some_topic"), getKafkaParams("localhost:9092", "some_logging_group))
);
JavaDStream<LoggingEvent> loggingRecords = messages.map(
(Function<ConsumerRecord<String, LoggingEvent>, LoggingEvent>) message -> message.value()
);
CassandraStreamingJavaUtil.javaFunctions(loggingRecords).writerBuilder("some_space", "some_table",
CassandraJavaUtil.mapToRow(LoggingEvent.class)).saveToCassandra();
jssc.start();
jssc.awaitTermination();
}
连接器中的映射是通过将类中的字段与表列映射来完成的。
对于设置,我们使用了 ansible,并且存档的分发版本与库依赖项列表中的相同。