【问题标题】:Pull from Cassandra database whenever any new rows or any new update is there?每当有任何新行或任何新更新时从 Cassandra 数据库中提取?
【发布时间】:2016-09-15 02:03:26
【问题描述】:

我正在开发一个需要在 Cassandra 数据库中存储 Avro 模式的系统。所以在 Cassandra 中,我们将存储这样的东西

SchemaId            AvroSchema

1                   some schema
2                   another schema

现在假设我在 Cassandra 的上表中插入另一行,现在表是这样的 -

SchemaId            AvroSchema

1                   some schema
2                   another schema
3                   another new schema

只要我在上表中插入新行 - 我需要告诉我的 Java 程序去提取新的架构 ID 和相应的架构..

解决这类问题的正确方法是什么?

我知道,一种方法是每隔几分钟进行一次轮询,假设每 5 分钟我们会从上表中提取数据,但这不是解决此问题的正确方法,因为每 5 分钟,无论是否有任何新架构,我都在拉动..

但除此之外还有其他解决方案吗?

我们可以使用 Apache Zookeeper 吗?还是 Zookeeper 不适合这个问题? 还是有其他解决方案?

我正在运行 Apache Cassandra 1.2.9

【问题讨论】:

    标签: java triggers cassandra apache-zookeeper


    【解决方案1】:

    一些解决方案:

    • 使用数据库触发器:Cassandra 2.0 有一些触发器支持,但它看起来不是最终版本,根据本文:http://www.datastax.com/dev/blog/whats-new-in-cassandra-2-0-prototype-triggers-support 可能会在 2.1 中稍作更改。触发器是一种常见的解决方案。
    • 您提出了轮询,但这并不总是一个糟糕的选择。特别是如果您有一些东西将该行标记为尚未被拉出,那么您可以将新行从 Cassandra 中拉出。如果查询成本不高,那么每 5 分钟拉一次对于 Cassandra 或任何数据库来说都不是明智之举。如果很少插入新行,则此选项可能不太好。

    Zookeeper 不是一个完美的解决方案,请参阅以下引用:

    因为手表是一次性触发器,并且之间存在延迟 获取事件并发送新请求以获取您无法获得的手表 可靠地看到 ZooKeeper 中节点发生的每一个变化。是 准备处理znode多次变化的情况 在获取事件和再次设置手表之间。 (你不可以 关心,但至少意识到它可能会发生。)

    引用来源:http://zookeeper.apache.org/doc/r3.4.2/zookeeperProgrammers.html#sc_WatchRememberThese

    【讨论】:

    • 感谢您的建议.. 所以根据您的说法,轮询选项在我的用例中要好得多?我没有其他选择可以使用吗?感谢您的帮助...
    • 我不知道您的用例中的所有因素,但根据我目前听到的情况,轮询不是一个应该取消的选项。
    【解决方案2】:

    卡桑德拉 3.0

    您可以使用它,它会将插入中的所有内容作为 json 对象获取。

    public class HelloWorld implements ITrigger
    {
        private static final Logger logger = LoggerFactory.getLogger(HelloWorld.class);
    
        public Collection<Mutation> augment(Partition partition)
        {
            String tableName = partition.metadata().cfName;
            logger.info("Table: " + tableName);
    
            JSONObject obj = new JSONObject();
            obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
    
            try {
                UnfilteredRowIterator it = partition.unfilteredIterator();
                while (it.hasNext()) {
                    Unfiltered un = it.next();
                    Clustering clt = (Clustering) un.clustering();  
                    Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
                    Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
    
                    while(columns.hasNext()){
                        ColumnDefinition columnDef = columns.next();
                        Cell cell = cells.next();
                        String data = new String(cell.value().array()); // If cell type is text
                        obj.put(columnDef.toString(), data);
                    }
                }
            } catch (Exception e) {
    
            }
            logger.debug(obj.toString());
    
            return Collections.emptyList();
        }
    }
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-20
    • 2021-09-08
    • 1970-01-01
    • 2019-07-22
    相关资源
    最近更新 更多