Summary: | IIOP Gateway not flexible enough | ||
---|---|---|---|
Product: | TAO | Reporter: | Johnny Willemsen <jwillemsen> |
Component: | Real-Time Event Service | Assignee: | DOC Center Support List (internal) <tao-support> |
Status: | NEW --- | ||
Severity: | enhancement | ||
Priority: | P5 | ||
Version: | 1.3.3 | ||
Hardware: | x86 | ||
OS: | Windows 2000 | ||
Attachments: |
implementation
definition |
Description
Johnny Willemsen
2003-07-15 09:09:38 CDT
We are going the following way: * TAO_EC_Gateway_IIOP is derived from ACE_Service_Object to make it configurable using the service configurator framework * methods in TAO_EC_Gateway_IIOP are splitted so that the new controls can work better and don't have to implemented much * TAO_ECG_ConsumerEventChannelControl is added as default control, this just adds to publish each event even when we detected a connection loss. * TAO_ECG_Reactive_ConsumerEventChannelControl is derived from TAO_ECG_ConsumerEventChannelControl and implements a reactive control. This closes all connections to consumer ec in case of a connection loss. * TAO_ECG_Reconnect_ConsumerEventChannelControl is derived from TAO_ECG_ConsumerEventChannelControl and implements a reconnect control. This will try to reconnect to the consumer ec. If connected again, it will subscribe itself again. Diff for proposed reactive observer strategy. The observer is removed from the observer list when it is unreachable. Derived from basic observer strategy. Index: EC_Default_Factory.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers- repository/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp,v retrieving revision 1.42 diff -u -r1.42 EC_Default_Factory.cpp --- EC_Default_Factory.cpp 20 Jun 2003 19:58:29 -0000 1.42 +++ EC_Default_Factory.cpp 18 Jul 2003 14:33:34 -0000 @@ -169,6 +169,8 @@ this->observer_ = 0; else if (ACE_OS::strcasecmp (opt, ACE_LIB_TEXT("basic")) == 0) this->observer_ = 1; + else if (ACE_OS::strcasecmp (opt, ACE_LIB_TEXT("reactive")) == 0) + this->observer_ = 2; else this->unsupported_option_value ("-ECObserver", opt); arg_shifter.consume_arg (); @@ -619,6 +621,13 @@ ACE_Lock* lock = 0; ACE_NEW_RETURN (lock, ACE_Lock_Adapter<TAO_SYNCH_MUTEX>, 0); return new TAO_EC_Basic_ObserverStrategy (ec, lock); + } + else if (this->observer_ == 2) + { + // @@ The lock should also be under control of the user... + ACE_Lock* lock = 0; + ACE_NEW_RETURN (lock, ACE_Lock_Adapter<TAO_SYNCH_MUTEX>, 0); + return new TAO_EC_Reactive_ObserverStrategy (ec, lock); } return 0; } Index: EC_ObserverStrategy.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers- repository/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp,v retrieving revision 1.26 diff -u -r1.26 EC_ObserverStrategy.cpp --- EC_ObserverStrategy.cpp 16 Jul 2003 13:05:18 -0000 1.26 +++ EC_ObserverStrategy.cpp 18 Jul 2003 14:33:34 -0000 @@ -323,6 +323,141 @@ } } + +// **************************************************************** + +TAO_EC_Reactive_ObserverStrategy::~TAO_EC_Reactive_ObserverStrategy (void) +{ +} + +void +TAO_EC_Reactive_ObserverStrategy::supplier_qos_update ( + TAO_EC_ProxyPushConsumer *consumer + ACE_ENV_ARG_DECL) +{ + if (consumer->publications ().is_gateway) + return; + + RtecEventChannelAdmin::SupplierQOS s_qos; + this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + Observer_Map copy; + int size = this->create_observer_map (copy ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + Observer_Map_Iterator end = copy.end (); + for (Observer_Map_Iterator i = copy.begin (); + i != end; + ++i) + { + Observer_Entry& entry = (*i).int_id_; + int ok = 1; + ACE_TRY_EX(update) + { + entry.observer->update_supplier (s_qos ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK_EX(update); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "TAO_EC_Reactive_ObserverStrategy::supplier_qos _update"); + ok = 0; + } + ACE_ENDTRY; + + if ( !ok ) + { + ACE_TRY_EX(remove) + { + this->remove_observer(entry.handle ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK_EX(remove); + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + } + } +} + +void +TAO_EC_Reactive_ObserverStrategy::consumer_qos_update ( + TAO_EC_ProxyPushSupplier *supplier + ACE_ENV_ARG_DECL) +{ + if (supplier->subscriptions ().is_gateway) + return; + + RtecEventChannelAdmin::ConsumerQOS c_qos; + this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + Observer_Map copy; + int size = this->create_observer_map (copy ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + Observer_Map_Iterator end = copy.end (); + for (Observer_Map_Iterator i = copy.begin (); + i != end; + ++i) + { + Observer_Entry& entry = (*i).int_id_; + int ok = 1; + ACE_TRY_EX(update) + { + entry.observer->update_consumer (c_qos ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK_EX(update); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "TAO_EC_Reactive_ObserverStrategy::consumer_qos _update"); + ok = 0; + } + ACE_ENDTRY; + + if ( !ok ) + { + ACE_TRY_EX(remove) + { + this->remove_observer(entry.handle ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK_EX(remove); + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + } + } +} + +int +TAO_EC_Reactive_ObserverStrategy::create_observer_map (Observer_Map &map + ACE_ENV_ARG_DECL) +{ + ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK_RETURN (0); + + Observer_Map_Iterator end = this->observers_.end (); + for (Observer_Map_Iterator i = this->observers_.begin (); + i != end; + ++i) + { + Observer_Entry& entry = (*i).int_id_; + Observer_Entry copy (entry.handle, + RtecEventChannelAdmin::Observer::_duplicate (entry.observer.in ())); + if (map.bind (copy.handle, copy) == -1) + { + map.unbind_all(); + return 0; + } + } + + return map.current_size(); +} + // **************************************************************** void Index: EC_ObserverStrategy.h =================================================================== RCS file: /project/cvs-repository/ACE_wrappers- repository/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h,v retrieving revision 1.23 diff -u -r1.23 EC_ObserverStrategy.h --- EC_ObserverStrategy.h 16 Jul 2003 13:05:18 -0000 1.23 +++ EC_ObserverStrategy.h 18 Jul 2003 14:33:34 -0000 @@ -5,6 +5,8 @@ * $Id: EC_ObserverStrategy.h,v 1.23 2003/07/16 13:05:18 jwillemsen Exp $ * * @author Carlos O'Ryan (coryan@cs.wustl.edu) + * @author Johnny Willemsen (jwillemsen@remedy.nl) + * @author Kees van Marle (kvmarle@remedy.nl) * * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and * other members of the DOC group. More details can be found in: @@ -190,13 +192,9 @@ */ struct Observer_Entry { - // The ACE_INLINE macros here are to keep g++ 2.7.X happy, - // otherwise it thinks they are used as inline functions before - // being used as such.... Apparently in the template code for the - // Hash_Map_Manager. - ACE_INLINE Observer_Entry (void); - ACE_INLINE Observer_Entry (RtecEventChannelAdmin::Observer_Handle h, - RtecEventChannelAdmin::Observer_ptr o); + Observer_Entry (void); + Observer_Entry (RtecEventChannelAdmin::Observer_Handle h, + RtecEventChannelAdmin::Observer_ptr o); /// The handle RtecEventChannelAdmin::Observer_Handle handle; @@ -222,12 +220,12 @@ /// Helpers. //@{ /// Recompute EC consumer subscriptions and send them out to all observers. - void consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier - ACE_ENV_ARG_DECL); + virtual void consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier + ACE_ENV_ARG_DECL); /// Recompute EC supplier publications and send them out to all observers. - void supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer - ACE_ENV_ARG_DECL); + virtual void supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer + ACE_ENV_ARG_DECL); /// Compute consumer QOS. void fill_qos (RtecEventChannelAdmin::ConsumerQOS &qos @@ -255,6 +253,56 @@ /// Keep the set of Observers Observer_Map observers_; +}; + +// **************************************************************** + +/** + * @class TAO_EC_Reactive_ObserverStrategy + * + * @brief A reactive observer strategy. + * + * This class simply keeps the information about the current list + * of observers, whenever the list of consumers and/or suppliers + * changes in queries the EC, computes the global subscription + * and/or publication list and sends the update message to all the + * observers. When an observer isn't reachable it is removed from + * the observer list. + * + * <H2>Memory Management</H2> + * It assumes ownership of the <lock>, but not of the + * Event_Channel. + */ +class TAO_RTEvent_Export TAO_EC_Reactive_ObserverStrategy : + public TAO_EC_Basic_ObserverStrategy +{ +public: + /// Constructor + TAO_EC_Reactive_ObserverStrategy (TAO_EC_Event_Channel_Base* ec, + ACE_Lock* lock); + + /// Destructor + virtual ~TAO_EC_Reactive_ObserverStrategy (void); + +protected: + /// Helpers. + //@{ + /// Recompute EC consumer subscriptions and send them out to all observers. + virtual void consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier + ACE_ENV_ARG_DECL); + + /// Recompute EC supplier publications and send them out to all observers. + virtual void supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer + ACE_ENV_ARG_DECL); + + /** + * Copies all current observers into a map and passes it + * back to the caller through @a map. + * @return Returns the size of the map. + */ + int create_observer_map (Observer_Map &map + ACE_ENV_ARG_DECL); + //@} }; // **************************************************************** Index: EC_ObserverStrategy.i =================================================================== RCS file: /project/cvs-repository/ACE_wrappers- repository/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i,v retrieving revision 1.7 diff -u -r1.7 EC_ObserverStrategy.i --- EC_ObserverStrategy.i 7 Jun 2003 08:52:35 -0000 1.7 +++ EC_ObserverStrategy.i 18 Jul 2003 14:33:34 -0000 @@ -49,6 +49,16 @@ // **************************************************************** ACE_INLINE +TAO_EC_Reactive_ObserverStrategy:: + TAO_EC_Reactive_ObserverStrategy (TAO_EC_Event_Channel_Base* ec, + ACE_Lock* lock) + : TAO_EC_Basic_ObserverStrategy (ec, lock) +{ +} + +// **************************************************************** + +ACE_INLINE TAO_EC_Accumulate_Supplier_Headers:: TAO_EC_Accumulate_Supplier_Headers (TAO_EC_Basic_ObserverStrategy::Headers &h) : headers_ (h) Created attachment 221 [details]
implementation
Created attachment 222 [details]
definition
Instead of deriving the iiop gateway from ace_service_object, I have created an IIOP_Gateway_Factory. This parses the service config options and is then used by the IIOP Gateway. This keeps the IIOP Gateway much cleaner and simpler. Attachements 221 & 222 are a proposed gateway implementation accept Another thing to do is splitting the observer interface in a consumer and supplier part. The current observer subscribes on consumer and supplier QoS but only uses the consumer side. Splitting this would makes it possible when only the consumer part is needed, to have a better through put, but when also supplier is needed, this could be a different EC then the consumer EC (like normally). Then we make an option in the IIOP gateway that it uses the supplier QoS to already filter the consumer qos list before it is passed to the supplier ec. to pool |