【问题标题】:Spark - Can a MultiMap be converted to a DataFrame in JAVASpark - 可以将 MultiMap 转换为 JAVA 中的 DataFrame
【发布时间】:2016-02-20 05:14:46
【问题描述】:

我正在尝试将包含数十亿数据值的 MultiMap 转换为 Spark DataFrame 以运行计算,然后将结果写入 cassandra 表。

我从以下 cassandra 查询和循环生成多图。如果有更好的方法来获取这些数据并将其操作到 DataFrame 中,我很乐意接受建议,就像我使用循环一样。

代码已更新为答案:

//Build ResultSet from cassandra query for data manipulation.
        Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
        //Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
        stmt.setFetchSize(1000);
        ResultSet results = session.execute(stmt);

// Get the Variables from each Row of Cassandra Data        
 Multimap<Double, Float> data = LinkedListMultimap.create();
        for (Row row : results){       
           // Column Names in Cassandra (Case Sensitive)
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth"); 

// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.            
                for(channel = 1.6000E8; channel <= channel_end;  ){ 
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {     
                     data.put(channel, power);
                    }  // end if
                    channel+=increment;
                }  // end for      
        } // end "row" for

// Create Spark List for DataFrame        
        List<Value> values = data.asMap().entrySet()
            .stream()
            .flatMap(x -> x.getValue()
                    .stream()
                    .map(y -> new Value(x.getKey(), y)))
            .collect(Collectors.toList());

// Create DataFrame and Calculate Results
    sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
        .agg(min("power"), max("power"), avg("power"))
        .write().mode(SaveMode.Append)      
        .option("table", "results")
        .option("keyspace", "model")
        .format("org.apache.spark.sql.cassandra").save();

    } // end session
} // End Compute 

public class Value implements Serializable {
    public Value(Double channel, Float power) {
        this.channel = channel;
        this.power = power;
    }
    Double channel;
    Float power;

    public void setChannel(Double channel) {
        this.channel = channel;
    }
    public void setPower(Float power) {
        this.power = power;
    }
    public Double getChannel() {
        return channel;
    }
    public Float getPower() {
        return power;
    }

    @Override
    public String toString() {
        return "[" +channel +","+power+"]";
    }
}

示例多图具有 {Double}=[Float] 类型,其中每个 Double 可能有多个 Float 项

示例

{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11]

我需要使用 spark 来获得这些的最小值、最大值和平均值。例如对于第一个 1.50ED 将是最小 10,最大 20,平均 15。

我已经有了可以使用的代码,一旦我可以将它放在一个临时表中并作为数据框进行操作:

queryMV.groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)      
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();

我将不胜感激有关如何使用 JAVA 将多图转换为 DataFrame 的一些提示。我还没有找到任何关于使用带火花的多图的文档。

我目前使用的解决方案是执行初始查询并使用 for 循环将原始数据写入一个新表,我可以反过来直接映射到一个临时表/数据帧,但这需要很长时间,因为我必须写数十亿在计算之前到 cassandra 的行数。我想使用 multimap 或类似的东西,直接转换为 spark 进行计算。

【问题讨论】:

  • 我想使用 spark,因为这个计算需要处理超过十亿个不同的值。该表将如下所示:'key: value, value, value' 我需要获取键并获取值的最小值、最大值和平均值。例如,如果我的键是 1.50E8,我的值是 10, 20,我的输出应该是 1.50E8 Min 10, Max 20, Avg 15

标签: java apache-spark guava multimap


【解决方案1】:

唉,Java parallelize 方法采用T 的列表或parallelizePairsTuple&lt;K, V&gt; 列表。所以你需要转换。而createDataFrame 仅适用于 RDD 和 Scala Seq 并且需要架构(bean 或 StructType)。

为了让它更有趣com.google.common.collect.ImmutableEntry 是不可序列化的,因此您需要在 Java 中进行转换,因此除非您将转换逻辑移到 Java 中,否则 @Pankaj Arora 解决方案的 Java 版本将无法工作。 IE。

public class Value implements Serializable {
    public Value(Double a, Float b) {
        this.a = a;
        this.b = b;
    }
    Double a;
    Float b;

    public void setA(Double a) {
        this.a = a;
    }
    public void setB(Float b) {
        this.b = b;
    }
    public Double getA() {
        return a;
    }
    public Float getB() {
        return b;
    }

    public String toString() {
        return "[" +a +","+b+"]";
    }
}


    Multimap<Double, Float> data = LinkedListMultimap.create();
    data.put(1d, 1f);
    data.put(1d, 2f);
    data.put(2d, 3f);

    List<Value> values = data.asMap().entrySet()
            .stream()
            .flatMap(x -> x.getValue()
                    .stream()
                    .map(y -> new Value(x.getKey(), y)))
            .collect(Collectors.toList());

    sqlContext.createDataFrame(sc.parallelize(values), Value.class).show();

鉴于您的编辑,我会考虑从一开始就创建对象(而不是多图)。

【讨论】:

  • 你能提供一个循环的例子吗?
  • 完成。但是你用过那个多重地图吗?我建议您跳过该步骤并直接在加载步骤中创建(更好地命名)Value 对象。
  • 我只使用多重映射,因为我不确定在将所有数据发送到数据帧之前如何正确编译所有数据。如果您知道更好的方法,我会喜欢这个建议,因为它有效地对数十亿个对象进行操作是至关重要的。顺便说一下,我的多地图版本的解决方案效果很好。
  • 如果数据量很大,您可能无法使用并行化加载它。因为必须将整个数据加载到驱动程序内存中,而且如果驱动程序有足够的内存,它会太慢。你在从 hdfs 读取数据吗?
  • 如果它是一个大型数据集,那么也许寻找一个 spark cassandra 连接器是要走的路。
【解决方案2】:
case class Output(a : Double ,b : Int )
val input = Map(1.50E8-> List(10, 20) ,  1.51E8-> List( -10, -13, -14, -15 ), 1.52E8-> List(-10, -11)).toArray
val inputRdd = sc.parallelize(input)
val queryMV = inputRdd.flatMap(x=> x._2.map(y=> Output(x._1, y))).toDF

【讨论】:

  • 这看起来很有希望。不过,我需要做的一件事是在不知道值是什么的情况下使用 Map。这似乎是 SCALA 除了没有 val 前缀之外,它在 JAVA 中是否相同?
猜你喜欢
  • 2017-03-25
  • 2017-05-09
  • 2018-03-31
  • 1970-01-01
  • 2022-01-08
  • 2017-03-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多