【问题标题】:How to generate csv response in elasticsearch?如何在弹性搜索中生成 csv 响应?
【发布时间】:2017-01-20 12:30:20
【问题描述】:

我们知道,Elasticsearchrest apis 返回 json response。但是,我需要来自这些 api 的 CSV responses

我正在寻找与 Solr 中可用的类似功能。Solr 提供 CSV response writer 使用它,我们可以轻松地在 csv form 中获得响应。

Elasticsearch 如何实现这一点?

注意我不打算将 ElasticSearch 集群的所有内容导出为 csv 格式。我想query elasticsearch rest apis 并在csv format 而不是json 中获得响应。

更新

我一直在尝试使用@Val 在答案中推荐的方法来使用logstash

下面是logstash-plain.log的内容

    [2017-01-23T18:28:35,762][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-01-23T18:28:35,783][INFO ][logstash.pipeline        ] Pipeline main started
[2017-01-23T18:28:35,827][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"_index:megacorp AND first_name:Jane", id=>"9a67b0421108afd201382b21693e2173243dd144-1", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>"json_60457023-6344-4af7-a2c5-1e89d1fe08aa", enable_metric=>true, charset=>"UTF-8">, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"parse_exception","reason":"Failed to derive xcontent"}],"type":"parse_exception","reason":"Failed to derive xcontent"},"status":400}
[2017-01-23T18:28:35,881][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-01-23T18:28:36,838][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"_index:megacorp AND first_name:Jane", id=>"9a67b0421108afd201382b21693e2173243dd144-1", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>"json_60457023-6344-4af7-a2c5-1e89d1fe08aa", enable_metric=>true, charset=>"UTF-8">, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"parse_exception","reason":"Failed to derive xcontent"}],"type":"parse_exception","reason":"Failed to derive xcontent"},"status":400}
[2017-01-23T18:28:37,848][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"_index:megacorp AND first_name:Jane", id=>"9a67b0421108afd201382b21693e2173243dd144-1", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>"json_60457023-6344-4af7-a2c5-1e89d1fe08aa", enable_metric=>true, charset=>"UTF-8">, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"parse_exception","reason":"Failed to derive xcontent"}],"type":"parse_exception","reason":"Failed to derive xcontent"},"status":400}
[2017-01-23T18:28:38,865][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.

下面是elasticsearch.log的内容

    2017-01-23T19:06:38,633][INFO ][o.e.n.Node               ] [] initializing ...
