实际上是doable。您可以像您描述的那样进行选择,但仅限于特定格式:JSON、CSV、Parquet。
想象在so67315601 存储桶中有一个data.json 文件在eu-central-1 中:
{
"a": "dataA",
"b": "dataB",
"c": "dataC",
"d": "dataD",
"e": "dataE"
}
首先,了解如何通过 S3 控制台选择字段。使用“对象操作”→“使用 S3 选择查询”:
AWS Java 开发工具包 1.x
这是使用 AWS Java SDK 1.x 进行选择的代码:
@ExtendWith(S3.class)
class SelectTest {
@AWSClient(endpoint = Endpoint.class)
private AmazonS3 client;
@Test
void test() throws IOException {
// LINES: Each line in the input data contains a single JSON object
// DOCUMENT: A single JSON object can span multiple lines in the input
final JSONInput input = new JSONInput();
input.setType(JSONType.DOCUMENT);
// Configure input format and compression
final InputSerialization inputSerialization = new InputSerialization();
inputSerialization.setJson(input);
inputSerialization.setCompressionType(CompressionType.NONE);
// Configure output format
final OutputSerialization outputSerialization = new OutputSerialization();
outputSerialization.setJson(new JSONOutput());
// Build the request
final SelectObjectContentRequest request = new SelectObjectContentRequest();
request.setBucketName("so67315601");
request.setKey("data.json");
request.setExpression("SELECT s.a, s.b FROM s3object s LIMIT 5");
request.setExpressionType(ExpressionType.SQL);
request.setInputSerialization(inputSerialization);
request.setOutputSerialization(outputSerialization);
// Run the query
final SelectObjectContentResult result = client.selectObjectContent(request);
// Parse the results
final InputStream stream = result.getPayload().getRecordsInputStream();
IOUtils.copy(stream, System.out);
}
}
输出是:
{"a":"dataA","b":"dataB"}
AWS Java SDK 2.x
AWS Java SDK 2.x 的代码更加巧妙。更多信息请参考this ticket。
@ExtendWith(S3.class)
class SelectTest {
@AWSClient(endpoint = Endpoint.class)
private S3AsyncClient client;
@Test
void test() throws Exception {
final InputSerialization inputSerialization = InputSerialization
.builder()
.json(JSONInput.builder().type(JSONType.DOCUMENT).build())
.compressionType(CompressionType.NONE)
.build();
final OutputSerialization outputSerialization = OutputSerialization.builder()
.json(JSONOutput.builder().build())
.build();
final SelectObjectContentRequest select = SelectObjectContentRequest.builder()
.bucket("so67315601")
.key("data.json")
.expression("SELECT s.a, s.b FROM s3object s LIMIT 5")
.expressionType(ExpressionType.SQL)
.inputSerialization(inputSerialization)
.outputSerialization(outputSerialization)
.build();
final TestHandler handler = new TestHandler();
client.selectObjectContent(select, handler).get();
RecordsEvent response = (RecordsEvent) handler.receivedEvents.stream()
.filter(e -> e.sdkEventType() == SelectObjectContentEventStream.EventType.RECORDS)
.findFirst()
.orElse(null);
System.out.println(response.payload().asUtf8String());
}
private static class TestHandler implements SelectObjectContentResponseHandler {
private SelectObjectContentResponse response;
private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
private Throwable exception;
@Override
public void responseReceived(SelectObjectContentResponse response) {
this.response = response;
}
@Override
public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
publisher.subscribe(receivedEvents::add);
}
@Override
public void exceptionOccurred(Throwable throwable) {
exception = throwable;
}
@Override
public void complete() {
}
}
}
如您所见,可以通过编程方式进行 S3 选择!
您可能想知道@AWSClient 和@ExtendWith( S3.class ) 是什么?
这是一个小型库,用于在您的测试中注入 AWS 客户端,名为 aws-junit5。这将大大简化您的测试。我是作者。 The usage 非常简单——在你的下一个项目中试试吧!