【发布时间】:2016-11-04 05:24:48
【问题描述】:
我正在使用 Java 在 Spark Streaming 中编写一个简单的数据管道,以从 Kafka 中提取 JSON 数据,将 JSON 解析为自定义类 (Transaction),然后将该数据插入到 Cassandra 表中,但我无法让mapToRow() 函数工作。
我见过大量的例子表明你所要做的就是这样的事情:
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String> lines = stream.map(
new Function<Tuple2<String,String>, String>(){
@Override
public String call(Tuple2<String,String> tuple2) {
return tuple2._2();
}
}
);
javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();
但是,当我这样做时,我得到了错误:
The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions
我认为我所缺少的只是课堂上的某种装饰,但我还没有成功找出哪一个。我已经试过了,基本上是把这个类变成一个属性包:
public class Transaction implements java.io.Serializable{
public int TransactionId;
...
public Transaction(){}
}
我已经尝试了所有的 DataStax 映射注释:
@Table(keyspace = "myKeyspace", name = "myTableName",
readConsistency = "QUORUM",
writeConsistency = "QUORUM",
caseSensitiveKeyspace = false,
caseSensitiveTable = false)
public class Transaction implements java.io.Serializable{
@PartitionKey(0)
@Column(name="transaction_id")
public int TransactionId;
...
public Transaction(){}
}
我还尝试为每个属性建立公共 get/set 方法并将属性设置为私有:
public class Transaction implements java.io.Serializable{
private int transactionId;
...
public Transaction(){}
public int getTransactionId() {
return transactionId;
}
public void setTransactionId(int transactionId) {
this.transactionId = transactionId;
}
}
我已经能够使用下面的类将DStream 解析为Transactions 的RDD:
public class Transaction implements java.io.Serializable{
...
public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
ArrayList<Transaction> transactions = new ArrayList<Transaction>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
try {
transactions.add(mapper.readValue(line, Transaction.class));
} catch (Exception e) {
System.out.println("Skipped:" + e);
}
}
return transactions;
}
}
}
结合以下代码,从上面作用于lines对象:
JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());
但是,一旦我从中获得它,它仍然无法与 writeBuilder().saveToCassandra() 链一起使用。
非常感谢这里的任何帮助。
【问题讨论】:
标签: java json cassandra apache-kafka spark-streaming