【问题标题】:Insert data from pyspark dataframe to another cassandra table using pyspark使用 pyspark 将数据从 pyspark 数据帧插入到另一个 cassandra 表
【发布时间】:2020-08-04 04:30:01
【问题描述】:

我有一个 cassandra 表 - 测试

+----+---------+---------+
| id | country | counter |
+====+=========+=========+
|  A |      RU |       1 |
+----+---------+---------+
|  B |      EN |       2 |
+----+---------+---------+
|  C |      IQ |       1 |
+----+---------+---------+
|  D |      RU |       3 |
+----+---------+---------+

此外,我在同一空间中有一个表 ma​​in,其中包含“country_main”和“main_id”列。 在 main_id 列中,我有与测试表中相同的 id,而且我有一些唯一的 id。 country_main 具有空值,与测试中的相同。例如:

+---------+--------------+---------+
| main_id | country_main |      ...|
+=========+==============+=========+
|  A      |              |      ...|
+---------+--------------+---------+
|  B      |      EN      |      ...|
+---------+--------------+---------+
|  Y      |      IQ      |      ...|
+---------+--------------+---------+
|  Z      |      RU      |      ...|
+---------+--------------+---------+

如何使用pyspark将test表中的数据插入到main中,根据ids填充country_main中的空值?

【问题讨论】:

    标签: apache-spark pyspark cassandra spark-cassandra-connector


    【解决方案1】:

    具有以下架构和数据:

    create table test.ct1 (
      id text primary key,
      country text,
      cnt int);
    
    insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
    insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
    insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
    insert into test.ct1(id, country, cnt) values('D', 'RU', 3);
    
    
    create table test.ct2 (
      main_id text primary key,
      country_main text,
      cnt int);
    
    insert into test.ct2(main_id, cnt) values('A', 1);
    insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
    insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
    insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);
    

    应该是这样的:

    from pyspark.sql.functions import *
    
    ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
       .option("table", "ct1").option("keyspace", "test").load()
    
    ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
      .option("table", "ct2").option("keyspace", "test").load()\
      .where(col("country_main").isNull())
    
    res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"), 
      col("country").alias("country_main"))
    res.write.format("org.apache.spark.sql.cassandra")\
       .option("table", "ct2").option("keyspace", "test")\
       .mode("append").save()
    

    代码的作用:

    1. ct2(对应于您的main 表)中选择所有行,其中country_mainnull
    2. 执行与ct1 的连接(对应于您的test 表)以从中获取国家/地区的值(优化可能是从两个表中仅选择必要的列)。另外,请注意连接是由 Spark 完成的,而不是在 Cassandra 级别上 - 只有即将发布的 Spark Cassandra 连接器版本(3.0,但 alpha 版本已经发布)才支持 Cassandra 级别的连接;
    3. 重命名列以匹配ct2 表的结构;
    4. 写回数据。

    结果:

    cqlsh> select * from test.ct2;
    
     main_id | cnt | country_main
    ---------+-----+--------------
           C |   1 |           IQ
           B |   2 |           EN
           A |   1 |           RU
           D |   3 |           RU
    

    对于源数据:

    cqlsh> select * from test.ct2;
    main_id | cnt | country_main
    ---------+-----+--------------                                       
           C |   1 |           IQ                                  
           B |   2 |           EN                                                                                         
           A |   1 |         null                                      
           D |   3 |           RU
    

    【讨论】:

    • 我在 res. 中得到空表。
    • 该代码已经过测试......我认为有一些表结构等。或者你有country_main的空字符串,而不是我的例子中的null......只要做ct2.show() 检查您是否选择了一些要加入的数据
    • 是的,我发现这是什么问题。 ct1 中的数据未与 ct2 中的数据相交。谢谢!
    猜你喜欢
    • 2019-10-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-23
    相关资源
    最近更新 更多