Sophie

Sophie

distrib > Mandriva > 10.0 > i586 > media > contrib > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 752

libace5-doc-5.4-2mdk.i586.rpm

// Peer_Router.cpp,v 4.32 2000/10/23 13:39:18 schmidt Exp

#if !defined (_PEER_ROUTER_C)
#define _PEER_ROUTER_C

#include "ace/Service_Config.h"
#include "ace/Get_Opt.h"
#include "Options.h"
#include "Peer_Router.h"

ACE_RCSID(Event_Server, Peer_Router, "Peer_Router.cpp,v 4.32 2000/10/23 13:39:18 schmidt Exp")

// Send the <ACE_Message_Block> to all the peers.  Note that in a
// "real" application this logic would most likely be more selective,
// i.e., it would actually do "routing" based on addressing
// information passed in the <ACE_Message_Block>.

int
Peer_Router_Context::send_peers (ACE_Message_Block *mb)
{
  PEER_ITERATOR map_iter = this->peer_map_;
  int bytes = 0;
  int iterations = 0;

  // Skip past the header and get the message to send.
  ACE_Message_Block *data_block = mb->cont ();

  // Use an iterator to "multicast" the data to *all* the registered
  // peers.  Note that this doesn't really multicast, it just makes a
  // "logical" copy of the <ACE_Message_Block> and enqueues it in the
  // appropriate <Peer_Handler> corresponding to each peer.  Note that
  // a "real" application would probably "route" the data to a subset
  // of connected peers here, rather than send it to all the peers.

  for (PEER_ENTRY *ss = 0;
       map_iter.next (ss) != 0;
       map_iter.advance ())
    {
      if (Options::instance ()->debug ())
	ACE_DEBUG ((LM_DEBUG,
		    "(%t) sending to peer via handle %d\n",
		    ss->ext_id_));

      iterations++;

      // Increment reference count before sending since the
      // <Peer_Handler> might be running in its own thread of control.
      bytes += ss->int_id_->put (data_block->duplicate ());
    }

  mb->release ();
  return bytes == 0 ? 0 : bytes / iterations;
}

// Remove the <Peer_Handler> from the peer connection map.

int
Peer_Router_Context::unbind_peer (ROUTING_KEY key)
{
  return this->peer_map_.unbind (key);
}

// Add the <Peer_Handler> to the peer connection map.

int
Peer_Router_Context::bind_peer (ROUTING_KEY key,
				Peer_Handler *peer_handler)
{
  return this->peer_map_.bind (key, peer_handler);
}

void
Peer_Router_Context::duplicate (void)
{
  this->reference_count_++;
}

void
Peer_Router_Context::release (void)
{
  ACE_ASSERT (this->reference_count_ > 0);
  this->reference_count_--;

  if (this->reference_count_ == 0)
    delete this;
}

Peer_Router_Context::Peer_Router_Context (u_short port)
  : reference_count_ (0)
{
  // Initialize the Acceptor's "listen-mode" socket.
  ACE_INET_Addr endpoint (port);
  if (this->open (endpoint) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n",
                "Acceptor::open"));

  // Initialize the connection map.
  else if (this->peer_map_.open () == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n",
                "Map_Manager::open"));
  else
    {
      ACE_INET_Addr addr;

      if (this->acceptor ().get_local_addr (addr) != -1)
	ACE_DEBUG ((LM_DEBUG,
		    "(%t) initializing %s on port = %d, handle = %d, this = %u\n",
		    addr.get_port_number () == Options::instance ()->supplier_port () 
                    ? "Supplier_Handler" : "Consumer_Handler",
		    addr.get_port_number (),
		    this->acceptor().get_handle (),
		    this));
      else
	ACE_ERROR ((LM_ERROR,
		    "%p\n",
                    "get_local_addr"));
    }
}

Peer_Router_Context::~Peer_Router_Context (void)
{
  // Free up the handle and close down the listening socket.
  ACE_DEBUG ((LM_DEBUG,
	      "(%t) closing down Peer_Router_Context\n"));

  // Close down the Acceptor and take ourselves out of the Reactor.
  this->handle_close ();

  PEER_ITERATOR map_iter = this->peer_map_;

  // Make sure to take all the handles out of the map to avoid
  // "resource leaks."

  for (PEER_ENTRY *ss = 0;
       map_iter.next (ss) != 0;
       map_iter.advance ())
    {
      if (Options::instance ()->debug ())
	ACE_DEBUG ((LM_DEBUG,
		    "(%t) closing down peer on handle %d\n",
		    ss->ext_id_));

      if (ACE_Reactor::instance ()->remove_handler
	  (ss->ext_id_,
           ACE_Event_Handler::READ_MASK) == -1)
	ACE_ERROR ((LM_ERROR,
                    "(%t) p\n",
                    "remove_handle"));
    }

  // Close down the map.
  this->peer_map_.close ();
}

