【问题标题】:Spark Streaming - Java - Insert JSON from Kafka into CassandraSpark Streaming - Java - 将 JSON 从 Kafka 插入 Cassandra
【发布时间】:2016-11-04 05:24:48
【问题描述】:

我正在使用 Java 在 Spark Streaming 中编写一个简单的数据管道,以从 Kafka 中提取 JSON 数据,将 JSON 解析为自定义类 (Transaction),然后将该数据插入到 Cassandra 表中,但我无法让mapToRow() 函数工作。

我见过大量的例子表明你所要做的就是这样的事情:

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
        streamingContext,
        String.class, 
        String.class, 
        StringDecoder.class, 
        StringDecoder.class,
        kafkaParams,
        topicsSet
);

JavaDStream<String> lines = stream.map(
    new Function<Tuple2<String,String>, String>(){
        @Override
        public String call(Tuple2<String,String> tuple2) {
            return tuple2._2();
        }
    }
);

javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();

但是,当我这样做时,我得到了错误:

The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions

我认为我所缺少的只是课堂上的某种装饰,但我还没有成功找出哪一个。我已经试过了,基本上是把这个类变成一个属性包:

public class Transaction implements java.io.Serializable{

    public int TransactionId;
    ...

    public Transaction(){}
}

我已经尝试了所有的 DataStax 映射注释:

@Table(keyspace = "myKeyspace", name = "myTableName",
       readConsistency = "QUORUM",
       writeConsistency = "QUORUM",
       caseSensitiveKeyspace = false,
       caseSensitiveTable = false)
public class Transaction implements java.io.Serializable{

    @PartitionKey(0)
    @Column(name="transaction_id")
    public int TransactionId;
    ...

    public Transaction(){}
}

我还尝试为每个属性建立公共 get/set 方法并将属性设置为私有:

public class Transaction implements java.io.Serializable{

    private int transactionId;
    ...

    public Transaction(){}

    public int getTransactionId() {
        return transactionId;
    }

    public void setTransactionId(int transactionId) {
        this.transactionId = transactionId;
    }
}

我已经能够使用下面的类将DStream 解析为TransactionsRDD

public class Transaction implements java.io.Serializable{

    ...

    public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
        public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
            ArrayList<Transaction> transactions = new ArrayList<Transaction>();
                ObjectMapper mapper = new ObjectMapper();
                while (lines.hasNext()) {
                    String line = lines.next();
                    try {
                        transactions.add(mapper.readValue(line, Transaction.class));
                    } catch (Exception e) {
                        System.out.println("Skipped:" + e);
                    }
                }

                return transactions;
        }
    }
}

结合以下代码,从上面作用于lines对象:

JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());

但是,一旦我从中获得它,它仍然无法与 writeBuilder().saveToCassandra() 链一起使用。

非常感谢这里的任何帮助。

【问题讨论】:

    标签: java json cassandra apache-kafka spark-streaming


    【解决方案1】:

    原来问题只是一个导入问题。我已经导入了com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*,以为它会给我所需的一切,但我还需要为 .mapToRow() 函数导入 com.datastax.spark.connector.japi.CassandraJavaUtil.*

    解决此问题后,我开始收到以下错误:

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
        at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
        at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
        at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
        at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
        at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
        at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
        at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
        at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
        at globalTransactions.Process.main(Process.java:77)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
        at java.net.URLClassLoader.findClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        ... 9 more
    

    通过引入 spark-sql 项目已解决:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    

    希望这对下一个人/女孩有所帮助。

    【讨论】:

      猜你喜欢
      • 2015-02-07
      • 1970-01-01
      • 2019-08-08
      • 2020-06-16
      • 2017-09-28
      • 2019-07-12
      • 2016-03-12
      • 2018-05-17
      • 1970-01-01
      相关资源
      最近更新 更多