【问题标题】:HBase application: Unit testing by mocking the HBaseHBase 应用程序:通过模拟 HBase 进行单元测试
【发布时间】:2014-07-22 20:41:12
【问题描述】:

我的应用程序中有一个从 HBase 访问数据的方法。它使用scan方法查询hbase。我想编写单元测试用例来测试这个功能。所以我想模拟 hbase 调用。怎么做?我正在使用 Mockit 进行模拟。

【问题讨论】:

  • 您是在尝试测试实际查询,还是在尝试模拟查询以便测试使用查询数据的应用程序部分?
  • 我想测试对从HBase返回的数据进行操作的服务方法。

标签: unit-testing junit hbase mockito powermock


【解决方案1】:

如果您使用的是 Mockito,则可以存根您的类以使它们返回您想要的内容。

假设您有一个名为HBaseHelper 的类和一个名为getData() 的方法,该方法使用扫描器从hbase 中检索数据。现在假设您在另一个类中有另一个名为 useData() 的方法:

public String useData() {
  String data = hbaseHelper.getData();

  // ... Do things with data
  return data;
}

如果您使用的是 Mockito,您可以在测试中有效地执行类似的操作以返回虚拟“数据”并测试使用此数据的方法:

import org.mockito.Mock;
import org.mockito.Mockito.when;

@Mock
HBaseHelper hbaseHelper;

@Test
public void testFoo() {
  when(hbaseHelper.getData()).thenReturn("hello world");

  assertThat(useData()).equals("hello world");
}

【讨论】:

  • 谢谢Xinzz。我正在尝试做类似的事情。但是我的 HBaseHelper 有静态方法和静态块。所以它试图在其静态块中连接到实际的 HBase。那么如何模拟静态方法调用。
  • 如果您必须模拟静态方法,这通常是糟糕的设计。但是,如果您真的需要,请查看 PowerMockito[code.google.com/p/powermock/wiki/MockitoUsage],特别是 PowerMockito.mockStatic()
  • 我用 power mock 做到了。但是在 Util 类中有静态方法是不是很糟糕,我在其中获取 hbase 表并检索数据并返回。
【解决方案2】:

在 Java 中,您可以像这样使用HBaseTestingUtility

private static final HBaseTestingUtility TEST_UTIL =
  new HBaseTestingUtility();

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}

此外,您可能需要 Thrift 服务器 来使用一些客户端库:

private static final HBaseTestingUtility TEST_UTIL =
  new HBaseTestingUtility();

ThriftServer thriftServer;
Thread thriftServerThread;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();

List<String> args = new ArrayList<>();
port = HBaseTestingUtility.randomFreePort();
args.add("-" + ThriftServer.PORT_OPTION);
args.add(String.valueOf(port));
args.add("-infoport");
int infoPort = HBaseTestingUtility.randomFreePort();
args.add(String.valueOf(infoPort));
args.add("start");

thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());

thriftServerThread = new Thread(new Runnable() {
  @Override
  public void run() {
    thriftServer.doMain(args.toArray(new String[args.size()]));
  }
});
thriftServerThread.setDaemon(true)
thriftServerThread.start();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}

pyspark 通过 py4j 这种方式:

def setUp(self):
    super(StreamingTest, self).setUp()

    # --- hbase configuration ---
    hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
        .loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
    self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()

    self._hbaseTestingUtility.startMiniCluster()


def tearDown(self):
    if self._hbaseTestingUtility is not None:
        self._hbaseTestingUtility.shutdownMiniCluster()

如果需要 Thrift 服务器(例如使用happybase 客户端库):

def setUp(self):
    super(StreamingTest, self).setUp()

    # --- hbase configuration ---
    hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
        .loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
    self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()
    self._hbaseTestingUtility.getConfiguration().setBoolean("hbase.table.sanity.checks", False)  # for thrift
    self._hbaseTestingUtility.startMiniCluster()

    # --- thrift server configuration ---
    thrift_server_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
        .loadClass('org.apache.hadoop.hbase.thrift.ThriftServer')

    # make thrift server instance
    cArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
    cArgs[0] = self._hbaseTestingUtility.getConfiguration().getClass()
    iArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)
    iArgs[0] = self._hbaseTestingUtility.getConfiguration()

    self._thriftServer = thrift_server_clz\
        .getDeclaredConstructor(cArgs)\
        .newInstance(iArgs)

    # prepare server start arguments
    tArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.String, 5)
    port = self._hbaseTestingUtility.randomFreePort()
    self.thrift_port = port
    tArgs[0] = "-port"
    tArgs[1] = str(port)
    tArgs[2] = "-infoport"
    info_port = self._hbaseTestingUtility.randomFreePort()
    tArgs[3] = str(info_port)
    tArgs[4] = "start"

    mArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
    mArgs[0] = tArgs.getClass()
    method = thrift_server_clz.getDeclaredMethod('doMain', mArgs)
    method.setAccessible(True)

    args = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)

    # start server in separate thread
    args[0] = tArgs
    self.thrift_server_thread = threading.Thread(target=method.invoke, args=[self._thriftServer, args])
    self.thrift_server_thread.setDaemon(True)
    self.thrift_server_thread.start()

当然,hbase 和 thrift jar 应该通过 spark submit 传递:

--jars path/to/jar1.jar,path/to/jar2.jar, --conf spark.driver.userClassPathFirst=true

【讨论】:

  • 谢谢!,顺便说一句,在 thrift server java mock 中,我认为 cmdLineThread.start() 应该是 thriftServerThread.start() 你也必须使用反射才能这样做
  • BiS,谢谢您的评论。固定的。 Spark 开发人员使用了反射,我也这样做了。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-09-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-04
  • 2013-11-12
  • 2018-11-21
相关资源
最近更新 更多