【问题标题】:Cassandra pagination卡桑德拉分页
【发布时间】:2017-10-30 11:13:11
【问题描述】:

我在 Cassandra 中有一张包含 100 万条记录的表。我想一次获取 100 条记录,所以如果我获取前 100 条,下一次获取应该从项目 101 开始。我如何获得这种分页?我也用了PagingState,但是没用。

我的代码如下:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;

/**
 * 
 * The solution of skipping rows is that use page state rather than iterator
 * rows one by one.
 *
 */
public class CassandraPaging {

    private Session session;

    public CassandraPaging(Session session) {
        this.session = session;
    }

    /**
     * Retrieve rows for the specified page offset.
     * 
     * @param statement
     * @param start
     *            starting row (>1), inclusive
     * @param size
     *            the maximum rows need to retrieve.
     * @return List<Row>
     */
    public List<Row> fetchRowsWithPage(Statement statement, int start, int size) {
        ResultSet result = skipRows(statement, start, size);
        return getRows(result, start, size);
    }

    private ResultSet skipRows(Statement statement, int start, int size) {
        ResultSet result = null;
        int skippingPages = getPageNumber(start, size);
        String savingPageState = null;
        statement.setFetchSize(size);
        boolean isEnd = false;
        for (int i = 0; i < skippingPages; i++) {
            if (null != savingPageState) {
                statement = statement.setPagingState(PagingState
                        .fromString(savingPageState));
            }
            result = session.execute(statement);
            PagingState pagingState = result.getExecutionInfo()
                    .getPagingState();
            if (null != pagingState) {
                savingPageState = result.getExecutionInfo().getPagingState()
                        .toString();
            }

            if (result.isFullyFetched() && null == pagingState) {
                // if hit the end more than once, then nothing to return,
                // otherwise, mark the isEnd to 'true'
                if (true == isEnd) {
                    return null;
                } else {
                    isEnd = true;
                }
            }
        }
        return result;
    }

    private int getPageNumber(int start, int size) {
        if (start < 1) {
            throw new IllegalArgumentException(
                    "Starting row need to be larger than 1");
        }
        int page = 1;
        if (start > size) {
            page = (start - 1) / size + 1;
        }
        return page;
    }

    private List<Row> getRows(ResultSet result, int start, int size) {
        List<Row> rows = new ArrayList<>(size);
        if (null == result) {
            return rows;
        }
        int skippingRows = (start - 1) % size;
        int index = 0;
        for (Iterator<Row> iter = result.iterator(); iter.hasNext()
                && rows.size() < size;) {
            Row row = iter.next();
            if (index >= skippingRows) {
                rows.add(row);
            }
            index++;
        }
        return rows;
    }
}

这是主要的方法:

public static void main(String[] args) {
    Cluster cluster = null;
    Session session = null;

    try {
        cluster = Cluster.builder().addContactPoint("localhost").withPort(9042).build();
        session = cluster.connect("mykeyspace");

        Statement select = QueryBuilder.select().all().from("mykeyspace", "Mytable");

        CassandraPaging cassandraPaging = new CassandraPaging(session);
        System.out.println("*************First Page1 **************");
        List<Row> firstPageRows = cassandraPaging.fetchRowsWithPage(select, 1, 5);
        printUser(firstPageRows);

        System.out.println("*************Second Page2 **************");
        List<Row> secondPageRows = cassandraPaging.fetchRowsWithPage(select, 6, 5);
        printUser(secondPageRows);

        System.out.println("*************Third Page3 **************");
        List<Row> thirdPageRows = cassandraPaging.fetchRowsWithPage(select, 6, 5);
        printUser(thirdPageRows);

        cluster.close();
        session.close();

    } catch(Exception exp) {
        exp.printStackTrace();
    } finally {
        cluster.close();
        session.close();
    }
}

private static void printUser(final List<Row> inRows) {
    for (Row row : inRows) {
        System.out.println("Id is:" + row.getUUID("id"));
        System.out.println("Name is:" + row.getInt("name"));
        System.out.println("account is:" + row.getString("account"));
    }
}

