我没有使用过 Dataproc 版本的 Druid,但在 Google Compute VM 中运行了一个小型集群。我从 GCS 提取数据的方式是使用 Google Cloud Storage Druid 扩展 - https://druid.apache.org/docs/latest/development/extensions-core/google.html
要启用扩展,您需要将其添加到 Druid common.properties 文件中的扩展列表中:
druid.extensions.loadList=["druid-google-extensions", "postgresql-metadata-storage"]
为了从 GCS 获取数据,我将 HTTP POST 请求发送到 http://druid-overlord-host:8081/druid/indexer/v1/task
POST 请求正文包含带有摄取规范的 JSON 文件(请参阅 ["ioConfig"]["firehose"] 部分):
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "daily_xport_test",
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "MONTH",
"queryGranularity": "NONE",
"rollup": false
},
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "dateday",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [{
"type": "string",
"name": "id",
"createBitmapIndex": true
},
{
"type": "long",
"name": "clicks_count_total"
},
{
"type": "long",
"name": "ctr"
},
"deleted",
"device_type",
"target_url"
]
}
}
}
},
"ioConfig": {
"type": "index_parallel",
"firehose": {
"type": "static-google-blobstore",
"blobs": [{
"bucket": "data-test",
"path": "/sample_data/daily_export_18092019/000000000000.json.gz"
}],
"filter": "*.json.gz$"
},
"appendToExisting": false
},
"tuningConfig": {
"type": "index_parallel",
"maxNumSubTasks": 1,
"maxRowsInMemory": 1000000,
"pushTimeout": 0,
"maxRetry": 3,
"taskStatusCheckPeriodMs": 1000,
"chatHandlerTimeout": "PT10S",
"chatHandlerNumRetries": 5
}
}
}
在 Druid 中启动摄取任务的示例 cURL 命令(spec.json 包含上一节中的 JSON):
curl -X 'POST' -H 'Content-Type:application/json' -d @spec.json http://druid-overlord-host:8081/druid/indexer/v1/task