Bug 1552

Summary: IIOP Gateway not flexible enough
Product: TAO Reporter: Johnny Willemsen <jwillemsen>
Component: Real-Time Event ServiceAssignee: DOC Center Support List (internal) <tao-support>
Status: NEW ---    
Severity: enhancement    
Priority: P5    
Version: 1.3.3   
Hardware: x86   
OS: Windows 2000   
Attachments: implementation
definition

Description Johnny Willemsen 2003-07-15 09:09:38 CDT
This is a place holder for work we are currently doing and we need a 
placeholder for discussions and feedback on this. 

When using the EC_Gateway_IIOP to federate two event channels we found some 
places for enhancement. The gateway tries to send each event to the event 
channel that is consumer of the gateway. When that ec is down, it tries to 
reach that ec each time again, resulting in a high overhead. Another issue we 
need to investigate is the behaviour of the code in the EC that wants to call 
the observer to update its QOS but that observer is unreachable.

We propose to add some service config options to the TAO_EC_Gateway_IIOP just 
like we have consumer and supplier control. The options are 
ECGIIOPConsumerECControl, ECGIIOPConsumerECControlPeriod and 
ECGIIOPConsumerECControlTimeout. We call it ConsumerEC because we are 
currently interested in improving the behaviour for the event channel that is 
consumer, but maybe later we want to add also things for the EC that is 
supplier for this gateway. 

The setup we do is that the gateway is placed on the host where the supplier 
ec is located, this is different than in the normal documentation, there the 
gateway is located on the host that is consumer. We also will rename the terms 
local and remote to consumer_ec and supplier_ec in the iiop gateway because in 
fact it doesn't matter what is local and remote, it matters if it is consumer 
or supplier.

For the types of ConsumerECControl we propose to have:
- basic (just try each time)
- reactive (on a failure just stop sending events, we could then also 
unsubscribe from the supplier ec)
- reconnect (tries to reconnect at a certain interval, when the ec is 
reachable again we will query for QOS again and then start sending events 
again)

We want to add a base TAO_ECG_ConsumerEventChannelControl and derived classes 
with the special controls (fe TAO_ECG_Reactive_ConsumerEventChannelControl).

We have looked at reusing code from the consumer/supplier control but this is 
hard because those controls have an Event_Channel_Base pointer and the gateway 
works without using the event channel itself.
Comment 1 Johnny Willemsen 2003-07-16 02:51:31 CDT
We are going the following way:
* TAO_EC_Gateway_IIOP is derived from ACE_Service_Object to make it 
configurable using the service configurator framework
* methods in TAO_EC_Gateway_IIOP are splitted so that the new controls can 
work better and don't have to implemented much
* TAO_ECG_ConsumerEventChannelControl is added as default control, this just 
adds to publish each event even when we detected a connection loss.
* TAO_ECG_Reactive_ConsumerEventChannelControl is derived from 
TAO_ECG_ConsumerEventChannelControl and implements a reactive control. This 
closes all connections to consumer ec in case of a connection loss.
* TAO_ECG_Reconnect_ConsumerEventChannelControl is derived from 
TAO_ECG_ConsumerEventChannelControl and implements a reconnect control. This 
will try to reconnect to the consumer ec. If connected again, it will 
subscribe itself again.

Comment 2 Johnny Willemsen 2003-07-18 09:42:34 CDT
Diff for proposed reactive observer strategy. The observer is removed from the 
observer list when it is unreachable. Derived from basic observer strategy.

Index: EC_Default_Factory.cpp
===================================================================
RCS file: /project/cvs-repository/ACE_wrappers-
repository/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp,v
retrieving revision 1.42
diff -u -r1.42 EC_Default_Factory.cpp
--- EC_Default_Factory.cpp	20 Jun 2003 19:58:29 -0000	1.42
+++ EC_Default_Factory.cpp	18 Jul 2003 14:33:34 -0000
@@ -169,6 +169,8 @@
                 this->observer_ = 0;
               else if (ACE_OS::strcasecmp (opt, ACE_LIB_TEXT("basic")) == 0)
                 this->observer_ = 1;
