【发布时间】:2015-05-17 06:31:36
【问题描述】:
我有一个使用 Cassandra 作为输入和输出的非常简单的 Hadoop 作业。这是作业配置代码(没什么特别的):
Job job = new Job(getConf(), JOB_NAME);
job.setJarByClass(getClass());
job.setMapperClass(CassandraHadoopCounterMapper.class);
job.setReducerClass(CassandraHadoopCounterReducer.class);
job.setCombinerClass(CassandraHadoopCounterCombiner.class);
job.setInputFormatClass(CqlInputFormat.class);
job.setOutputFormatClass(CqlOutputFormat.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Map.class);
job.setOutputValueClass(List.class);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, INPUT_COLUMN_FAMILY, WIDE_ROWS);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), Murmur3Partitioner.class.getName());
ConfigHelper.setOutputPartitioner(job.getConfiguration(), Murmur3Partitioner.class.getName());
String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET c = ?";
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
//aditional properties:
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "2000");
ConfigHelper.setInputSplitSize(job.getConfiguration(), 4 * 64 * 1024);
我的输入 cassandra 表有 10k 行。
在 hadoop 中,我设置了 max mappers = 2 和 max reducers = 2
在工作计数器中,我可以看到以下内容:
Map input records=4000
InputCQLPageRowSize * mappers
如果未设置InputCQLPageRowSize,则Map input records 等于2000(因为默认InputCQLPageRowSize 为1000)
我的问题: 如何让我的 hadoop 作业读取输入表中的所有行?
作业完全在我的 PC 上本地运行。
我正在使用 Cassandra v2.0.11 和 Hadoop v1.0.4
【问题讨论】:
标签: hadoop cassandra cassandra-2.0