您可以使用派生列、排名和数据透视转换的组合来执行此操作。
- 假设我将给定的示例数据(字符串数组)作为列
mycol。
- 现在,我已经使用了
rank转换。我为排名列提供了列名id,并使用mycol列作为排序条件(升序)。结果如下所示:
- 现在我已经使用派生列创建了一个动态表达式为
unfold(mycol) 的new 列。
- 由于某种原因,这个新列的类型没有正确呈现。因此,我使用 cast 使其成为具有复杂类型定义的复杂类型
string[]。
- 我创建了 2 个新列
key 和 value。动态内容如下:
key: split(new[1],':')[1]
value: split(new[1],':')[2]
- 现在我已经使用了
pivot转换。在这里,我在 id 上使用分组依据,选择数据透视列作为 key,选择数据透视列作为 max(value)(因为必须使用聚合)。
- 获得所需的结果。以下是整个数据流 JSON(实际转换从排名开始,因为您已经有了数组列。)
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "csv1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "dest",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "derivedColumn1"
},
{
"name": "rank1"
},
{
"name": "derivedColumn2"
},
{
"name": "cast1"
},
{
"name": "derivedColumn3"
},
{
"name": "pivot1"
}
],
"scriptLines": [
"source(output(",
" mycol as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source1 derive(mycol = split(replace(replace(replace(mycol,'[',''),']',''),'"',''),',')) ~> derivedColumn1",
"derivedColumn1 rank(asc(mycol, true),",
" output(id as long)) ~> rank1",
"rank1 derive(new = unfold(mycol)) ~> derivedColumn2",
"derivedColumn2 cast(output(",
" new as string[]",
" ),",
" errors: true) ~> cast1",
"cast1 derive(key = split(new[1],':')[1],",
" value = split(new[1],':')[2]) ~> derivedColumn3",
"derivedColumn3 pivot(groupBy(id),",
" pivotBy(key),",
" {} = max(value),",
" columnNaming: '$N$V',",
" lateral: true) ~> pivot1",
"pivot1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" partitionFileNames:['op.csv'],",
" umask: 0022,",
" preCommands: [],",
" postCommands: [],",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" saveOrder: 1,",
" partitionBy('hash', 1)) ~> sink1"
]
}
}
}