【问题标题】:CSV to elasticsearch with python SerializationErrorCSV到elasticsearch与python SerializationError
【发布时间】:2018-10-04 17:41:18
【问题描述】:

当我尝试将 bulk_data 发送到本地弹性搜索时,由于 SerializationError,我的数据没有加载。

我已经尝试填充 csv 文件中的空单元格,但这不是解决方案。

from elasticsearch import Elasticsearch

bulk_data = []
header = []
count = 0
for row in csv_file_object:
    if count > 0 : 
        data_dict = {}
        for i in range(len(row)):
            row = row.rstrip() 
            data_dict[header[i]] = row[i]
        op_dict = {
            "index": {
                "_index": INDEX_NAME, 
                "_type": TYPE_NAME, 
            }
        }
        bulk_data.append(op_dict)
        bulk_data.append(data_dict)
    else:
        header = row
    count = count+1

# create ES client, create index
es = Elasticsearch(hosts = [ES_HOST])
if es.indices.exists(INDEX_NAME):
    print("deleting '%s' index..." % (INDEX_NAME))
    res = es.indices.delete(index = INDEX_NAME)
    
res = es.bulk(index = INDEX_NAME, body = bulk_data, refresh = True)

请参阅图片了解 SerializationError 和 bulk_data 值:

请注意:\n 是由序列化过程本身添加的。

【问题讨论】:

  • 不,添加“\n”是因为您不剥离行。应该是 row = row.rstrip()
  • 以字节为单位弹性转换您的行字符串?这种行为正确吗?或者你只会索引一个数字数组?
  • @Lupanoide,我想从 csv 添加 4 列。但目前我只得到索引到 120?查看我的编辑,我循环遍历行值以添加到 data_dict
  • 您的 csv 中有 5 列,所需的输出应该是 {"105": [0, 10262233 , 20 , 10262233, 1] , "110": ...} 对吗?跨度>
  • @Lupanoide 感谢您的帮助!是的,所需的输出应该是 {"105": [0, 10262233 , 20 , 10262233, 1] , "110": ...} 当前输出类似于:{44: 50 95:51 97:49 99: 51 100:}

标签: python csv elasticsearch


【解决方案1】:

我试图回复你,但我无法理解一件事。如何从数据中检索字段名称?在您的代码中,我看到您从名为header 的列表中检索它,该列表为空?我不明白你是怎么取这个值的。。检查我的答案,我不知道我是否理解得很好

from elasticsearch import Elasticsearch
from elasticsearch import helpers


index_name = "your_index_name"
doc_type = "your_doc_type"
esConnector = Elasticsearch(["http://192.168.1.1:9200/"])
# change your ip here
count = 0

def generate_data(csv_file_object)
    with open(csv_file_object, "r") as f:
        for line in f:
           line = line.split(",").rstrip()
           data_dict = {header[count]: line} 
           obj={
            '_op_type': 'index',
            '_index': index_name,
            '_type': doc_type,
            '_id': count+1,
            '_source': data_dict
                }
            count +=1
            yield obj


for success, info in helpers.parallel_bulk(client=esConnector, actions=generate_data(csv_file_object), thread_count=4):
    if not success: 
        print 'Doc failed', info

【讨论】:

    猜你喜欢
    • 2017-01-11
    • 2017-05-25
    • 1970-01-01
    • 2017-08-17
    • 2021-10-07
    • 2019-04-26
    • 2015-06-23
    • 2015-03-23
    • 2017-01-26
    相关资源
    最近更新 更多