【问题标题】:get Data from DB for each row DataFrame Pyspark从数据库中获取每一行 DataFrame Pyspark 的数据
【发布时间】:2018-01-11 09:43:46
【问题描述】:

我在流式传输上下文中使用 Pyspark Dataframe API,我已在我的 spark 流式传输应用程序中将 RDD 转换为 DF foreach DStream(我使用的是 kafka 接收器)这是我在流程 RDD 函数中所做的:

rowRdd = data_lined_parameters.map(
        lambda x: Row(SYS=x[0], METRIC='temp', SEN=x[1], OCCURENCE=x[2], THRESHOLD_HIGH=x[3], OSH=x[4], OSM=x[5], OEH=x[6], OEM=x[7],OSD=x[8],OED=x[9],REMOVE_HOLIDAYS=x[10],TS=x[11],VALUE=x[12],DAY=x[13],WEEKDAY=x[14],HOLIDAY=x[15]))
rawDataDF = sqlContext.createDataFrame(rowRdd)

rawDataRequirementsCheckedDF = rawDataDF.filter("WEEKDAY <= OED AND WEEKDAY >=OSD AND HOLIDAY = false  VALUE > THRESHOLD_HIGH  ")

我的下一步是使用 hbase 表中的新列来丰富我 rawDataRequirementsCheckedDF 中的每一行,我的问题是从 hbase (phoenix) 获取数据并将其加入我的原始数据框的最有效方法是什么:

--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+
|                 DAY|HOLIDAY|METRIC|OCCURENCE|OED|OEH|OEM|OSD|OSH|OSM|REMOVE_HOLIDAYS|SEN|             SYS|THRESHOLD_HIGH|                  TS|  VALUE|WEEKDAY|
+--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+
|2017-08-03 00:00:...|  false|  temp|        3|  4| 19| 59|  0|  8|  0|           TRUE|  1|0201|            26|2017-08-03 16:22:...|28.4375|      3|
|2017-08-03 00:00:...|  false|  temp|        3|  4| 19| 59|  0|  8|  0|           TRUE|  1|0201|            26|2017-08-03 16:22:...|29.4375|      3|
+--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+

hbase 表的主键是 DAY,SYS,SEN ,所以会产生相同格式的数据框。

编辑:

这是我迄今为止尝试过的:

sysList = rawDataRequirementsCheckedDF.map(lambda x : "'"+x['SYS']+"'").collect()
df_sensor = sqlContext.read.format("jdbc").option("dbtable","(select DATE,SYSTEMUID,SENSORUID,OCCURENCE from ANOMALY where SYSTEMUID in ("+','.join(sysList)+") )").option("url", "jdbc:phoenix:clustdev1:2181:/hbase-unsecure").option("driver", "org.apache.phoenix.jdbc.PhoenixDriver").load()
df_anomaly = rawDataRequirementsCheckedDF.join(df_sensor, col("SYS") == col("SYSTEMUID"), 'outer')

【问题讨论】:

    标签: python apache-spark pyspark spark-streaming


    【解决方案1】:

    我从 HBase 获取数据的一种简单方法是将表创建到 phoenix 中,然后加载到 spark 中。这是 Apache Phoenix 页面的 Apache Spark 插件部分

    df = sqlContext.read \
    .format("org.apache.phoenix.spark") \
    .option("table", "TABLE1") \
    .option("zkUrl", "localhost:2181") \
    .load()
    

    Apache Spark 插件链接:https://phoenix.apache.org/phoenix_spark.html

    【讨论】:

    • 感谢您的回答,我的问题可能不清楚,但我的问题是我的 sql 请求中使用的参数是从我的 rawDataRequirementsCheckedDF 发出的,我需要从我的 hbase 获取 foreach SYS 数据表
    • 我正在尝试做类似的事情: sysList = rawDataRequirementsCheckedDF.map(lambda x : "'"+x['SYS']+"'").collect() df_sensor = sqlContext.read.format ("jdbc").option("dbtable","(select OCCURENCE from ANOMALY where SYSTEMUID in ("+','.join(sysList)+"))").option("url", "jdbc:phoenix: dev1:2181:/hbase-unsecure").option("driver", "org.apache.phoenix.jdbc.PhoenixDriver").load() 但我不确定这是否是最好的方法
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-09-02
    • 2015-07-23
    • 1970-01-01
    • 2021-10-28
    • 1970-01-01
    • 2023-01-12
    • 1970-01-01
    相关资源
    最近更新 更多