Sophie

Sophie

distrib > Mandriva > 10.0-com > i586 > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 1447

libace5-doc-5.4-2mdk.i586.rpm

// server.cpp,v 4.20 2003/11/01 11:15:23 dhinton Exp

// server.cpp (written by Tim Harrison)

// Listens to multicast address for client log messages.  Prints
// Mbits/sec received from client.

#include "ace/OS_main.h"
#include "ace/SOCK_Dgram.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Dgram_Mcast.h"
#include "ace/Reactor.h"
#include "ace/Log_Msg.h"
#include "Log_Wrapper.h"
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_unistd.h"
#include "ace/os_include/os_netdb.h"

ACE_RCSID(Multicast, server, "server.cpp,v 4.20 2003/11/01 11:15:23 dhinton Exp")

#if defined (ACE_HAS_IP_MULTICAST)
class Server_Events : public ACE_Event_Handler
{
public:
  Server_Events (u_short port,
                 const char *mcast_addr,
                 long time_interval = 0);
  ~Server_Events (void);

  virtual int handle_input (ACE_HANDLE fd);
  virtual int handle_timeout (const ACE_Time_Value &tv,
                              const void *arg);

  virtual ACE_HANDLE get_handle (void) const;

  ACE_Time_Value *wait_time (void);

private:
  char *message_;
  Log_Wrapper::Log_Record *log_record_;
  char buf_[4 * BUFSIZ];
  char hostname_[MAXHOSTNAMELEN];

  int initialized_;
  int count_;
  int interval_;
  // time interval to log messages

  ACE_Time_Value *how_long_;
  ACE_Reactor *reactor_;
  ACE_SOCK_Dgram_Mcast mcast_dgram_;
  ACE_INET_Addr remote_addr_;
  ACE_INET_Addr mcast_addr_;

  // = statistics on messages received
  double total_bytes_received_;
  int total_messages_received_;
  int last_sequence_number_;
};

static const char MCAST_ADDR[] = ACE_DEFAULT_MULTICAST_ADDR;
static const int UDP_PORT = ACE_DEFAULT_MULTICAST_PORT;
static const int DURATION = 5;

ACE_HANDLE
Server_Events::get_handle (void) const
{
  return this->mcast_dgram_.get_handle ();
}

ACE_Time_Value *
Server_Events::wait_time (void)
{
  return this->how_long_;
}

Server_Events::Server_Events (u_short port,
                              const char *mcast_addr,
                              long time_interval)
  : initialized_ (0),
    count_ (1),
    interval_ (time_interval),
    mcast_addr_ (port, mcast_addr),
    total_bytes_received_ (0)
{
  // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group.

  if (ACE_OS::hostname (this->hostname_,
                        MAXHOSTNAMELEN) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n",
                "hostname"));

  else if (this->mcast_dgram_.subscribe (this->mcast_addr_) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n",
                "subscribe"));
  else
    {
      // Point to NULL so that we block in the beginning.
      this->how_long_ = 0;

      this->log_record_ = (Log_Wrapper::Log_Record *) &buf_;
      this->message_ = &buf_[sizeof (Log_Wrapper::Log_Record)];
    }
}

// A destructor that emacs refuses to color blue ;-)

Server_Events::~Server_Events (void)
{
  this->mcast_dgram_.unsubscribe ();

  ACE_DEBUG ((LM_DEBUG,
              "total bytes received = %d after %d second\n",
              this->total_bytes_received_,
              this->interval_));

  ACE_DEBUG ((LM_DEBUG,
              "Mbits/sec = %.2f\n",
              (float) (total_bytes_received_ * 8 / (float) (1024*1024*interval_))));

  ACE_DEBUG ((LM_DEBUG,
              "last sequence number = %d\ntotal messages received = %d\ndiff = %d\n",
              this->last_sequence_number_,
              this->total_messages_received_,
              this->last_sequence_number_ - total_messages_received_));
}

int
Server_Events::handle_timeout (const ACE_Time_Value &,
                               const void *arg)
{
  ACE_DEBUG ((LM_DEBUG, "\t%d timeout%s occurred for %s.\n",
              this->count_,
              this->count_ == 1 ? "" : "s",
              (char *) arg));

  // Don't let the timeouts continue if there's no activity since
  // otherwise we use up a lot of CPU time unnecessarily.
  if (this->count_ == 5)
    {
      reactor ()->cancel_timer (this);
      this->initialized_ = 0;

      ACE_DEBUG ((LM_DEBUG,
                  "\tcancelled timeout for %s to avoid busy waiting.\n",
                  (char *) arg));
    }

  this->count_++;
  return 0;
}

int
Server_Events::handle_input (ACE_HANDLE)
{
  // Receive message from multicast group.
  iovec iovp[2];
  iovp[0].iov_base = buf_;
  iovp[0].iov_len  = sizeof (log_record_);
  iovp[1].iov_base = &buf_[sizeof (log_record_)];
  iovp[1].iov_len  = 4 * BUFSIZ - sizeof (log_record_);

  ssize_t retcode =
    this->mcast_dgram_.recv (iovp,
                             2,
                             this->remote_addr_);
  if (retcode != -1)
    {
      total_messages_received_++;
      total_bytes_received_ += retcode;
      last_sequence_number_ =
        ntohl (log_record_->sequence_number);

      for (char *message_end = this->message_ + ACE_OS::strlen (this->message_) - 1;
           ACE_OS::strchr ("\r\n \t", *message_end) != 0;
           )
        {
          *message_end-- = '\0';
          if (message_end == this->message_)
            break;
        }

      ACE_DEBUG ((LM_DEBUG,
                  "sequence number = %d\n",
                  last_sequence_number_));
      ACE_DEBUG ((LM_DEBUG,
                  "message = '%s'\n",
                  this->message_));

      if (this->initialized_ == 0)
        {
          // Restart the timer since we've received events again.
          if (reactor()->schedule_timer (this,
                                         (void *) this->hostname_,
                                         ACE_Time_Value::zero,
                                         ACE_Time_Value (DURATION)) == -1)
            ACE_ERROR_RETURN ((LM_ERROR,
                               "%p\n",
                               "schedule_timer"),
                              -1);
          this->initialized_ = 1;
        }

      this->count_ = 1;
      return 0;
    }
  else
    return -1;
}

int
ACE_TMAIN (int, ACE_TCHAR *[])
{
  // Instantiate a server which will receive messages for DURATION
  // seconds.
  Server_Events server_events (UDP_PORT,
                               MCAST_ADDR,
                               DURATION);
  // Instance of the ACE_Reactor.
  ACE_Reactor reactor;

  if (reactor.register_handler (&server_events,
                                ACE_Event_Handler::READ_MASK) == -1)
    ACE_ERROR ((LM_ERROR,
                "%p\n%a",
                "register_handler",
                1));

  ACE_DEBUG ((LM_DEBUG,
              "starting up server\n"));

  for (;;)
    reactor.handle_events (server_events.wait_time ());

  ACE_NOTREACHED (return 0;)
}
#else
int
main (int, char *argv[])
{
  ACE_ERROR_RETURN ((LM_ERROR,
                     "error: %s must be run on a platform that support IP multicast\n",
                     argv[0]), -1);
}
#endif /* ACE_HAS_IP_MULTICAST */