[2017-01-23T19:06:38,751][INFO ][o.e.e.NodeEnvironment    ] [TgbIozs] using [1] data paths, mounts [[/ (/dev/sda8)]], net usable_space [36.9gb], net total_space [139.6gb], spins? [possibly], types [ext4]
[2017-01-23T19:06:38,752][INFO ][o.e.e.NodeEnvironment    ] [TgbIozs] heap size [1.9gb], compressed ordinary object pointers [true]
[2017-01-23T19:06:38,760][INFO ][o.e.n.Node               ] node name [TgbIozs] derived from node ID [TgbIozsCR5WWSm_8iU-Rdw]; set [node.name] to override
[2017-01-23T19:06:38,761][INFO ][o.e.n.Node               ] version[5.1.2], pid[7239], build[c8c4c16/2017-01-11T20:18:39.146Z], OS[Linux/3.16.0-70-generic/amd64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_77/25.77-b03]
[2017-01-23T19:06:39,764][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [aggs-matrix-stats]
[2017-01-23T19:06:39,765][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [ingest-common]
[2017-01-23T19:06:39,765][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [lang-expression]
[2017-01-23T19:06:39,765][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [lang-groovy]
[2017-01-23T19:06:39,765][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [lang-mustache]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [lang-painless]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [percolator]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [reindex]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [transport-netty3]
[2017-01-23T19:06:39,766][INFO ][o.e.p.PluginsService     ] [TgbIozs] loaded module [transport-netty4]
[2017-01-23T19:06:39,767][INFO ][o.e.p.PluginsService     ] [TgbIozs] no plugins loaded
[2017-01-23T19:06:42,342][INFO ][o.e.n.Node               ] initialized
[2017-01-23T19:06:42,342][INFO ][o.e.n.Node               ] [TgbIozs] starting ...
[2017-01-23T19:06:42,595][INFO ][o.e.t.TransportService   ] [TgbIozs] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300}
[2017-01-23T19:06:42,610][WARN ][o.e.b.BootstrapCheck     ] [TgbIozs] max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
[2017-01-23T19:06:42,611][WARN ][o.e.b.BootstrapCheck     ] [TgbIozs] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
[2017-01-23T19:06:45,816][INFO ][o.e.c.s.ClusterService   ] [TgbIozs] new_master {TgbIozs}{TgbIozsCR5WWSm_8iU-Rdw}{U2MjduBXTcOYx50aXsY-CQ}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-elected-as-master ([0] nodes joined)
[2017-01-23T19:06:45,860][INFO ][o.e.h.HttpServer         ] [TgbIozs] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2017-01-23T19:06:45,861][INFO ][o.e.n.Node               ] [TgbIozs] started
[2017-01-23T19:06:46,211][INFO ][o.e.g.GatewayService     ] [TgbIozs] recovered [1] indices into cluster_state
[2017-01-23T19:06:47,046][INFO ][o.e.c.r.a.AllocationService] [TgbIozs] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[megacorp][0]] ...]).
[2017-01-23T19:07:35,357][DEBUG][o.e.c.s.ClusterService   ] [TgbIozs] processing [cluster_update_settings]: took [18ms] done applying updated cluster_state (version: 7, uuid: Wc1Xm4H5SSOcJ6lIM--Stg)
[2017-01-23T19:07:35,357][DEBUG][o.e.c.s.ClusterService   ] [TgbIozs] processing [reroute_after_cluster_update_settings]: execute
[2017-01-23T19:07:35,363][DEBUG][o.e.c.s.ClusterService   ] [TgbIozs] processing [reroute_after_cluster_update_settings]: took [4ms] no change in cluster_state
[2017-01-23T19:07:35,370][DEBUG][i.n.h.c.c.ZlibCodecFactory] -Dio.netty.noJdkZlibDecoder: false
[2017-01-23T19:07:35,372][DEBUG][i.n.h.c.c.ZlibCodecFactory] -Dio.netty.noJdkZlibEncoder: false
[2017-01-23T19:07:35,674][DEBUG][r.suppressed             ] path: /megacorp/_search, params: {size=1000, scroll=1m, index=megacorp}
org.elasticsearch.ElasticsearchParseException: Failed to derive xcontent
    at org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:239) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.action.search.RestSearchAction.parseSearchRequest(RestSearchAction.java:103) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.action.search.RestSearchAction.prepareRequest(RestSearchAction.java:81) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:66) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.RestController.executeHandler(RestController.java:243) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:200) [elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.HttpServer.dispatchRequest(HttpServer.java:113) [elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.netty4.Netty4HttpServerTransport.dispatchRequest(Netty4HttpServerTransport.java:507) [transport-netty4-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.netty4.Netty4HttpRequestHandler.channelRead0(Netty4HttpRequestHandler.java:69) [transport-netty4-5.1.2.jar:5.1.2]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler.channelRead(HttpPipeliningHandler.java:66) [transport-netty4-5.1.2.jar:5.1.2]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:536) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:490) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) [netty-common-4.1.6.Final.jar:4.1.6.Final]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

logstash console 中生成的响应:

输入的命令:logstash_csv.sh "first_name:Jane" "first_name,last_name"

