【问题标题】:Kafka Spark Scala Cassandra Compatible Versions [closed]Kafka Spark Scala Cassandra 兼容版本 [关闭]
【发布时间】:2018-03-11 05:12:09
【问题描述】:

我正在尝试使用 Apache Kafka、Saprk、Scala 和 Cassandra 创建一个应用程序。 但是我在获得这些工具的正确兼容版本时遇到了很多问题。

谁能告诉我应该使用哪个版本?

提前致谢..

【问题讨论】:

  • 请添加您正在使用的版本以及您的pom.xml 的外观。
  • 我使用的是 Spark 2.2、Kafka 0.11、scala 2.11,但在创建 API 时遇到了很多问题。这就是为什么我要问我现在应该从哪个版本开始?

标签: scala apache-spark cassandra apache-kafka


【解决方案1】:

以下是我们使用过的库的版本列表:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.cassandra</groupId>
        <artifactId>apache-cassandra</artifactId>
        <version>3.10</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>

您将面临的主要兼容性问题是不同的 scala 版本(2.10.* 或 2.11.*)。您将不得不照顾它,并看到所有依赖项都使用相同的 scala 版本。我认为您可以毫无疑问地将所有版本更新到最新版本,只要您关注所有地方的相同 scala 版本。

这里还有代码示例,可以帮助您开始:

       public static void main(String[] args) throws   InterruptedException {
           JavaStreamingContext jssc = new JavaStreamingContext(getSparkConfiguration(), Durations.seconds(5));

           JavaInputDStream<ConsumerRecord<String, LoggingEvent>> messages  =
            KafkaUtils.createDirectStream(
                    jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, LoggingEvent>Subscribe(Arrays.asList("some_topic"), getKafkaParams("localhost:9092", "some_logging_group))
            );

           JavaDStream<LoggingEvent> loggingRecords = messages.map(
            (Function<ConsumerRecord<String, LoggingEvent>, LoggingEvent>) message -> message.value()
    );

             CassandraStreamingJavaUtil.javaFunctions(loggingRecords).writerBuilder("some_space", "some_table",
                  CassandraJavaUtil.mapToRow(LoggingEvent.class)).saveToCassandra();

          jssc.start();
          jssc.awaitTermination();
}

连接器中的映射是通过将类中的字段与表列映射来完成的。

对于设置,我们使用了 ansible,并且存档的分发版本与库依赖项列表中的相同。

【讨论】:

  • scala :-).
【解决方案2】:

如果你对 sbt 版本感兴趣

libraryDependencies ++= {

  val sparkV = "2.1.0"
  val cassandraV = "2.0.0-M3"

  Seq(
    "org.apache.spark"      %% "spark-core" % sparkV,
    "org.apache.spark"      %% "spark-streaming" % sparkV,
    "org.apache.spark"      %% "spark-streaming-kafka-0-10" % sparkV,
    "org.apache.spark"      %% "spark-sql-kafka-0-10" % sparkV,
    "org.apache.spark"      %% "spark-sql" % sparkV,
    "org.apache.spark"      %% "spark-hive" % sparkV,
    "com.datastax.spark"    %% "spark-cassandra-connector" % cassandraV,
    "com.datastax.cassandra" % "cassandra-driver-core" % "3.2.0",
    "org.rogach"            %% "scallop" % "2.1.2"
  )

}

【讨论】:

  • 上述依赖的scala版本是2.11.8。
猜你喜欢
  • 2018-08-31
  • 2017-10-08
  • 2018-03-11
  • 1970-01-01
  • 2019-01-28
  • 2013-05-25
  • 2019-08-16
  • 2017-06-04
  • 2021-11-25
相关资源
最近更新 更多