Peer_Router *
Peer_Router_Context::peer_router (void)
{
  return this->peer_router_;
}

void
Peer_Router_Context::peer_router (Peer_Router *pr)
{
  this->peer_router_ = pr;
}

// Factory Method that creates a new <Peer_Handler> for each
// connection.

int
Peer_Router_Context::make_svc_handler (Peer_Handler *&sh)
{
  ACE_NEW_RETURN (sh,
                  Peer_Handler (this),
                  -1);
  return 0;
}

Peer_Handler::Peer_Handler (Peer_Router_Context *prc)
  : peer_router_context_ (prc)
{
}

// Send output to a peer.  Note that this implementation "blocks" if
// flow control occurs.  This is undesirable for "real" applications.
// The best way around this is to make the <Peer_Handler> an Active
// Object, e.g., as done in the $ACE_ROOT/apps/Gateway/Gateway
// application.

int
Peer_Handler::put (ACE_Message_Block *mb,
                   ACE_Time_Value *tv)
{
#if 0
  // If we're running as Active Objects just enqueue the message here.
  return this->putq (mb, tv);
#else
  ACE_UNUSED_ARG (tv);

  int result = this->peer ().send_n (mb->rd_ptr (),
                                     mb->length ());
  // Release the memory.
  mb->release ();

  return result;
#endif /* 0 */
}

// Initialize a newly connected handler.

int
Peer_Handler::open (void *)
{
  char buf[BUFSIZ], *p = buf;

  if (this->peer_router_context_->peer_router ()->info (&p,
                                        sizeof buf) != -1)
    ACE_DEBUG ((LM_DEBUG,
                "(%t) creating handler for %s, handle = %d\n",
                buf,
                this->get_handle ()));
  else
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "info"),
                      -1);
#if 0
  // If we're running as an Active Object activate the Peer_Handler
  // here.
  if (this->activate (Options::instance ()->t_flags ()) == -1)
     ACE_ERROR_RETURN ((LM_ERROR,
                        "%p\n",
                        "activation of thread failed"),
                       -1);
  ACE_DEBUG ((LM_DEBUG,
	      "(%t) Peer_Handler::open registering with Reactor for handle_input\n"));
#else

  // Register with the Reactor to receive messages from our Peer.
  if (ACE_Reactor::instance ()->register_handler
      (this, ACE_Event_Handler::READ_MASK) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "register_handler"),
                      -1);
#endif /* 0 */

  // Insert outselves into the routing map.
  else if (this->peer_router_context_->bind_peer (this->get_handle (),
                                  this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, 
                       "%p\n",
                       "bind_peer"),
                      -1);
  else
    return 0;
}

// Receive a message from a Peer.

