【问题标题】:How to get element by Index in Spark RDD (Java)如何在 Spark RDD(Java)中按索引获取元素
【发布时间】:2015-01-05 20:56:04
【问题描述】:

我知道 rdd.firstwfirst() 方法,它为我提供了 RDD 中的第一个元素。

还有一个方法 rdd.take(num) 它给了我第一个“num”个元素。

但是不是有可能通过索引获取元素吗?

谢谢.e

【问题讨论】:

    标签: java apache-spark rdd


    【解决方案1】:

    我也被困了一段时间,所以扩展 Maasg 的答案,但回答是按 Java 的索引查找一系列值(您需要在顶部定义 4 个变量):

    DataFrame df;
    SQLContext sqlContext;
    Long start;
    Long end;
    
    JavaPairRDD<Row, Long> indexedRDD = df.toJavaRDD().zipWithIndex();
    JavaRDD filteredRDD = indexedRDD.filter((Tuple2<Row,Long> v1) -> v1._2 >= start && v1._2 < end);
    DataFrame filteredDataFrame = sqlContext.createDataFrame(filteredRDD, df.schema());
    

    请记住,当您运行此代码时,您的集群需要有 Java 8(因为正在使用 lambda 表达式)。

    另外,zipWithIndex 可能很昂贵!

    【讨论】:

    • 嗨,你能指导 Java 7 的类似解决方案吗?
    • code rdd.filter(new Function, Boolean>(){ public Boolean call(Tuple2 v1){ return v1._2 >= start && v1._2 code
    【解决方案2】:

    这应该可以通过首先索引 RDD 来实现。转换zipWithIndex 提供了稳定的索引,按原始顺序对每个元素进行编号。

    给定:rdd = (a,b,c)

    val withIndex = rdd.zipWithIndex // ((a,0),(b,1),(c,2))
    

    要按索引查找元素,这种形式没有用处。首先我们需要使用索引作为键:

    val indexKey = withIndex.map{case (k,v) => (v,k)}  //((0,a),(1,b),(2,c))
    

    现在,可以在 PairRDD 中使用 lookup 操作按键查找元素:

    val b = indexKey.lookup(1) // Array(b)
    

    如果您希望经常在同一个 RDD 上使用 lookup,我建议缓存 indexKey RDD 以提高性能。

    如何使用Java API 完成此操作是留给读者的练习。

    【讨论】:

    • 还要记住,RDD 的排序是未定义的,除非它已排序。每次评估时它都可以改变。也可以map(_.swap)切换键值。
    • 你留下的“作为读者练习”是这个问题的主要复杂性。
    • @user1843507 您认为缺少什么? zipWithIndexlookup 在 Java 中是相同的,所以只有 map 函数需要一些小的努力来将 (key, value) 的值交换为 (value, key) 这不是这个问题的核心。跨度>
    • 旧线程,但我添加了一个基于 Java 的答案(尽管寻找的是一系列记录而不是单个记录)。
    • 你真的希望indexKey被分区。
    【解决方案3】:

    我试过这个类按索引获取一个项目。首先,当你构造new IndexedFetcher(rdd, itemClass)时,它会统计RDD的每个分区中的元素个数。然后,当您调用indexedFetcher.get(n) 时,它仅在包含该索引的分区上运行作业。

    请注意,我需要使用 Java 1.7 而不是 1.8 来编译它;从 Spark 1.1.0 开始,com.esotericsoftware.reflectasm 中捆绑的 org.objectweb.asm 还不能读取 Java 1.8 类(当您尝试运行 Java 1.8 函数时抛出 IllegalStateException)。

    import java.io.Serializable;
    
    import org.apache.spark.SparkContext;
    import org.apache.spark.TaskContext;
    import org.apache.spark.rdd.RDD;
    
    import scala.reflect.ClassTag;
    
    public static class IndexedFetcher<E> implements Serializable {
        private static final long serialVersionUID = 1L;
        public final RDD<E> rdd;
        public Integer[] elementsPerPartitions;
        private Class<?> clazz;
        public IndexedFetcher(RDD<E> rdd, Class<?> clazz){
            this.rdd = rdd;
            this.clazz = clazz;
            SparkContext context = this.rdd.context();
            ClassTag<Integer> intClassTag = scala.reflect.ClassTag$.MODULE$.<Integer>apply(Integer.class);
            elementsPerPartitions = (Integer[]) context.<E, Integer>runJob(rdd, IndexedFetcher.<E>countFunction(), intClassTag);
        }
        public static class IteratorCountFunction<E> extends scala.runtime.AbstractFunction2<TaskContext, scala.collection.Iterator<E>, Integer> implements Serializable {
            private static final long serialVersionUID = 1L;
            @Override public Integer apply(TaskContext taskContext, scala.collection.Iterator<E> iterator) {
                int count = 0;
                while (iterator.hasNext()) {
                    count++;
                    iterator.next();
                }
                return count;
            }
        }
        static <E> scala.Function2<TaskContext, scala.collection.Iterator<E>, Integer> countFunction() {
            scala.Function2<TaskContext, scala.collection.Iterator<E>, Integer> function = new IteratorCountFunction<E>();
            return function;
        }
        public E get(long index) {
            long remaining = index;
            long totalCount = 0;
            for (int partition = 0; partition < elementsPerPartitions.length; partition++) {
                if (remaining < elementsPerPartitions[partition]) {
                    return getWithinPartition(partition, remaining);
                }
                remaining -= elementsPerPartitions[partition];
                totalCount += elementsPerPartitions[partition];
            }
            throw new IllegalArgumentException(String.format("Get %d within RDD that has only %d elements", index, totalCount));
        }
        public static class FetchWithinPartitionFunction<E> extends scala.runtime.AbstractFunction2<TaskContext, scala.collection.Iterator<E>, E> implements Serializable {
            private static final long serialVersionUID = 1L;
            private final long indexWithinPartition;
            public FetchWithinPartitionFunction(long indexWithinPartition) {
                this.indexWithinPartition = indexWithinPartition;
            }
            @Override public E apply(TaskContext taskContext, scala.collection.Iterator<E> iterator) {
                int count = 0;
                while (iterator.hasNext()) {
                    E element = iterator.next();
                    if (count == indexWithinPartition)
                        return element;
                    count++;
                }
                throw new IllegalArgumentException(String.format("Fetch %d within partition that has only %d elements", indexWithinPartition, count));
            }
        }
        public E getWithinPartition(int partition, long indexWithinPartition) {
            System.out.format("getWithinPartition(%d, %d)%n", partition, indexWithinPartition);
            SparkContext context = rdd.context();
            scala.Function2<TaskContext, scala.collection.Iterator<E>, E> function = new FetchWithinPartitionFunction<E>(indexWithinPartition);
            scala.collection.Seq<Object> partitions = new scala.collection.mutable.WrappedArray.ofInt(new int[] {partition});
            ClassTag<E> classTag = scala.reflect.ClassTag$.MODULE$.<E>apply(this.clazz);
            E[] result = (E[]) context.<E, E>runJob(rdd, function, partitions, true, classTag);
            return result[0];
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-04-10
      • 1970-01-01
      • 2015-12-26
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多