在 Java 中,您可以像这样使用HBaseTestingUtility:
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}
此外,您可能需要 Thrift 服务器 来使用一些客户端库:
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
ThriftServer thriftServer;
Thread thriftServerThread;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();
List<String> args = new ArrayList<>();
port = HBaseTestingUtility.randomFreePort();
args.add("-" + ThriftServer.PORT_OPTION);
args.add(String.valueOf(port));
args.add("-infoport");
int infoPort = HBaseTestingUtility.randomFreePort();
args.add(String.valueOf(infoPort));
args.add("start");
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());
thriftServerThread = new Thread(new Runnable() {
@Override
public void run() {
thriftServer.doMain(args.toArray(new String[args.size()]));
}
});
thriftServerThread.setDaemon(true)
thriftServerThread.start();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}
用 pyspark 通过 py4j 这种方式:
def setUp(self):
super(StreamingTest, self).setUp()
# --- hbase configuration ---
hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()
self._hbaseTestingUtility.startMiniCluster()
def tearDown(self):
if self._hbaseTestingUtility is not None:
self._hbaseTestingUtility.shutdownMiniCluster()
如果需要 Thrift 服务器(例如使用happybase 客户端库):
def setUp(self):
super(StreamingTest, self).setUp()
# --- hbase configuration ---
hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()
self._hbaseTestingUtility.getConfiguration().setBoolean("hbase.table.sanity.checks", False) # for thrift
self._hbaseTestingUtility.startMiniCluster()
# --- thrift server configuration ---
thrift_server_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass('org.apache.hadoop.hbase.thrift.ThriftServer')
# make thrift server instance
cArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
cArgs[0] = self._hbaseTestingUtility.getConfiguration().getClass()
iArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)
iArgs[0] = self._hbaseTestingUtility.getConfiguration()
self._thriftServer = thrift_server_clz\
.getDeclaredConstructor(cArgs)\
.newInstance(iArgs)
# prepare server start arguments
tArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.String, 5)
port = self._hbaseTestingUtility.randomFreePort()
self.thrift_port = port
tArgs[0] = "-port"
tArgs[1] = str(port)
tArgs[2] = "-infoport"
info_port = self._hbaseTestingUtility.randomFreePort()
tArgs[3] = str(info_port)
tArgs[4] = "start"
mArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
mArgs[0] = tArgs.getClass()
method = thrift_server_clz.getDeclaredMethod('doMain', mArgs)
method.setAccessible(True)
args = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)
# start server in separate thread
args[0] = tArgs
self.thrift_server_thread = threading.Thread(target=method.invoke, args=[self._thriftServer, args])
self.thrift_server_thread.setDaemon(True)
self.thrift_server_thread.start()
当然,hbase 和 thrift jar 应该通过 spark submit 传递:
--jars path/to/jar1.jar,path/to/jar2.jar, --conf spark.driver.userClassPathFirst=true