Sophie

Sophie

distrib > Mandriva > 10.0-com > i586 > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 676

libace5-doc-5.4-2mdk.i586.rpm

// == == == == == == == == == == == == == == == == == == == == == == ==
// TP_Reactor.cpp,v 1.2 2004/01/05 22:57:06 shuston Exp
// Stolen from $ACE_ROOT/tests/Thread_Pool_Reactor_Test.cpp
// Thread_Pool_Reactor_Test.cpp, v 1.29 2001/03/20 01:07:21 irfan Exp
// = AUTHOR
//      Irfan Pyarali <irfan@cs.wustl.edu> and
//      Nanbor Wang <nanbor@cs.wustl.edu>
// == == == == == == == == == == == == == == == == == == == == == == ==
#include "ace/OS_NS_string.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Acceptor.h"
#include "ace/Thread_Manager.h"
#include "ace/TP_Reactor.h"

#include "Request_Handler.h"

// Accepting end point.  This is actually "localhost:10010", but some
// platform couldn't resolve the name so we use the IP address
// directly here.
static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010");

// Total number of server threads.
static size_t svr_thrno = 5;

// Total number of client threads.
static size_t cli_runs = 2;

// Total connection attemps of a client thread.
static size_t cli_conn_no = 2;

// Total requests a client thread sends.
static size_t cli_req_no = 5;

// Delay before a thread sending the next request (in msec.)
static int req_delay = 50;


typedef ACE_Strategy_Acceptor <Request_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR;


Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr)
    : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr),
    nr_msgs_rcvd_(0)
{
  this->reactor (ACE_Reactor::instance ());
}

int
Request_Handler::handle_input (ACE_HANDLE fd)
{
  ACE_TCHAR buffer[BUFSIZ];
  ACE_TCHAR len = 0;
  ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR));

  if (result > 0
      && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR))
      == ACE_static_cast (ssize_t, len * sizeof (ACE_TCHAR)))
    {
      ++this->nr_msgs_rcvd_;

      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("(%t) svr input; fd: 0x%x; input: %s\n"),
                  fd,
                  buffer));
      if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0)
        ACE_Reactor::end_event_loop ();
      return 0;
    }
  else
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Request_Handler: 0x%x peer closed (0x%x)\n"),
                this, fd));
  return -1;
}

int
Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) svr close; fd: 0x%x, rcvd %d msgs\n"),
              fd,
              this->nr_msgs_rcvd_));
  
  if (this->nr_msgs_rcvd_ != cli_req_no)
    ACE_ERROR((LM_ERROR,
               ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"),
               this,
               cli_req_no,
               this->nr_msgs_rcvd_));
  
  this->destroy ();
  return 0;
}

// Listing 2 code/ch16
static int
reactor_event_hook (ACE_Reactor *)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("(%t) handling events ....\n")));

  return 0;
}

class ServerTP : public ACE_Task_Base
{
public:
  virtual int svc (void)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Running the event loop\n")));

    int result =
      ACE_Reactor::instance ()->run_reactor_event_loop
        (&reactor_event_hook);

    if (result == -1)
      ACE_ERROR_RETURN ((LM_ERROR,
                         ACE_TEXT ("(%t) %p\n"),
                         ACE_TEXT ("Error handling events")),
                        0);

    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("(%t) Done handling events.\n")));

    return 0;
  }
};
// Listing 2

class Client: public ACE_Task_Base
  {
  public:
    Client()
        :addr_(rendezvous)
    {}

    virtual int svc()
    {
      ACE_OS::sleep (3);
      const ACE_TCHAR *msg =
        ACE_TEXT ("Message from Connection worker");
      
      ACE_TCHAR buf [BUFSIZ];
      buf[0] = ACE_OS::strlen (msg) + 1;
      ACE_OS::strcpy (&buf[1], msg);

      for (size_t i = 0; i < cli_runs; i++)
        send_work_to_server(buf);

      shut_down();

      return 0;
    }

  private:
    void send_work_to_server(ACE_TCHAR* arg)
    {
      ACE_SOCK_Stream stream;
      ACE_SOCK_Connector connect;
      ACE_Time_Value delay (0, req_delay);
      size_t len = * ACE_reinterpret_cast (ACE_TCHAR *, arg);

      for (size_t i = 0 ; i < cli_conn_no; i++)
        {
          if (connect.connect (stream, addr_) < 0)
            {
              ACE_ERROR ((LM_ERROR,
                          ACE_TEXT ("(%t) %p\n"),
                          ACE_TEXT ("connect")));
              continue;
            }

          for (size_t j = 0; j < cli_req_no; j++)
            {
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("Sending work to server on handle 0x%x, req %d\n"),
                          stream.get_handle (),
                          j+1));
              if (stream.send_n (arg,
                                 (len + 1) * sizeof (ACE_TCHAR)) == -1)
                {
                  ACE_ERROR ((LM_ERROR,
                              ACE_TEXT ("(%t) %p\n"),
                              ACE_TEXT ("send_n")));
                  continue;
                }
              ACE_OS::sleep (delay);
            }

          stream.close ();
        }

    }

    void shut_down()
    {
      ACE_SOCK_Stream stream;
      ACE_SOCK_Connector connect;

      if (connect.connect (stream, addr_) == -1)
        ACE_ERROR ((LM_ERROR,
                    ACE_TEXT ("(%t) %p Error while connecting\n"),
                    ACE_TEXT ("connect")));

      const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown");

      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("shutdown stream handle = %x\n"),
                  stream.get_handle ()));

      if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1)
        ACE_ERROR ((LM_ERROR,
                    ACE_TEXT ("(%t) %p\n"),
                    ACE_TEXT ("send_n")));

      stream.close ();
    }
  private:
    ACE_INET_Addr addr_;
  };
// Listing 1 code/ch16
int ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_TP_Reactor sr;
  ACE_Reactor new_reactor (&sr);
  ACE_Reactor::instance (&new_reactor);

  ACCEPTOR acceptor;
  ACE_INET_Addr accept_addr (rendezvous);

  if (acceptor.open (accept_addr) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       ACE_TEXT ("%p\n"),
                       ACE_TEXT ("open")),
                      1);

  ACE_DEBUG ((LM_DEBUG,
             ACE_TEXT ("(%t) Spawning %d server threads...\n"),
             svr_thrno));

  ServerTP serverTP;
  serverTP.activate (THR_NEW_LWP | THR_JOINABLE, svr_thrno);

  Client client;
  client.activate ();

  ACE_Thread_Manager::instance ()->wait ();

  return 0;
}
// Listing 1
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Accept_Strategy<Request_Handler, ACE_SOCK_ACCEPTOR>;
template class ACE_Concurrency_Strategy<Request_Handler>;
template class ACE_Creation_Strategy<Request_Handler>;
template class ACE_Scheduling_Strategy<Request_Handler>;
template class ACE_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>;
template class ACE_Strategy_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>;
template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate  ACE_Accept_Strategy<Request_Handler, ACE_SOCK_ACCEPTOR>
#pragma instantiate  ACE_Concurrency_Strategy<Request_Handler>
#pragma instantiate  ACE_Creation_Strategy<Request_Handler>
#pragma instantiate  ACE_Scheduling_Strategy<Request_Handler>
#pragma instantiate  ACE_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>
#pragma instantiate  ACE_Strategy_Acceptor<Request_Handler, ACE_SOCK_ACCEPTOR>
#pragma instantiate  ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */