? getpmbchanges ? pmb.patch ? TAO_IDL/libTAO_IDL_BE.so.5.3.5 ? TAO_IDL/libTAO_IDL_FE.so.5.3.5 ? TAO_IDL/tao_idl ? orbsvcs/FTRT_Event_Service/Gateway_Service/ftrtec_gateway_service ? orbsvcs/IFR_Service/IFR_Service ? orbsvcs/IFR_Service/libTAO_IFR_BE.so.1.3.5 ? orbsvcs/IFR_Service/tao_ifr ? orbsvcs/ImplRepo_Service/ImR_Activator ? orbsvcs/ImplRepo_Service/ImplRepo_Service ? orbsvcs/ImplRepo_Service/tao_imr ? orbsvcs/LifeCycle_Service/LifeCycle_Service ? orbsvcs/LoadBalancer/LoadManager ? orbsvcs/LoadBalancer/LoadMonitor ? orbsvcs/Naming_Service/NT_Naming_Service ? orbsvcs/Naming_Service/Naming_Service ? orbsvcs/Notify_Service/NT_Notify_Service ? orbsvcs/Notify_Service/Notify_Service ? orbsvcs/PSS/libTAO_PSDL.so.1.3.5 ? orbsvcs/PSS/libTAO_PSDL_Datastore.so.1.3.5 ? orbsvcs/PSS/psdl_tao ? orbsvcs/Scheduling_Service/Scheduling_Service ? orbsvcs/TAO_Service/TAO_Service ? orbsvcs/Time_Service/Time_Service_Clerk ? orbsvcs/Time_Service/Time_Service_Server ? orbsvcs/Trading_Service/Trading_Service ? orbsvcs/orbsvcs/ORBSVCS_COMPONENTS.list ? orbsvcs/orbsvcs/libTAO_AV.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosConcurrency.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosEvent.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosLifeCycle.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosLoadBalancing.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosNaming.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosNotification.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosProperty.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosTime.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_CosTrading.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_DsEventLogAdmin.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_DsLogAdmin.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_DsNotifyLogAdmin.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_ETCL.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_FTORB_Utils.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_FTRT_ClientORB.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_FTRT_EventChannel.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_FT_ClientORB.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_FT_ServerORB.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_FtRtEvent.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_IFRService.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_PortableGroup.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RTCORBAEvent.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RTCosScheduling.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RTEvent.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RTEventLogAdmin.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RTKokyuEvent.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RTOLDEvent.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RTSched.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RTSchedEvent.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_RT_Notification.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_SSLIOP.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_Security.so.1.3.5 ? orbsvcs/orbsvcs/libTAO_Svc_Utils.so.1.3.5 ? orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp.bala ? performance-tests/Latency/Thread_Pool/client ? performance-tests/Latency/Thread_Pool/results ? performance-tests/Latency/Thread_Pool/server ? tao/GIOP_Message_Base.cpp.bala ? tao/GIOP_Message_Base.h.bala ? tao/GIOP_Message_Base.i.bala ? tao/GIOP_Message_Generator_Parser_Impl.inl.bala ? tao/GIOP_Message_Lite.cpp.bala ? tao/GIOP_Message_Lite.h.bala ? tao/GIOP_Message_State.cpp.bala ? tao/GIOP_Message_State.h.bala ? tao/GIOP_Message_State.inl.bala ? tao/IIOP_Transport.cpp.bala ? tao/Incoming_Message_Queue.cpp.bala ? tao/Incoming_Message_Queue.h.bala ? tao/Incoming_Message_Queue.inl.bala ? tao/Pluggable_Messaging.h.bala ? tao/TAO_COMPONENTS.list ? tao/Transport.cpp.bala ? tao/Transport.h.bala ? tao/libTAO.so.1.3.5 ? tao/BiDir_GIOP/libTAO_BiDirGIOP.so.1.3.5 ? tao/Domain/libTAO_Domain.so.1.3.5 ? tao/DynamicAny/libTAO_DynamicAny.so.1.3.5 ? tao/DynamicInterface/libTAO_DynamicInterface.so.1.3.5 ? tao/IFR_Client/libTAO_IFR_Client.so.1.3.5 ? tao/IORInterceptor/libTAO_IORInterceptor.so.1.3.5 ? tao/IORManipulation/libTAO_IORManip.so.1.3.5 ? tao/IORTable/libTAO_IORTable.so.1.3.5 ? tao/Messaging/libTAO_Messaging.so.1.3.5 ? tao/ObjRefTemplate/libTAO_ObjRefTemplate.so.1.3.5 ? tao/PortableServer/libTAO_PortableServer.so.1.3.5 ? tao/RTCORBA/libTAO_RTCORBA.so.1.3.5 ? tao/RTPortableServer/libTAO_RTPortableServer.so.1.3.5 ? tao/RTScheduling/libTAO_RTScheduler.so.1.3.5 ? tao/SmartProxies/libTAO_SmartProxies.so.1.3.5 ? tao/Strategies/DIOP_Transport.cpp.bala ? tao/Strategies/SHMIOP_Transport.cpp.bala ? tao/Strategies/SHMIOP_Transport.h.bala ? tao/Strategies/libTAO_Strategies.so.1.3.5 ? tao/TypeCodeFactory/libTAO_TypeCodeFactory.so.1.3.5 ? tao/Utils/libTAO_Utils.so.1.3.5 ? tao/Valuetype/libTAO_Valuetype.so.1.3.5 ? utils/IOR-parser/ior-parser ? utils/catior/catior ? utils/nslist/nsadd ? utils/nslist/nsdel ? utils/nslist/nslist Index: orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp,v retrieving revision 1.19 retrieving revision 1.19.2.1 diff -u -r1.19 -r1.19.2.1 --- orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp 14 Dec 2003 16:09:39 -0000 1.19 +++ orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp 15 Dec 2003 22:31:47 -0000 1.19.2.1 @@ -550,8 +550,8 @@ int TAO_UIPMC_Transport::handle_input (TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time, - int /*block*/) + ACE_Time_Value *max_wait_time, + int /*block*/) { // If there are no messages then we can go ahead to read from the // handle for further reading.. @@ -587,58 +587,68 @@ // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.rd_ptr (), + ssize_t n = this->recv (message_block.wr_ptr (), message_block.space (), max_wait_time); // If there is an error return to the reactor.. if (n <= 0) { - if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) recv returned error on transport %d after fault %p\n"), - this->id (), - ACE_TEXT ("handle_input_i ()\n"))); - } - if (n == -1) + // @@ Why not send_connection_closed_notifications() ? this->tms_->connection_closed (); return n; } - // Set the write pointer in the stack buffer. + // Set the write pointer in the stack buffer message_block.wr_ptr (n); - // Parse the incoming message for validity. The check needs to be + // Check the incoming message for validity. The check needs to be // performed by the messaging objects. - if (this->parse_incoming_messages (message_block) == -1) + // + // NOTE! I don't completely understand how this transport is supposed + // to work. I don't know if it's possible to get a partial header, + // in which case check_for_valid_header would return -1 because it didn't + // have enough information to decide valid or invalid. For now I'll assume + // that we either get a complete, valid header or a complete INVALID header. + if (this->messaging_object ()->check_for_valid_header (message_block) != 1) { if (TAO_debug_level) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) parse_incoming_messages failed on transport %d after fault %p\n"), + ACE_TEXT ("TAO: (%P|%t|%N|%l) failed to find a valid header on transport %d after fault %p\n"), this->id (), - ACE_TEXT ("handle_input_i ()\n"))); + ACE_TEXT ("handle_input ()\n"))); } return -1; } // NOTE: We are not performing any queueing nor any checking for - // missing data. We are assuming that ALL the data would be got in a + // missing data. We are assuming that ALL the data arrives in a // single read. // Make a node of the message block.. - TAO_Queued_Data qd (&message_block); - - // Extract the data for the node.. - this->messaging_object ()->get_message_data (&qd); - - // Process the message - return this->process_parsed_messages (&qd, rh); + // + // We could make this more efficient by having a fixed Queued Data + // allocator, i.e., it always gave back the same thing. Actually, + // we *could* create an allocator that took a stack-allocated object + // as an argument and returned that when asked an allocation is + // done. Something to contemplate... + TAO_Queued_Data* qd = + TAO_Queued_Data::make_completed_message (message_block, + *this->messaging_object ()); + int retval = -1; + if (qd) + { + // Process the message + retval = this->process_parsed_messages (qd, rh); + TAO_Queued_Data::release (qd); + } + return retval; } + int TAO_UIPMC_Transport::register_handler (void) Index: tao/GIOP_Message_Base.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_Base.cpp,v retrieving revision 1.97 retrieving revision 1.97.2.1 diff -u -r1.97 -r1.97.2.1 --- tao/GIOP_Message_Base.cpp 14 Dec 2003 16:03:48 -0000 1.97 +++ tao/GIOP_Message_Base.cpp 15 Dec 2003 22:31:47 -0000 1.97.2.1 @@ -24,7 +24,7 @@ size_t /*input_cdr_size*/) : orb_core_ (orb_core) , message_state_ (orb_core, - this) + this) , out_stream_ (this->buffer_, sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */ TAO_ENCAP_BYTE_ORDER, @@ -295,6 +295,7 @@ TAO_Pluggable_Message_Type TAO_GIOP_Message_Base::message_type ( TAO_GIOP_Message_State &msg_state) + const { // Convert to the right type of Pluggable Messaging message type. @@ -329,264 +330,6 @@ } int -TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) -{ - - if (this->message_state_.parse_message_header (incoming) == -1) - { - return -1; - } - - return 0; -} - -ssize_t -TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) -{ - // Actual message size including the header.. - CORBA::ULong msg_size = - this->message_state_.message_size (); - - size_t len = incoming.length (); - - // If we have too many messages or if we have less than even a size - // of the GIOP header then .. - if (len > msg_size || - len < TAO_GIOP_MESSAGE_HEADER_LEN) - { - return -1; - } - else if (len == msg_size) - return 0; - - return msg_size - len; -} - - -int -TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd) -{ - TAO_GIOP_Message_State state (this->orb_core_, - this); - - if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) - { - if (incoming.length () > 0) - { - // Make a node which has a message block of the size of - // MESSAGE_HEADER_LEN. - qd = - this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); - - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - qd->missing_data_ = -1; - } - return 0; - } - - if (state.parse_message_header (incoming) == -1) - { - return -1; - } - - size_t copying_len = state.message_size (); - - qd = this->make_queued_data (copying_len); - - if (copying_len > incoming.length ()) - { - qd->missing_data_ = - copying_len - incoming.length (); - - copying_len = incoming.length (); - } - - qd->msg_block_->copy (incoming.rd_ptr (), - copying_len); - - incoming.rd_ptr (copying_len); - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - return 1; -} - -int -TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming) -{ - // Look to see whether we had atleast parsed the GIOP header ... - if (qd->missing_data_ == -1) - { - // The data length that has been stuck in there during the last - // read .... - size_t len = - qd->msg_block_->length (); - - // We know that we would have space for - // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data - // from the into the message block in - qd->msg_block_->copy (incoming.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN - len); - - // Move the rd_ptr () in the incoming message block.. - incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); - - TAO_GIOP_Message_State state (this->orb_core_, - this); - - // Parse the message header now... - if (state.parse_message_header (*qd->msg_block_) == -1) - return -1; - - // Now grow the message block so that we can copy the rest of - // the data... - if (qd->msg_block_->space () < state.message_size ()) - { - ACE_CDR::grow (qd->msg_block_, - state.message_size ()); - } - - // Copy the pay load.. - // Calculate the bytes that needs to be copied in the queue... - size_t copy_len = - state.payload_size (); - - // If the data that needs to be copied is more than that is - // available to us .. - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - else - { - qd->missing_data_ = 0; - } - - // ..now we are set to copy the right amount of data to the - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the of the .. - incoming.rd_ptr (copy_len); - - // Get the other details... - qd->byte_order_ = state.byte_order_; - qd->major_version_ = state.giop_version_.major; - qd->minor_version_ = state.giop_version_.minor; - qd->msg_type_ = this->message_type (state); - } - else - { - // @@todo: Need to abstract this out to a seperate method... - size_t copy_len = qd->missing_data_; - - if (copy_len > incoming.length ()) - { - // Calculate the missing data.. - qd->missing_data_ = - copy_len - incoming.length (); - - // Set the actual possible copy_len that is available... - copy_len = incoming.length (); - } - - // Copy the right amount of data in to the node... - // node.. - qd->msg_block_->copy (incoming.rd_ptr (), - copy_len); - - // Set the of the .. - qd->msg_block_->rd_ptr (copy_len); - - } - - - return 0; -} - - -int -TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd) -{ - if (dqd->byte_order_ != sqd->byte_order_ - || dqd->major_version_ != sqd->major_version_ - || dqd->minor_version_ != sqd->minor_version_) - { - // Yes, print it out in all debug levels!. This is an error by - // CORBA 2.4 spec - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) incompatible fragments:") - ACE_TEXT ("different GIOP versions or byte order\n"))); - return -1; - } - - // Skip the header in the incoming message - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - // If we have a fragment header skip the header length too.. - if (sqd->minor_version_ == 2 && - sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER); - - // Get the length of the incoming message block.. - size_t incoming_length = - sqd->msg_block_->length (); - - // Increase the size of the destination message block if we need - // to. - ACE_Message_Block *mb = - dqd->msg_block_; - - // Check space before growing. - if (mb->space () < incoming_length) - { - ACE_CDR::grow (mb, - mb->length () + incoming_length); - } - - // Copy the data - dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (), - incoming_length); - return 0; -} - -void -TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) -{ - // Get the message information - qd->byte_order_ = - this->message_state_.byte_order_; - qd->major_version_ = - this->message_state_.giop_version_.major; - qd->minor_version_ = - this->message_state_.giop_version_.minor; - - //qd->more_fragments_ = this->message_state_.more_fragments_; - - if (this->message_state_.more_fragments_) - qd->more_fragments_ = 1; - else - qd->more_fragments_ = 0; - - qd->msg_type_= - this->message_type (this->message_state_); - - // Reset the message_state - this->message_state_.reset (); -} - -int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) @@ -764,13 +507,13 @@ // Should be taken care by the state specific parsing retval = generator_parser->parse_reply (input_cdr, - params); + params); break; case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY: retval = generator_parser->parse_locate_reply (input_cdr, - params); + params); break; default: retval = -1; @@ -898,7 +641,9 @@ parse_error = parser->parse_request_header (request); - request.orb_core()->codeset_manager()->process_service_context(request); + TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); + if (csm) + csm->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -1098,9 +843,9 @@ } TAO::ObjectKey tmp_key (locate_request.object_key ().length (), - locate_request.object_key ().length (), - locate_request.object_key ().get_buffer (), - 0); + locate_request.object_key ().length (), + locate_request.object_key ().get_buffer (), + 0); // Set it to an error state parse_error = 1; @@ -1324,6 +1069,7 @@ } +#if 0 // Server sends an "I'm shutting down now, any requests you've sent me // can be retried" message to the server. The message is prefab, for // simplicity. @@ -1419,7 +1165,7 @@ transport-> id ())); } - +#endif int TAO_GIOP_Message_Base::send_reply_exception ( @@ -1576,44 +1322,49 @@ } -TAO_Queued_Data * -TAO_GIOP_Message_Base::make_queued_data (size_t sz) -{ - // Get a node for the queue.. - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // @@todo: We have a similar method in Transport.cpp. Need to see how - // we can factor them out.. - // Make a datablock for the size requested + something. The - // "something" is required because we are going to align the data - // block in the message block. During alignment we could loose some - // bytes. As we may not know how many bytes will be lost, we will - // allocate ACE_CDR::MAX_ALIGNMENT extra. - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (sz + - ACE_CDR::MAX_ALIGNMENT); - - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - ACE_Message_Block mb (db, - 0, - alloc); - - ACE_Message_Block *new_mb = mb.duplicate (); - - ACE_CDR::mb_align (new_mb); - - qd->msg_block_ = new_mb; - +void +TAO_GIOP_Message_Base::set_queued_data_from_message_header ( + TAO_Queued_Data *qd, + const ACE_Message_Block &mb + ) const +{ + // @@CJC: Try leaving out the declaration for this->message_state_ + // and see what pukes. I don't think we need it any more. + TAO_GIOP_Message_State state; + if (state.take_values_from_message_block (mb) == -1) + { + // what the heck do we do here?! + qd->current_state_ = TAO_Queued_Data::INVALID; + return; + } - return qd; + // It'd be nice to have an abstract base for GIOP_Message_State + // so that there could just be a line like: + // qd->take_values_from (state); + // Get the message information + qd->byte_order_ = state.byte_order (); + qd->major_version_ = state.giop_version ().major; + qd->minor_version_ = state.giop_version ().minor; + qd->more_fragments_ = state.more_fragments () ? 1 : 0; + qd->request_id_ = state.request_id_; + qd->msg_type_= message_type (state); + qd->missing_data_bytes_ = state.payload_size (); } -size_t -TAO_GIOP_Message_Base::header_length (void) const -{ - return TAO_GIOP_MESSAGE_HEADER_LEN; +int +TAO_GIOP_Message_Base::check_for_valid_header ( + const ACE_Message_Block &mb + ) const +{ + // NOTE! We don't hardcode the length of the header b/c header_length should + // be eligible for inlining by pretty much any compiler, and it should return + // a constant. The rest of this method is hard-coded and hand-optimized because + // this method gets called A LOT. + if (mb.length () < this->header_length ()) + return -1; + + // Is finding that it's the right length and the magic bytes present + // enough to declare it a valid header? I think so... + register const char* h = mb.rd_ptr (); + return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0; } Index: tao/GIOP_Message_Base.h =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_Base.h,v retrieving revision 1.27 retrieving revision 1.27.2.1 diff -u -r1.27 -r1.27.2.1 --- tao/GIOP_Message_Base.h 14 Dec 2003 16:03:48 -0000 1.27 +++ tao/GIOP_Message_Base.h 15 Dec 2003 22:31:47 -0000 1.27.2.1 @@ -94,36 +94,38 @@ /// the message. virtual int format_message (TAO_OutputCDR &cdr); - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block); - - /// Calculate the amount of data that is missing in the - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &message_block); - - /* Extract the details of the next message from the - * through . Returns 1 if there are more messages and returns a - * 0 if there are no more messages in . - */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd); - - /// Check whether the node needs consolidation from - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming); - - /// Get the details of the message parsed through the . - virtual void get_message_data (TAO_Queued_Data *qd); - - /// @@Bala:Docu?? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd); - /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd); + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. + + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. If \code mb does not + contain less than \code header_length() bytes, this method cannot make a + complete evaluation, and returns a commensurate value. + + \return 1 \code header_length() bytes found, and constitute a valid header + \return 0 \code header_length() bytes found, and do not constitute a valid header + \return -1 not enough bytes available to make a determination of header validity + */ + virtual int check_for_valid_header (const ACE_Message_Block &mb) const; + + /*! + \brief Set fields in \param qd based on values derived from \param mb. + + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const; + + /// Parse the reply message that we received and return the reply /// information though @@ -174,7 +176,7 @@ /// TAO_PLUGGABLE_MESSAGE_REPLY, /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. - TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state); + TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state) const; private: @@ -197,10 +199,12 @@ /// Send error messages int send_error (TAO_Transport *transport); +#if 0 /// Close a connection, first sending GIOP::CloseConnection. void send_close_connection (const TAO_GIOP_Message_Version &version, TAO_Transport *transport, void *ctx); +#endif /// We must send a LocateReply through , this request /// resulted in some kind of exception. Index: tao/GIOP_Message_Base.i =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_Base.i,v retrieving revision 1.8 retrieving revision 1.8.2.1 diff -u -r1.8 -r1.8.2.1 --- tao/GIOP_Message_Base.i 14 Dec 2003 16:03:48 -0000 1.8 +++ tao/GIOP_Message_Base.i 15 Dec 2003 22:31:47 -0000 1.8.2.1 @@ -4,3 +4,11 @@ // // GIOP_Message_Base // + + +ACE_INLINE size_t +TAO_GIOP_Message_Base::header_length (void) const +{ + return TAO_GIOP_MESSAGE_HEADER_LEN; +} + Index: tao/GIOP_Message_Generator_Parser_Impl.inl =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl,v retrieving revision 1.4 retrieving revision 1.4.2.1 diff -u -r1.4 -r1.4.2.1 --- tao/GIOP_Message_Generator_Parser_Impl.inl 14 Dec 2003 16:03:48 -0000 1.4 +++ tao/GIOP_Message_Generator_Parser_Impl.inl 15 Dec 2003 22:31:47 -0000 1.4.2.1 @@ -5,9 +5,24 @@ check_revision (CORBA::Octet incoming_major, CORBA::Octet incoming_minor) { - if (incoming_major > TAO_DEF_GIOP_MAJOR || - incoming_minor > TAO_DEF_GIOP_MINOR) + CORBA::UShort version_as_whole_num = incoming_major << 8 | incoming_minor; + CORBA::UShort max_allowable_version = TAO_DEF_GIOP_MAJOR << 8 | TAO_DEF_GIOP_MINOR; + + CORBA::Boolean ret = 0; + + // If it's greater than the max, we know it's not allowed. + if (version_as_whole_num > max_allowable_version) return 0; - - return 1; + + // If it's less than the max, though, we still have to check for + // each explicit version and only allow the ones we know work. + switch (version_as_whole_num) + { + case 0x0100: + case 0x0101: + case 0x0102: + ret = 1; + } + + return ret; } Index: tao/GIOP_Message_Lite.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_Lite.cpp,v retrieving revision 1.76 retrieving revision 1.76.2.1 diff -u -r1.76 -r1.76.2.1 --- tao/GIOP_Message_Lite.cpp 14 Dec 2003 16:03:48 -0000 1.76 +++ tao/GIOP_Message_Lite.cpp 15 Dec 2003 22:31:47 -0000 1.76.2.1 @@ -243,6 +243,7 @@ } +#if 0 int TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block) { @@ -501,6 +502,7 @@ // We dont know what fragments are??? return -1; } +#endif int TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport, @@ -728,7 +730,9 @@ parse_error = this->parse_request_header (request); - request.orb_core()->codeset_manager()->process_service_context(request); + TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager(); + if (csm) + csm->process_service_context(request); transport->assign_translators(&cdr,&output); // Throw an exception if the @@ -1626,38 +1630,6 @@ } } -TAO_Queued_Data * -TAO_GIOP_Message_Lite::make_queued_data (size_t sz) -{ - // Get a node for the queue.. - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data (); - - // Make a datablock for the size requested + something. The - // "something" is required because we are going to align the data - // block in the message block. During alignment we could loose some - // bytes. As we may not know how many bytes will be lost, we will - // allocate ACE_CDR::MAX_ALIGNMENT extra. - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (sz + - ACE_CDR::MAX_ALIGNMENT); - - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - ACE_Message_Block mb (db, - 0, - alloc); - - ACE_Message_Block *new_mb = mb.duplicate (); - - ACE_CDR::mb_align (new_mb); - - qd->msg_block_ = new_mb; - - return qd; -} - int TAO_GIOP_Message_Lite::generate_locate_reply_header ( TAO_OutputCDR & /*cdr*/, @@ -1678,4 +1650,23 @@ TAO_GIOP_Message_Lite::header_length (void) const { return TAO_GIOP_LITE_HEADER_LEN; +} + +void +TAO_GIOP_Message_Lite::set_queued_data_from_message_header ( + TAO_Queued_Data *qd, + const ACE_Message_Block &mb + ) const +{ + ACE_UNUSED_ARG (qd); + ACE_UNUSED_ARG (mb); +} + +int +TAO_GIOP_Message_Lite::check_for_valid_header ( + const ACE_Message_Block &mb + ) const +{ + ACE_UNUSED_ARG (mb); + return 0; } Index: tao/GIOP_Message_Lite.h =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_Lite.h,v retrieving revision 1.35 retrieving revision 1.35.2.1 diff -u -r1.35 -r1.35.2.1 --- tao/GIOP_Message_Lite.h 14 Dec 2003 16:03:48 -0000 1.35 +++ tao/GIOP_Message_Lite.h 15 Dec 2003 22:31:47 -0000 1.35.2.1 @@ -88,8 +88,11 @@ /// the message. virtual int format_message (TAO_OutputCDR &cdr); - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block); + /// Process the request message that we have received on the + /// connection + virtual int process_request_message (TAO_Transport *transport, + TAO_Queued_Data *qd); + /// Get the message type. The return value would be one of the /// following: @@ -99,33 +102,25 @@ /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. TAO_Pluggable_Message_Type message_type (void); + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. - /// Calculate the amount of data that is missing in the - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &message_block); - - /* Extract the details of the next message from the - * through . Returns 1 if there are more messages and returns a - * 0 if there are no more messages in . + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. Does */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd); + virtual int check_for_valid_header (const ACE_Message_Block &mb) const; - /// Check whether the node needs consolidation from - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming); - - /// Get the details of the message parsed through the . - virtual void get_message_data (TAO_Queued_Data *qd); - - /// @@Bala: Docu??? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd); + /*! + \brief Set fields in \param qd based on values derived from \param mb. - /// Process the request message that we have received on the - /// connection - virtual int process_request_message (TAO_Transport *transport, - TAO_Queued_Data *qd); + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const; /// Parse the reply message that we received and return the reply /// information though Index: tao/GIOP_Message_State.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_State.cpp,v retrieving revision 1.21 retrieving revision 1.21.2.1 diff -u -r1.21 -r1.21.2.1 --- tao/GIOP_Message_State.cpp 14 Dec 2003 16:03:48 -0000 1.21 +++ tao/GIOP_Message_State.cpp 15 Dec 2003 22:31:47 -0000 1.21.2.1 @@ -5,20 +5,65 @@ #include "tao/GIOP_Message_Base.h" #include "ace/Log_Msg.h" +#include "ace/OS_NS_string.h" #if !defined (__ACE_INLINE__) # include "tao/GIOP_Message_State.inl" #endif /* __ACE_INLINE__ */ -ACE_RCSID (tao, - GIOP_Message_State, - "$Id$") + +class TAO_Debug_Msg_Emitter_Guard +{ +public: + TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg) + : which_level_(debug_level) + { + if (TAO_debug_level < this->which_level_) + { + msg_ = 0; + return; + } + + this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ]; + ACE_OS::strcpy (this->msg_, msg); + ACE_OS::strcat (this->msg_, " begin\n"); + + if (TAO_debug_level >= this->which_level_) + { + ACE_DEBUG ((LM_DEBUG, this->msg_ )); + } + } + + ~TAO_Debug_Msg_Emitter_Guard () + { + if (this->msg_) + { + if (TAO_debug_level >= this->which_level_) + { + char* begin_start = + this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1; + ACE_OS::strcpy (begin_start, " end\n"); + ACE_DEBUG ((LM_DEBUG, this->msg_)); + } + + delete[] this->msg_; + } + } + +private: + static const int MAGIC_LENGTH; + unsigned int which_level_; + char* msg_; +}; + +const int TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH = 8; // " begin\n" + \000 + +ACE_RCSID(tao, GIOP_Message_State, "$Id$") TAO_GIOP_Message_State::TAO_GIOP_Message_State ( TAO_ORB_Core * /*orb_core*/, - TAO_GIOP_Message_Base *base) - : base_ (base), - giop_version_ (TAO_DEF_GIOP_MAJOR, + TAO_GIOP_Message_Base * /*base*/) + : giop_version_ (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR), byte_order_ (0), message_type_ (0), @@ -29,125 +74,79 @@ { } - +// This doesn't check the message block's length, so that means that +// the *caller* needs to do that first. int -TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming) +TAO_GIOP_Message_State::take_values_from_message_block ( + const ACE_Message_Block& mb + ) { - if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN) - { - // Parse the GIOP header - if (this->parse_message_header_i (incoming) == -1) - return -1; - } - - return 0; -} + const char* buf = mb.rd_ptr (); -int -TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming) -{ - if (TAO_debug_level > 8) + // Get the version information + if (this->set_version_info_from_buffer (buf) == -1) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::parse_message_header_i\n" - )); + return -1; } - // Grab the rd_ptr_ from the message block.. - char *buf = incoming.rd_ptr (); - - // Parse the magic bytes first - if (this->parse_magic_bytes (buf) == -1) + // Get the byte order information... + if (this->set_byte_order_info_from_buffer (buf) == -1) { return -1; } - // Get the version information - if (this->get_version_info (buf) == -1) - return -1; - - // Get the byte order information... - if (this->get_byte_order_info (buf) == -1) - return -1; - // Get the message type this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - // Get the size of the message.. - this->get_payload_size (buf); + this->set_payload_size_from_buffer (buf); + + // Get the request id + this->parse_fragment_header (buf, mb.length ()); if (this->message_size_ == 0) { - if (this->message_type_ == TAO_GIOP_MESSAGERROR) + const char* msgname = 0; + + switch (this->message_type_) + { + case TAO_GIOP_MESSAGERROR: + msgname = "GIOP_MESSAGE_ERROR"; break; + case TAO_GIOP_CLOSECONNECTION: + msgname = "GIOP_CLOSE_CONNECTION"; break; + } + if (msgname != 0) { if (TAO_debug_level > 0) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) -" - "GIOP_MESSAGE_ERROR received \n")); + ACE_DEBUG (( + LM_DEBUG, + "(%P|%t) GIOP_Message_State::take_values: %s rcv'd.\n", + msgname + )); } - return 0; } else { if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - " - "Message of size zero recd. \n")); + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) GIOP_Message_State::take_values: " + "Message of size zero rcv'd.\n")); + } + return -1; } } - - if (this->more_fragments_) - { - (void) this->parse_fragment_header (buf, - incoming.length ()); - } - return 0; } - - - -int -TAO_GIOP_Message_State::parse_magic_bytes (char *buf) -{ - // The values are hard-coded to support non-ASCII platforms. - if (!(buf [0] == 0x47 // 'G' - && buf [1] == 0x49 // 'I' - && buf [2] == 0x4f // 'O' - && buf [3] == 0x50)) // 'P' - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("TAO (%P|%t) - bad header, ") - ACE_LIB_TEXT ("magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"), - buf[0], - buf[1], - buf[2], - buf[3])); - return -1; - } - - return 0; -} - int -TAO_GIOP_Message_State::get_version_info (char *buf) +TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf) { - if (TAO_debug_level > 8) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::get_version_info\n")); - } - // We have a GIOP message on hand. Get its revision numbers - CORBA::Octet incoming_major = - buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; - CORBA::Octet incoming_minor = - buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; // Check the revision information if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( @@ -157,7 +156,9 @@ if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - bad version <%d.%d>\n"), + ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT ("GIOP_Message_State::set_version_info_from_buffer:") + ACE_TEXT ("bad version <%d.%d>\n"), incoming_major, incoming_minor)); } @@ -172,15 +173,9 @@ } int -TAO_GIOP_Message_State::get_byte_order_info (char *buf) +TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf) { - if (TAO_debug_level > 8) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info\n")); - } - - // Let us be specific that this is for 1.0 + // Let us be specific that this is for 1.0 if (this->giop_version_.minor == 0 && this->giop_version_.major == 1) { @@ -191,10 +186,14 @@ this->byte_order_ != 1) { if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info, " - "invalid byte order <%d> for version <1.0>\n", - this->byte_order_)); + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_State::" + "get_byte_order_info, " + "invalid byte order <%d> for version <1.0>\n", + this->byte_order_)); + } + return -1; } } @@ -211,12 +210,15 @@ if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) { if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - invalid flags for <%d>") - ACE_TEXT (" for version <%d %d> \n"), - buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], - this->giop_version_.major, - this->giop_version_.minor)); + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - invalid flags for <%d>") + ACE_TEXT (" for version <%d %d> \n"), + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], + this->giop_version_.major, + this->giop_version_.minor)); + } + return -1; } } @@ -224,19 +226,10 @@ return 0; } -void -TAO_GIOP_Message_State::get_payload_size (char *rd_ptr) -{ - // Move the read pointer - rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; - - this->message_size_ = this->read_ulong (rd_ptr); -} - int -TAO_GIOP_Message_State::parse_fragment_header (char *buf, +TAO_GIOP_Message_State::parse_fragment_header (const char *buf, size_t length) { size_t len = @@ -246,9 +239,7 @@ // By this point we are doubly sure that we have a more or less // valid GIOP message with a valid major revision number. - if (this->giop_version_.minor == 2 && - this->message_type_ == TAO_GIOP_FRAGMENT && - length > len) + if (this->giop_version_.minor >= 2 && length > len) { // Fragmented message in GIOP 1.2 should have a fragment header // following the GIOP header. Grab the rd_ptr to get that @@ -263,7 +254,7 @@ } CORBA::ULong -TAO_GIOP_Message_State::read_ulong (char *rd_ptr) +TAO_GIOP_Message_State::read_ulong (const char *rd_ptr) { CORBA::ULong x = 0; Index: tao/GIOP_Message_State.h =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_State.h,v retrieving revision 1.14 retrieving revision 1.14.2.1 diff -u -r1.14 -r1.14.2.1 --- tao/GIOP_Message_State.h 14 Dec 2003 16:03:48 -0000 1.14 +++ tao/GIOP_Message_State.h 15 Dec 2003 22:31:47 -0000 1.14.2.1 @@ -41,11 +41,10 @@ public: /// Ctor - TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, - TAO_GIOP_Message_Base *base); + TAO_GIOP_Message_State (TAO_ORB_Core *orb_core = 0, + TAO_GIOP_Message_Base *base = 0); - /// Parse the message header. - int parse_message_header (ACE_Message_Block &incoming); + int take_values_from_message_block (const ACE_Message_Block& mb); /// Return the message size CORBA::ULong message_size (void) const; @@ -53,9 +52,24 @@ /// Return the message size CORBA::ULong payload_size (void) const; - /// Return the byte order information + /*! + \brief Return the byte order information. + \return 0 big-endian + \return 1 little-endian + */ CORBA::Octet byte_order (void) const; + /*! + \brief Return GIOP version information. + */ + const TAO_GIOP_Message_Version &giop_version () const; + + /// (Requests and Replys) + CORBA::Octet more_fragments () const; + + /// MsgType above + CORBA::Octet message_type () const; + /// Reset the state.. void reset (void); @@ -63,39 +77,29 @@ friend class TAO_GIOP_Message_Base; - /// Parse the message header. - int parse_message_header_i (ACE_Message_Block &incoming); - - /// Checks for the magic word 'GIOP' in the start of the incoing - /// stream - int parse_magic_bytes (char *buf); - /// Extracts the version information from the incoming /// stream. Performs a check for whether the version information is /// right and sets the information in the - int get_version_info (char *buf); + int set_version_info_from_buffer (const char *buf); /// Extracts the byte order information from the incoming /// stream. Performs a check for whether the byte order information /// right and sets the information in the - int get_byte_order_info (char *buf); + int set_byte_order_info_from_buffer (const char *buf); /// Gets the size of the payload and set the size in the - void get_payload_size (char *buf); + void set_payload_size_from_buffer (const char *buf); /// Parses the GIOP FRAGMENT_HEADER information from the incoming /// stream. - int parse_fragment_header (char *buf, + int parse_fragment_header (const char *buf, size_t length); /// Read the unsigned long from the buffer. The should just /// point to the next 4 bytes data that represent the ULong - CORBA::ULong read_ulong (char *buf); + CORBA::ULong read_ulong (const char *buf); private: - - /// The GIOP base class.. - TAO_GIOP_Message_Base *base_; // GIOP version information.. TAO_GIOP_Message_Version giop_version_; Index: tao/GIOP_Message_State.inl =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/GIOP_Message_State.inl,v retrieving revision 1.7 retrieving revision 1.7.2.1 diff -u -r1.7 -r1.7.2.1 --- tao/GIOP_Message_State.inl 14 Dec 2003 16:03:48 -0000 1.7 +++ tao/GIOP_Message_State.inl 15 Dec 2003 22:31:47 -0000 1.7.2.1 @@ -33,22 +33,29 @@ this->missing_data_ = 0; } -#if 0 -ACE_INLINE int -TAO_GIOP_Message_State::message_fragmented (void) +ACE_INLINE const TAO_GIOP_Message_Version & +TAO_GIOP_Message_State::giop_version () const { - if (this->more_fragments) - return 1; - - return 0; + return this->giop_version_; } +ACE_INLINE CORBA::Octet +TAO_GIOP_Message_State::more_fragments () const +{ + return this->more_fragments_; +} - -ACE_INLINE CORBA::Boolean -TAO_GIOP_Message_State::header_received (void) const +ACE_INLINE CORBA::Octet +TAO_GIOP_Message_State::message_type () const { - return this->message_size != 0; + return this->message_type_; } -#endif +ACE_INLINE void +TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr) +{ + // Move the read pointer + rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; + + this->message_size_ = this->read_ulong (rd_ptr); +} Index: tao/IIOP_Transport.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/IIOP_Transport.cpp,v retrieving revision 1.120 retrieving revision 1.120.2.1 diff -u -r1.120 -r1.120.2.1 --- tao/IIOP_Transport.cpp 14 Dec 2003 16:03:48 -0000 1.120 +++ tao/IIOP_Transport.cpp 15 Dec 2003 22:31:47 -0000 1.120.2.1 @@ -95,7 +95,7 @@ { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv_i, ") + ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv, ") ACE_TEXT ("read failure - %m\n"), this->id ())); } Index: tao/Incoming_Message_Queue.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Incoming_Message_Queue.cpp,v retrieving revision 1.13 retrieving revision 1.13.2.1 diff -u -r1.13 -r1.13.2.1 --- tao/Incoming_Message_Queue.cpp 14 Dec 2003 16:03:49 -0000 1.13 +++ tao/Incoming_Message_Queue.cpp 15 Dec 2003 22:31:47 -0000 1.13.2.1 @@ -1,7 +1,8 @@ #include "Incoming_Message_Queue.h" +#include "Pluggable_Messaging.h" #include "debug.h" - -#include "ace/Log_Msg.h" +#include "ace/Malloc_T.h" +#include "ace/Message_Block.h" #if !defined (__ACE_INLINE__) # include "Incoming_Message_Queue.inl" @@ -12,7 +13,7 @@ "$Id$") TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core) - : queued_data_ (0), + : last_added_ (0), size_ (0), orb_core_ (orb_core) { @@ -42,21 +43,21 @@ { // Check to see if the length of the incoming block is less than // that of the of the tail. - if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_) + if (block.length () <= this->last_added_->missing_data_bytes_) { n = block.length (); } else { - n = this->queued_data_->missing_data_; + n = this->last_added_->missing_data_bytes_; } // Do the copy - this->queued_data_->msg_block_->copy (block.rd_ptr (), + this->last_added_->msg_block_->copy (block.rd_ptr (), n); // Decerement the missing data - this->queued_data_->missing_data_ -= n; + this->last_added_->missing_data_bytes_ -= n; } return n; @@ -65,17 +66,20 @@ TAO_Queued_Data * TAO_Incoming_Message_Queue::dequeue_head (void) { + if (this->size_ == 0) + return 0; + // Get the node on the head of the queue... - TAO_Queued_Data *tmp = - this->queued_data_->next_; + TAO_Queued_Data *head = this->last_added_->next_; // Reset the head node.. - this->queued_data_->next_ = tmp->next_; - - // Decrease the size - --this->size_; + this->last_added_->next_ = head->next_; + + // Decrease the size and reset last_added_ if empty + if (--this->size_ == 0) + this->last_added_ = 0; - return tmp; + return head; } TAO_Queued_Data * @@ -86,95 +90,412 @@ return 0; // Get the node on the head of the queue... - TAO_Queued_Data *tmp = - this->queued_data_->next_; + TAO_Queued_Data *head = + this->last_added_->next_; - while (tmp->next_ != this->queued_data_) + while (head->next_ != this->last_added_) { - tmp = tmp->next_; + head = head->next_; } // Put the head in tmp. - tmp->next_ = this->queued_data_->next_; + head->next_ = this->last_added_->next_; - TAO_Queued_Data *ret_qd = this->queued_data_; + TAO_Queued_Data *ret_qd = this->last_added_; - this->queued_data_ = tmp; + this->last_added_ = head; // Decrease the size - --this->size_; + if (--this->size_ == 0) + this->last_added_ = 0; return ret_qd; } - int TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd) { if (this->size_ == 0) { - this->queued_data_ = nd; - this->queued_data_->next_ = this->queued_data_; + this->last_added_ = nd; + this->last_added_->next_ = this->last_added_; } else { - nd->next_ = this->queued_data_->next_; - this->queued_data_->next_ = nd; - this->queued_data_ = nd; + nd->next_ = this->last_added_->next_; + this->last_added_->next_ = nd; + this->last_added_ = nd; } ++ this->size_; return 0; } +TAO_Queued_Data * +TAO_Incoming_Message_Queue::find_fragment (CORBA::Octet major, + CORBA::Octet minor) const +{ + TAO_Queued_Data *found = 0; + if (this->last_added_ != 0) + { + TAO_Queued_Data *qd = this->last_added_->next_; + + do { + if (qd->more_fragments_ && + qd->major_version_ == major && qd->minor_version_ == minor) + { + found = qd; + } + else + { + qd = qd->next_; + } + } while (found == 0 && qd != this->last_added_->next_); + } + + return found; +} + +TAO_Queued_Data * +TAO_Incoming_Message_Queue::find_fragment (CORBA::ULong request_id) const +{ + TAO_Queued_Data *found = 0; + if (this->last_added_ != 0) + { + TAO_Queued_Data *qd = this->last_added_->next_; + + do { + if (qd->more_fragments_ && qd->request_id_ == request_id) + { + found = qd; + } + else + { + qd = qd->next_; + } + } while (found == 0 && qd != this->last_added_->next_); + } + + return found; +} + /************************************************************************/ // Methods for TAO_Queued_Data /************************************************************************/ TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc) - : msg_block_ (0), - missing_data_ (0), - byte_order_ (0), - major_version_ (0), - minor_version_ (0), - more_fragments_ (0), - msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), - next_ (0), - allocator_ (alloc) + : msg_block_ (0) + , current_state_ (INVALID) + , missing_data_bytes_ (0) + , byte_order_ (0) + , major_version_ (0) + , minor_version_ (0) + , more_fragments_ (0) + , request_id_ (0) + , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + , next_ (0) + , allocator_ (alloc) { } TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc) - : msg_block_ (mb), - missing_data_ (0), - byte_order_ (0), - major_version_ (0), - minor_version_ (0), - more_fragments_ (0), - msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), - next_ (0), - allocator_ (alloc) + : msg_block_ (mb) + , current_state_ (INVALID) + , missing_data_bytes_ (0) + , byte_order_ (0) + , major_version_ (0) + , minor_version_ (0) + , more_fragments_ (0) + , request_id_ (0) + , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + , next_ (0) + , allocator_ (alloc) { } TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd) - : msg_block_ (qd.msg_block_->duplicate ()), - missing_data_ (qd.missing_data_), - byte_order_ (qd.byte_order_), - major_version_ (qd.major_version_), - minor_version_ (qd.minor_version_), - more_fragments_ (qd.more_fragments_), - msg_type_ (qd.msg_type_), - next_ (0), - allocator_ (qd.allocator_) + : msg_block_ (qd.msg_block_->duplicate ()) + , current_state_ (qd.current_state_) + , missing_data_bytes_ (qd.missing_data_bytes_) + , byte_order_ (qd.byte_order_) + , major_version_ (qd.major_version_) + , minor_version_ (qd.minor_version_) + , more_fragments_ (qd.more_fragments_) + , request_id_ (qd.request_id_) + , msg_type_ (qd.msg_type_) + , next_ (0) + , allocator_ (qd.allocator_) +{ +} + + +/*! + \brief Allocate and return a new empty message block of size \a new_size mimicking parameters of \a mb. + + This function allocates a new aligned message block using the same + allocators and flags as found in \a mb. The size of the new message + block is at least \a new_size; the size may be adjusted up in order + to accomodate alignment requirements and still fit \a new_size bytes + into the aligned buffer. + + \param mb message block whose parameters should be mimicked + \param new_size size of the new message block (will be adjusted for proper alignment) + \return an aligned message block with rd_ptr sitting at correct alignment spot, 0 on failure + + \author Thanks to Rich Seibel for helping implement with the public API for ACE_Message_Block! + */ +static ACE_Message_Block* +clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size) +{ + // Calculate the required size of the cloned block with alignment + size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT); + + // Get the allocators + ACE_Allocator *data_allocator; + ACE_Allocator *data_block_allocator; + ACE_Allocator *message_block_allocator; + mb->access_allocators (data_allocator, + data_block_allocator, + message_block_allocator); + + // Create a new Message Block + ACE_Message_Block *nb; + ACE_NEW_MALLOC_RETURN (nb, + ACE_static_cast(ACE_Message_Block*, + message_block_allocator->malloc ( + sizeof (ACE_Message_Block))), + ACE_Message_Block(aligned_size, + mb->msg_type(), + mb->cont(), + 0, //we want the data block created + data_allocator, + mb->locking_strategy(), + mb->msg_priority(), + mb->msg_execution_time (), + mb->msg_deadline_time (), + data_block_allocator, + message_block_allocator), + 0); + + ACE_CDR::mb_align (nb); + + // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since + // we just dynamically allocated the two things. + nb->set_flags (mb->flags()); + nb->clr_flags (ACE_Message_Block::DONT_DELETE); + + return nb; +} + +/*! + \brief Copy data from \a src->rd_ptr to \a dst->wr_ptr, of at most \a span_size bytes. + + (This is similar to memcpy, although with message blocks we can be a + little smarter.) This function assumes that \a dst has enough space + for \a span_size bytes, and that \a src has at least \a span_size + bytes available to copy. When everything is copied \a dst->wr_ptr + gets updated accordingly, but \a src->rd_ptr is left to the caller + to update. + + \param dst the destination message block + \param src the source message block + \param span_size size of the maximum span of bytes to be copied + \return 0 on failure, otherwise \a dst + */ +static ACE_Message_Block* +copy_mb_span (ACE_Message_Block *dst, ACE_Message_Block *src, size_t span_size) { + // @todo check for enough space in dst, and src contains at least span_size + + if (src == 0 || dst == 0) + return 0; + + if (span_size == 0) + return dst; + + dst->copy (src->rd_ptr (), span_size); + return dst; } /*static*/ TAO_Queued_Data * -TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc) +TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc) +{ + register TAO_Queued_Data *new_qd = 0; + register const size_t HDR_LEN = msging_obj.header_length (); /* COMPUTE ONCE! */ + register const size_t MB_LEN = mb->length (); /* COMPUTE ONCE! */ + + // Validate arguments. + if (mb == 0) + goto failure; + + new_qd = make_queued_data (alloc); + if (new_qd == 0) + goto failure; + + // do we have enough bytes to make a complete header? + if (MB_LEN >= HDR_LEN) + { + // Since we have enough bytes to make a complete header, + // the header needs to be valid. Check that now, and punt + // if it's not valid. + if (! msging_obj.check_for_valid_header (*mb)) + { + goto failure; + } + else + { + new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD; + msging_obj.set_queued_data_from_message_header (new_qd, *mb); + if (new_qd->current_state_ == INVALID) + goto failure; + + // missing_data_bytes_ now has the full GIOP message size, so we allocate + // a new message block of that size, plus the header. + new_qd->msg_block_ = clone_mb_nocopy_size (mb, + new_qd->missing_data_bytes_ + + HDR_LEN); + // Of course, we don't have the whole message (if we did, we + // wouldn't be here!), so we copy only what we've got, i.e., whatever's + // in the message block. + if (copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0) + goto failure; + + // missing_data_bytes_ now has the full GIOP message size, but + // there might still be stuff in mb. Therefore, we have to adjust + // missing_data_bytes_, i.e., decrease it by the number of "actual + // payload bytes" in mb. + // + // "actual payload bytes" :== length of mb (which included the header) - header length + new_qd->missing_data_bytes_ -= (MB_LEN - HDR_LEN); + mb->rd_ptr (MB_LEN); + } + } + else + { + new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER; + new_qd->msg_block_ = clone_mb_nocopy_size (mb, HDR_LEN); + if (new_qd->msg_block_ == 0 || + copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0) + goto failure; + new_qd->missing_data_bytes_ = HDR_LEN - MB_LEN; + mb->rd_ptr (MB_LEN); + } + + ACE_ASSERT (new_qd->current_state_ != INVALID); + if (TAO_debug_level > 7) + { + const char* s = "?unk?"; + switch (new_qd->current_state_) + { + case WAITING_TO_COMPLETE_HEADER: s = "WAITING_TO_COMPLETE_HEADER"; break; + case WAITING_TO_COMPLETE_PAYLOAD: s = "WAITING_TO_COMPLETE_PAYLOAD"; break; + case INVALID: s = "INVALID"; break; + case COMPLETED: s = "COMPLETED"; break; + } + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ") + ACE_TEXT ("made uncompleted message from %u bytes into qd=%-08x:") + ACE_TEXT ("state=%s,missing_data_bytes=%u\n"), + new_qd->msg_block_->length(), new_qd, s, new_qd->missing_data_bytes_)); + } + return new_qd; + +failure: + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ") + ACE_TEXT ("failed to make uncompleted message: mb=%-08x, qd=%-08x\n"), + mb, new_qd)); + } + TAO_Queued_Data::release (new_qd); + return 0; +} + + +/*static*/ +TAO_Queued_Data * +TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc) +{ + register const size_t HDR_LEN = msging_obj.header_length (); + register const size_t MB_LEN = mb.length (); + + // Validate arguments. + if (MB_LEN < HDR_LEN) + return 0; + + size_t total_msg_len = 0; + register TAO_Queued_Data *new_qd = make_queued_data (alloc); + if (new_qd == 0) + goto failure; + + // We can assume that there are enough bytes for a header, so + // extract the header data. Don't assume that there's enough for + // the payload just yet. + new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD; + msging_obj.set_queued_data_from_message_header (new_qd, mb); + if (new_qd->current_state_ == INVALID) + goto failure; + + // new_qd_->missing_data_bytes_ + protocol header length should be + // *at least* the length of the message. Verify that we have that + // many bytes in the message block and, if we don't, release the new + // qd and fail. + total_msg_len = new_qd->missing_data_bytes_ + HDR_LEN; + if (total_msg_len > MB_LEN) + goto failure; + + // Make a copy of the relevant portion of mb and hang on to it + if ((new_qd->msg_block_ = clone_mb_nocopy_size (&mb, total_msg_len)) == 0) + goto failure; + + if (copy_mb_span (new_qd->msg_block_, &mb, total_msg_len) == 0) + goto failure; + + // Update missing data and the current state + new_qd->missing_data_bytes_ = 0; + new_qd->current_state_ = COMPLETED; + + // Advance the rd_ptr on the message block + mb.rd_ptr (total_msg_len); + + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") + ACE_TEXT ("extracted complete message (%u bytes incl hdr) from mblk=%-08x into qd=%-08x\n"), + total_msg_len, &mb, new_qd)); + } + + return new_qd; + +failure: + if (TAO_debug_level > 7) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ") + ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"), + &mb, MB_LEN)); + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + mb.rd_ptr (), MB_LEN, + ACE_TEXT (" residual bytes in buffer"))); + + } + TAO_Queued_Data::release (new_qd); + return 0; +} + +/*static*/ +TAO_Queued_Data * +TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc) { TAO_Queued_Data *qd = 0; @@ -281,3 +602,29 @@ return qd; } + +void +TAO_Queued_Data::consolidate (void) +{ + // Is this a chain of fragments? + if (this->more_fragments_ && this->msg_block_->cont () != 0) + { + // Create a message block big enough to hold the entire chain + ACE_Message_Block *dest = clone_mb_nocopy_size ( + this->msg_block_, + this->msg_block_->total_length ()); + // Reset the cont() parameter + dest->cont (0); + + // Use ACE_CDR to consolidate the chain for us + ACE_CDR::consolidate (dest, this->msg_block_); + + // free the original message block chain + this->msg_block_->release (); + + // Set the message block to the new consolidated message block + this->msg_block_ = dest; + this->more_fragments_ = 0; + } +} + Index: tao/Incoming_Message_Queue.h =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Incoming_Message_Queue.h,v retrieving revision 1.14 retrieving revision 1.14.2.1 diff -u -r1.14 -r1.14.2.1 --- tao/Incoming_Message_Queue.h 14 Dec 2003 16:03:49 -0000 1.14 +++ tao/Incoming_Message_Queue.h 15 Dec 2003 22:31:47 -0000 1.14.2.1 @@ -27,6 +27,7 @@ class TAO_ORB_Core; class TAO_Queued_Data; class TAO_Transport; +class TAO_Pluggable_Messaging; /** * @class TAO_Incoming_Message_Queue @@ -75,31 +76,68 @@ /// Return the length of the queue.. CORBA::ULong queue_length (void); - /// Methods for sanity check. Checks to see whether the node on the - /// head or tail is complete or not and ready for further - /// processing. + /*! + @name Node Inspection Predicates + + \brief These methods allow inspection of head and tail nodes for "completeness". + + These methods check to see whether the node on the head or tail is + "complete" and ready for further processing. See each method's + documentation for its definition of "complete". + */ + //@{ + /*! + "complete" == the GIOP message at the tail is not missing any data (it may be a complete GIOP Fragment, though) + + \return -1 queue is empty + \return 0 tail is not "complete" + \return 1 tail is "complete" + */ int is_tail_complete (void); + + /*! + + "complete" == the GIOP message at the head is not missing any data + AND, if it's the first message in a series of GIOP fragments, all + the fragments have been received, parsed, and placed into the + queue + + \return -1 if queue is empty + \return 0 if head is not "complete" + \return 1 if head is "complete" + */ int is_head_complete (void); + //@} - /// This method checks whether the last message that was queued up - /// was fragmented... + /*! + \brief Check to see if the message at the tail (complete or incomplete) is a GIOP Fragment. + */ int is_tail_fragmented (void); /// Return the size of data that is missing in tail of the queue. size_t missing_data_tail (void) const; /// void missing_data (size_t data); + /// Find the first fragment that matches the GIOP version + TAO_Queued_Data *find_fragment (CORBA::Octet major, + CORBA::Octet minor) const; + + /// Find the first fragment that matches the request id + TAO_Queued_Data *find_fragment (CORBA::ULong request_id) const; + private: friend class TAO_Transport; - /// Make a node for the queue. - TAO_Queued_Data *get_node (void); - private: + /*! + \brief A circular linked list of messages awaiting processing. - /// A linked listof messages that await processing - TAO_Queued_Data *queued_data_; + \a last_message_added_ points to the most recent message added to + the list. The earliest addition can be easily accessed via + \a last_message_added_->next_. + */ + TAO_Queued_Data *last_added_; /// The size of the queue CORBA::ULong size_; @@ -123,20 +161,73 @@ class TAO_Export TAO_Queued_Data { -public: +protected: /// Default Constructor TAO_Queued_Data (ACE_Allocator *alloc = 0); /// Constructor. TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0); +public: /// Copy constructor. TAO_Queued_Data (const TAO_Queued_Data &qd); - /// Creation and deletion of a node in the queue. - static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0); + /*! + \name Factory Methods + + These methods manufacture instances of TAO_Queued_Data and return + them. These instances should be removed via TAO_Queued_Data::release. + + Instances are initialized from data in the ACE_Message_Block, + interpreted according to rules defined in the + TAO_Pluggable_Messaging object. + + The manufactured instance adopts the message block \em without + duplicating it; therefore, the caller must duplicate or orphan the + message block. The caller also must insure that the message block + can be released via ACE_Message_Block::release, and that its life + endures beyond the calling scope. + + For the purposes of TAO_Queued_Data, a completed message is a + completely received message as defined by the messaging protocol + object. For GIOP, that means that the number of bytes specified + in the general GIOP header have been completely received. It + specifically DOES NOT mean that all \em fragments have been + received. Fragment reassembly is another matter altogether. + */ + //@{ + /*! + \brief Make and return an instance of TAO_Queued_Data suitable for use as an uncompleted message. + */ + static TAO_Queued_Data* make_uncompleted_message (ACE_Message_Block *mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc = 0); + /*! + \brief Make and return an instance of TAO_Queued_Data suitable for use as a completed message. + */ + // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE! + // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN + // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS + // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT + // BEHAVES DIFFERENTLY FROM make_uncompleted_message. + static TAO_Queued_Data* make_completed_message (ACE_Message_Block &mb, + TAO_Pluggable_Messaging &msging_obj, + ACE_Allocator *alloc = 0); + + /// Consolidate this fragments chained message blocks into one. + void consolidate (void); + + /*! + \brief Creation and deletion of a node in the queue. + \todo Maybe this should be private? + */ +private: + static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0); +public: + //@} static void release (TAO_Queued_Data *qd); + void release (void); /// Duplicate ourselves. This creates a copy of ourselves on the /// heap and returns a pointer to the duplicated node. @@ -146,11 +237,43 @@ /// The message block that contains the message. ACE_Message_Block *msg_block_; - /// Data missing in the above message that hasn't been read or - /// processed yet. - CORBA::Long missing_data_; + /*! + @name Missing Data details + + The \a missing_data_bytes_ member contains the number of bytes of + data missing from \a msg_block_. However, there can be two places + where data is missing: header and payload. We cannot know how + much data is missing from the payload until we have a complete + header. Fortunately, headers are a fixed length, so we can know + how much we're missing from the header. + + We use \param current_state_ to indicate which portion of the message + \param missing_data_bytes_ refers to, as well as the general state of + the message. + */ + //@{ + /*! + Describes the meaning given to the number stored in \a missing_data_bytes_. + */ + enum Queued_Data_State + { + INVALID = -1, //!< The queued data is in an invalid/uninitialized state, and no data should be trusted. + COMPLETED = 0, //!< Message is complete; \a missing_data_bytes_ should be zero. + WAITING_TO_COMPLETE_HEADER, //!< Value in \a missing_data_bytes_ indicates part of header is missing. + WAITING_TO_COMPLETE_PAYLOAD //!< Value in \a missing_data_bytes_ indicates part of payload is missing. + }; + + /*! + Indicates the current state of the message, including hints at + how to interpret the value stored in \a missing_data_bytes_. + */ + Queued_Data_State current_state_; + + /*! Data missing in the above message that hasn't been read or processed yet. */ + size_t missing_data_bytes_; + //@} - /// The byte order of the message that is stored in the node.. + /*! The byte order of the message that is stored in the node. */ CORBA::Octet byte_order_; /// Many protocols like GIOP have a major and minor version @@ -164,6 +287,9 @@ /// member indicates whether the message that we have recd. and /// queue already has more fragments that is missing.. CORBA::Octet more_fragments_; + + /// The fragment request id + CORBA::ULong request_id_; /// The message type of the message TAO_Pluggable_Message_Type msg_type_; Index: tao/Incoming_Message_Queue.inl =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Incoming_Message_Queue.inl,v retrieving revision 1.11 retrieving revision 1.11.2.1 diff -u -r1.11 -r1.11.2.1 --- tao/Incoming_Message_Queue.inl 14 Dec 2003 16:03:49 -0000 1.11 +++ tao/Incoming_Message_Queue.inl 15 Dec 2003 22:31:47 -0000 1.11.2.1 @@ -18,7 +18,7 @@ return -1; if (this->size_ && - this->queued_data_->missing_data_ == 0) + this->last_added_->missing_data_bytes_ == 0) return 1; return 0; @@ -31,8 +31,8 @@ return -1; if (this->size_ && - this->queued_data_->next_->missing_data_ == 0 && - this->queued_data_->next_->more_fragments_ == 0) + this->last_added_->next_->missing_data_bytes_ == 0 && + this->last_added_->next_->more_fragments_ == 0) return 1; return 0; @@ -45,7 +45,7 @@ return 0; if (this->size_ && - this->queued_data_->more_fragments_ == 1) + this->last_added_->more_fragments_ == 1) return 1; return 0; @@ -55,22 +55,21 @@ TAO_Incoming_Message_Queue::missing_data_tail (void) const { if (this->size_ != 0) - return this->queued_data_->missing_data_; + return this->last_added_->missing_data_bytes_; return 0; } - -ACE_INLINE TAO_Queued_Data * -TAO_Incoming_Message_Queue::get_node (void) -{ - return TAO_Queued_Data::get_queued_data (); -} - /************************************************************************/ // Methods for TAO_Queued_Data /************************************************************************/ + +ACE_INLINE void +TAO_Queued_Data::release (void) +{ + TAO_Queued_Data::release (this); +} /*static*/ ACE_INLINE void Index: tao/Pluggable_Messaging.h =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Pluggable_Messaging.h,v retrieving revision 1.24 retrieving revision 1.24.2.1 diff -u -r1.24 -r1.24.2.1 --- tao/Pluggable_Messaging.h 14 Dec 2003 16:03:49 -0000 1.24 +++ tao/Pluggable_Messaging.h 15 Dec 2003 22:31:47 -0000 1.24.2.1 @@ -121,36 +121,30 @@ virtual void init (CORBA::Octet major, CORBA::Octet minor) = 0; - /// Parse the incoming messages.. - virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0; - - /// Calculate the amount of data that is missing in the - /// message block. - virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0; - - /// Get the details of the message parsed through the . - virtual void get_message_data (TAO_Queued_Data *qd) = 0; - - /* Extract the details of the next message from the - * through . Returns 1 if there are more messages and returns a - * 0 if there are no more messages in . - */ - virtual int extract_next_message (ACE_Message_Block &incoming, - TAO_Queued_Data *&qd) = 0; - - /// Check whether the node needs consolidation from - virtual int consolidate_node (TAO_Queued_Data *qd, - ACE_Message_Block &incoming) = 0; - - /// @@Bala:Docu?? - virtual int consolidate_fragments (TAO_Queued_Data *dqd, - const TAO_Queued_Data *sqd) = 0; - /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, TAO_Queued_Data *qd) = 0; + /*! + \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message. + + Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to + see if they look like the beginning of a message. Does + */ + virtual int check_for_valid_header (const ACE_Message_Block &mb) const = 0; + + /*! + \brief Set fields in \param qd based on values derived from \param mb. + + This function sets fields in \param qd based on values derived + from \param mb. It assumes that if the length of \param mb is + enough to hold a header, then the data in there can be trusted to + make sense. + */ + virtual void set_queued_data_from_message_header ( + TAO_Queued_Data *, + const ACE_Message_Block &mb) const = 0; /// Parse the reply message that we received and return the reply /// information though Index: tao/Transport.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Transport.cpp,v retrieving revision 1.94 retrieving revision 1.94.2.1 diff -u -r1.94 -r1.94.2.1 --- tao/Transport.cpp 14 Dec 2003 16:03:49 -0000 1.94 +++ tao/Transport.cpp 15 Dec 2003 22:31:47 -0000 1.94.2.1 @@ -19,7 +19,9 @@ #include "Resume_Handle.h" #include "Codeset_Manager.h" #include "Codeset_Translator_Factory.h" +#include "GIOP_Message_State.h" #include "ace/OS_NS_sys_time.h" +#include "ace/Message_Block.h" #include "ace/Reactor.h" @@ -110,12 +112,15 @@ , head_ (0) , tail_ (0) , incoming_message_queue_ (orb_core) + , uncompleted_message_ (0) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ()) , id_ ((size_t) this) , purging_order_ (0) + , recv_buffer_size_ (0) + , sent_byte_count_ (0) , char_translator_ (0) , wchar_translator_ (0) , tcs_set_ (0) @@ -236,13 +241,9 @@ { // codeset service context is only supposed to be sent in the first request // on a particular connection. - if (this->first_request_) - { - this->orb_core ()->codeset_manager ()->generate_service_context ( - opdetails, - *this - ); - } + TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager(); + if (csm && this->first_request_) + csm->generate_service_context( opdetails, *this ); if (this->messaging_object ()->generate_request_header (opdetails, spec, @@ -604,7 +605,12 @@ TAO_Transport::schedule_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; if (TAO_debug_level > 3) { @@ -620,7 +626,12 @@ TAO_Transport::cancel_output_i (void) { ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; if (TAO_debug_level > 3) { @@ -739,6 +750,9 @@ // no bytes are sent send() can only return 0 or -1 ACE_ASSERT (byte_count != 0); + // Total no. of bytes sent for a send call + this->sent_byte_count_ += byte_count; + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, @@ -762,6 +776,10 @@ // We loop over all the elements in the queue ... TAO_Queued_Message *i = this->head_; + // reset the value so that the counting is done for each new send + // call. + this->sent_byte_count_ = 0; + while (i != 0) { // ... each element fills the iovector ... @@ -820,8 +838,14 @@ if (this->flush_timer_pending ()) { ACE_Event_Handler *eh = this->event_handler_i (); - ACE_Reactor *reactor = eh->reactor (); - reactor->cancel_timer (this->flush_timer_id_); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + } this->reset_flush_timer (); } @@ -898,20 +922,25 @@ if (set_timer) { ACE_Event_Handler *eh = this->event_handler_i (); - ACE_Reactor *reactor = eh->reactor (); - this->current_deadline_ = new_deadline; - ACE_Time_Value delay = - new_deadline - ACE_OS::gettimeofday (); - - if (this->flush_timer_pending ()) + if (eh != 0) { - reactor->cancel_timer (this->flush_timer_id_); - } + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); - this->flush_timer_id_ = - reactor->schedule_timer (&this->transport_timer_, - &this->current_deadline_, - delay); + if (this->flush_timer_pending ()) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); + } + } } return constraints_reached; @@ -1118,6 +1147,18 @@ return 0; } + +class CTHack +{ +public: + CTHack() { enter(); } + ~CTHack() { leave(); } +private: + void enter() { x = 1; } + void leave() { x = 0; } + int x; +}; + /* * * All the methods relevant to the incoming data path of the ORB are @@ -1129,6 +1170,8 @@ ACE_Time_Value * max_wait_time, int /*block*/) { + CTHack cthack; + if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, @@ -1136,9 +1179,8 @@ this->id ())); } - // First try to process messages of the head of the incoming queue. + // First try to process messages off the head of the incoming queue. int retval = this->process_queue_head (rh); - if (retval <= 0) { if (retval == -1) @@ -1149,7 +1191,6 @@ "error while parsing the head of the queue\n", this->id())); } - return retval; } @@ -1158,7 +1199,7 @@ // The buffer on the stack which will be used to hold the input // messages - char buf [TAO_MAXBUFSIZE]; + char buf[TAO_MAXBUFSIZE]; #if defined (ACE_HAS_PURIFY) (void) ACE_OS::memset (buf, @@ -1180,26 +1221,35 @@ ACE_Message_Block::DONT_DELETE, this->orb_core_->input_cdr_msgblock_allocator ()); + // We'll loop trying to complete the message this number of times, + // and that's it. + unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS; + + unsigned int did_queue_message = 0; // Align the message block ACE_CDR::mb_align (&message_block); size_t recv_size = 0; - if (this->orb_core_->orb_params ()->single_read_optimization ()) { - recv_size = - message_block.space (); + recv_size = message_block.space (); } else { - recv_size = - this->messaging_object ()->header_length (); + recv_size = this->messaging_object ()->header_length (); } + // Saving the size of the received buffer in case any one needs to + // get the size of the message thats received in the + // context. Obviously the value will be changed for each recv call + // and the user is supposed to invoke the accessor only in the + // invocation context to get meaningful information. + this->recv_buffer_size_ = recv_size; + // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.rd_ptr (), + ssize_t n = this->recv (message_block.wr_ptr (), recv_size, max_wait_time); @@ -1212,7 +1262,7 @@ if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " + "TAO (%P|%t) - Transport[%d]::handle_input_i: " "read %d bytes\n", this->id (), n)); } @@ -1220,172 +1270,372 @@ // Set the write pointer in the stack buffer message_block.wr_ptr (n); - // Parse the message and try consolidating the message if - // needed. - retval = this->parse_consolidate_messages (message_block, - rh, - max_wait_time); + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + (const char *) message_block.rd_ptr (), + message_block.length (), + ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket"))); + + +complete_message_and_possibly_enqueue: + // Check to see if we're still working to complete a message + if (this->uncompleted_message_) + { + // try to complete it + + // on exit from this frame we have one of the following states: + // + // (a) an uncompleted message still in uncompleted_message_ + // AND message_block is empty + // + // (b) uncompleted_message_ zero, the completed message at the + // tail of the incoming queue; message_block could be empty + // or still contain bytes + + // ==> repeat + do + { + /* + * Append the "right number of bytes" to uncompleted_message_ + */ + // ==> right_number_of_bytes = MIN(bytes missing from + // uncompleted_message_, length of message_block); + size_t right_number_of_bytes = + ACE_MIN (this->uncompleted_message_->missing_data_bytes_, + message_block.length () ); - if (retval <= 0) - { - if (retval == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_input_i, " - "error while parsing and consolidating\n", - this->id ())); - } - return retval; - } - - // Make a node of the message block.. - TAO_Queued_Data qd (&message_block, - this->orb_core_->transport_message_buffer_allocator ()); - - // Extract the data for the node.. - this->messaging_object ()->get_message_data (&qd); + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "trying to use %u (of %u) " + "bytes to complete message missing %u bytes\n", + this->id (), + right_number_of_bytes, + message_block.length (), + this->uncompleted_message_->missing_data_bytes_)); + } - // Check whether the message was fragmented.. - if (qd.more_fragments_ || - (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - // Duplicate the node that we have as the node is on stack.. - TAO_Queued_Data *nqd = - TAO_Queued_Data::duplicate (qd); + // ==> append right_number_of_bytes from message_block + // to uncomplete_message_ & update read pointer of + // message_block; + + // 1. we assume that uncompleted_message_.msg_block_'s + // wr_ptr is properly maintained + // 2. we presume that uncompleted_message_.msg_block was + // allocated with enough space to contain the *entire* + // expected GIOP message, so this copy shouldn't involve an + // additional allocation + this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (), + right_number_of_bytes); + this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; + message_block.rd_ptr (right_number_of_bytes); - return this->consolidate_fragments (nqd, rh); - } + switch (this->uncompleted_message_->current_state_) + { + case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER: + { + int hdrvalidity = this->messaging_object()->check_for_valid_header ( + *this->uncompleted_message_->msg_block_); + if (hdrvalidity == 0) + { + // According to the spec, Section 15.4.8, we should send + // the MessageError GIOP message on receipt of "any message...whose + // header is not properly formed (e.g., has the wrong magic value)". + // + // So, rather than returning -1, what we REALLY need to do is + // send a MessageError in reply. + // + // I'm not sure what the best way to trigger that is...probably to + // queue up a special internal-only COMPLETED message that, when + // processed, sends the MessageError as part of its processing. + return -1; + } + else if (hdrvalidity == 1) + { + // ==> update bytes missing from uncompleted_message_ + // with size of message from valid header; + this->messaging_object()->set_queued_data_from_message_header ( + this->uncompleted_message_, + *this->uncompleted_message_->msg_block_); + // ==> change state of uncompleted_event_ to + // WAITING_TO_COMPLETE_PAYLOAD; + this->uncompleted_message_->current_state_ = + TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD; + + // ==> Resize the message block to have capacity for + // the rest of the incoming message + ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_; + ACE_CDR::grow (&mb, + mb.size () + + this->uncompleted_message_->missing_data_bytes_); + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "found a valid header in the message; " + "waiting for %u bytes to complete payload\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } - // Process the message - return this->process_parsed_messages (&qd, - rh); -} + // Continue the loop... + continue; + } + // In the case where we don't have enough information (hdrvalidity == -1), + // we just have to fall through and collect more. +#if 0 + else + { + // What the heck will we do with a bad header? Just + // better to close the connection and let things + // re-train from there. + if (this->uncompleted_message_->msg_block_->length () == + this->messaging_object()->header_length()) + return -1; + +#if 0 // I don't think I need this clause, but I'm leaving it just in case. + // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes; + this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes; + ACE_ASSERT (this->uncompleted_message_->missing_data_bytes_ > 0); +#endif + } +#endif + } + break; + + case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD: + // Here we have an opportunity to try to finish reading the + // uncompleted message. This is a Good Idea(TM) because there are + // good odds that either more data came available since the last + // time we read, or that we simply didn't read the whole message on + // the first read. So, we try to read again. + // + // NOTE! this changes this->uncompleted_message_! + this->try_to_complete (max_wait_time); -int -TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) -{ - // Parse the incoming message for validity. The check needs to be - // performed by the messaging objects. - if (this->parse_incoming_messages (block) == -1) - { - return -1; - } + // ==> if (bytes missing from uncompleted_message_ == 0) + if (this->uncompleted_message_->missing_data_bytes_ == 0) + { + /* + * We completed the message! Hooray! + */ + // ==> place uncompleted_message_ (which is now + // complete!) at the tail of the incoming message + // queue; + + // ---> NOTE: whoever pulls this off the queue must delete it! + this->uncompleted_message_->current_state_ + = TAO_Queued_Data::COMPLETED; + + // @@CJC NEED TO CHECK RETURN VALUE HERE! + this->enqueue_incoming_message (this->uncompleted_message_); + did_queue_message = 1; + // zero out uncompleted_message_; + this->uncompleted_message_ = 0; + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "completed and queued message for processing!\n", + this->id ())); + } - // Check whether we have a complete message for processing - ssize_t missing_data = this->missing_data (block); + } + else + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "still need %u bytes to complete uncompleted message.\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + } + break; - if (missing_data < 0) - { - // If we have more than one message - return this->consolidate_extra_messages (block, - rh); - } - else if (missing_data > 0) + default: + // @@CJC What do we do here?! + ACE_ASSERT (! "Transport::handle_input_i: unexpected state" + "in uncompleted_message_"); + } + } + // Does the order of the checks matter? In both (a) and (b), + // message_block is empty, but only in (b) is there no + // uncompleted_message_. + // ==> until (message_block is empty || there is no uncompleted_message_); + // or, rewritten in C++ looping constructs + // ==> while ( ! message_block is empty && there is an uncompleted_message_ ); + while (message_block.length() != 0 && this->uncompleted_message_); + } + + // ***************************** + // @@ CJC + // + // Once upon a time we tried to complete reading the uncompleted + // message here, but testing found that completing later worked + // better. + // ***************************** + + + // At this point, there should be nothing in uncompleted_message_. + // We now need to chop up the bytes in message_block and store any + // complete messages in the incoming message queue. + // + // ==> if (message_block still has data) + if (message_block.length () != 0) { - // If we have missing data then try doing a read or try queueing - // them. - return this->consolidate_message (block, - missing_data, - rh, - max_wait_time); - } + TAO_Queued_Data *complete_message = 0; + do + { + if (TAO_debug_level >= 10) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ") + ACE_TEXT("extracting complete messages\n"))); + ACE_HEX_DUMP ((LM_DEBUG, + message_block.rd_ptr (), + message_block.length (), + ACE_TEXT (" from this message buffer"))); + } - return 1; -} + complete_message = + TAO_Queued_Data::make_completed_message ( + message_block, *this->messaging_object ()); + if (complete_message) + { + this->enqueue_incoming_message (complete_message); + did_queue_message = 1; + } + } + while (complete_message != 0); + // On exit from this frame we have one of the following states: + // (a) message_block is empty + // (b) message_block contains bytes from a partial message + } + + // If, at this point, there's still data in message_block, it's + // an incomplete message. Therefore, we stuff it into the + // uncompleted_message_ and clear out message_block. + // ==> if (message_block still has data) + if (message_block.length () != 0) + { + // duplicate message_block remainder into this->uncompleted_message_ + ACE_ASSERT (this->uncompleted_message_ == 0); + this->uncompleted_message_ = + TAO_Queued_Data::make_uncompleted_message (&message_block, + *this->messaging_object ()); + ACE_ASSERT (this->uncompleted_message_ != 0); + + // In a debug build, we won't reach this point if we couldn't + // create an uncompleted message because the above ASSERT will + // trip. However, in an optimized build, the ASSERT isn't + // there, so we'll go past here. + // + // We could put a check in here similar to the ASSERT condition, + // but doing that would terminate this loop early and result in + // our never processing any completed messages that were received + // in this trip to handle_input_i. + // + // Maybe we could instead queue up a special completed message that, + // when processed, causes the connection to get closed in a non-graceful + // termination scenario. + } + + // We should have consumed ALL the bytes by now. + ACE_ASSERT (message_block.length () == 0); + + // + // We don't want to try to re-read earlier because we may not have + // an uncompleted message until we get to this point. So, if we did + // it earlier, we could have missed the opportunity to complete it + // and dispatch. + // + // Thanks to Bala for the idea to read again + // to increase throughput! + + if (this->uncompleted_message_) + { + if (number_of_read_attempts--) + { + // We try to read again just in case more data arrived while + // we were doing the stuff above. This way, we can increase + // throughput without much of a penalty. -int -TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) -{ - // If we have a queue and if the last message is not complete a - // complete one, then this read will get us the remaining data. So - // do not try to parse the header if we have an incomplete message - // in the queue. - if (this->incoming_message_queue_.is_tail_complete () != 0) - { - // As it looks like a new message has been read, process the - // message. Call the messaging object to do the parsing.. - int retval = - this->messaging_object ()->parse_incoming_messages (block); + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "still have an uncompleted message; " + "will try %d more times before letting " + "somebody else have a chance.\n", + this->id (), + number_of_read_attempts)); + } - if (retval == -1) + // We only bother trying to complete payload, not header, because the + // retry only happens in the complete-the-payload clause above. + if (this->uncompleted_message_->current_state_ == + TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD) + goto complete_message_and_possibly_enqueue; + } + else { + // The queue should be empty because it should have been processed + // above. But I wonder if I should put a check in here anyway. if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, " - "error in incoming message\n", - this->id ())); - - return -1; + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "giving up reading for now and returning " + "with incoming queue length = %d\n", + this->id (), + this->incoming_message_queue_.queue_length ())); + if (this->uncompleted_message_) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Transport[%d]::handle_input_i: " + "missing bytes from uncompleted message = %u\n", + this->id (), + this->uncompleted_message_->missing_data_bytes_)); + } + return 1; } } - return 0; -} - - -size_t -TAO_Transport::missing_data (ACE_Message_Block &incoming) -{ - // If we have a incomplete message in the queue then find out how - // much of data is required to get a complete message. - if (this->incoming_message_queue_.is_tail_complete () == 0) - { - return this->incoming_message_queue_.missing_data_tail (); - } + // **** END CJC PMG CHANGES **** - return this->messaging_object ()->missing_data (incoming); + return did_queue_message ? this->process_queue_head (rh) : 1; } -int -TAO_Transport::consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) +void +TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time) { - // Check whether the last message in the queue is complete.. - if (this->incoming_message_queue_.is_tail_complete () == 0) - { - return this->consolidate_message_queue (incoming, - missing_data, - rh, - max_wait_time); - } - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message\n", - this->id ())); - } - - // Calculate the actual length of the load that we are supposed to - // read which is equal to the + length of the buffer - // that we have.. - size_t payload = missing_data + incoming.size (); - - // Grow the buffer to the size of the message - ACE_CDR::grow (&incoming, - payload); + if (this->uncompleted_message_ == 0) + return; ssize_t n = 0; + size_t &missing_data = this->uncompleted_message_->missing_data_bytes_; + ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_; - // As this used for transports where things are available in one - // shot this looping should not create any problems. - for (ssize_t bytes = missing_data; bytes != 0; bytes -= n) + // Try to complete this until we error or block right here... + for (ssize_t bytes = missing_data; + bytes != 0; + bytes -= n) { // .. do a read on the socket again. - n = this->recv (incoming.wr_ptr (), + n = this->recv (mb.wr_ptr (), bytes, max_wait_time); if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message, " + "TAO (%P|%t) - Transport[%d]::handle_input_i, " "read %d bytes on attempt\n", this->id(), n)); } @@ -1395,375 +1645,168 @@ break; } - incoming.wr_ptr (n); + mb.wr_ptr (n); missing_data -= n; } - - // If we got an error.. - if (n == -1) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Trasport[%d]::consolidate_message, " - "error while trying to consolidate\n", - this->id ())); - } - - return -1; - } - - // If we had gotten a EWOULDBLOCK n would be equal to zero. But we - // have to put the message in the queue anyway. So let us proceed - // to do that and return... - - // Check to see if we have messages in queue or if we have missing - // data . AT this point we cannot have have semi-complete messages - // in the queue as they would have been taken care before. Put - // ourselves in the queue and then try processing one of the - // messages.. - if ((missing_data > 0 - ||this->incoming_message_queue_.queue_length ()) - && this->incoming_message_queue_.is_tail_fragmented () == 0) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message, " - "queueing up the message\n", - this->id ())); - } - - // Get a queued data - TAO_Queued_Data *qd = - this->make_queued_data (incoming); - - // Add the missing data to the queue - qd->missing_data_ = missing_data; - - // Get the rest of the messaging data - this->messaging_object ()->get_message_data (qd); - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - - if (this->incoming_message_queue_.is_head_complete ()) - { - return this->process_queue_head (rh); - } - - return 0; - } - - // We dont have any missing data. Just make a queued_data node with - // the existing message block and send it to the higher layers of - // the ORB. - TAO_Queued_Data pqd (&incoming, - this->orb_core_->transport_message_buffer_allocator ()); - pqd.missing_data_ = missing_data; - this->messaging_object ()->get_message_data (&pqd); - - // Check whether the message was fragmented and try to consolidate - // the fragments.. - if (pqd.more_fragments_ || - (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - // Duplicate the queued data as it is on stack.. - TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd); - - return this->consolidate_fragments (nqd, rh); - } - - // Now we have a full message in our buffer. Just go ahead and - // process that - return this->process_parsed_messages (&pqd, - rh); } -int -TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd, - TAO_Resume_Handle &rh) -{ - // If we have received a fragment message then we have to - // consolidate with the last message in queue - // @@todo: this piece of logic follows GIOP a bit... Need to revisit - // if we have protocols other than GIOP - - // @@todo: Fragments now have too much copying overhead. Need to get - // them out if we want to have some reasonable performance metrics - // in future.. Post 1.2 seems a nice time.. - if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - TAO_Queued_Data *tqd = - this->incoming_message_queue_.dequeue_tail (); - - tqd->more_fragments_ = qd->more_fragments_; - tqd->missing_data_ = qd->missing_data_; - - if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1) - { - return -1; - } - - TAO_Queued_Data::release (qd); - this->incoming_message_queue_.enqueue_tail (tqd); - this->process_queue_head (rh); - } - else - { - // if we dont have a fragment already in the queue just add it in - // the queue - this->incoming_message_queue_.enqueue_tail (qd); - } - - return 0; -} int -TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time) +TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n", - this->id ())); - } - - // If the queue did not have a complete message put this piece of - // message in the queue. We know it did not have a complete - // message. That is why we are here. - size_t n = - this->incoming_message_queue_.copy_tail (incoming); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "copied [%d] bytes to the tail\n", - this->id (), - n)); - } - - // Update the missing data... - missing_data = - this->incoming_message_queue_.missing_data_tail (); - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "missing [%d] bytes in the tail message\n", - this->id (), - missing_data)); - } - - // Move the read pointer of the message block to the end - // of the copied message and process the remaining portion... - incoming.rd_ptr (n); - - // If we have some more information left in the message block.. - if (incoming.length ()) - { - // We may have to parse & consolidate. This part of the message - // doesn't seem to be part of the last message in the queue (as - // the copy () hasn't taken away this message). - int retval = this->parse_consolidate_messages (incoming, - rh, - max_wait_time); - - // If there is an error return - if (retval == -1) - { - if (TAO_debug_level) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "error while consolidating, part of the read message\n", - this->id ())); - } - return retval; - } - else if (retval == 1) - { - // If the message in the message block has only - // one message left we need to process that seperately. - - // Get a queued data - TAO_Queued_Data *qd = this->make_queued_data (incoming); - - // Get the rest of the message data - this->messaging_object ()->get_message_data (qd); - - // Add the missing data to the queue - qd->missing_data_ = 0; - - // Check whether the message was fragmented and try to consolidate - // the fragments.. - if (qd->more_fragments_ || - (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)) - { - return this->consolidate_fragments (qd, rh); - } - - // Add it to the tail of the queue.. - this->incoming_message_queue_.enqueue_tail (qd); - - // We should surely have a message in queue now. So just - // process that. - return this->process_queue_head (rh); - } - - // parse_consolidate_messages () would have processed one of the - // messages, so we better return as we dont want to starve other - // threads. - return 0; - } - - // If we still have some missing data.. - if (missing_data > 0) - { - // Get the last message from the Queue - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_tail (); - - if (TAO_debug_level > 5) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "trying recv, again\n", - this->id ())); + // Get the GIOP version + CORBA::Octet major = queueable_message->major_version_; + CORBA::Octet minor = queueable_message->minor_version_; + CORBA::UShort whole = major << 8 | minor; + + // Set up a couple of pointers that are shared by the code + // for the different GIOP versions. + ACE_Message_Block *mb = 0; + TAO_Queued_Data *fragment_message = 0; + + switch(whole) + { + case 0x0100: // GIOP 1.0 + if (!queueable_message->more_fragments_) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + // Fragments aren't supported in 1.0. This is an error and + // we should reject it somehow. What do we do here? Do we throw + // an exception to the receiving side? Do we throw an exception + // to the sending side? + // + // At the very least, we need to log the fact that we received + // nonsense. + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "detected a fragmented GIOP 1.0 message\n"), + -1); + break; + case 0x0101: // GIOP 1.1 + // In 1.1, fragments kinda suck because they don't have they're + // own message-specific header. Therefore, we have to do the + // following: + fragment_message = + this->incoming_message_queue_.find_fragment (major, minor); + + // No fragment was found + if (fragment_message == 0) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + if (queueable_message->more_fragments_) + { + // Find the last message block in the continuation + mb = fragment_message->msg_block_; + while (mb->cont () != 0) + mb = mb->cont (); + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the GIOP header + queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN); + mb->cont (queueable_message->msg_block_); + + // Get rid of the queuable message but save the message block + queueable_message->msg_block_ = 0; + queueable_message->release (); + + // One note is that TAO_Queued_Data contains version numbers, + // but doesn't indicate the actual protocol to which those + // version numbers refer. That's not a problem, though, because + // instances of TAO_Queued_Data live in a queue, and that queue + // lives in a particular instance of a Transport, and the + // transport instance has an association with a particular + // messaging_object. The concrete messaging object embodies a + // messaging protocol, and must cover all versions of that + // protocol. Therefore, we just need to cover the bases of all + // versions of that one protocol. } - - // Try to do a read again. If we have some luck it would be - // great.. - ssize_t n = this->recv (qd->msg_block_->wr_ptr (), - missing_data, - max_wait_time); - - if (TAO_debug_level > 5) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, " - "recv retval [%d]\n", - this->id (), - n)); - } - - // Error... - if (n < 0) - { - return n; - } - - // If we get a EWOULDBLOCK ie. n==0, we should anyway put the - // message in queue before returning.. - // Move the write pointer - qd->msg_block_->wr_ptr (n); - - // Decrement the missing data - qd->missing_data_ -= n; - - // Now put the TAO_Queued_Data back in the queue - this->incoming_message_queue_.enqueue_tail (qd); - - // Any way as we have come this far and are about to return, - // just try to process a message if it is there in the queue. - if (this->incoming_message_queue_.is_head_complete ()) + else { - return this->process_queue_head (rh); - } + // There is a complete chain of fragments + fragment_message->consolidate (); - return 0; + // Go ahead and enqueue this message + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + } + break; + case 0x0102: // GIOP 1.2 + // In 1.2, we get a little more context. There's a + // FRAGMENT message-specific header, and inside that is the + // request id with which the fragment is associated. + fragment_message = + this->incoming_message_queue_.find_fragment ( + queueable_message->request_id_); + + // No fragment was found + if (fragment_message == 0) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + + if (fragment_message->major_version_ != major || + fragment_message->minor_version_ != minor) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "GIOP versions do not match " + "(%d.%d != %d.%d\n", + fragment_message->major_version_, + fragment_message->minor_version_, + major, minor), + -1); + + // Find the last message block in the continuation + mb = fragment_message->msg_block_; + while (mb->cont () != 0) + mb = mb->cont (); + + // Add the current message block to the end of the chain + // after adjusting the read pointer to skip the GIOP header + queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN + + TAO_GIOP_MESSAGE_FRAGMENT_HEADER); + mb->cont (queueable_message->msg_block_); + + // Remove our reference to the message block. At this point + // the message block of the fragment head owns it as part of a + // chain + queueable_message->msg_block_ = 0; + + if (!queueable_message->more_fragments_) + { + // This is the end of the fragments for this request + fragment_message->consolidate (); + } + + // Get rid of the queuable message + queueable_message->release (); + break; + default: + if (!queueable_message->more_fragments_) + return this->incoming_message_queue_.enqueue_tail ( + queueable_message); + // This is an unknown GIOP version + ACE_ERROR_RETURN ((LM_ERROR, + "TAO (%P|%t) - " + "TAO_Transport::enqueue_incoming_message " + "can not handle a fragmented GIOP %d.%d " + "message\n", major, minor), + -1); } - // Process a message in the head of the queue if we have one.. - return this->process_queue_head (rh); + return 0; } int -TAO_Transport::consolidate_extra_messages (ACE_Message_Block - &incoming, - TAO_Resume_Handle &rh) -{ - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n", - this->id ())); - } - - // Pick the tail of the queue - TAO_Queued_Data *tail = - this->incoming_message_queue_.dequeue_tail (); - - if (tail) - { - // If we have a node in the tail, checek to see whether it needs - // consolidation. If so, just consolidate it. - if (this->messaging_object ()->consolidate_node (tail, incoming) == -1) - { - return -1; - } - - // .. put the tail back in queue.. - this->incoming_message_queue_.enqueue_tail (tail); - } - - int retval = 1; - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, " - "extracting extra messages\n", - this->id ())); - } - - // Extract messages.. - while (retval == 1) - { - TAO_Queued_Data *q_data = 0; - - retval = - this->messaging_object ()->extract_next_message (incoming, - q_data); - if (q_data) - { - // If we have read a framented message then... - if (q_data->more_fragments_ || - q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT) - { - this->consolidate_fragments (q_data, rh); - } - else - { - this->incoming_message_queue_.enqueue_tail (q_data); - } - } - } - - // In case of error return.. - if (retval == -1) - { - return retval; - } - - return this->process_queue_head (rh); -} - -int TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh) { // Get the that we have received TAO_Pluggable_Message_Type t = qd->msg_type_; - // int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) { if (TAO_debug_level > 0) @@ -1828,57 +1871,6 @@ return 0; } -TAO_Queued_Data * -TAO_Transport::make_queued_data (ACE_Message_Block &incoming) -{ - // Get an instance of TAO_Queued_Data - TAO_Queued_Data *qd = - TAO_Queued_Data::get_queued_data ( - this->orb_core_->transport_message_buffer_allocator ()); - - // Get the flag for the details of the data block... - ACE_Message_Block::Message_Flags flg = - incoming.self_flags (); - - if (ACE_BIT_DISABLED (flg, - ACE_Message_Block::DONT_DELETE)) - { - // Duplicate the data block before putting it in the queue. - qd->msg_block_ = ACE_Message_Block::duplicate (&incoming); - } - else - { - // As we are in CORBA mode, all the data blocks would be aligned - // on an 8 byte boundary. Hence create a data block for more - // than the actual length - ACE_Data_Block *db = - this->orb_core_->create_input_cdr_data_block (incoming.length ()+ - ACE_CDR::MAX_ALIGNMENT); - - // Get the allocator.. - ACE_Allocator *alloc = - this->orb_core_->input_cdr_msgblock_allocator (); - - // Make message block.. - ACE_Message_Block mb (db, - 0, - alloc); - - // Duplicate the block.. - qd->msg_block_ = mb.duplicate (); - - // Align the message block - ACE_CDR::mb_align (qd->msg_block_); - - // Copy the data.. - qd->msg_block_->copy (incoming.rd_ptr (), - incoming.length ()); - } - - - return qd; -} - int TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) { @@ -1889,65 +1881,59 @@ this->id ())); } - // See if the message in the head of the queue is complete... - if (this->incoming_message_queue_.is_head_complete () > 0) - { - // Get the message on the head of the queue.. - TAO_Queued_Data *qd = - this->incoming_message_queue_.dequeue_head (); + if (this->incoming_message_queue_.is_head_complete () != 1) + return 1; + + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); - if (TAO_debug_level > 3) + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head, " + "the size of the queue is [%d]\n", + this->id (), + this->incoming_message_queue_.queue_length())); + } + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.queue_length () > 0) + { + if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "the size of the queue is [%d]\n", - this->id (), - this->incoming_message_queue_.queue_length())); - } - // Now that we have pulled out out one message out of the queue, - // check whether we have one more message in the queue... - if (this->incoming_message_queue_.is_head_complete () > 0) - { - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::process_queue_head, " - "notify reactor\n", - this->id ())); - - } - int retval = - this->notify_reactor (); + "notify reactor\n", + this->id ())); - if (retval == 1) - { - // Let the class know that it doesn't need to resume the - // handle.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); - } - else if (retval < 0) - return -1; - } - else - { - // As we are ready to process the last message just resume - // the handle. Set the flag incase someone had reset the flag.. - rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); } + int retval = + this->notify_reactor (); - // Process the message... - if (this->process_parsed_messages (qd, rh) == -1) + if (retval == 1) { - return -1; + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); } + else if (retval < 0) + return -1; + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + } - // Delete the Queued_Data.. - TAO_Queued_Data::release (qd); + // Process the message... + int retval = this->process_parsed_messages (qd, rh); - return 0; - } + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); - return 1; + return (retval == -1) ? -1 : 0; } int @@ -1959,6 +1945,8 @@ } ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; // Get the reactor associated with the event handler ACE_Reactor *reactor = this->orb_core ()->reactor (); @@ -1993,6 +1981,18 @@ TAO_Transport::transport_cache_manager (void) { return this->orb_core_->lane_resources ().transport_cache (); +} + +size_t +TAO_Transport::recv_buffer_size (void) +{ + return this->recv_buffer_size_; +} + +size_t +TAO_Transport::sent_byte_count (void) +{ + return this->sent_byte_count_; } void Index: tao/Transport.h =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Transport.h,v retrieving revision 1.55 retrieving revision 1.55.2.1 diff -u -r1.55 -r1.55.2.1 --- tao/Transport.h 14 Dec 2003 16:03:49 -0000 1.55 +++ tao/Transport.h 15 Dec 2003 22:31:47 -0000 1.55.2.1 @@ -447,7 +447,6 @@ virtual ACE_Event_Handler * event_handler_i (void) = 0; protected: - virtual TAO_Connection_Handler * connection_handler_i (void) = 0; public: @@ -489,6 +488,7 @@ virtual int handle_input (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time = 0, int block = 0); + void try_to_complete (ACE_Time_Value *max_wait_time); enum { @@ -568,60 +568,11 @@ protected: - /// Called by the handle_input_i (). This method is used to parse - /// message read by the handle_input_i () call. It also decides - /// whether the message needs consolidation before processing. - int parse_consolidate_messages (ACE_Message_Block &bl, - TAO_Resume_Handle &rh, - ACE_Time_Value *time = 0); - - - /// Method does parsing of the message if we have a fresh message in - /// the or just returns if we have read part of the - /// previously stored message. - int parse_incoming_messages (ACE_Message_Block &message_block); - - /// Return if we have any missing data in the queue of messages - /// or determine if we have more information left out in the - /// presently read message to make it complete. - size_t missing_data (ACE_Message_Block &message_block); - - /// Consolidate the currently read message or consolidate the last - /// message in the queue. The consolidation of the last message in - /// the queue is done by calling consolidate_message_queue (). - virtual int consolidate_message (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time); - - /// @@Bala: Docu??? - int consolidate_fragments (TAO_Queued_Data *qd, - TAO_Resume_Handle &rh); - - /// First consolidate the message queue. If the message is still not - /// complete, try to read from the handle again to make it - /// complete. If these dont help put the message back in the queue - /// and try to check the queue if we have message to process. (the - /// thread needs to do some work anyway :-)) - int consolidate_message_queue (ACE_Message_Block &incoming, - ssize_t missing_data, - TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time); - - /// Called by parse_consolidate_message () if we have more messages - /// in one read. Queue up the messages and try to process one of - /// them, atleast at the head of them. - int consolidate_extra_messages (ACE_Message_Block &incoming, - TAO_Resume_Handle &rh); - /// Process the message by sending it to the higher layers of the /// ORB. int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); - /// Make a queued data from the message block - TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming); - /// Implement send_message_shared() assuming the handler_lock_ is /// held. int send_message_shared_i (TAO_Stub *stub, @@ -669,6 +620,40 @@ int handle_timeout (const ACE_Time_Value ¤t_time, const void* act); + /// Accessor to recv_buffer_size_ + size_t recv_buffer_size (void); + + /// Accessor to sent_byte_count_ + size_t sent_byte_count (void); + + + /*! + \name Incoming Queue Methods + */ + //@{ + /*! + \brief Queue up \a queueable_message as a completely-received incoming message. + + This method queues up a completely-received queueable GIOP message + (i.e., it must be dynamically-allocated). It does not assemble a + complete GIOP message; that should be done prior to calling this + message, and is currently done in handle_input_i. + + This does, however, assure that a completely-received GIOP + FRAGMENT gets associated with any previously-received related + fragments. It does this through collaboration with the messaging + object (since fragment reassembly is protocol specific). + + \param queueable_message instance as returned by one of the TAO_Queued_Data::make_*_message that's been completely received + + \return 0 successfully enqueued \a queueable_message + + \return -1 failed to enqueue \a queueable_message + \todo How do we indicate \em what may have failed? + */ + int enqueue_incoming_message (TAO_Queued_Data *queueable_message); + //@} + /// CodeSet Negotiation - Get the char codeset translator factory /// TAO_Codeset_Translator_Factory *char_translator (void) const; @@ -792,10 +777,14 @@ /// Print out error messages if the event handler is not valid void report_invalid_event_handler (const char *caller); - /* + /** * Process the message that is in the head of the incoming queue. * If there are more messages in the queue, this method calls - * this->notify_reactor () to wake up a thread + * this->notify_reactor () to wake up a thread. + * + * \return -1 An error occurred; occurs independent presence of messages in the queue. + * \return 1 No messages in the queue to process; nothing processed. + * \return 0 Messages were in the queue to process and one got processed. */ int process_queue_head (TAO_Resume_Handle &rh); @@ -856,9 +845,12 @@ TAO_Queued_Message *head_; TAO_Queued_Message *tail_; - /// Queue of the incoming messages.. + /// Queue of the completely-received incoming messages.. TAO_Incoming_Message_Queue incoming_message_queue_; + /// Place to hold a partially-received (waiting-to-be-completed) message + TAO_Queued_Data * uncompleted_message_; + /// The queue will start draining no later than /// *if* the deadline is ACE_Time_Value current_deadline_; @@ -893,8 +885,11 @@ /// Used by the LRU, LFU and FIFO Connection Purging Strategies. unsigned long purging_order_; -private: + /// Size of the buffer received. + size_t recv_buffer_size_; + /// Number of bytes sent. + size_t sent_byte_count_; /// @@Phil, I think it would be nice if we could think of a way to /// do the following. /// We have been trying to use the transport for marking about @@ -907,6 +902,7 @@ /// we can move this to the connection_handler and it may more sense /// with the DSCP stuff around there. Do you agree? +private: /// Additional member values required to support codeset translation TAO_Codeset_Translator_Factory *char_translator_; TAO_Codeset_Translator_Factory *wchar_translator_; Index: tao/Strategies/DIOP_Transport.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Strategies/DIOP_Transport.cpp,v retrieving revision 1.18 retrieving revision 1.18.2.1 diff -u -r1.18 -r1.18.2.1 --- tao/Strategies/DIOP_Transport.cpp 14 Dec 2003 16:03:49 -0000 1.18 +++ tao/Strategies/DIOP_Transport.cpp 15 Dec 2003 22:31:47 -0000 1.18.2.1 @@ -88,12 +88,19 @@ for (int i = 0; i < iovcnt; i++) bytes_to_send += iov[i].iov_len; - this->connection_handler_->dgram ().send (iov, - iovcnt, - addr); + ssize_t n = this->connection_handler_->dgram ().send (iov, + iovcnt, + addr); // @@ Michael: // Always return a positive number of bytes sent, as we do // not handle sending errors in DIOP. + if (n == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO: (%P|%t|%N|%l) Send of %d bytes failed %p\n"), + bytes_to_send, + ACE_TEXT ("send_i ()\n"))); + } bytes_transferred = bytes_to_send; @@ -191,7 +198,7 @@ // Read the message into the message block that we have created on // the stack. - ssize_t n = this->recv (message_block.rd_ptr (), + ssize_t n = this->recv (message_block.wr_ptr (), message_block.space (), max_wait_time); @@ -199,6 +206,7 @@ if (n <= 0) { if (n == -1) + // @@ Why not send_connection_closed_notifications() ? this->tms_->connection_closed (); return n; @@ -207,23 +215,43 @@ // Set the write pointer in the stack buffer message_block.wr_ptr (n); - // Parse the incoming message for validity. The check needs to be + // Check the incoming message for validity. The check needs to be // performed by the messaging objects. - if (this->parse_incoming_messages (message_block) == -1) - return -1; + if (this->messaging_object ()->check_for_valid_header (message_block) == 0) + { + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO: (%P|%t|%N|%l) failed to find a valid header on transport %d after fault %p\n"), + this->id (), + ACE_TEXT ("handle_input_i ()\n"))); + } + + return -1; + } // NOTE: We are not performing any queueing nor any checking for - // missing data. We are assuming that ALL the data would be got in a + // missing data. We are assuming that ALL the data arrives in a // single read. // Make a node of the message block.. - TAO_Queued_Data qd (&message_block); - - // Extract the data for the node.. - this->messaging_object ()->get_message_data (&qd); - - // Process the message - return this->process_parsed_messages (&qd, rh); + // + // We could make this more efficient by having a fixed Queued Data + // allocator, i.e., it always gave back the same thing. Actually, + // we *could* create an allocator that took a stack-allocated object + // as an argument and returned that when asked an allocation is + // done. Something to contemplate... + TAO_Queued_Data* qd = + TAO_Queued_Data::make_completed_message (message_block, + *this->messaging_object ()); + int retval = -1; + if (qd) + { + // Process the message + retval = this->process_parsed_messages (qd, rh); + TAO_Queued_Data::release (qd); + } + return retval; } @@ -309,5 +337,176 @@ minor); return 1; } + +// @@ Frank: Hopefully DIOP doesn't need this +/* +int +TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) +{ + CORBA::Boolean byte_order; + if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0) + return -1; + + cdr.reset_byte_order (ACE_static_cast(int,byte_order)); + + DIOP::ListenPointList listen_list; + if ((cdr >> listen_list) == 0) + return -1; + + // As we have received a bidirectional information, set the flag to + // 1 + this->bidirectional_flag (1); + return this->connection_handler_->process_listen_point_list (listen_list); +} +*/ + + + +// @@ Frank: Hopefully DIOP doesn't need this +/* +void +TAO_DIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) +{ + + // Get a handle on to the acceptor registry + TAO_Acceptor_Registry * ar = + this->orb_core ()->acceptor_registry (); + + + // Get the first acceptor in the registry + TAO_AcceptorSetIterator acceptor = ar->begin (); + + DIOP::ListenPointList listen_point_list; + + for (; + acceptor != ar->end (); + acceptor++) + { + // Check whether it is a DIOP acceptor + if ((*acceptor)->tag () == TAO_TAG_UDP_PROFILE) + { + this->get_listen_point (listen_point_list, + *acceptor); + } + } + + // We have the ListenPointList at this point. Create a output CDR + // stream at this point + TAO_OutputCDR cdr; + + // Marshall the information into the stream + if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)== 0) + || (cdr << listen_point_list) == 0) + return; + + // Add this info in to the svc_list + opdetails.service_context ().set_context (IOP::BI_DIR_DIOP, + cdr); + + return; +} + + +int +TAO_DIOP_Transport::get_listen_point ( + DIOP::ListenPointList &listen_point_list, + TAO_Acceptor *acceptor) +{ + TAO_DIOP_Acceptor *iiop_acceptor = + ACE_dynamic_cast (TAO_DIOP_Acceptor *, + acceptor ); + + // Get the array of endpoints serviced by + const ACE_INET_Addr *endpoint_addr = + iiop_acceptor->endpoints (); + + // Get the count + size_t count = + iiop_acceptor->endpoint_count (); + + // Get the local address of the connection + ACE_INET_Addr local_addr; + + if (this->connection_handler_->peer ().get_local_addr (local_addr) + == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Could not resolve local host") + ACE_TEXT (" address in set_bidir_context_info () \n")), + -1); + } + + + // Note: Looks like there is no point in sending the list of + // endpoints on interfaces on which this connection has not + // been established. If this is wrong, please correct me. + char *local_interface = 0; + + // Get the hostname for the local address + if (iiop_acceptor->hostname (this->orb_core_, + local_addr, + local_interface) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Could not resolve local host") + ACE_TEXT (" name \n")), + -1); + } + + ACE_INET_Addr *tmp_addr = ACE_const_cast (ACE_INET_Addr *, + endpoint_addr); + + for (size_t index = 0; + index <= count; + index++) + { + // Get the listen point on that acceptor if it has the same + // interface on which this connection is established + char *acceptor_interface = 0; + + if (iiop_acceptor->hostname (this->orb_core_, + tmp_addr[index], + acceptor_interface) == -1) + continue; + + // @@ This is very bad for performance, but it is a one time + // affair + if (ACE_OS::strcmp (local_interface, + acceptor_interface) == 0) + { + // We have the connection and the acceptor endpoint on the + // same interface + DIOP::ListenPoint point; + point.host = CORBA::string_dup (local_interface); + point.port = endpoint_addr[index].get_port_number (); + + // Get the count of the number of elements + CORBA::ULong len = listen_point_list.length (); + + // Increase the length by 1 + listen_point_list.length (len + 1); + + // Add the new length to the list + listen_point_list[len] = point; + } + + // @@ This is bad.... + CORBA::string_free (acceptor_interface); + } + + CORBA::string_free (local_interface); + return 1; +} +*/ + +#if 0 +TAO_Connection_Handler * +TAO_DIOP_Transport::invalidate_event_handler_i (void) +{ + TAO_Connection_Handler * eh = this->connection_handler_; + this->connection_handler_ = 0; + return eh; +} +#endif #endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */ Index: tao/Strategies/SHMIOP_Transport.cpp =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Strategies/SHMIOP_Transport.cpp,v retrieving revision 1.35 retrieving revision 1.35.2.1 diff -u -r1.35 -r1.35.2.1 --- tao/Strategies/SHMIOP_Transport.cpp 14 Dec 2003 16:03:49 -0000 1.35 +++ tao/Strategies/SHMIOP_Transport.cpp 15 Dec 2003 22:31:47 -0000 1.35.2.1 @@ -136,6 +136,7 @@ } +#if 0 int TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, ssize_t missing_data, @@ -191,6 +192,7 @@ // process that return this->process_parsed_messages (&pqd, rh); } +#endif int TAO_SHMIOP_Transport::send_request (TAO_Stub *stub, Index: tao/Strategies/SHMIOP_Transport.h =================================================================== RCS file: /project/cvs-repository/ACE_wrappers-repository/TAO/tao/Strategies/SHMIOP_Transport.h,v retrieving revision 1.24 retrieving revision 1.24.2.1 diff -u -r1.24 -r1.24.2.1 --- tao/Strategies/SHMIOP_Transport.h 14 Dec 2003 16:03:49 -0000 1.24 +++ tao/Strategies/SHMIOP_Transport.h 15 Dec 2003 22:31:47 -0000 1.24.2.1 @@ -82,10 +82,14 @@ size_t len, const ACE_Time_Value *s = 0); +#if 0 + // This no longer exists with the PMB-change flow. Not sure how to deal with that, + // so for now we ditch the method and see if things work. virtual int consolidate_message (ACE_Message_Block &incoming, ssize_t missing_data, TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time); +#endif //@}