【发布时间】:2015-10-27 13:33:03
【问题描述】:
我是自定义文件格式的新手。我正在尝试通过
MapReduce 执行 WordCount 程序。我正在使用以下类
WordKey.class(以单词为键)
MyInputFormat.class
MyRecordReader.class
MyOutputFormat.class
MyRecordWriter.class
MyMapper.class
MyReducer.class
我已经以如下所示的方式实现了映射器
public class MyMapper extends Mapper<<strong>WordKey</strong>,<strong>WordValue</strong>,Text,IntWritable>{//rest of the business logic }
我的问题是:
可以像我在上面给出的代码中编写的那样自定义映射器的输入键和输入值吗?或者其他东西可能是相同的解决方案?
请推荐
提前致谢 :)
以下是 Mapper 和 Reducer 代码
**Mapper**
public class MyMapper extends Mapper<StudentKey, PassValue,PassValue,IntWritable> {
public void map(StudentKey key,PassValue value,Context context)throws IOException,InterruptedException
{
context.write(new PassValue(value.getResStatus()),new IntWritable(1));
}
}
这是 reduce 方法的代码
public class MyReducer extends
Reducer<PassValue, IntWritable, Text, IntWritable> {
int sum=0;
public void reduce(PassValue key,Iterable<IntWritable>value,Context context)throws IOException,InterruptedException{
for(IntWritable x:value)
{
sum+=x.get();
}
context.write(new Text(key.toString()), new IntWritable(sum));
}
}
RecordReader 的代码
public class MyRecordReader extends RecordReader<StudentKey, PassValue> {
private StudentKey key;
private PassValue value;
private LineRecordReader reader=new LineRecordReader();
@Override
public void close() throws IOException {
reader.close();
}
@Override
public StudentKey getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public PassValue getCurrentValue() throws IOException, InterruptedException {
// return the value
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return reader.getProgress();
}
@Override
public void initialize(InputSplit is, TaskAttemptContext tac)
throws IOException, InterruptedException {
reader.initialize(is, tac);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
boolean nextKeyValueStatus=reader.nextKeyValue();
if(nextKeyValueStatus)
{
if(key==null)
{
key=new StudentKey();
}if(value==null)
{
value=new PassValue();
}
Text line=reader.getCurrentValue();
String tokens[]=line.toString().split(",");
key.setName(new Text(tokens[0]));
key.setRoll(tokens[1]); //string is acceptable here because the setRoll has been customised apart from the normal
// framework structure
value.setResStatus(tokens[2]);
}
return nextKeyValueStatus;
}
}
RecordWriter 代码
public class MyRecordWriter extends RecordWriter<PassValue, IntWritable> {
private DataOutputStream out;
public MyRecordWriter(DataOutputStream out)
{
super();
this.out=out;
}
public MyRecordWriter(FSDataOutputStream fileOut) {
this.out=fileOut;
}
@Override
public void close(TaskAttemptContext tac) throws IOException,
InterruptedException {
out.close();
}
List<IntWritable> x=new ArrayList<IntWritable>();
@Override
public void write(PassValue arg0, IntWritable arg1) throws IOException,
InterruptedException {
x.add(arg1);
//write key First
out.writeBytes(arg0.toString());
for(IntWritable y:x)
{
out.writeBytes(",");
out.writeBytes(String.valueOf(y.get()));
}
out.writeBytes("\r\n");
}
}
OutputFormat 的代码
public class MyOutputFormat extends FileOutputFormat<PassValue,IntWritable> {
@Override
public RecordWriter<PassValue, IntWritable> getRecordWriter(
TaskAttemptContext tac) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Path currPath=FileOutputFormat.getOutputPath(tac);
Path fullPath=new Path(currPath,"result.txt");
FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
FSDataOutputStream fileOut=fs.create(fullPath, tac);
return new MyRecordWriter(fileOut);
}
}
自定义键代码
public class StudentKey implements WritableComparable<StudentKey> {
private Text name;
private IntWritable roll;
public StudentKey()
{
}
public StudentKey(Text name, IntWritable roll) {
super();
this.name = name;
this.roll = roll;
}
public StudentKey(Text name)
{
super();
this.name=name;
}
/**
* @return the name
*/
public Text getName() {
return name;
}
/**
* @param name the name to set
*/
public void setName(Text name) {
this.name = name;
}
/**
* @return the roll
*/
public IntWritable getRoll() {
return roll;
}
/**
* @param roll the roll to set
*/
public void setRoll(IntWritable roll) {
this.roll = roll;
}
public void setRoll(String roll) {
this.roll = new IntWritable(Integer.parseInt(roll));
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
this.name.readFields(arg0);
this.roll.readFields(arg0);
}
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
this.name.write(arg0);
this.roll.write(arg0); }
/* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((roll == null) ? 0 : roll.hashCode());
return result;
}
/* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StudentKey other = (StudentKey) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
if (roll == null) {
if (other.roll != null)
return false;
} else if (!roll.equals(other.roll))
return false;
return true;
}
@Override
public int compareTo(StudentKey arg0) {
int comp=this.name.compareTo(arg0.name);
if(comp!=0)
{
return comp;
}
else return this.roll.compareTo(arg0.roll);
}
}
自定义值类
public class PassValue implements Writable {
private Text resStatus;
public PassValue()
{
resStatus=new Text();
}
public PassValue(Text resStatus)
{
super();
this.resStatus=resStatus;
}
public PassValue(PassValue status)
{
super();
this.resStatus=status.resStatus;
}
/**
* @return the resStatus
*/
public Text getResStatus() {
return resStatus;
}
/**
* @param resStatus the resStatus to set
*/
public void setResStatus(Text resStatus) {
this.resStatus = resStatus;
}
//String implementation
public void setResStatus(String resStatus) {
this.resStatus = new Text(resStatus);
}
@Override
public void readFields(DataInput arg0) throws IOException {
this.resStatus.readFields(arg0);
}
@Override
public void write(DataOutput arg0) throws IOException {
this.resStatus.write(arg0);
}
/* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((resStatus == null) ? 0 : resStatus.hashCode());
return result;
}
/* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PassValue other = (PassValue) obj;
if (resStatus == null) {
if (other.resStatus != null)
return false;
} else if (!resStatus.equals(other.resStatus))
return false;
return true;
}
}
驱动程序代码
public class DriverCode extends Configured implements Tool {
static Configuration cf;
@Override
public int run(String[] arg0) throws IOException,InterruptedException,ClassNotFoundException {
cf=new Configuration();
Job j=Job.getInstance(cf);
j.setJarByClass(DriverCode.class);
j.setMapperClass(MyMapper.class);
j.setMapOutputKeyClass(PassValue.class);
j.setMapOutputValueClass(IntWritable.class);
// j.setCombinerClass(WCReduce.class);
j.setReducerClass(MyReducer.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(Text.class);
j.setInputFormatClass(MyInputFormat.class);
j.setOutputFormatClass(MyOutputFormat.class);
Path op=new Path(arg0[1]);
FileInputFormat.addInputPath(j, new Path(arg0[0]));
FileOutputFormat.setOutputPath(j, op);
op.getFileSystem(cf).delete(op, true);
return j.waitForCompletion(true)?0:1;
}
public static void main(String args[])throws Exception{
int res=ToolRunner.run(cf, new DriverCode(), args);
System.exit(res);
}
}
【问题讨论】:
-
当你想到wordkey和wordvalue时,你能补充一下你的要求吗?这将有助于我们更好地指导您。
-
实际上我正在练习自定义文件格式的概念,并且我正在使用一个文本文件,它的内容在一行中,没有换行符。通过使用供应商定义的 Box 类来计算单词出现的数量可能是一项简单的任务,但我发现自定义它很有趣,因为我在 RecordReader 部分中遇到了分隔符的问题。希望我很清楚我的要求。非常感谢您的提问:)
-
好。当您计划自己的输入格式时,我假设您知道管理自己的拆分和记录阅读器。快乐编码