+              else if (ACE_OS::strcasecmp (opt, ACE_LIB_TEXT("reactive")) == 
0)
+                this->observer_ = 2;
               else
                   this->unsupported_option_value ("-ECObserver", opt);
               arg_shifter.consume_arg ();
@@ -619,6 +621,13 @@
       ACE_Lock* lock = 0;
       ACE_NEW_RETURN (lock, ACE_Lock_Adapter<TAO_SYNCH_MUTEX>, 0);
       return new TAO_EC_Basic_ObserverStrategy (ec, lock);
+    }
+  else if (this->observer_ == 2)
+    {
+      // @@ The lock should also be under control of the user...
+      ACE_Lock* lock = 0;
+      ACE_NEW_RETURN (lock, ACE_Lock_Adapter<TAO_SYNCH_MUTEX>, 0);
+      return new TAO_EC_Reactive_ObserverStrategy (ec, lock);
     }
   return 0;
 }
Index: EC_ObserverStrategy.cpp
===================================================================
RCS file: /project/cvs-repository/ACE_wrappers-
repository/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp,v
retrieving revision 1.26
diff -u -r1.26 EC_ObserverStrategy.cpp
--- EC_ObserverStrategy.cpp	16 Jul 2003 13:05:18 -0000	1.26
+++ EC_ObserverStrategy.cpp	18 Jul 2003 14:33:34 -0000
@@ -323,6 +323,141 @@
     }
 }
 
+
+// ****************************************************************
+
+TAO_EC_Reactive_ObserverStrategy::~TAO_EC_Reactive_ObserverStrategy (void)
+{
+}
+
+void
+TAO_EC_Reactive_ObserverStrategy::supplier_qos_update (
+                                        TAO_EC_ProxyPushConsumer *consumer
+                                        ACE_ENV_ARG_DECL)
+{
+  if (consumer->publications ().is_gateway)
+    return;
+
+  RtecEventChannelAdmin::SupplierQOS s_qos;
+  this->fill_qos (s_qos ACE_ENV_ARG_PARAMETER);
+  ACE_CHECK;
+
+  Observer_Map copy;
+  int size = this->create_observer_map (copy ACE_ENV_ARG_PARAMETER);
+  ACE_CHECK;
+
+  Observer_Map_Iterator end = copy.end ();
+  for (Observer_Map_Iterator i  = copy.begin ();
+       i != end;
+       ++i)
+    {
+      Observer_Entry& entry = (*i).int_id_;
+      int ok = 1;
+      ACE_TRY_EX(update)
+        {
+          entry.observer->update_supplier (s_qos ACE_ENV_ARG_PARAMETER);
+          ACE_TRY_CHECK_EX(update);
+        }
+      ACE_CATCHANY
+        {
+          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+                               "TAO_EC_Reactive_ObserverStrategy::supplier_qos
_update");
+          ok = 0;
+        }
+      ACE_ENDTRY;
+
+      if ( !ok )
+        {
+          ACE_TRY_EX(remove)
+            {
+              this->remove_observer(entry.handle ACE_ENV_ARG_PARAMETER);
+              ACE_TRY_CHECK_EX(remove);
+            }
+          ACE_CATCHANY
+            {
+            }
+          ACE_ENDTRY;
+        }
+    }
+}
+
+void
+TAO_EC_Reactive_ObserverStrategy::consumer_qos_update (
+                                        TAO_EC_ProxyPushSupplier *supplier
+                                        ACE_ENV_ARG_DECL)
+{
+  if (supplier->subscriptions ().is_gateway)
+    return;
+
+  RtecEventChannelAdmin::ConsumerQOS c_qos;
+  this->fill_qos (c_qos ACE_ENV_ARG_PARAMETER);
+  ACE_CHECK;
+
+  Observer_Map copy;
+  int size = this->create_observer_map (copy ACE_ENV_ARG_PARAMETER);
+  ACE_CHECK;
+
+  Observer_Map_Iterator end = copy.end ();
+  for (Observer_Map_Iterator i  = copy.begin ();
+       i != end;
+       ++i)
+    {
+      Observer_Entry& entry = (*i).int_id_;
+      int ok = 1;
+      ACE_TRY_EX(update)
+        {
+          entry.observer->update_consumer (c_qos ACE_ENV_ARG_PARAMETER);
+          ACE_TRY_CHECK_EX(update);
+        }
+      ACE_CATCHANY
+        {
+          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+                               "TAO_EC_Reactive_ObserverStrategy::consumer_qos
_update");
+          ok = 0;
+        }
+      ACE_ENDTRY;
+
+      if ( !ok )
+      {
+        ACE_TRY_EX(remove)
+          {
+            this->remove_observer(entry.handle ACE_ENV_ARG_PARAMETER);
+            ACE_TRY_CHECK_EX(remove);
+          }
+        ACE_CATCHANY
+          {
+          }
+        ACE_ENDTRY;
+      }
+    }
+}
+
+int
+TAO_EC_Reactive_ObserverStrategy::create_observer_map (Observer_Map &map
+                                                       ACE_ENV_ARG_DECL)
+{
+  ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
+                 RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR
());
+  ACE_CHECK_RETURN (0);
+
+  Observer_Map_Iterator end = this->observers_.end ();
+  for (Observer_Map_Iterator i  = this->observers_.begin ();
+       i != end;
+       ++i)
+    {
+      Observer_Entry& entry = (*i).int_id_;
+      Observer_Entry copy (entry.handle,
+                            RtecEventChannelAdmin::Observer::_duplicate 
(entry.observer.in ()));
+      if (map.bind (copy.handle, copy) == -1)
+      {
+        map.unbind_all();
+        return 0;
+      }
+    }
+
+  return map.current_size();
+}
+
 // ****************************************************************
 
 void
