为了实现这一点,我使用 NamedVectors。
如您所知,在对数据进行任何聚类之前,您必须对其进行矢量化。
这意味着您必须将数据转换为 Mahout 向量,因为那是
聚类算法使用的数据类型。
矢量化过程将取决于数据的性质,即矢量化文本与
向量化数值。
您的数据似乎很容易矢量化,因为它只有一个 ID 和 4 个数值。
您可以编写一个 Hadoop 作业来获取您的输入数据,例如,作为 CSV 文件,
并输出一个序列文件,其中您的数据已经矢量化。
然后,您将 Mahout 聚类算法应用于此输入,并将每个矢量的 ID(矢量名称)保留在聚类结果中。
可以使用以下类实现对数据进行矢量化的示例作业:
public class DenseVectorizationDriver extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err); return -1;
}
Job job = new Job(getConf(), "Create Dense Vectors from CSV input");
job.setJarByClass(DenseVectorizationDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(DenseVectorizationMapper.class);
job.setReducerClass(DenseVectorizationReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(VectorWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}
public class DenseVectorizationMapper extends Mapper<LongWritable, Text, LongWritable, VectorWritable>{
/*
* This mapper class takes the input from a CSV file whose fields are separated by TAB and emits
* the same key it receives (useless in this case) and a NamedVector as value.
* The "name" of the NamedVector is the ID of each row.
*/
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
System.out.println("LINE: "+line);
String[] lineParts = line.split("\t", -1);
String id = lineParts[0];
//you should do some checks here to assure that this piece of data is correct
Vector vector = new DenseVector(lineParts.length -1);
for (int i = 1; i < lineParts.length -1; i++){
String strValue = lineParts[i];
System.out.println("VALUE: "+strValue);
vector.set(i, Double.parseDouble(strValue));
}
vector = new NamedVector(vector, id);
context.write(key, new VectorWritable(vector));
}
}
public class DenseVectorizationReducer extends Reducer<LongWritable, VectorWritable, LongWritable, VectorWritable>{
/*
* This reducer simply writes the output without doing any computation.
* Maybe it would be better to define this hadoop job without reduce phase.
*/
@Override
public void reduce(LongWritable key, Iterable<VectorWritable> values, Context context) throws IOException, InterruptedException{
VectorWritable writeValue = values.iterator().next();
context.write(key, writeValue);
}
}