Index: ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h =================================================================== RCS file: /home/src/ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h,v retrieving revision 1.7 retrieving revision 1.7.54.1 diff -u -r1.7 -r1.7.54.1 --- ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h 5 Aug 2002 12:34:11 -0000 1.7 +++ ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.h 7 Jun 2004 19:36:40 -0000 1.7.54.1 @@ -2,7 +2,7 @@ /** * @file ECG_CDR_Message_Receiver.h * - * $Id: ECG_CDR_Message_Receiver.h,v 1.7 2002/08/05 12:34:11 marina Exp $ + * $Id: ECG_CDR_Message_Receiver.h,v 1.7.54.1 2004/06/07 19:36:40 coryan Exp $ * * @author Carlos O'Ryan (coryan@cs.wustl.edu) * @author Marina Spivak (marina@atdesk.com) @@ -22,6 +22,7 @@ #include "tao/Environment.h" #include "ace/Hash_Map_Manager.h" #include "ace/INET_Addr.h" +#include "ace/Copy_Disabled.h" /** * @class TAO_ECG_CDR_Processor @@ -49,6 +50,7 @@ * keep track of the incoming data. */ class TAO_ECG_UDP_Request_Entry + : private ACE_Copy_Disabled { public: enum { @@ -84,11 +86,6 @@ char* fragment_buffer (CORBA::ULong fragment_offset); private: - - TAO_ECG_UDP_Request_Entry (const TAO_ECG_UDP_Request_Entry & rhs); - TAO_ECG_UDP_Request_Entry& operator= (const TAO_ECG_UDP_Request_Entry & rhs); - -private: /// This attributes should remain constant in all the fragments, used /// for validation.... CORBA::Boolean byte_order_; @@ -169,6 +166,8 @@ int handle_input (ACE_SOCK_Dgram& dgram, TAO_ECG_CDR_Processor *cdr_processor); + class Requests; + private: enum { @@ -177,7 +176,6 @@ }; struct Mcast_Header; - class Requests; typedef ACE_Hash_Map_Manager, purging incomplete requests as needed. */ TAO_ECG_UDP_Request_Entry ** get_request (CORBA::ULong request_id); + + //@{ + /** + * @name Accessor for the expected range of request ids. + */ + CORBA::ULong id_range_low() const; + CORBA::ULong id_range_high() const; + //@} private: + /** + * @brief Determine the "position" of a request ids with respect + * to the current range of expected request ids. + */ + void classify_request_id( + CORBA::ULong request_id, + bool & in_expected_range, + bool & higher_than_expected_range, + bool & causes_range_reset) const; + + /** + * @brief Reinitialize the range of expected request ids to start at + * request_id, return the corresponding entry. + */ + TAO_ECG_UDP_Request_Entry ** reset_range(CORBA::ULong request_id); + + /** + * @brief Move the range of expected request ids to include + * request_id, return the corresponding entry. + */ + TAO_ECG_UDP_Request_Entry ** move_range(CORBA::ULong request_id); + + /** + * @brief Purge all the pending requests. + */ + void purge_all(); + + /** + * @brief Purge the request at the position idx. + */ + void purge_one(size_t idx); + + /** + * @brief Compute the array index for request_id + */ + size_t compute_index(CORBA::ULong request_id) const; - /// Delete any outstanding requests with ids in the range - /// [, ] from and - /// and reset their slots. - void purge_requests (CORBA::ULong purge_first, - CORBA::ULong purge_last); - - Requests & operator= (const Requests &rhs); - Requests (const Requests &rhs); - private: /// Array, used in a circular fashion, that stores partially received /// requests (and info on which requests have been fully received @@ -292,6 +326,9 @@ CORBA::ULong id_range_low_; CORBA::ULong id_range_high_; //@} + + /// The offset in the fragmented_requests_ circular buffer + size_t offset_; /// Minimum range shifting amount. size_t min_purge_count_; Index: ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.i =================================================================== RCS file: /home/src/ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.i,v retrieving revision 1.5 retrieving revision 1.5.54.1 diff -u -r1.5 -r1.5.54.1 --- ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.i 5 Aug 2002 12:34:11 -0000 1.5 +++ ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.i 7 Jun 2004 19:36:41 -0000 1.5.54.1 @@ -1,4 +1,4 @@ -// $Id: ECG_CDR_Message_Receiver.i,v 1.5 2002/08/05 12:34:11 marina Exp $ +// $Id: ECG_CDR_Message_Receiver.i,v 1.5.54.1 2004/06/07 19:36:41 coryan Exp $ ACE_INLINE TAO_ECG_CDR_Message_Receiver::Requests::Requests (void) @@ -6,10 +6,25 @@ , size_ (0) , id_range_low_ (0) , id_range_high_ (0) + , offset_ (0) , min_purge_count_ (0) { } +ACE_INLINE CORBA::ULong +TAO_ECG_CDR_Message_Receiver::Requests:: +id_range_low() const +{ + return id_range_low_; +} + +ACE_INLINE CORBA::ULong +TAO_ECG_CDR_Message_Receiver::Requests:: +id_range_high() const +{ + return id_range_high_; +} + // **************************************************************** ACE_INLINE @@ -60,8 +75,3 @@ TAO_ECG_Refcounted_Endpoint empty_endpoint; this->ignore_from_ = empty_endpoint; } - - - - - Index: ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp =================================================================== RCS file: /home/src/ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp,v retrieving revision 1.9 retrieving revision 1.9.2.1 diff -u -r1.9 -r1.9.2.1 --- ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp 18 Mar 2004 01:58:45 -0000 1.9 +++ ACE_wrappers/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp 7 Jun 2004 19:36:40 -0000 1.9.2.1 @@ -1,4 +1,4 @@ -// $Id: ECG_CDR_Message_Receiver.cpp,v 1.9 2004/03/18 01:58:45 coryan Exp $ +// $Id: ECG_CDR_Message_Receiver.cpp,v 1.9.2.1 2004/06/07 19:36:40 coryan Exp $ #include "ECG_CDR_Message_Receiver.h" #include "ECG_CDR_Message_Sender.h" @@ -8,12 +8,13 @@ #include "tao/Exception.h" #include "ace/SOCK_Dgram.h" +#include + #if !defined(__ACE_INLINE__) #include "ECG_CDR_Message_Receiver.i" #endif /* __ACE_INLINE__ */ -ACE_RCSID(Event, ECG_CDR_Message_Receiver, "$Id: ECG_CDR_Message_Receiver.cpp,v 1.9 2004/03/18 01:58:45 coryan Exp $") - +ACE_RCSID(Event, ECG_CDR_Message_Receiver, "$Id: ECG_CDR_Message_Receiver.cpp,v 1.9.2.1 2004/06/07 19:36:40 coryan Exp $") TAO_ECG_CDR_Processor::~TAO_ECG_CDR_Processor (void) { @@ -125,8 +126,85 @@ { return this->payload_.rd_ptr () + fragment_offset; } + // **************************************************************** +inline int request_id_position_normalized( + CORBA::ULong range_end, CORBA::ULong x) +{ + if (x < range_end) + { + return 0; + } + + CORBA::ULong const half_max = + std::numeric_limits::max() / 2; + + if (x < half_max + range_end / 2) { + return 1; + } + + return -1; +} + +/** + * @brief Determine the position of a request ID with respect to a + * range. + * + * @return < 0 if x is before the [range_begin,range_end) range + * == 0 if x is in the range + * > 0 if x is after the range + */ +inline int request_id_position( + CORBA::ULong range_begin, CORBA::ULong range_end, + CORBA::ULong x) +{ + return request_id_position_normalized( + range_end - range_begin, x - range_begin); +} + +void TAO_ECG_CDR_Message_Receiver::Requests:: +classify_request_id( + CORBA::ULong request_id, + bool & in_expected_range, + bool & higher_than_expected_range, + bool & causes_range_reset) const +{ + int r = request_id_position( + id_range_low_, id_range_high_, request_id); + + if (r > 0) + { + in_expected_range = false; + higher_than_expected_range = true; + causes_range_reset = false; + return; + } + + if (r == 0) + { + in_expected_range = true; + higher_than_expected_range = false; + causes_range_reset = false; + return; + } + + in_expected_range = false; + higher_than_expected_range = false; + + r = request_id_position( + id_range_low_, id_range_high_, request_id + 2 * size_); + + if (r < 0) + { + causes_range_reset = true; + } + else + { + causes_range_reset = false; + } +} + int TAO_ECG_CDR_Message_Receiver::Requests::init (size_t size, size_t min_purge_count) @@ -141,7 +219,8 @@ this->size_ = size; this->id_range_low_ = 0; - this->id_range_high_ = size - 1; + this->id_range_high_ = size; + this->offset_ = 0; this->min_purge_count_ = min_purge_count; for (size_t i = 0; i < size; ++i) @@ -154,88 +233,122 @@ TAO_ECG_CDR_Message_Receiver::Requests::~Requests (void) { - for (size_t i = 0; i < this->size_; ++i) - { - TAO_ECG_UDP_Request_Entry* request = - this->fragmented_requests_[i]; + purge_all(); + delete [] this->fragmented_requests_; +} - if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_) - delete request; - } +TAO_ECG_UDP_Request_Entry ** +TAO_ECG_CDR_Message_Receiver::Requests:: +get_request (CORBA::ULong request_id) +{ + bool in_expected_range, higher_than_expected_range, causes_range_reset; - delete [] this->fragmented_requests_; + classify_request_id( + request_id, + in_expected_range, higher_than_expected_range, causes_range_reset); + + if (causes_range_reset) + { + // This message is so old that we assume the sequence numbers have + // been reset by the sender, possibly because the sender has + // restarted: + return reset_range(request_id); + } + + if (in_expected_range) + { + // Regular case, just return whatever fragment we need to: + return this->fragmented_requests_ + compute_index(request_id); + } + + if (!higher_than_expected_range) + { + // An old message but not old enough to restart the sequence + // numbers. + return 0; + } - this->fragmented_requests_ = 0; - this->size_ = 0; - this->id_range_low_ = 0; - this->id_range_high_ = 0; + // The message is not in the expected range and in fact is higher + // than the expected range. We need to move the range. + return move_range(request_id); + + return this->fragmented_requests_ + compute_index(request_id); +} + +TAO_ECG_UDP_Request_Entry ** +TAO_ECG_CDR_Message_Receiver::Requests:: +reset_range(CORBA::ULong request_id) +{ + purge_all(); + id_range_low_ = request_id; + id_range_high_ = request_id + size_; + offset_ = 0; + + // ACE_ASSERT(compute_index(request_id) == 0); + return fragmented_requests_; } TAO_ECG_UDP_Request_Entry ** -TAO_ECG_CDR_Message_Receiver::Requests::get_request (CORBA::ULong request_id) +TAO_ECG_CDR_Message_Receiver::Requests:: +move_range(CORBA::ULong request_id) { - if (request_id < this->id_range_low_) - // is below the current range. + CORBA::ULong new_slots_needed = request_id - this->id_range_high_; + + if (new_slots_needed < this->min_purge_count_) + new_slots_needed = this->min_purge_count_; + + if (new_slots_needed > this->size_) + { + return reset_range(request_id); + } + + while (new_slots_needed > 0) + { + purge_one(compute_index(id_range_low_)); + offset_++; + if (offset_ == size_) { - if (request_id + 2 * this->size_ < id_range_low_) - { - // The request is *WAY* out of range, most likely this is - // caused by a restarted sender (which was unlucky enough to - // reuse the same socket.) Treat this as a special case: - purge_requests (id_range_low_, id_range_high_); - id_range_low_ = request_id; - id_range_high_ = request_id + this->size_; - return this->fragmented_requests_; - } - return 0; + offset_ = 0; } + id_range_low_++; + id_range_high_++; + new_slots_needed--; + } - if (request_id > this->id_range_high_) - // is above the current range - need to shift the range - // to include it. - { - CORBA::ULong new_slots_needed = request_id - this->id_range_high_; - - if (new_slots_needed < this->min_purge_count_) - new_slots_needed = this->min_purge_count_; - - if (new_slots_needed > this->size_) - // Shifting the range by more than the size of array. - { - this->purge_requests (this->id_range_low_, this->id_range_high_); - this->id_range_high_ = request_id; - this->id_range_low_ = request_id - this->size_ + 1; - } - else - { - this->purge_requests (this->id_range_low_, - this->id_range_low_ + new_slots_needed - 1); - this->id_range_high_ += new_slots_needed; - this->id_range_low_ += new_slots_needed; - } - } + return fragmented_requests_ + compute_index(request_id); +} - // Return array location for . - int index = request_id % this->size_; - return this->fragmented_requests_ + index; +void TAO_ECG_CDR_Message_Receiver::Requests:: +purge_all() +{ + for (size_t i = 0; i != this->size_; ++i) + { + purge_one(i); + } } +void TAO_ECG_CDR_Message_Receiver::Requests:: +purge_one(size_t idx) +{ + // Notice this is a reference + TAO_ECG_UDP_Request_Entry * & request = + this->fragmented_requests_[idx]; + + if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_) + delete request; -void -TAO_ECG_CDR_Message_Receiver::Requests::purge_requests ( - CORBA::ULong purge_first, - CORBA::ULong purge_last) -{ - for (CORBA::ULong i = purge_first; i <= purge_last; ++i) - { - size_t index = i % this->size_; - if (this->fragmented_requests_[index] - != &TAO_ECG_CDR_Message_Receiver::Request_Completed_) - { - delete this->fragmented_requests_[index]; - } - this->fragmented_requests_[index] = 0; - } + request = 0; +} + +size_t TAO_ECG_CDR_Message_Receiver::Requests:: +compute_index(CORBA::ULong request_id) const +{ + size_t idx = (request_id - id_range_low_) + offset_; + if (idx >= size_) + { + idx -= size_; + } + return idx; } // **************************************************************** @@ -351,9 +464,9 @@ } if (*request == &Request_Completed_) { - ACE_DEBUG ((EC_FORMAT (INFO, - "Received duplicate mcast fragment. " - "(Request already complete)."))); + // ACE_DEBUG ((EC_FORMAT (INFO, + // "Received duplicate mcast fragment. " + // "(Request already complete)."))); return 0; } if (*request != 0)