Index: EC_ObserverStrategy.h
===================================================================
RCS file: /project/cvs-repository/ACE_wrappers-
repository/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h,v
retrieving revision 1.23
diff -u -r1.23 EC_ObserverStrategy.h
--- EC_ObserverStrategy.h	16 Jul 2003 13:05:18 -0000	1.23
+++ EC_ObserverStrategy.h	18 Jul 2003 14:33:34 -0000
@@ -5,6 +5,8 @@
  *  $Id: EC_ObserverStrategy.h,v 1.23 2003/07/16 13:05:18 jwillemsen Exp $
  *
  *  @author Carlos O'Ryan (coryan@cs.wustl.edu)
+ *  @author Johnny Willemsen  (jwillemsen@remedy.nl)
+ *  @author Kees van Marle  (kvmarle@remedy.nl)
  *
  * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and
  * other members of the DOC group. More details can be found in:
@@ -190,13 +192,9 @@
    */
   struct Observer_Entry
   {
-    // The ACE_INLINE macros here are to keep g++ 2.7.X happy,
-    // otherwise it thinks they are used as inline functions before
-    // being used as such.... Apparently in the template code for the
-    // Hash_Map_Manager.
-    ACE_INLINE Observer_Entry (void);
-    ACE_INLINE Observer_Entry (RtecEventChannelAdmin::Observer_Handle h,
-                               RtecEventChannelAdmin::Observer_ptr o);
+    Observer_Entry (void);
+    Observer_Entry (RtecEventChannelAdmin::Observer_Handle h,
+                    RtecEventChannelAdmin::Observer_ptr o);
 
     /// The handle
     RtecEventChannelAdmin::Observer_Handle handle;
@@ -222,12 +220,12 @@
   /// Helpers.
   //@{
   /// Recompute EC consumer subscriptions and send them out to all observers.
-  void consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier
-                            ACE_ENV_ARG_DECL);
+  virtual void consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier
+                                    ACE_ENV_ARG_DECL);
 
   /// Recompute EC supplier publications and send them out to all observers.
