【发布时间】:2017-11-22 18:27:32
【问题描述】:
我想在 flink 上使用弹性生产者,但我在身份验证方面遇到了一些麻烦: 我的弹性搜索集群前面有 Nginx,我在 nginx 中使用基本身份验证。
但是使用弹性搜索连接器,我无法在我的网址中添加基本身份验证(因为 InetSocketAddress)
您是否有使用带有基本身份验证的弹性搜索连接器的想法?
感谢您的宝贵时间。
这是我的代码:
val configur = new java.util.HashMap[String, String]
configur.put("cluster.name", "cluster")
configur.put("bulk.flush.max.actions", "1000")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("cluster.com"), 9300))
jsonOutput.filter(_.nonEmpty).addSink(new ElasticsearchSink(configur,
transportAddresses,
new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val jsonMap = parse(element).values.asInstanceOf[java.util.HashMap[String, String]]
return Requests.indexRequest()
.index("flinkTest")
.source(jsonMap);
}
override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
indexer.add(createIndexRequest(element))
}
}))
【问题讨论】:
标签: elasticsearch apache-flink flink-streaming basic-authentication