【问题标题】:Python: how to mock a kafka topic for unit tests?Python:如何模拟 kafka 主题以进行单元测试?
【发布时间】:2017-03-13 09:55:18
【问题描述】:

我们有一个消息调度程序,它在将消息属性放入带有该键的 Kafka 主题队列之前,会根据消息属性生成一个哈希键。

这样做是为了重复数据删除。但是,我不确定如何在不实际设置本地集群并检查其是否按预期执行的情况下测试这种重复数据删除。

在线搜索用于模拟 Kafka 主题队列的工具并没有帮助,我担心我可能以错误的方式思考这个问题。

最终,无论用于模拟 Kafka 队列的什么,都应该与本地集群的行为方式相同 - 即通过向主题队列插入密钥来提供重复数据删除。

有没有这样的工具?

【问题讨论】:

    标签: python unit-testing apache-kafka kafka-python


    【解决方案1】:

    如果您需要验证 Kafka 特定功能,或使用 Kafka 特定功能的实现,那么唯一的方法就是使用 Kafka!

    Kafka 是否对其重复数据删除逻辑进行了任何测试?如果是这样,以下组合可能足以减轻您的组织的失败风险:

    • 哈希逻辑的单元测试(确保相同的对象确实生成相同的哈希)
    • Kafka 主题重复数据删除测试(Kafka 项目内部)
    • 飞行前烟雾测试验证您的应用与 Kafka 的集成

    如果 Kafka 没有围绕其主题重复数据删除进行任何类型的测试,或者您担心破坏性更改,那么围绕 Kafka 特定功能进行自动检查非常重要。这可以通过集成测试来完成。我最近在基于 Docker 的集成测试管道上取得了很大的成功。在创建 Kafka docker 镜像(社区可能已经提供了一个)的初步工作之后,设置集成测试管道变得微不足道。管道可能如下所示:

    • 执行基于应用程序的单元测试(哈希逻辑)
    • 一旦通过,您的 CI 服务器就会启动 Kafka
    • 执行集成测试,验证重复写入仅向主题发出一条消息。

    我认为重要的是确保将 Kafka 集成测试最小化,以仅包含绝对依赖于 Kafka 特定功能的测试。即使使用 docker-compose,它们也可能比单元测试慢几个数量级,~1 毫秒 vs 1 秒?要考虑的另一件事是维护集成管道的开销可能值得冒险相信 Kakfa 将提供它声称的主题重复数据删除。

    【讨论】:

      【解决方案2】:

      这是一个用 Python 对 Kafka 相关功能进行自动化测试的示例:https://github.com/up9inc/async-ms-demo/blob/main/grayscaler/tests.py

      它使用http://mockintosh.io项目的“Kafka Mock”功能。

      免责声明:我隶属于该项目。

      【讨论】:

        【解决方案3】:

        为了在 Python 下模拟 Kafka 单元测试 使用 SBT 测试任务,我做了如下。 Pyspark 应该已安装。

        build.sbt 中定义应该与测试一起运行的任务:

        val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")
        
        val command = "python3 -m unittest app_test.py"
        val workingDirectory = new File("./project/src/main/python")
        
        testPythonTask := {
          val s: TaskStreams = streams.value
          s.log.info("Executing task testPython")
          Process(command,
            workingDirectory,
            // arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
            "PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
              // collect all jar paths from project
              .format((fullClasspath in Runtime value)
              .map(_.data.getCanonicalPath)
                .filter(_.contains(".jar"))
                .mkString(",")),
            "PYSPARK_PYTHON" -> "python3") ! s.log
        }
        
        //attach custom test task to default test tasks
        test in Test := {
          testPythonTask.value
          (test in Test).value
        }
        
        testOnly in Test := {
          testPythonTask.value
          (testOnly in Test).value
        }
        

        python 测试用例 (app_test.py):

        import random
        import unittest
        from itertools import chain
        
        from pyspark.streaming.kafka import KafkaUtils
        from pyspark.streaming.tests import PySparkStreamingTestCase
        
        class KafkaStreamTests(PySparkStreamingTestCase):
            timeout = 20  # seconds
            duration = 1
        
            def setUp(self):
                super(KafkaStreamTests, self).setUp()
        
                kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
                    .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
                self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
                self._kafkaTestUtils.setup()
        
            def tearDown(self):
                if self._kafkaTestUtils is not None:
                    self._kafkaTestUtils.teardown()
                    self._kafkaTestUtils = None
        
                super(KafkaStreamTests, self).tearDown()
        
            def _randomTopic(self):
                return "topic-%d" % random.randint(0, 10000)
        
            def _validateStreamResult(self, sendData, stream):
                result = {}
                for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
                                                           sum(sendData.values()))):
                    result[i] = result.get(i, 0) + 1
        
                self.assertEqual(sendData, result)
        
            def test_kafka_stream(self):
                """Test the Python Kafka stream API."""
                topic = self._randomTopic()
                sendData = {"a": 3, "b": 5, "c": 10}
        
                self._kafkaTestUtils.createTopic(topic)
                self._kafkaTestUtils.sendMessages(topic, sendData)
        
                stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
                                                 "test-streaming-consumer", {topic: 1},
                                                 {"auto.offset.reset": "smallest"})
                self._validateStreamResult(sendData, stream)
        

        pyspark.streaming.tests 模块中的 Flume、Kinesis 和其他更多示例。

        【讨论】:

          猜你喜欢
          • 2020-04-06
          • 1970-01-01
          • 1970-01-01
          • 2013-11-18
          • 2015-07-01
          • 2018-04-04
          • 2021-09-27
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多