-  void supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer
-                            ACE_ENV_ARG_DECL);
+  virtual void supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer
+                                    ACE_ENV_ARG_DECL);
 
   /// Compute consumer QOS.
   void fill_qos (RtecEventChannelAdmin::ConsumerQOS &qos
@@ -255,6 +253,56 @@
 
   /// Keep the set of Observers
   Observer_Map observers_;
+};
+
+// ****************************************************************
+
+/**
+ * @class TAO_EC_Reactive_ObserverStrategy
+ *
+ * @brief A reactive observer strategy.
+ *
+ * This class simply keeps the information about the current list
+ * of observers, whenever the list of consumers and/or suppliers
+ * changes in queries the EC, computes the global subscription
+ * and/or publication list and sends the update message to all the
+ * observers. When an observer isn't reachable it is removed from
+ * the observer list.
+ *
+ * <H2>Memory Management</H2>
+ * It assumes ownership of the <lock>, but not of the
+ * Event_Channel.
+ */
+class TAO_RTEvent_Export TAO_EC_Reactive_ObserverStrategy :
+  public TAO_EC_Basic_ObserverStrategy
+{
+public:
+  /// Constructor
+  TAO_EC_Reactive_ObserverStrategy (TAO_EC_Event_Channel_Base* ec,
+                                    ACE_Lock* lock);
+
+  /// Destructor
+  virtual ~TAO_EC_Reactive_ObserverStrategy (void);
+
+protected:
+  /// Helpers.
+  //@{
+  /// Recompute EC consumer subscriptions and send them out to all observers.
+  virtual void consumer_qos_update (TAO_EC_ProxyPushSupplier *supplier
+                                    ACE_ENV_ARG_DECL);
+
+  /// Recompute EC supplier publications and send them out to all observers.
+  virtual void supplier_qos_update (TAO_EC_ProxyPushConsumer *consumer
+                                    ACE_ENV_ARG_DECL);
+
+  /**
+   * Copies all current observers into a map and passes it
+   * back to the caller through @a map.
+   * @return Returns the size of the map.
+   */
+  int create_observer_map (Observer_Map &map
+                           ACE_ENV_ARG_DECL);
+  //@}
 };
 
 // ****************************************************************
Index: EC_ObserverStrategy.i
===================================================================
RCS file: /project/cvs-repository/ACE_wrappers-
repository/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i,v
retrieving revision 1.7
diff -u -r1.7 EC_ObserverStrategy.i
--- EC_ObserverStrategy.i	7 Jun 2003 08:52:35 -0000	1.7
+++ EC_ObserverStrategy.i	18 Jul 2003 14:33:34 -0000
@@ -49,6 +49,16 @@
 // ****************************************************************
 
 ACE_INLINE
+TAO_EC_Reactive_ObserverStrategy::
+      TAO_EC_Reactive_ObserverStrategy (TAO_EC_Event_Channel_Base* ec,
+                                     ACE_Lock* lock)
+  :  TAO_EC_Basic_ObserverStrategy (ec, lock)
+{
+}
+
+// ****************************************************************
+
+ACE_INLINE
 TAO_EC_Accumulate_Supplier_Headers::
     TAO_EC_Accumulate_Supplier_Headers 
(TAO_EC_Basic_ObserverStrategy::Headers &h)
   : headers_ (h)

Comment 3 Johnny Willemsen 2003-07-20 07:27:13 CDT
Created attachment 221 [details]
implementation
Comment 4 Johnny Willemsen 2003-07-20 07:27:53 CDT
Created attachment 222 [details]
definition
Comment 5 Johnny Willemsen 2003-07-20 07:29:15 CDT
Instead of deriving the iiop gateway from ace_service_object, I have created 
an IIOP_Gateway_Factory. This parses the service config options and is then 
used by the IIOP Gateway. This keeps the IIOP Gateway much cleaner and 
simpler. Attachements 221 & 222 are a proposed gateway implementation
Comment 6 Johnny Willemsen 2003-07-29 12:25:19 CDT
accept
Comment 7 Johnny Willemsen 2003-08-04 14:49:49 CDT
Another thing to do is splitting the observer interface in a consumer and 
supplier part. The current observer subscribes on consumer and supplier QoS 
but only uses the consumer side. Splitting this would makes it possible when 
only the consumer part is needed, to have a better through put, but when also 
supplier is needed, this could be a different EC then the consumer EC (like 
normally). Then we make an option in the IIOP gateway that it uses the 
supplier QoS to already filter the consumer qos list before it is passed to 
the supplier ec.
Comment 8 Johnny Willemsen 2006-08-16 09:07:00 CDT
to pool