【问题标题】:Create New Rows from Oracle CLOB and Write to HDFS从 Oracle CLOB 创建新行并写入 HDFS
【发布时间】:2020-11-10 11:34:39
【问题描述】:

在 Oracle 数据库中,我可以读取包含 CLOB 类型的表(注意换行符):

ID         MY_CLOB
001        500,aaa,bbb
           500,ccc,ddd
           480,1,2,bad
           500,eee,fff
002        777,0,0,bad
003        500,yyy,zzz

我需要处理这个,并导入一个 HDFS 表,其中每个 MY_CLOB 行以“500”开头的新行。在这种情况下,hive 表应如下所示:

ID     C_1    C_2    C_3
001    500    aaa    bbb
001    500    ccc    ddd
001    500    eee    fff
003    500    yyy    zzz

这个solution 我之前的问题成功地在Oracle 上产生了这个。但是使用 Python 驱动程序将结果写入 HDFS 非常慢,或者永远不会成功。

solution 之后,我测试了一个类似的正则表达式 + pyspark 解决方案,它可能适用于我的目的:

<!-- begin snippet: js hide: true -->
import cx_Oracle
#... query = """SELECT ID, MY_CLOB FROM oracle_table"""
#... cx_oracle_results <--- fetchmany results (batches) from query

import re
from pyspark.sql import Row
from pyspark.sql.functions import col
def clob_to_table(clob_lines):
    m = re.findall(r"^(500),(.*),(.*)", 
                   clob_lines, re.MULTILINE)
    return Row(C_1 = m.group(1), C_2 = m.group(2), C_3 = m.group(3))

# Process each batch of results and write to hive as parquet
for batch in cx_oracle_results():
    # batch is like [(1,<cx_oracle object>), (2,<cx_oracle object>), (3,<cx_oracle object>)]
    # When `.read()` looks like [(1,"500,a,b\n500c,d"), (2,"500,e,e"), (3,"500,z,y\n480,-1,-1")]
    df = sc.parallelize(batch).toDF(["ID", "MY_CLOB"])\
           .withColumn("clob_as_text", col("MY_CLOB")\
           .read()\  # Converts cx_oracle CLOB object to text.
           .map(clob_to_table)
    df.write.mode("append").parquet("myschema.pfile")

但是以这种方式读取 oracle 游标结果并将其输入 pyspark 效果不佳。

我正在尝试运行由另一个工具生成的 sqoop 作业,将 CLOB 作为文本导入,并希望我可以在合理的时间内将 sqooped 表处理成一个新的 hive 表。也许使用 pyspark 的解决方案类似于上述。

很遗憾,这个 sqoop 作业不起作用。

sqoop import -Doraoop.timestamp.string=false -Doracle.sessionTimeZone=America/Chicago 
-Doraoop.import.hint=" " -Doraoop.oracle.session.initialization.statements="alter session disable parallel query;" 
-Dkite.hive.tmp.root=/user/hive/kite_tmp/wassadamo --verbose 
--connect jdbc:oracle:thin:@ldap://connection/string/to/oracle 
--num-mappers 8 --split-by date_column 
--query "SELECT * FROM (
    SELECT ID, MY_CLOB
    FROM oracle_table
    WHERE ROWNUM <= 1000
    ) WHERE \$CONDITIONS" 
--create-hive-table --hive-import --hive-overwrite --hive-database my_db 
--hive-table output_table --as-parquetfile --fields-terminated-by \| 
--delete-target-dir --target-dir $HIVE_WAREHOUSE --map-column-java=MY_CLOB=String 
--username wassadamo --password-file /user/wassadamo/.oracle_password

但是我得到一个错误(下面的sn-p):

20/07/13 17:04:08 INFO mapreduce.Job:  map 0% reduce 0%
20/07/13 17:05:08 INFO mapreduce.Job: Task Id : attempt_1594629724936_3157_m_000001_0, Status : FAILED
Error: java.io.IOException: SQLException in nextKeyValue
...
Caused by: java.sql.SQLDataException: ORA-01861: literal does not match format string

这似乎是由将 CLOB 列映射到字符串引起的。我是根据这个answer做的。

我该如何解决这个问题?我也对不同的 pyspark 解决方案持开放态度

【问题讨论】:

  • 你有没有想过在数据库中拆分 clob 而不是在 pyspark 上进行拆分?如果您在数据库中进行艰苦的工作,它会运行得更快。
  • 我没有 UPDATE/CREATE 权限,只有 SELECT @RobertoHernandez。我尝试通过 Python 驱动程序首先运行 solution SQL,然后写入本地 csv,但正如我所说,它非常慢,或者永远不会终止。
  • 而且查询对于 sqoop 来说似乎太复杂了。但如果是这样,我应该何时以及如何解析 clob?
  • 我同意 sqoop 的查询非常复杂,但如果您没有创建视图的选项,那么您唯一的机会是。无论如何,我不认为 ORA-01861 是由于 map-column-java=clob:string 。这实际上是在 Hive 中导入 clob 的唯一方法。
  • 如果查询对于 sqoop 来说过于复杂,也许我可以在 Hive 中创建视图来代替?

标签: oracle pyspark hive sqoop clob


【解决方案1】:

部分答案:oracle 错误似乎是由于

--split-by date_column

这个date_column 是一个Oracle 日期类型。事实证明,从 Oracle 获取时它不起作用。能够在这个问题上分裂会很好。但拆分 ID (varchar2) 似乎有效。

高效解析文本 MY_CLOB 字段并为每行创建新行的问题仍然存在。

【讨论】:

    猜你喜欢
    • 2021-10-19
    • 2018-10-29
    • 2018-02-01
    • 2018-10-24
    • 1970-01-01
    • 2018-01-31
    • 2013-12-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多