【问题标题】:How to handle nested double quotes in Nifi?如何处理 Nifi 中的嵌套双引号?
【发布时间】:2021-08-14 16:08:31
【问题描述】:

我们有一个带有嵌套双引号列的 csv 文件。

例如:1,John,26,"你好吗"吉姆"。

在这个例子中,我们有 4 列 id、name、age 和 message。

这里的消息列有嵌套的双引号,这导致 convertRecord Nifi 处理器中的数据解析问题(无法解析传入数据错误)。有什么方法可以转义嵌套的双引号并正确读取数据?

如下图所示,我们在 CSVReader 和 CSVRecordSetWritter 控制器服务中都使用了以下属性。

【问题讨论】:

  • 不是有效的 csv 格式。通常双引号必须用另一个双引号转义......所以,最好在生成 csv 的地方解决这个问题。
  • 很遗憾,我们无法更改来源并更正数据。只需要在 Nifi 中找到方法。
  • 如何检测哪个引号是字段的结尾?如果您有最后一个字段 - 那么可以使用脚本来修复格式。
  • @dagget 现在我们将文件格式从 CSV 更改为制表符分隔文件,并将控制器服务中的引用字符属性从双引号更改为非英文字符。截至目前,数据已通过此更改正确解析。

标签: apache-nifi cloudera


【解决方案1】:

我们遇到了完全相同的问题,正如@daggett 强调的那样 - 您如何检测哪个引号是字段的结尾?我们甚至与 Cloudera 进行了交谈,一切都归结为数据不符合 CSV 标准规则。

因此编写了一个使用ExecuteScript 处理器调用的小型python 脚本,并且能够转义几乎所有特殊字符,除非双引号和分隔符是数据的一部分,例如。 "field_1","field_2 this is very invalid", data","field_3"

试一试,如果它有效,请发表评论,以便我们可以将逻辑包含到自定义处理器中!

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re


# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        with wrap(inputStream) as f:
            lines = f.readlines()
            outer_new_value_list = []
            is_header_row = True
            for row in lines:
                if is_header_row:
                    is_header_row = False
                    outer_new_value_list.append(row)
                    continue

                char_list = list(row.strip())

                for position, char in enumerate(char_list):
                    #print(position, char)
                    # if position == 54:
                    #     print()
                    if (position + 1) == len(char_list):
                        continue
                    if position == 0:
                        continue
                    else:
                        if char == '"':
                            if char_list[position - 1] == ',' or char_list[position + 1] == ',':
                                # this double quote is Quote Character at start of field or end of field
                                continue
                            if char_list[position - 1] != ',' and char_list[position + 1] != ',':
                                # this double quote is inbetween and is not Quote Character, add  escape character to it
                                replace_char = '\\' + char
                                char_list[position] = replace_char
                        if char == ',':
                            # Int values are not in double quotes, so check previous and next char is of int type
                            previous_char_type = ''
                            next_char_type = ''
                            try:
                                previous_char = char_list[position - 1]
                                if isinstance(int(previous_char), int):
                                    previous_char_type = 'Int'
                            except:
                                pass
                                # print('previous_char : ' + str(previous_char))

                            try:
                                next_char = char_list[position + 1]
                                if isinstance(int(next_char), int):
                                    next_char_type = 'Int'
                            except:
                                pass
                            # print(" next_char: " + str(next_char))

                            if previous_char_type == 'Int' or next_char_type == 'Int':
                                print('No need to replace this instance of comma')
                                continue

                            if char_list[position - 1] == '"' or char_list[position + 1] == '"':
                                # delimited comma
                                continue
                            if char_list[position - 1] != '"' and char_list[position + 1] != '"':
                                # not delimited comma, inbetween comma, add with escape character to it
                                replace_char = '\\' + char
                                char_list[position] = replace_char
                        if char == '\\':
                            replace_char = ''
                            char_list[position] = replace_char

                new_data_line = ''.join([str(elem) for elem in char_list])
                outer_new_value_list.append(new_data_line + '\r\n')

            with wrap(outputStream, 'w') as filehandle:
                filehandle.writelines("%s" % line for line in outer_new_value_list)


# end class
flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end

【讨论】:

  • 感谢您的回复。现在我们将文件格式从 CSV 更改为制表符分隔文件,并将控制器服务中的引用字符属性从双引号更改为非英文字符。截至目前,数据已通过此更改正确解析。
猜你喜欢
  • 2013-09-21
  • 2014-11-14
  • 2021-02-04
  • 2011-11-16
  • 2019-03-16
  • 2021-01-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多