【发布时间】:2020-04-28 12:37:32
【问题描述】:
我正在学习如何使用Apache Flink和使用Python3.5编写一个简单的Word Count。
这是我的 Python 代码:
#!/usr/bin/python3.5
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
import os
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
input = './input'
output = './output'
try:
os.remove(input)
os.remove(output)
except OSError:
pass
fd = os.open(input, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o666)
msg = 'aaa\nbbb\nccc' # <------------- look at here
# msg = 'aaa,bbb,ccc' # <------------- not working as expected
os.write(fd, msg.encode())
t_env.connect(FileSystem().path(input)) \
.with_format(OldCsv() # <-------------- look at here
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path(output)) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
t_env.from_path('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
t_env.execute("python_job")
它按预期工作。执行此脚本后,我得到一个名为output的文件:
aaa 1
bbb 1
ccc 1
如您所见,我将msg的内容,也就是aaa\nbbb\nccc,写入到文件input中。
然后,我认为输入文件的格式似乎是CSV。所以我尝试将变量msg 更改为aaa,bbb,ccc。
但是,现在输出变为aaa 1。第一个逗号后面的部分似乎丢失了。
我不明白为什么。既然是CSV文件,为什么不能用逗号作为分隔符?
【问题讨论】:
标签: python python-3.x csv apache-flink