int
Peer_Handler::handle_input (ACE_HANDLE h)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%t) input arrived on handle %d\n",
              h));

  ACE_Message_Block *db;

  ACE_NEW_RETURN (db, ACE_Message_Block (BUFSIZ), -1);

  ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY),
						 ACE_Message_Block::MB_PROTO, db);
  // Check for memory failures.
  if (hb == 0)
    {
      db->release ();
      errno = ENOMEM;
      return -1;
    }

  ssize_t n = this->peer ().recv (db->rd_ptr (),
                                  db->size ());

  if (n == -1)
    ACE_ERROR_RETURN ((LM_ERROR, 
                       "%p",
                       "recv failed"),
                      -1);
  else if (n == 0) // Client has closed down the connection.
    {
      if (this->peer_router_context_->unbind_peer (this->get_handle ()) == -1)
	ACE_ERROR_RETURN ((LM_ERROR,
                           "%p",
                           "unbind failed"),
                          -1);

      ACE_DEBUG ((LM_DEBUG,
                  "(%t) shutting down handle %d\n", h));
      // Instruct the <ACE_Reactor> to deregister us by returning -1.
      return -1; 
    }
  else
    {
      // Transform incoming buffer into an <ACE_Message_Block>.

      // First, increment the write pointer to the end of the newly
      // read data block.
      db->wr_ptr (n);

      // Second, copy the "address" into the header block.  Note that
      // for this implementation the HANDLE we receive the message on
      // is considered the "address."  A "real" application would want
      // to do something more sophisticated.
      *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle ();

      // Third, update the write pointer in the header block.
      hb->wr_ptr (sizeof (ACE_HANDLE));

      // Finally, pass the message through the stream.  Note that we
      // use <Task::put> here because this gives the method at *our*
      // level in the stream a chance to do something with the message
      // before it is sent up the other side.  For instance, if we
      // receive messages in the <Supplier_Router>, it will just call
      // <put_next> and send them up the stream to the
      // <Consumer_Router> (which broadcasts them to consumers).
      // However, if we receive messages in the <Consumer_Router>, it
      // could reply to the Consumer with an error since it's not
      // correct for Consumers to send messages (we don't do this in
      // the current implementation, but it could be done in a "real"
      // application).

      if (this->peer_router_context_->peer_router ()->put (hb) == -1)
        return -1;
      else
        return 0;
    }
}

Peer_Router::Peer_Router (Peer_Router_Context *prc)
  : peer_router_context_ (prc)
{
}

Peer_Router_Context *
Peer_Router::context (void) const
{
  return this->peer_router_context_;
}

int
Peer_Router::control (ACE_Message_Block *mb)
{
  ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
  ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;

  switch (command = ioc->cmd ())
    {
    case ACE_IO_Cntl_Msg::SET_LWM:
    case ACE_IO_Cntl_Msg::SET_HWM:
      this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ());
      break;
    default:
      return -1;
    }
  return 0;
}

#if 0

// Right now, Peer_Handlers are purely Reactive, i.e., they all run in
// a single thread of control.  It would be easy to make them Active
// Objects by calling activate() in Peer_Handler::open(), making
// Peer_Handler::put() enqueue each message on the message queue, and
// (3) then running the following svc() routine to route each message
// to its final destination within a separate thread.  Note that we'd
// want to move the svc() call up to the Consumer_Router and
// Supplier_Router level in order to get the right level of control
// for input and output.

Peer_Handler::svc (void)
{
  ACE_Message_Block *db, *hb;

  // Do an endless loop
  for (;;)
    {
      db = new Message_Block (BUFSIZ);
      hb = new Message_Block (sizeof (ROUTING_KEY),
                              Message_Block::MB_PROTO,
                              db);

      ssize_t n = this->peer_.recv (db->rd_ptr (), db->size ());

      if (n == -1)
	LM_ERROR_RETURN ((LOG_ERROR,
                          "%p",
                          "recv failed"),
                         -1);
      else if (n == 0) // Client has closed down the connection.
	{
	  if (this->peer_router_context_->peer_router ()->unbind_peer (this->get_handle ()) == -1)
	    LM_ERROR_RETURN ((LOG_ERROR,
                              "%p",
                              "unbind failed"),
                             -1);
	  LM_DEBUG ((LOG_DEBUG,
                     "(%t) shutting down \n"));

          // We do not need to be deregistered by reactor
	  // as we were not registered at all.
	  return -1; 
	}
      else
	{
          // Transform incoming buffer into a Message.
	  db->wr_ptr (n);
	  *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
	  hb->wr_ptr (sizeof (long));

          // Pass the message to the stream.
	  if (this->peer_router_context_->peer_router ()->reply (hb) == -1)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "(%t) %p\n",
                               "Peer_Handler.svc : peer_router->reply failed"),
                              -1);
	}
    }
  return 0;
}
#endif /* 0 */
#endif /* _PEER_ROUTER_C */

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>;
template class ACE_Map_Entry<ROUTING_KEY, Peer_Handler *>;
template class ACE_Map_Iterator_Base<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>;
template class ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>;
template class ACE_Map_Reverse_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>;
template class ACE_Map_Manager<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>;
template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
#pragma instantiate ACE_Map_Entry<ROUTING_KEY, Peer_Handler *>
#pragma instantiate ACE_Map_Iterator_Base<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
#pragma instantiate ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
#pragma instantiate ACE_Map_Reverse_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
#pragma instantiate ACE_Map_Manager<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */