【问题标题】:Streaming a ResultSet as RDF using a custom vocabulary使用自定义词汇将 ResultSet 流式传输为 RDF
【发布时间】:2023-03-12 19:10:01
【问题描述】:

我需要将从查询中获得的 Jena com.hp.hpl.jena.query.ResultSet 流式传输到转换为 RDF 输出格式的远程端点。 我知道 Jena 为此提供了ResultSetFormatter.toModel 工具,但是我有以下限制:

  • 我想使用不同的表示/词汇,而不是 Jena 提供的那种,并且
  • 我不想将数据加载到内存中。换句话说,我不想创建一个Model 并用ResultSet 填充它,而是在我对其进行迭代时流出三元组,以控制内存消耗。
  • 我仍然希望受益于 Jena 序列化程序

我见过StreamRDF这个接口,但是我对如何有效使用它不是很清楚。 在这种情况下,什么是正确的方法?

【问题讨论】:

  • 在耶拿用户列表中提问和回答。
  • 我集成了另一种更紧凑的方法,使用 Jena 实用程序,正如用户列表中的对话中所建议的那样。

标签: java stream rdf resultset jena


【解决方案1】:

我实施的解决方案对我有用,因此我将其发布为答案。 下面的 sn-p 应该提供足够的信息,因为它涵盖了问题中的要求。值得注意的是,我创建了两个类,一个ResultSetTripleIterator 和一个QuerySolutionToTripleAdapter 接口。第一个负责将三元组流式传输给作者,而第二个负责从每个 QuerySolution 构建一个三元组的迭代器。

public class ResultSetTripleIterator implements Iterator<Triple> {
   private ResultSet rs;
   private QuerySolutionToTripleAdapter ad;
   private Iterator<Triple> it = null;
   public ResultSetTripleIterator(ResultSet resultSet, QuerySolutionToTripleAdapter adapter) {
      this.rs = resultSet;
      this.ad = adapter;
   }
   @Override
   public boolean hasNext() {
      if(it != null && it.hasNext()){
         return true;
      }
      it = null;
      return rs.hasNext();
   }
   @Override
   public Triple next() {
       if(it == null){
           it = ad.adapt(rs.next());
       }
       return it.next();
   }
}
public interface QuerySolutionToTripleAdapter {
    public Iterator<Triple> adapt(QuerySolution qs);
}

以下是应用程序示例:

// Can be any OutputStream
OutputStream os = new ByteArrayOutputStream();
StreamRDF stream = StreamRDFWriter.getWriterStream(os, Lang.TRIG);
QueryExecution qe = QueryExecutionFactory.sparqlService(
        "http://data.open.ac.uk/sparql", "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT ?A ?B ?C WHERE {?A a ?B . ?A rdf:type ?C} LIMIT 100");

Iterator<Triple> iter = new ResultSetTripleIterator(qe.execSelect(), new QuerySolutionToTripleAdapter() {
    Integer rowIndex = 0;

    @Override
    public Iterator<Triple> adapt(QuerySolution qs) {
        rowIndex++;
        String ns = "http://www.example.org/test/row#";
        String pns = "http://www.example.org/test/col#";
        Resource subject = ResourceFactory.createResource(ns + Integer.toString(rowIndex));
        Property property;
        List<Triple> list = new ArrayList<Triple>();
        Iterator<String> cn = qs.varNames();
        while (cn.hasNext()) {
            String c = cn.next();
            property = ResourceFactory.createProperty(pns + c);
            list.add(new Triple(subject.asNode(), property.asNode(), qs.get(c).asNode()));
        }
        return list.iterator();
    }
});
stream.start();
StreamOps.sendTriplesToStream(iter, stream);
stream.finish();

但是,Jena 似乎不支持某些 RDF 序列化流式传输,即 XML 和 JSON 格式,例如导致 org.apache.jena.riot.RiotException: No serialization for language Lang:rdf/null

编辑

感谢 Jena 用户邮件列表中的反馈,可以通过使用 Jena 的实用程序类来操作迭代器和执行转换来压缩上面的代码。

sn-p可以改写如下:

OutputStream os = new ByteArrayOutputStream();
StreamRDF stream = StreamRDFWriter.getWriterStream(os, Lang.RDFTHRIFT);
QueryExecution qe = QueryExecutionFactory.sparqlService(
        "http://data.open.ac.uk/sparql", "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> SELECT ?A ?B ?C WHERE {?A a ?B . ?A rdf:type ?C} LIMIT 100");

Transform<QuerySolution, Iterator<Triple>> m = new Transform<QuerySolution, Iterator<Triple>>() {
    Integer rowIndex = 0;

    @Override
    public Iterator<Triple> convert(QuerySolution qs) {
        rowIndex++;
        String ns = "http://www.example.org/test/row#";
        String pns = "http://www.example.org/test/col#";
        Resource subject = ResourceFactory.createResource(ns + Integer.toString(rowIndex));
        Property property;
        List<Triple> list = new ArrayList<Triple>();
        Iterator<String> cn = qs.varNames();
        while (cn.hasNext()) {
            String c = cn.next();
            property = ResourceFactory.createProperty(pns + c);
            list.add(new Triple(subject.asNode(), property.asNode(), qs.get(c).asNode()));
        }
        return list.iterator();
    }
};
Iterator<Triple> iter = WrappedIterator.createIteratorIterator( Iter.map( qe.execSelect(), m ));
stream.start();
StreamOps.sendTriplesToStream(iter, stream);
stream.finish();

【讨论】:

  • 不是所有的序列化都可以写成流。 JSON-LD 和 RDF-XML 不能。 (可以使用一种形式的 RDF/XML,但它会很冗长,每个三元组上都有名称空间声明。)。为了完整起见,现在实现了语言 RDF/NULL。
  • 这部分正确,可以编写 JSON-LD 广告 RDF/XML 流,问题是您仍然需要在内存中放一些东西(例如在 json-ld 中标记上下文最后)或者结果可能过于冗长(你提到的情况)。但是因为语法而改变序列化方法也可能不是一个好主意,因为我们可能不知道客户端的用例(他们在非常大的查询结果上请求 rdf/xml,然后服务器得到 OOM - 我的情况)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-09-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多