【问题标题】:Results pagination in Cassandra (CQL)Cassandra (CQL) 中的结果分页
【发布时间】:2015-01-01 15:36:12
【问题描述】:

我想知道如何使用 Cassandra 实现分页。

假设我有一个博客。该博客每页最多列出 10 个帖子。要访问下一篇文章,用户必须点击分页菜单以访问第 2 页(第 11-20 篇文章)、第 3 页(第 21-30 篇文章)等。

在 MySQL 下使用 SQL,我可以做到以下几点:

SELECT * FROM posts LIMIT 20,10;

LIMIT 的第一个参数是从结果集开头的偏移量,第二个参数是要获取的行数。上面的示例返回从第 20 行开始的 10 行。

如何在 CQL 中达到同样的效果?

我在 Google 上找到了一些解决方案,但它们都需要“上次查询的最后结果”。它适用于让“下一步”按钮分页到另一个 10 个结果集,但是如果我想从第 1 页跳转到第 5 页怎么办?

【问题讨论】:

    标签: cassandra cql cql3 datastax-enterprise


    【解决方案1】:

    如果您使用的是 Cassandra 2.0+,则不需要使用令牌。

    Cassandra 2.0 具有自动分页功能。 不再使用令牌函数来创建分页,而是现在是一个内置功能。

    现在开发人员可以遍历整个结果集,而不必担心它的大小是否大于内存。当客户端代码对结果进行迭代时,可以获取一些额外的行,同时删除旧的行。

    在Java中看这个,注意SELECT语句返回所有行,检索的行数设置为100。

    我在这里展示了一个简单的语句,但是同样的代码可以用准备好的语句和绑定语句来编写。如果不需要,可以禁用自动分页。测试各种获取大小设置也很重要,因为您希望保持足够小的内存,但又不能小到需要过多往返数据库。查看 this 博客文章,了解分页在服务器端的工作原理。

    Statement stmt = new SimpleStatement(
                      "SELECT * FROM raw_weather_data"
                      + " WHERE wsid= '725474:99999'"
                        + " AND year = 2005 AND month = 6");
    stmt.setFetchSize(24);
    ResultSet rs = session.execute(stmt);
    Iterator<Row> iter = rs.iterator();
    while (!rs.isFullyFetched()) {
       rs.fetchMoreResults();
       Row row = iter.next();
       System.out.println(row);
    }
    

    【讨论】:

    • 如果您要为网页提供分页结果,此示例并没有真正的帮助。用户可能会为页面添加书签并稍后返回,期望在几天或几周后找到相同的结果。他们需要能够将令牌传回以在结果中完全相同的位置恢复。
    • @user3170530:查看我关于手动分页的新答案。
    • 好吧,我在表中有 6 行,获取大小设置为 200,我从不在 while 中。所以这个例子行不通。
    • 博客链接已损坏。
    【解决方案2】:

    尝试在 CQL 中使用 token 函数: https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useToken.html

    另外一个建议,如果你使用的是 DSE,solr 支持深度分页: https://cwiki.apache.org/confluence/display/solr/Pagination+of+Results

    【讨论】:

    • DSE 是否已经支持深度分页?据我所知,这个功能是在 Solr 4.7 中引入的,但 DSE 4.5(最新)仍在使用 Solr 4.6。会尝试这个艰难的
    • 在 4.7 中对深度分页进行了改进——光标功能 issues.apache.org/jira/browse/SOLR-5463 但是分页在 4.6 中存在
    • 还可以查看我们在 github 中的示例代码,这里有一个不错的分页示例 github.com/DataStaxCodeSamples/datastax-paging-demo
    • 链接已损坏。
    【解决方案3】:

    手动分页

    驱动程序公开了一个 PagingState 对象,该对象表示我们在获取最后一页时在结果集中的位置:

    ResultSet resultSet = session.execute("your query");
    // iterate the result set...
    PagingState pagingState = resultSet.getExecutionInfo().getPagingState();
    

    这个对象可以序列化为字符串或者字节数组:

    String string = pagingState.toString();
    byte[] bytes = pagingState.toBytes();
    

    这个序列化的表单可以保存在某种形式的持久存储中,以便以后重用。当稍后检索到该值时,我们可以将其反序列化并在语句中重新注入:

    PagingState pagingState = PagingState.fromString(string);
    Statement st = new SimpleStatement("your query");
    st.setPagingState(pagingState);
    ResultSet rs = session.execute(st);
    

    请注意,分页状态只能用于完全相同的语句(相同的查询字符串、相同的参数)。此外,它是一个不透明的值,仅用于收集、存储和重复使用。如果您尝试修改其内容或使用不同的语句重用它,驱动程序将引发错误。

    源:https://docs.datastax.com/en/cql-oss/3.3/cql/cql_reference/cqlshPaging.html

    【讨论】:

    • 如果数据库在与PagingState 的调用之间发生变化怎么办?它仍然有效吗?我知道它可能会在这里或那里错过一个页面,这很好,但它仍然会找到它的方式吗?如果索引所在的页面被删除了怎么办?
    • 如何验证您是否位于最后一页的末尾?那是什么方法呢?
    • 我们有什么 pagingState 存储在某个地方,在下一次调用之前,一些记录被删除了?然后会发生什么。
    【解决方案4】:

    如果您阅读此文档“使用分页状态令牌获取下一个结果”,

    https://datastax.github.io/php-driver/features/result_paging/

    我们可以使用“分页状态令牌”在应用程序级别进行分页。 所以 PHP 逻辑应该是这样的,

    <?php
    $limit = 10;
    $offset = 20;
    
    $cluster   = Cassandra::cluster()->withContactPoints('127.0.0.1')->build();
    $session   = $cluster->connect("simplex");
    $statement = new Cassandra\SimpleStatement("SELECT * FROM paging_entries Limit ".($limit+$offset));
    
    $result = $session->execute($statement, new Cassandra\ExecutionOptions(array('page_size' => $offset)));
    // Now $result has all rows till "$offset" which we can skip and jump to next page to fetch "$limit" rows.
    
    while ($result->pagingStateToken()) {
        $result = $session->execute($statement, new Cassandra\ExecutionOptions($options = array('page_size' => $limit,'paging_state_token' => $result->pagingStateToken())));
        foreach ($result as $row) {
          printf("key: '%s' value: %d\n", $row['key'], $row['value']);
        }
    }
    ?>
    

    【讨论】:

      【解决方案5】:

      虽然 count 在 CQL 中可用,但到目前为止我还没有看到 offset 部分的好的解决方案...

      所以...我一直在考虑的一个解决方案是使用后台进程创建页面集。

      在某个表中,我会将博客页面 A 创建为对页面 1、2、... 10 的一组引用。然后博客页面 B 的另一个条目指向页面 11 到 20,等等。

      换句话说,我将使用设置为页码的行键来构建自己的索引。您仍然可以使其具有一定的灵活性,因为您可以让用户选择每页查看 10、20 或 30 个引用。例如,当设置为 30 时,将集合 1、2 和 3 显示为页面 A,将集合 4、5、6 显示为页面 B,等等)

      如果您有一个后端进程来处理所有这些,您可以在添加新页面和从博客中删除旧页面时更新您的列表。该过程应该非常快(如果即使那么慢,也应该是 1 分钟,对于 1,000,000 行......),然后您可以立即找到要在列表中显示的页面。 (显然,如果您要让成千上万的用户每人发布数百页……这个数字会迅速增长。)

      如果您想提供一个复杂的 WHERE 子句,情况就会变得更加复杂。默认情况下,博客会向您显示从最新到最旧的所有帖子的列表。您还可以提供带有标签 Cassandra 的帖子列表。也许您想反转顺序等。除非您有某种形式的高级方法来创建索引,否则这很困难。在我这边,我有一种类似 C 的语言,它可以偷看和戳到一行中的值,以 (a) 选择它们,如果选择 (b) 对它们进行排序。换句话说,就我而言,我已经可以拥有与 SQL 中的一样复杂的 WHERE 子句。但是,我还没有将我的列表分解为页面。我想下一步...

      【讨论】:

        【解决方案6】:

        对节点 js (koa js,marko js) 使用 cassandra-node 驱动程序:分页 问题

        由于缺少跳过功能,我们需要解决。下面是node app手动分页的实现,大家可以理解一下。

        • 简单用户列表代码
        • 在下一页和上一页状态之间导航
        • 易于复制

        我将在这里说明两种解决方案,但仅给出下面解决方案 1 的代码,

        解决方案 1:维护 nextprevious 记录的页面状态(维护堆栈或任何最适合的数据结构)

        解决方案 2:循环遍历所有有限制的记录,并将所有可能的页面状态保存在变量中,并相对于它们的 pageStates 生成页面

        在模型中使用这个注释代码,我们可以获得页面的所有状态

                    //for the next flow
                    //if (result.nextPage) {
                    // Retrieve the following pages:
                    // the same row handler from above will be used
                    // result.nextPage();
                    //}
        

        路由器功能

            var userModel = require('/models/users');
                  public.get('/users', users);
                  public.post('/users', filterUsers);
        
            var users = function* () {//get request
                var data = {};
                var pageState = { "next": "", "previous": "" };
                try {
                    var userCount = yield userModel.Count();//count all users with basic count query
        
                    var currentPage = 1;
                    var pager = yield generatePaging(currentPage, userCount, pagingMaxLimit);
                    var userList = yield userModel.List(pager);
                    data.pageNumber = currentPage;
                    data.TotalPages = pager.TotalPages;
                    console.log('--------------what now--------------');
                    data.pageState_next = userList.pageStates.next;
                    data.pageState_previous = userList.pageStates.previous;
                    console.log("next ", data.pageState_next);
                    console.log("previous ", data.pageState_previous);
        
                    data.previousStates = null;
        
                    data.isPrevious = false;
                    if ((userCount / pagingMaxLimit) > 1) {
                        data.isNext = true;
                    }
        
                    data.userList = userList;
                    data.totalRecords = userCount;
                    console.log('--------------------userList--------------------', data.userList);
                    //pass to html template
                }
                catch (e) {
                    console.log("err ", e);
                    log.info("userList error : ", e);
                }
           this.body = this.stream('./views/userList.marko', data);
           this.type = 'text/html';
            };
        
            //post filter and get list
            var filterUsers = function* () {
                console.log("<------------------Form Post Started----------------->");
                var data = {};
                var totalCount;
                data.isPrevious = true;
                data.isNext = true;
        
                var form = this.request.body;
                console.log("----------------formdata--------------------", form);
                var currentPage = parseInt(form.hdpagenumber);//page number hidden in html
                console.log("-------before current page------", currentPage);
                var pageState = null;
                try {
                    var statesArray = [];
                    if (form.hdallpageStates && form.hdallpageStates !== '') {
                        statesArray = form.hdallpageStates.split(',');
                    }
                    console.log(statesArray);
        
                    //develop stack to track paging states
                    if (form.hdpagestateRequest === 'next') {
                        console.log('--------------------------next---------------------');
                        currentPage = currentPage + 1;
                        statesArray.push(form.hdpageState_next);
                        pageState = form.hdpageState_next;
                    }
                    else if (form.hdpagestateRequest === 'previous') {
                        console.log('--------------------------pre---------------------');
                        currentPage = currentPage - 1;
                        var p_st = statesArray.length - 2;//second last index
                        console.log('this index of array to be removed ', p_st);
                        pageState = statesArray[p_st];
                        statesArray.splice(p_st, 1);
                        //pageState = statesArray.pop();
                    }
                    else if (form.hdispaging === 'false') {
                        currentPage = 1;
                        pageState = null;
                        statesArray = [];
                    }
        
        
                    data.previousStates = statesArray;
                    console.log("paging true");
        
                    totalCount = yield userModel.Count();
        
                    var pager = yield generatePaging(form.hdpagenumber, totalCount, pagingMaxLimit);
                    data.pageNumber = currentPage;
                    data.TotalPages = pager.TotalPages;
        
                    //filter function - not yet constructed
                    var searchUsers = yield userModel.searchList(pager, pageState);
                    data.usersList = searchUsers;
                    if (searchUsers.pageStates) {
                        data.pageStates = searchUsers.pageStates;
                        data.next = searchUsers.nextPage;
                        data.pageState_next = searchUsers.pageStates.next;
                        data.pageState_previous = searchUsers.pageStates.previous;
        
                        //show previous and next buttons accordingly
                        if (currentPage == 1 && pager.TotalPages > 1) {
                            data.isPrevious = false;
                            data.isNext = true;
                        }
                        else if (currentPage == 1 && pager.TotalPages <= 1) {
                            data.isPrevious = false;
                            data.isNext = false;
                        }
                        else if (currentPage >= pager.TotalPages) {
                            data.isPrevious = true;
                            data.isNext = false;
                        }
                        else {
                            data.isPrevious = true;
                            data.isNext = true;
                        }
                    }
                    else {
                        data.isPrevious = false;
                        data.isNext = false;
                    }
                    console.log("response ", searchUsers);
                    data.totalRecords = totalCount;
        
                   //pass to html template
                }
                catch (e) {
                    console.log("err ", e);
                    log.info("user list error : ", e);
                }
                console.log("<------------------Form Post Ended----------------->");
           this.body = this.stream('./views/userList.marko', data);
           this.type = 'text/html';
            };
        
            //Paging function
            var generatePaging = function* (currentpage, count, pageSizeTemp) {
                var paging = new Object();
                var pagesize = pageSizeTemp;
                var totalPages = 0;
                var pageNo = currentpage == null ? null : currentpage;
                var skip = pageNo == null ? 0 : parseInt(pageNo - 1) * pagesize;
                var pageNumber = pageNo != null ? pageNo : 1;
                totalPages = pagesize == null ? 0 : Math.ceil(count / pagesize);
                paging.skip = skip;
                paging.limit = pagesize;
                paging.pageNumber = pageNumber;
                paging.TotalPages = totalPages;
                return paging;
            };
        

        模型函数

            var clientdb = require('../utils/cassandradb')();
            var Users = function (options) {
              //this.init();
              _.assign(this, options);
            };
        
            Users.List = function* (limit) {//first time
                    var myresult; var res = [];
                    res.pageStates = { "next": "", "previous": "" };
        
                    const options = { prepare: true, fetchSize: limit };
                    console.log('----------did i appeared first?-----------');
        
                    yield new Promise(function (resolve, reject) {
                        clientdb.eachRow('SELECT * FROM users_lookup_history', [], options, function (n, row) {
                            console.log('----paging----rows');
                            res.push(row);
                        }, function (err, result) {
                            if (err) {
                                console.log("error ", err);
                            }
                            else {
                                res.pageStates.next = result.pageState;
                                res.nextPage = result.nextPage;//next page function
                            }
                            resolve(result);
                        });
                    }).catch(function (e) {
                        console.log("error ", e);
                    }); //promise ends
        
                    console.log('page state ', res.pageStates);
                    return res;
                };
        
                Users.searchList = function* (pager, pageState) {//paging filtering
                    console.log("|------------Query Started-------------|");
                    console.log("pageState if any ", pageState);
                    var res = [], myresult;
                    res.pageStates = { "next": "" };
                    var query = "SELECT * FROM users_lookup_history ";
                    var params = [];
        
                    console.log('current pageState ', pageState);
                    const options = { pageState: pageState, prepare: true, fetchSize: pager.limit };
                    console.log('----------------did i appeared first?------------------');
        
                    yield new Promise(function (resolve, reject) {
                        clientdb.eachRow(query, [], options, function (n, row) {
                            console.log('----Users paging----rows');
                            res.push(row);
                        }, function (err, result) {
                            if (err) {
                                console.log("error ", err);
                            }
                            else {
                                res.pageStates.next = result.pageState;
                                res.nextPage = result.nextPage;
                            }
                            //for the next flow
                            //if (result.nextPage) {
                            // Retrieve the following pages:
                            // the same row handler from above will be used
                            // result.nextPage();
                            //}
                            resolve(result);
                        });
                    }).catch(function (e) {
                        console.log("error ", e);
                        info.log('something');
                    }); //promise ends
        
                    console.log('page state ', pageState);
        
                    console.log("|------------Query Ended-------------|");
                    return res;
                };
        

        HTML端

                <div class="box-footer clearfix">
                <ul class="pagination pagination-sm no-margin pull-left">
                     <if test="data.isPrevious == true">
                     <li><a class='submitform_previous' href="">Previous</a></li>
                     </if>
                     <if test="data.isNext == true">
                        <li><a class="submitform_next" href="">Next</a></li>
                     </if>
                 </ul>
                 <ul class="pagination pagination-sm no-margin pull-right">
                            <li>Total Records : $data.totalRecords</li>&nbsp;&nbsp;
                            <li> | Total Pages : $data.TotalPages</li>&nbsp;&nbsp;
                            <li> | Current Page : $data.pageNumber</li>&nbsp;&nbsp;
                 </ul>
                 </div>
        

        我对node js和cassandra db不是很熟悉,这个解决方案肯定可以改进。解决方案 1 是从分页想法开始的工作示例代码。干杯

        【讨论】:

          【解决方案7】:

          a detailed blog.

          我们的用例类似。从 Cassandra 表中提取所有内容(cassandra 巧妙地通过一次获取约 5000 个并返回一个游标),对每一行进行大量个性化处理,然后继续。一旦我们的迭代达到接近 5000 次,它就会再次在内部获取下一个 5000 行块并将其添加到结果游标中。它做得如此出色,以至于我们甚至感觉不到幕后发生的这种神奇。

          但是它成为了我们的瓶颈。由于迭代块需要一些时间,直到它到达块的末尾,Cassandra 认为连接没有被使用并自动关闭连接并大喊大叫,它的超时。所以我们用页面状态来实现。

          from cassandra.cluster import Cluster
          from cassandra.auth import PlainTextAuthProvider
          from cassandra.query import SimpleStatement
          
          
          # connection with cassandra
          cluster = Cluster(["127.0.0.1"], auth_provider=PlainTextAuthProvider(username="pankaj", password="pankaj"))
          
          
          session = cluster.connect()
          
          
          # setting keyspace
          session.set_keyspace("my_keyspace")
          
          
          # set fetch size
          fetch_size = 100
          
          
          # It will print first 100 records
          next_page_available = True
          paging_state        = None
          data_count          = 0
          
          
          while next_page_available is True:
              # fetches a new chunk with given page state
              result = fetch_a_fresh_chunk(paging_state)
              paging_state = results.paging_state
          
          
              for result in results:
                  # process payload here.....
          
          
                  # payload processed 
                  data_count += 1
          
          
                  # once we reach fetch size, we stop cassandra to fetch more chunk, internally
                  if data_count == fetch_size:
                      i = 0
                      break
          
          
          # fetches a fresh chunk with given page state 
          def fetch_a_fresh_chunk(paging_state = None)
              query       = "SELECT * FROM my_cute_cassandra_table;"
              
              statement   = SimpleStatement(query, fetch_size = fetch_size)
              results     = session.execute(statement, paging_state=paging_state)
          

          【讨论】:

            猜你喜欢
            • 2015-08-08
            • 2015-03-24
            • 2021-04-19
            • 2017-10-18
            • 2016-04-08
            • 2017-06-25
            • 2015-03-25
            • 2013-03-16
            • 2012-08-03
            相关资源
            最近更新 更多