【问题标题】:Cassandra slow performance on AWSCassandra 在 AWS 上的性能下降
【发布时间】:2019-05-13 23:08:24
【问题描述】:

我们的一位 DBA 使用相同的 Python 代码(如下)在 AWS EC2 上将 Cassandra 与 Oracle 进行了基准测试,以获取插入性能(100 万条记录),并获得了以下令人惊讶的结果:

Oracle 12.2,单节点,64 核/256GB,EC2 EBS 存储,38 秒

Cassandra 5.1.13 (DDAC),单节点,2核/4GB,EC2 EBS 存储,464 秒

Cassandra 3.11.4,四个节点,16 核/64GB(每个节点),EC2 EBS 存储,486 秒

所以 - 我们做错了什么?
Cassandra 怎么表现这么慢?
* 没有足够的节点? (为什么 4 个节点比单个节点慢?)
* 配置问题?
* 还有什么?

谢谢!

以下是 Python 代码:

import logging
import time
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster, BatchStatement
from cassandra.query import SimpleStatement
from cassandra.auth import PlainTextAuthProvider

class PythonCassandraExample:

    def __init__(self):
        self.cluster = None
        self.session = None
        self.keyspace = None
        self.log = None

    def __del__(self):
        self.cluster.shutdown()

    def createsession(self):
        auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
        self.cluster = Cluster(['10.220.151.138'],auth_provider = auth_provider)
        self.session = self.cluster.connect(self.keyspace)

    def getsession(self):
        return self.session

    # How about Adding some log info to see what went wrong
    def setlogger(self):
        log = logging.getLogger()
        log.setLevel('INFO')
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
        log.addHandler(handler)
        self.log = log

    # Create Keyspace based on Given Name
    def createkeyspace(self, keyspace):
        """
        :param keyspace:  The Name of Keyspace to be created
        :return:
        """
        # Before we create new lets check if exiting keyspace; we will drop that and create new
        rows = self.session.execute("SELECT keyspace_name FROM system_schema.keyspaces")
        if keyspace in [row[0] for row in rows]:
            self.log.info("dropping existing keyspace...")
            self.session.execute("DROP KEYSPACE " + keyspace)

        self.log.info("creating keyspace...")
        self.session.execute("""
                CREATE KEYSPACE %s
                WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
                """ % keyspace)

        self.log.info("setting keyspace...")
        self.session.set_keyspace(keyspace)

    def create_table(self):
        c_sql = """
                CREATE TABLE IF NOT EXISTS employee (emp_id int PRIMARY KEY,
                                              ename varchar,
                                              sal double,
                                              city varchar);
                 """
        self.session.execute(c_sql)
        self.log.info("Employee Table Created !!!")

    # lets do some batch insert
    def insert_data(self):
        i = 1
        while i < 1000000:
          insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
          batch = BatchStatement()
          batch.add(insert_sql, (i, 'Danny', 2555, 'De-vito'))
          self.session.execute(batch)
          # self.log.info('Batch Insert Completed for ' + str(i))
          i += 1

    # def select_data(self):
    #    rows = self.session.execute('select count(*) from perftest.employee limit 5;')
    #    for row in rows:
    #        print(row.ename, row.sal)

    def update_data(self):
        pass

    def delete_data(self):
        pass


if __name__ == '__main__':
    example1 = PythonCassandraExample()
    example1.createsession()
    example1.setlogger()
    example1.createkeyspace('perftest')
    example1.create_table()

    # Populate perftest.employee table
    start = time.time()
    example1.insert_data()
    end = time.time()
    print ('Duration: ' + str(end-start) + ' sec.')

    # example1.select_data()

【问题讨论】:

  • 对于这种基准测试,您应该使用相同的硬件。
  • 感谢您的回复!我们目前必须使用现有的硬件。问题是,Cassandra,它应该是一个专门为快速随机插入和获取优化的 NoSQL 数据库,为什么在这样的插入负载中执行如此缓慢......
  • 这看起来 DBA 正在创建一个不公平的基准,也许是为了阻止公司远离 Oracle。虽然您可以制定一个突出 Cassandra 优势的基准测试,但您不太可能达到简单的插入速度:关系数据库已针对该用例进行了高度调整。此外,Cassandra 不一定比小规模的替代品表现更好(而且 4 个节点很小:如果您查看 Cassandra 页面,它们会突出显示具有数千个节点的安装)。
  • 1.你会推荐什么测试来正确测试 Cassandra? 2. 据我所知,Cassandra 确实针对高速到达的单次插入进行了优化……我错了吗?谢谢
  • 我不能推荐一个基准,因为我不知道你的工作量。这是迄今为止开发(或批评)基准时最重要的考虑因素。如果您正在编写的实际应用程序的行为方式与您的 DBA 的基准测试相同,那么使用传统的 RDMS 可能会更好。

标签: performance amazon-web-services cassandra


【解决方案1】:

这里有多个问题:

  • 对于第二次测试,您没有为 DDAC 分配足够的内存和内核,因此 Cassandra 仅获得 1Gb 堆 - 默认情况下,Cassandra 占用所有可用内存的 1/4。第三次测试也是如此 - 它只会获得 16Gb 的堆内存,您可能需要将其提高到更高的值,例如 24Gb 甚至更高。
  • 不清楚每次测试中有多少 IOP - EBS 具有不同的吞吐量,具体取决于卷的大小及其类型
  • 您正在使用同步 API 来执行命令 - 基本上,您在确认已插入前一项后插入下一项。 using asynchronous API可以达到最佳吞吐量;
  • 您在每次迭代中都在准备语句 - 这会导致每次都将 CQL 字符串发送到服务器,因此会减慢一切速度 - 只需将行 insert_sql = self.session.prepare( 移出循环即可;
  • (不完全相关)您正在使用批处理语句来写入数据 - 它是anti-pattern in Cassandra,因为数据只发送到一个节点,然后应该将数据分发到真正拥有数据的节点。这解释了为什么 4 节点集群比 1 节点集群差。

附:现实的负载测试是相当困难的。对此有专门的工具,例如,您可以在this blog post 中找到更多信息。

【讨论】:

    【解决方案2】:

    下面的更新代码将每 100 条记录批处理一次:

    """
    Python  by Techfossguru
    Copyright (C) 2017  Satish Prasad
    
    """
    import logging
    import warnings
    import time
    from cassandra import ConsistencyLevel
    from cassandra.cluster import Cluster, BatchStatement
    from cassandra.query import SimpleStatement
    from cassandra.auth import PlainTextAuthProvider
    
    class PythonCassandraExample:
    
        def __init__(self):
            self.cluster = None
            self.session = None
            self.keyspace = None
            self.log = None
    
        def __del__(self):
            self.cluster.shutdown()
    
        def createsession(self):
            auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
            self.cluster = Cluster(['10.220.151.138'],auth_provider = auth_provider)
            self.session = self.cluster.connect(self.keyspace)
    
        def getsession(self):
            return self.session
    
        # How about Adding some log info to see what went wrong
        def setlogger(self):
            log = logging.getLogger()
            log.setLevel('INFO')
            handler = logging.StreamHandler()
            handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
            log.addHandler(handler)
            self.log = log
    
        # Create Keyspace based on Given Name
        def createkeyspace(self, keyspace):
            """
            :param keyspace:  The Name of Keyspace to be created
            :return:
            """
            # Before we create new lets check if exiting keyspace; we will drop that and create new
            rows = self.session.execute("SELECT keyspace_name FROM system_schema.keyspaces")
            if keyspace in [row[0] for row in rows]:
                self.log.info("dropping existing keyspace...")
                self.session.execute("DROP KEYSPACE " + keyspace)
    
            self.log.info("creating keyspace...")
            self.session.execute("""
                    CREATE KEYSPACE %s
                    WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
                    """ % keyspace)
    
            self.log.info("setting keyspace...")
            self.session.set_keyspace(keyspace)
    
        def create_table(self):
            c_sql = """
                    CREATE TABLE IF NOT EXISTS employee (emp_id int PRIMARY KEY,
                                                  ename varchar,
                                                  sal double,
                                                  city varchar);
                     """
            self.session.execute(c_sql)
            self.log.info("Employee Table Created !!!")
    
        # lets do some batch insert
        def insert_data(self):
            i = 1
            insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
            batch = BatchStatement()
            warnings.filterwarnings("ignore", category=FutureWarning)
    
            while i < 1000001:
              # insert_sql = self.session.prepare("INSERT INTO  employee (emp_id, ename , sal,city) VALUES (?,?,?,?)")
              # batch = BatchStatement()
              batch.add(insert_sql, (i, 'Danny', 2555, 'De-vito'))
    
              # Commit every 100 records
              if (i % 100 == 0):
                 self.session.execute(batch)
                 batch = BatchStatement()
                 # self.log.info('Batch Insert Completed for ' + str(i))
              i += 1
            self.session.execute(batch)
    
        # def select_data(self):
        #    rows = self.session.execute('select count(*) from actimize.employee limit 5;')
        #    for row in rows:
        #        print(row.ename, row.sal)
    
        def update_data(self):
            pass
    
        def delete_data(self):
            pass
    
    
    if __name__ == '__main__':
        example1 = PythonCassandraExample()
        example1.createsession()
        example1.setlogger()
        example1.createkeyspace('actimize')
        example1.create_table()
    
        # Populate actimize.employee table
        start = time.time()
        example1.insert_data()
        end = time.time()
        print ('Duration: ' + str(end-start) + ' sec.')
    
        # example1.select_data()
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-06-12
    • 1970-01-01
    • 2023-02-08
    • 2017-10-07
    • 2014-05-19
    • 2013-12-24
    • 2014-07-27
    • 2011-04-08
    相关资源
    最近更新 更多