【问题标题】:Why can't the comma be used as the delimiter in Apache Flink为什么在 Apache Flink 中不能使用逗号作为分隔符
【发布时间】:2020-04-28 12:37:32
【问题描述】:

我正在学习如何使用Apache Flink和使用Python3.5编写一个简单的Word Count。

这是我的 Python 代码:

#!/usr/bin/python3.5

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
import os

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

input = './input'
output = './output'
try:
    os.remove(input)
    os.remove(output)
except OSError:
    pass
fd = os.open(input, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o666)

msg = 'aaa\nbbb\nccc'      # <------------- look at here
# msg = 'aaa,bbb,ccc'      # <------------- not working as expected
os.write(fd, msg.encode())

t_env.connect(FileSystem().path(input)) \
    .with_format(OldCsv()            # <-------------- look at here
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path(output)) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource') \
    .group_by('word') \
    .select('word, count(1)') \
    .insert_into('mySink')

t_env.execute("python_job")

它按预期工作。执行此脚本后,我得到一个名为output的文件:

aaa     1
bbb     1
ccc     1

如您所见,我将msg的内容,也就是aaa\nbbb\nccc,写入到文件input中。

然后,我认为输入文件的格式似乎是CSV。所以我尝试将变量msg 更改为aaa,bbb,ccc

但是,现在输出变为aaa 1。第一个逗号后面的部分似乎丢失了。

我不明白为什么。既然是CSV文件,为什么不能用逗号作为分隔符?

【问题讨论】:

    标签: python python-3.x csv apache-flink


    【解决方案1】:

    逗号是默认的字段分隔符,所以如果你删除了这一行

    .field_delimiter('\t')
    

    你会得到你想要的,或者你可以通过

    .field_delimiter(',')
    

    OldCsv 表源描述为here in the documentation

    【讨论】:

      猜你喜欢
      • 2013-11-03
      • 1970-01-01
      • 2015-03-15
      • 2012-07-17
      • 2014-07-16
      • 2016-03-07
      • 1970-01-01
      相关资源
      最近更新 更多