我实施的解决方案对我有用,因此我将其发布为答案。
下面的 sn-p 应该提供足够的信息,因为它涵盖了问题中的要求。值得注意的是,我创建了两个类,一个ResultSetTripleIterator 和一个QuerySolutionToTripleAdapter 接口。第一个负责将三元组流式传输给作者,而第二个负责从每个 QuerySolution 构建一个三元组的迭代器。
public class ResultSetTripleIterator implements Iterator<Triple> {
private ResultSet rs;
private QuerySolutionToTripleAdapter ad;
private Iterator<Triple> it = null;
public ResultSetTripleIterator(ResultSet resultSet, QuerySolutionToTripleAdapter adapter) {
this.rs = resultSet;
this.ad = adapter;
}
@Override
public boolean hasNext() {
if(it != null && it.hasNext()){
return true;
}
it = null;
return rs.hasNext();
}
@Override
public Triple next() {
if(it == null){
it = ad.adapt(rs.next());
}
return it.next();
}
}
public interface QuerySolutionToTripleAdapter {
public Iterator<Triple> adapt(QuerySolution qs);
}
以下是应用程序示例:
// Can be any OutputStream
OutputStream os = new ByteArrayOutputStream();
StreamRDF stream = StreamRDFWriter.getWriterStream(os, Lang.TRIG);
QueryExecution qe = QueryExecutionFactory.sparqlService(
"http://data.open.ac.uk/sparql", "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT ?A ?B ?C WHERE {?A a ?B . ?A rdf:type ?C} LIMIT 100");
Iterator<Triple> iter = new ResultSetTripleIterator(qe.execSelect(), new QuerySolutionToTripleAdapter() {
Integer rowIndex = 0;
@Override
public Iterator<Triple> adapt(QuerySolution qs) {
rowIndex++;
String ns = "http://www.example.org/test/row#";
String pns = "http://www.example.org/test/col#";
Resource subject = ResourceFactory.createResource(ns + Integer.toString(rowIndex));
Property property;
List<Triple> list = new ArrayList<Triple>();
Iterator<String> cn = qs.varNames();
while (cn.hasNext()) {
String c = cn.next();
property = ResourceFactory.createProperty(pns + c);
list.add(new Triple(subject.asNode(), property.asNode(), qs.get(c).asNode()));
}
return list.iterator();
}
});
stream.start();
StreamOps.sendTriplesToStream(iter, stream);
stream.finish();
但是,Jena 似乎不支持某些 RDF 序列化流式传输,即 XML 和 JSON 格式,例如导致 org.apache.jena.riot.RiotException: No serialization for language Lang:rdf/null。
编辑
感谢 Jena 用户邮件列表中的反馈,可以通过使用 Jena 的实用程序类来操作迭代器和执行转换来压缩上面的代码。
sn-p可以改写如下:
OutputStream os = new ByteArrayOutputStream();
StreamRDF stream = StreamRDFWriter.getWriterStream(os, Lang.RDFTHRIFT);
QueryExecution qe = QueryExecutionFactory.sparqlService(
"http://data.open.ac.uk/sparql", "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT ?A ?B ?C WHERE {?A a ?B . ?A rdf:type ?C} LIMIT 100");
Transform<QuerySolution, Iterator<Triple>> m = new Transform<QuerySolution, Iterator<Triple>>() {
Integer rowIndex = 0;
@Override
public Iterator<Triple> convert(QuerySolution qs) {
rowIndex++;
String ns = "http://www.example.org/test/row#";
String pns = "http://www.example.org/test/col#";
Resource subject = ResourceFactory.createResource(ns + Integer.toString(rowIndex));
Property property;
List<Triple> list = new ArrayList<Triple>();
Iterator<String> cn = qs.varNames();
while (cn.hasNext()) {
String c = cn.next();
property = ResourceFactory.createProperty(pns + c);
list.add(new Triple(subject.asNode(), property.asNode(), qs.get(c).asNode()));
}
return list.iterator();
}
};
Iterator<Triple> iter = WrappedIterator.createIteratorIterator( Iter.map( qe.execSelect(), m ));
stream.start();
StreamOps.sendTriplesToStream(iter, stream);
stream.finish();