STARTING logstash_csv script......
Sending Logstash's logs to /home/sagarhp/installations/logstash-5.1.2/logs which is now configured via log4j2.properties
[2017-01-23T19:49:25,103][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-01-23T19:49:25,131][INFO ][logstash.pipeline        ] Pipeline main started
[2017-01-23T19:49:25,239][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-01-23T19:49:25,386][INFO ][logstash.outputs.csv     ] Opening file {:path=>"/home/sagarhp/mybin/test.csv"}
{
    "@timestamp" => 2017-01-23T14:04:25.361Z,
         "about" => "I like to collect rock albums",
      "@version" => "1",
     "last_name" => "Smith",
     "interests" => [
        [0] "music"
    ],
    "first_name" => "Jane",
           "age" => 32
}
[2017-01-23T19:49:28,159][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

更新:替换logstash 5.1.2 with 2.4.1。 elasticsearch版本和以前一样是5.1.2。

下面是elasticsearch.log的内容:

[2017-01-24T11:35:18,909][INFO ][o.e.n.Node               ] [] initializing ...
[2017-01-24T11:35:19,101][INFO ][o.e.e.NodeEnvironment    ] [T7CEo0J] using [1] data paths, mounts [[/ (/dev/sda8)]], net usable_space [35.7gb], net total_space [139.6gb], spins? [possibly], types [ext4]
[2017-01-24T11:35:19,102][INFO ][o.e.e.NodeEnvironment    ] [T7CEo0J] heap size [1.9gb], compressed ordinary object pointers [true]
[2017-01-24T11:35:19,111][INFO ][o.e.n.Node               ] node name [T7CEo0J] derived from node ID [T7CEo0J8SOqX13kNEAPAvg]; set [node.name] to override
[2017-01-24T11:35:19,122][INFO ][o.e.n.Node               ] version[5.1.2], pid[8973], build[c8c4c16/2017-01-11T20:18:39.146Z], OS[Linux/3.16.0-70-generic/amd64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_77/25.77-b03]
[2017-01-24T11:35:20,209][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [aggs-matrix-stats]
[2017-01-24T11:35:20,209][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [ingest-common]
[2017-01-24T11:35:20,209][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [lang-expression]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [lang-groovy]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [lang-mustache]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [lang-painless]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [percolator]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [reindex]
[2017-01-24T11:35:20,210][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [transport-netty3]
[2017-01-24T11:35:20,211][INFO ][o.e.p.PluginsService     ] [T7CEo0J] loaded module [transport-netty4]
[2017-01-24T11:35:20,211][INFO ][o.e.p.PluginsService     ] [T7CEo0J] no plugins loaded
[2017-01-24T11:35:22,810][INFO ][o.e.n.Node               ] initialized
[2017-01-24T11:35:22,811][INFO ][o.e.n.Node               ] [T7CEo0J] starting ...
[2017-01-24T11:35:23,039][INFO ][o.e.t.TransportService   ] [T7CEo0J] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300}
[2017-01-24T11:35:23,054][WARN ][o.e.b.BootstrapCheck     ] [T7CEo0J] max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
[2017-01-24T11:35:23,055][WARN ][o.e.b.BootstrapCheck     ] [T7CEo0J] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
[2017-01-24T11:35:26,258][INFO ][o.e.c.s.ClusterService   ] [T7CEo0J] new_master {T7CEo0J}{T7CEo0J8SOqX13kNEAPAvg}{rOR6BRP9S6CqXOChtboGLA}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-elected-as-master ([0] nodes joined)
[2017-01-24T11:35:26,319][INFO ][o.e.h.HttpServer         ] [T7CEo0J] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2017-01-24T11:35:26,320][INFO ][o.e.n.Node               ] [T7CEo0J] started
[2017-01-24T11:35:26,616][INFO ][o.e.g.GatewayService     ] [T7CEo0J] recovered [1] indices into cluster_state
[2017-01-24T11:35:27,494][INFO ][o.e.c.r.a.AllocationService] [T7CEo0J] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[megacorp][1]] ...]).
[2017-01-24T11:35:55,245][DEBUG][o.e.c.s.ClusterService   ] [T7CEo0J] processing [cluster_update_settings]: took [31ms] done applying updated cluster_state (version: 7, uuid: RYMpMgAlT1yXJu8Wkdf-pg)
[2017-01-24T11:35:55,245][DEBUG][o.e.c.s.ClusterService   ] [T7CEo0J] processing [reroute_after_cluster_update_settings]: execute
[2017-01-24T11:35:55,253][DEBUG][o.e.c.s.ClusterService   ] [T7CEo0J] processing [reroute_after_cluster_update_settings]: took [7ms] no change in cluster_state
[2017-01-24T11:36:12,203][DEBUG][r.suppressed             ] path: /megacorp/_search, params: {size=1000, scroll=1m, index=megacorp, search_type=scan}
java.lang.IllegalArgumentException: No search type for [scan]
    at org.elasticsearch.action.search.SearchType.fromString(SearchType.java:107) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.action.search.RestSearchAction.parseSearchRequest(RestSearchAction.java:114) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.action.search.RestSearchAction.prepareRequest(RestSearchAction.java:81) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:66) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.RestController.executeHandler(RestController.java:243) ~[elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:200) [elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.HttpServer.dispatchRequest(HttpServer.java:113) [elasticsearch-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.netty4.Netty4HttpServerTransport.dispatchRequest(Netty4HttpServerTransport.java:507) [transport-netty4-5.1.2.jar:5.1.2]
    at org.elasticsearch.http.netty4.Netty4HttpRequestHandler.channelRead0(Netty4HttpRequestHandler.java:69) [transport-netty4-5.1.2.jar:5.1.2]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler.channelRead(HttpPipeliningHandler.java:66) [transport-netty4-5.1.2.jar:5.1.2]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) [netty-codec-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:536) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:490) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) [netty-common-4.1.6.Final.jar:4.1.6.Final]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

以下是我在logstash console 中得到的内容:

STARTING logstash_csv script......
Settings: Default pipeline workers: 4
A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"{\"query\":{\"query_string\": {\"query\": \"first_name:Jane\"}}}", codec=><LogStash::Codecs::JSON charset=>"UTF-8">, scan=>true, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No search type for [scan]"}],"type":"illegal_argument_exception","reason":"No search type for [scan]"},"status":400} {:level=>:error}
Pipeline main started
A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Elasticsearch hosts=>["localhost:9200"], index=>"megacorp", query=>"{\"query\":{\"query_string\": {\"query\": \"first_name:Jane\"}}}", codec=><LogStash::Codecs::JSON charset=>"UTF-8">, scan=>true, size=>1000, scroll=>"1m", docinfo=>false, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>
  Error: [400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No search type for [scan]"}],"type":"illegal_argument_exception","reason":"No search type for [scan]"},"status":400} {:level=>:error}

【问题讨论】:

    标签: csv elasticsearch


    【解决方案1】:

    如果您愿意使用 Logstash,那么您可以非常轻松地使用 elasticsearch input 进行查询,然后使用 csv output 将数据转储到 CSV 文件中。它看起来像这样:

    input {
      elasticsearch {
        hosts => ["localhost:9200"]
        index => "your_index"
        query => '{"query": {"match_all": {}}}'
      }
    }
    output {
      csv {
        fields => ["field1", "field2", "field3"]
        path => "/path/to/file.csv"
      }
    }
    

    更新

    如果您需要动态调用它,您可以根据您作为 shell 脚本输入的查询动态生成此 logstash 配置:

    #!/bin/sh
    
    if [ -z "$LOGSTASH_HOME" ]; then
        echo "WARNING: The LOGSTASH_HOME environment variable is not set!"
        exit 0
    fi
    
    LS_CONF="input {
       elasticsearch {
         hosts => [\"localhost:9200\"]
         index => 'megacorp'
         query => '{\"query\":{\"query_string\": {\"query\": \"$1\"}}}'
       }
    }
    output {
       csv {
         fields => [$2]
         path => \"/path/to/file.csv\"
       }
    }"
    
    $LOGSTASH_HOME/bin/logstash -e "$LS_CONF"
    

    然后您可以像这样使用查询my_field:123456 调用该脚本

    ./es_to_csv.sh "my_field:123456" "field1,field2,field3"
    

    这与调用{{elasticUrl}}/_search?q=my_field:123456 的效果相同,并生成一个包含field1,field2,field3 列的CSV 文件

    【讨论】:

    • 感谢您的回复。据我了解,我必须在 Logstash 配置中提供这个并运行 logstash。因此,如果我向 elasticsearch 提供自定义查询,例如:{{elasticUrl}}/_search?q=....,则所有满足给定查询的文档都将导出到 file.csv。我是否正确理解这一点?我要求确保根据查询过滤文档,然后仅写入 csv。
    • 是的,查询部分看起来像 query =&gt; '...',其中 ... 是您在 URL 中的 q=... 参数后面放置的任何内容
    • 运行logstash时没有请求URL,直接运行bin/logstash -f yourconfig.conf
    • 好的,谢谢。那么fields 配置呢?他们是否也尊重我们在 URL 中传递的内容?或者,它们是静态的并且不会改变?
    • 是的,这就是document_type =&gt; 'supervisors',您需要将其添加到elasticsearch 输入中
    【解决方案2】:

    这有点困难,因为本质上 - JSON 是一种分层数据结构,而 CSV 不是。

    因此,没有一种简单的方法可以将一个减少到另一个 - 您所做的任何事情都将是定制的。

    但是您可以执行以下操作:

    #!/usr/bin/env perl
    
    use strict;
    use warnings;
    
    use LWP;
    use JSON;
    
    my $url =
      'http://localhost:9200/index-name/path/AVm7dsU_mwKGPn0NRXkK';
    
    my $agent    = LWP::UserAgent->new;
    my $response = $agent->get($url);
    
    if ( $response->code ) {
       my $json = from_json( $response->content );
       my @fields = sort keys %{ $json->{_source} };
    
       #column headings
       print join ",", @fields, "\n";
       #column values
       print join ",", @{ $json->{_source} }{@fields}, "\n";
    }
    

    这有点粗略,并假设_source 存在平坦的键值关系。对于多条记录,您需要将其包装在一个循环中以打印多条记录 - 这只是单个文档的示例。

    最好 - 如果可能的话 - 更改任何需要 CSV 的内容,首先处理多维数据格式。

    【讨论】:

    • 感谢您的回复。实际上,我不仅有扁平的键值关系。我有json data in nested form。我目前正在使用ElasticSearch。之前,我使用Apache Solr 能够从嵌套的 json 生成 csv 数据。 Solr 以扁平形式存储深度嵌套的 json,只需在请求参数中指定 wt=csv 即可返回 csv。与 Solr 一样,ElasticSearch 也建立在 Lucene 之上。而且,我希望 ElasticSearch 也能够产生类似的 csv 响应。
    • 好吧,以上将允许您展平 json,但您必须决定如何做。例如如果它是一个数组,你想将键连接到一个字段中吗?或者展开数据结构,使其成为多行(请记住 - 这意味着如果您有很多多值字段,您可能会有很多行)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-26
    • 2019-03-28
    • 1970-01-01
    • 2019-05-13
    相关资源
    最近更新 更多