【问题标题】:Problems with setting up and accessing Distributed Cache设置和访问分布式缓存的问题
【发布时间】:2014-01-21 00:46:19
【问题描述】:

由于某种原因,我在网上找不到任何好的资源来让分布式缓存与新 API 一起工作。希望这里有人可以解释我做错了什么。我目前的尝试是我在网上找到的各种东西的混搭。

此程序尝试运行 k-最近邻算法。输入文件是测试数据集,而分布式缓存保存训练数据集和训练标签。映射器应该取一行测试数据,与分布式缓存数据中的每一行进行比较,并返回最相似的行的标签。

import java.net.URI;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KNNDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.printf("Usage: %s [generic options] <input dir> <output dir>\n", getClass().getSimpleName());
            return -1;
        }

        Configuration conf = new Configuration();
        // conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "^");

        conf.setInt ("train_rows",1000);
        conf.setInt ("test_rows",1000);
        conf.setInt ("cols",612);
        DistributedCache.addCacheFile(new URI("cacheData/train_sample.csv"),conf);
        DistributedCache.addCacheFile(new URI("cacheData/train_labels.csv"),conf);

        Job job = new Job(conf);
        job.setJarByClass(KNNDriver.class); 
        job.setJobName("KNN");

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(KNNMapper.class);
        job.setReducerClass(KNNReducer.class);
        // job.setInputFormatClass(KeyValueTextInputFormat.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(), new KNNDriver(), args);
        System.exit(exitCode);
    }
}

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KNNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

  int[][] train_vals;
  int[] train_label_vals;
  int train_rows;
  int test_rows;
  int cols;

  @Override
  public void setup(Context context) throws IOException, InterruptedException {
      Configuration conf = context.getConfiguration();

      // Path[] cacheFiles = context.getLocalCacheFiles();

      int train_rows = conf.getInt("train_rows", 0);
      int test_rows = conf.getInt("test_rows", 0);
      int cols = conf.getInt("cols", 0);

      train_vals = new int[train_rows][cols];
      train_label_vals = new int[train_rows];

      // read train csv, parse, and store into 2d int array
      Scanner myScan;
        try {
            myScan = new Scanner(new File("train_sample.csv"));

            //Set the delimiter used in file
            myScan.useDelimiter("[,\r\n]+");

            //Get all tokens and store them in some data structure
            //I am just printing them

            System.out.println("myScan loaded for train_sample");

            for(int row = 0; row < train_rows; row++) {
                for(int col = 0; col < cols; col++) {
                    train_vals[row][col] = Integer.parseInt(myScan.next().toString());

                }
            }

            myScan.close();

        } catch (FileNotFoundException e) {
            System.out.print("Error: Train file not found.");
        }

    // read train_labels csv, parse, and store into 2d int array
        try {
            myScan = new Scanner(new File("train_labels.csv"));

            //Set the delimiter used in file
            myScan.useDelimiter("[,\r\n]+");

            //Get all tokens and store them in some data structure
            //I am just printing them

            System.out.println("myScan loaded for train_sample");


            for(int row = 0; row < train_rows; row++) {
                    train_label_vals[row] = Integer.parseInt(myScan.next().toString());
            }

            myScan.close();

        } catch (FileNotFoundException e) {
            System.out.print("Error: Train Labels file not found.");
        }
  }

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

        // setup() gave us train_vals & train_label_vals.
        // Each line in map() represents a test observation.  We iterate 
        // through every train_val row to find nearest L2 match, then
        // return a key/value pair of <observation #, 

        // convert from Text to String
        String line = value.toString();
        long distance;
        double best_distance = Double.POSITIVE_INFINITY;
        int col_num;

        int best_digit = -1;
        IntWritable rowId = null;
        int i;
        IntWritable rowNum;
        String[] pixels;

        // comma delimited files, split on commas
        // first we find the # of rows
        for (i = 0; i < train_rows; i++) {
            distance = 0;
            col_num = 0;
            pixels = line.split(",");
            rowId = new IntWritable(Integer.parseInt(pixels[0]));

            for (int j = 1; j < cols; j++) {
                distance += (Integer.parseInt(pixels[j]) - train_vals[i][j-1])^2;
            }
            if (distance < best_distance) {
                best_distance = distance;
                best_digit = train_label_vals[i];
            }
        }
        context.write(rowId, new IntWritable(best_digit));
  }
}

我注释掉了 Path... 语句,因为我不明白它的作用,或者它如何将文件数据发送到映射器,但我注意到它在几个网站上列出。目前,即使将分布式缓存数据集上传到 HDFS,程序也找不到。

【问题讨论】:

  • 你为什么要评论 // Path[] cacheFiles = context.getLocalCacheFiles(); cacheFiles 是你需要的

标签: hadoop


【解决方案1】:

尝试使用符号链接:

DistributedCache.createSymlink(conf);
DistributedCache.addCacheFile(new URI("cacheData/train_sample.csv#train_sample.csv"),conf);
DistributedCache.addCacheFile(new URI("cacheData/train_labels.csv#train_labels.csv"),conf);

这将使映射器的本地目录中的文件以您实际尝试访问的名称可用。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-06-02
    • 2021-10-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多