CORBA的Event Service提供了一种分发事件通知的方式。
Event Servcie的Event Channel从一个Supplier那里得到事件,发送到一个或多个Consumer那里。
事件可以用push的方式发送,也可以用pull的方式发送,也可以混合push和pull。
Event Service的实现可以看JacORB安装目录下的\idl\omg目录中的IDL文件。
CosEventComm.idl
CosEventChannelAdmin.idl
用类图来表示:

通过这个类图,可以很清晰的得到使用Event Service的步骤。以push mode为例,
Server侧(Supplier)
1)创建一个类,继承自PushSupplierPOA,这个类就是Server侧的Supplier的实现类。
2)创建一个org.jacorb.events.EventChannelImpl的对象,它实现了EventChannel的接口。
然后向NameService注册这个Channel,注册时提供一个Channel名。
3)调用EventChannelImpl对象的for_suppliers()方法,得到一个SupplierAdmin对象。
4)调用SupplierAdmin对象的obtain_push_consumer()方法,得到一个ProxyPushConsumer对象。
5)创建1)中的Supplier的实现类的一个实例。
在上面4)得到的ProxyPushConsumer对象上调用connect_push_supplier()方法,
把Supplier的实现类的实例传给此方法。
6)现在Server侧已经连接到Event Channel,可以调用ProxyPushConsumer对象的push()方法来发送事件通知给Channel了。
Client侧(Consumer)
1)创建一个类,继承自PushConsumerPOA,这个类就是Client侧的Consumer的实现类。
这个类的push()方法,在Event Channel发送事件通知过来的时候会被调用。
在这里添加对事件通知的处理代码。
2)向NameService查询并获得Event Channel的引用(EventChannel对象),查询时需要提供Server侧注册时使用的Channel名。
3)调用EventChannel对象的for_consumers()方法,得到一个ConsumerAdmin对象。
4)调用ConsumerAdmin对象的obtain_push_supplier()方法,得到一个ProxyPushSupplier对象。
5)创建1)中的Consumer的实现类的一个实例。
在上面4)得到的ProxyPushSupplier对象上调用connect_push_consumer()方法,
把Consumer的实现类的实例传给此方法。
6)现在Client侧也已经连接到Event Channel。
这时候如果Server侧(Supplier)发送事件通知,Consumer的实现类的push()方法就会被调用了。
下面提供一个例程,运行此例子需要启动NameService,
启动方法和例程的启动方法见http://blog.csdn.net/fw0124/article/details/7188844
Server代码
1)PushSupplierImpl
package learning.corba.event.newsbroadcast.server;
import org.omg.CosEventComm.PushSupplierPOA;
public class PushSupplierImpl extends PushSupplierPOA {
public void disconnect_push_supplier() {
System.out.println("disconnect_push_supplier");
}
}
2)NewsBCServer
package learning.corba.event.newsbroadcast.server;
import java.util.Properties;
import org.jacorb.events.EventChannelImpl;
import org.omg.CORBA.Any;
import org.omg.CORBA.ORB;
import org.omg.CosEventChannelAdmin.ProxyPushConsumer;
import org.omg.CosEventChannelAdmin.SupplierAdmin;
import org.omg.CosEventComm.Disconnected;
import org.omg.CosEventComm.PushSupplierHelper;
import org.omg.CosNaming.NamingContextExt;
import org.omg.CosNaming.NamingContextExtHelper;
import org.omg.PortableServer.POA;
import org.omg.PortableServer.POAHelper;
public class NewsBCServer implements Runnable {
private ORB orb;
private ProxyPushConsumer proxyPushConsumer;
public static void main(String[] args) {
try {
Properties props = new Properties();
props.put("org.omg.PortableInterceptor.ORBInitializerClass.bidir_init",
"org.jacorb.orb.giop.BiDirConnectionInitializer");
ORB orb = ORB.init(args, props);
POA rootPoa = POAHelper.narrow(
orb.resolve_initial_references("RootPOA"));
rootPoa.the_POAManager().activate();
NamingContextExt ncExt = NamingContextExtHelper.narrow(
orb.resolve_initial_references("NameService"));
// Create event channel
EventChannelImpl eventChannel = new EventChannelImpl(orb, rootPoa);
ncExt.rebind(ncExt.to_name("NewsBC.EventChannel"),
rootPoa.servant_to_reference(eventChannel));
// Obtain the push consumer proxy object
PushSupplierImpl pushSupplierImpl = new PushSupplierImpl();
SupplierAdmin supplierAdmin = eventChannel.for_suppliers();
ProxyPushConsumer proxyPushConsumer = supplierAdmin.obtain_push_consumer();
proxyPushConsumer.connect_push_supplier(
PushSupplierHelper.narrow(rootPoa.servant_to_reference(pushSupplierImpl)));
Thread t = new Thread(new NewsBCServer(orb, proxyPushConsumer));
t.start();
orb.run();
} catch (Exception e) {
e.printStackTrace();
}
}
public NewsBCServer(ORB orb, ProxyPushConsumer proxyPushConsumer) {
this.orb = orb;
this.proxyPushConsumer = proxyPushConsumer;
}
public void run() {
int i = 0;
try {
while(!Thread.interrupted()) {
Any event = this.orb.create_any();
event.insert_long(i++);
this.proxyPushConsumer.push(event);
Thread.sleep(3000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Disconnected e) {
e.printStackTrace();
}
this.proxyPushConsumer.disconnect_push_consumer();
this.orb.shutdown(true);
}
}
Client代码
1)PushConsumerImpl
package learning.corba.event.newsbroadcast.client;
import org.omg.CORBA.Any;
import org.omg.CORBA.ORB;
import org.omg.CosEventChannelAdmin.ProxyPushConsumer;
import org.omg.CosEventChannelAdmin.ProxyPushSupplier;
import org.omg.CosEventComm.Disconnected;
import org.omg.CosEventComm.PushConsumerPOA;
public class PushConsumerImpl extends PushConsumerPOA {
private ORB orb;
private ProxyPushSupplier proxyPushSupplier;
private int eventCount = 0;
public PushConsumerImpl(ORB orb, ProxyPushSupplier proxyPushSupplier) {
this.orb = orb;
this.proxyPushSupplier = proxyPushSupplier;
}
public void disconnect_push_consumer() {
System.out.println("disconnect_push_consumer");
}
public void push(Any event) throws Disconnected {
System.out.println("event:" + event.extract_long());
if (eventCount++ > 10) {
proxyPushSupplier.disconnect_push_supplier();
orb.shutdown(false);
}
}
}
2)NewsBCClient
package learning.corba.event.newsbroadcast.client;
import java.util.Properties;
import org.omg.CORBA.ORB;
import org.omg.CosEventChannelAdmin.ConsumerAdmin;
import org.omg.CosEventChannelAdmin.EventChannel;
import org.omg.CosEventChannelAdmin.EventChannelHelper;
import org.omg.CosEventChannelAdmin.ProxyPushSupplier;
import org.omg.CosEventComm.PushConsumer;
import org.omg.CosEventComm.PushConsumerHelper;
import org.omg.CosNaming.NamingContextExt;
import org.omg.CosNaming.NamingContextExtHelper;
import org.omg.PortableServer.POA;
import org.omg.PortableServer.POAHelper;
public class NewsBCClient {
public static void main(String[] args) {
try {
Properties props = new Properties();
props.put("org.omg.PortableInterceptor.ORBInitializerClass.bidir_init",
"org.jacorb.orb.giop.BiDirConnectionInitializer");
ORB orb = ORB.init(args, props);
NamingContextExt ncExt = NamingContextExtHelper.narrow(
orb.resolve_initial_references("NameService"));
EventChannel eventChannel = EventChannelHelper.narrow(
ncExt.resolve_str("NewsBC.EventChannel"));
ConsumerAdmin consumerAdmin = eventChannel.for_consumers();
ProxyPushSupplier proxyPushSupplier = consumerAdmin.obtain_push_supplier();
POA rootPoa = POAHelper.narrow(orb
.resolve_initial_references("RootPOA"));
rootPoa.the_POAManager().activate();
PushConsumerImpl pushConsumerImpl = new PushConsumerImpl(orb, proxyPushSupplier);
PushConsumer pushConsumer = PushConsumerHelper.narrow(
rootPoa.servant_to_reference(pushConsumerImpl));
proxyPushSupplier.connect_push_consumer(pushConsumer);
orb.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}