【发布时间】:2014-07-27 02:20:55
【问题描述】:
我正在尝试从 cassandra 读取到 JavaRDD 以下是我的代码
public class SparkWCassandra {
public static void main(String[] args) {
JavaSparkContext jsc = new JavaSparkContext("local","spark Cassandra");
String KeySpace = "retail";
String InputColumnFamily = "ordercf";
try {
Job job = new Job();
job.setInputFormatClass(CqlPagingInputFormat.class);
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
} catch (IOException ex) {
Logger.getLogger(SparkWCassandra.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
下一步应该使用jsc.newAPIHadoopRDD(),但我不明白这些参数是什么意思,我应该传递给它什么。
cassandra中创建的keyspace和table如下
CREATE TABLE salecount (product_id text,sale_count int, PRIMARY KEY (product_id));
CREATE TABLE ordercf (user_id text,
time timestamp,
product_id text,
quantity int,
PRIMARY KEY (user_id, time));
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('bob', 1385983646000,'iphone', 1);
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('tom', 1385983647000,'samsung', 4);
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('dora', 1385983648000,'nokia', 2);
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('charlie', 1385983649000,'iphone', 2);
谁能举例说明使用 newAPIHadoopFile 吗?谢谢!
【问题讨论】:
标签: java hadoop cassandra apache-spark