我们遇到了完全相同的问题,正如@daggett 强调的那样 - 您如何检测哪个引号是字段的结尾?我们甚至与 Cloudera 进行了交谈,一切都归结为数据不符合 CSV 标准规则。
因此编写了一个使用ExecuteScript 处理器调用的小型python 脚本,并且能够转义几乎所有特殊字符,除非双引号和分隔符是数据的一部分,例如。 "field_1","field_2 this is very invalid", data","field_3"
试一试,如果它有效,请发表评论,以便我们可以将逻辑包含到自定义处理器中!
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
outer_new_value_list = []
is_header_row = True
for row in lines:
if is_header_row:
is_header_row = False
outer_new_value_list.append(row)
continue
char_list = list(row.strip())
for position, char in enumerate(char_list):
#print(position, char)
# if position == 54:
# print()
if (position + 1) == len(char_list):
continue
if position == 0:
continue
else:
if char == '"':
if char_list[position - 1] == ',' or char_list[position + 1] == ',':
# this double quote is Quote Character at start of field or end of field
continue
if char_list[position - 1] != ',' and char_list[position + 1] != ',':
# this double quote is inbetween and is not Quote Character, add escape character to it
replace_char = '\\' + char
char_list[position] = replace_char
if char == ',':
# Int values are not in double quotes, so check previous and next char is of int type
previous_char_type = ''
next_char_type = ''
try:
previous_char = char_list[position - 1]
if isinstance(int(previous_char), int):
previous_char_type = 'Int'
except:
pass
# print('previous_char : ' + str(previous_char))
try:
next_char = char_list[position + 1]
if isinstance(int(next_char), int):
next_char_type = 'Int'
except:
pass
# print(" next_char: " + str(next_char))
if previous_char_type == 'Int' or next_char_type == 'Int':
print('No need to replace this instance of comma')
continue
if char_list[position - 1] == '"' or char_list[position + 1] == '"':
# delimited comma
continue
if char_list[position - 1] != '"' and char_list[position + 1] != '"':
# not delimited comma, inbetween comma, add with escape character to it
replace_char = '\\' + char
char_list[position] = replace_char
if char == '\\':
replace_char = ''
char_list[position] = replace_char
new_data_line = ''.join([str(elem) for elem in char_list])
outer_new_value_list.append(new_data_line + '\r\n')
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in outer_new_value_list)
# end class
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end