【发布时间】:2015-12-15 12:13:26
【问题描述】:
有没有办法通过 spark 的 java api 将 csv 导入 cassandra,而无需为 csv 创建 pojo 类。我可以通过创建如下所示的 pojo 类来插入 csv,有什么方法可以在不使用 spark java api 以编程方式为 csv 创建 pojo 类的情况下这样做。
My csv looks like this
Name,Age,bg,sex
ammar,67,ab+,M
nehan,88,b+,M
moin,99,m+,M
arbaaz,67,a+,M
...
程序如下。
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import com.cassandra.insertion.MergeGeneSymDataInsertion;
import com.cassandra.insertion.MergeGeneSymDataInsertion.HgIpsenGeneSym;
import com.publicdata.task.PublicDataInsertion.PublicData;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
public class InsertCsv {
static JavaSparkContext ctx = null;
static boolean isHeader = true;
public static void main(String[] args) {
try {
ctx = new JavaSparkContext(new SparkConf().setMaster("local[4]")
.setAppName("TestCsvInserion"));
insertCsv(ctx);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void insertCsv(JavaSparkContext ctx) {
JavaRDD<String> testfileRdd = ctx
.textFile("/home/syedammar/Pilot Project /test.csv");
JavaRDD<Bats> batsclassRdd = testfileRdd
.map(new Function<String, Bats>() {
@Override
public Bats call(String line) throws Exception {
// TODO Auto-generated method stub
if(!isHeader){
String[] words=StringUtils.split(line, ",");
String name = words[0];
String age = words[1];
String bg = words[2];
String sex = words[3];
return new Bats(name, age, bg, sex);
}
else
{
isHeader=false;
return null;
}
}
}).filter(new Function<Bats, Boolean>() {
@Override
public Boolean call(Bats obj) throws Exception {
// TODO Auto-generated method stub
return obj!=null;
}
}).coalesce(1);
javaFunctions(batsclassRdd).writerBuilder("test", "bats", mapToRow(Bats.class)).saveToCassandra();
}
public static class Bats {
public Bats() {
// TODO Auto-generated constructor stub
}
private String name;
private String age;
private String bg;
public Bats(String name, String age, String bg, String sex) {
super();
this.name = name;
this.age = age;
this.bg = bg;
this.sex = sex;
}
private String sex;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public String getBg() {
return bg;
}
public void setBg(String bg) {
this.bg = bg;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
}
}
【问题讨论】:
-
我想你想要的是 spark-csv github.com/databricks/spark-csv
-
看到你发送的链接我不太清楚。您能否在问题中给我一个上述 csv 模板的示例(代码),以将其插入到具有相同 csv 标头的 cassandra 表中,而无需通过 spark java api 创建 pojo 类。谢谢
标签: java csv apache-spark cassandra-2.0 datastax