【问题讨论】:

    标签: java cassandra pagination bigdata database


    【解决方案1】:

    要使用以下解决方案,您的类路径中需要 spring-data 依赖项。

    Spring 提供PageRequest,它是Pageable 的实现,接受pageNosize(页面上显示的记录数)。

    import org.springframework.data.domain.PageRequest;
    import org.springframework.data.domain.Pageable;
    
    PageRequest(int page, int size)
    

    示例

    创建回购。
    要创建 repo,请使用 org.springframework.data.repository.PagingAndSortingRepository

    class CasandraRepo extends PagingAndSortingRepository{
    
    }
    

    //在repository.findAll中使用这个pageReq,如下图;

    Pageable pageReq = new PageRequest(0, 10);
    CasandraRepo  repo;
    repo.findAll(pageReq);
    

    【讨论】:

    • 其实我使用的是 import com.datastax.driver.core.PagingState;当我获取全部记录时它工作正常,但我不希望一次完整的数据,所以我使用 setFetchSize() 然后它不起作用。
    【解决方案2】:
      /*First, get the number of page states with page limit size (in my case 25):*/
    
     int n=0;
     PagingState pageStates=null;
     Map<Integer, PagingState> stringMap=new HashMap<Integer, PagingState>();
     do{
        Statement select = QueryBuilder.select().all().from("keyspace", "tablename").setFetchSize(25).setPagingState(pageStates);
        ResultSet resultSet=session.execute(select);
        pageStates=resultSet.getExecutionInfo().getPagingState();
        stringMap.put(++n,pageStates);
     }while (pageStates!=null);
    
    
     /*Then, find page index -> get the exact page state -> pass it in query
     ========================================================================
     1.Get the page number
     2.calculate the offset with pagelimit(in my case 25)
     3.get the pageindex
     4. pass pagestate of appropriate page index in query */
    
    
     int pagenumber ;                      
     int offset = (pagenumber * 25) - 25;  
     int pageindex=(offset/25)-1;        
     Statement selectq = QueryBuilder.select().all().from("keyspace", "tablename").setPagingState(stringMap.get(pageindex));  
     ResultSet resultSet = session.execute(selectq);
     fourthPageRows=cassandraPaging.getRows(resultSet,offset,25);
    

    【讨论】:

      【解决方案3】:

      为此,您需要在项目中导入 spring-cassandra-data 依赖项。

      Simple PageRequest 不能用于获取可分页对象,就像我们发送 0 以外的页面(或第一页)时一样。它会引发异常:“无法为除第一页 (0) 之外的索引页面创建 Cassandra 页面请求。”

      像这样使用 CassandraPageRequest:

      private static final int PAGE = 0;
      private static final String DEFAULT_CURSOR_MARK = "-1";
      private static final String SORT_FIELD = "test_name";
      
      public TestResponse getData(int pageSize, String cursorMark) {
      
          Pageable pageable = CassandraPageRequest.of(PageRequest.of(PAGE, pageSize, Sort.by(Sort.Direction.DESC, SORT_FIELD)), DEFAULT_CURSOR_MARK.equalsIgnoreCase(
                  cursorMark) ? null : PagingState.fromString(cursorMark));
      
          Slice<Test> testSlice = testRepository.findAll(pageable);
      
          TestResponse testResponse = new TestResponse();
          testResponse.setRecords(testSlice.getContent());
      
          if(!testSlice.isLast()) {
              testResponse.setNextCursorMark(((CassandraPageRequest)testSlice.getPageable()).getPagingState().toString());
          } else {
              testResponse.setNextCursorMark(DEFAULT_CURSOR_MARK);
          }
      
          return testResponse;
      }
      

      对于所有进一步的请求,PAGE 将保持为 0,因为当我们通过 cursorMark(或 Cassandra 中的 PagingState)时它没有意义。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2012-03-29
        • 2015-10-19
        • 2015-03-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-09-16
        相关资源
        最近更新 更多