Sophie

Sophie

distrib > Mandriva > 10.0-com > i586 > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 1476

libace5-doc-5.4-2mdk.i586.rpm

// test_proactor.cpp,v 1.37 2003/11/10 01:48:03 dhinton Exp

// ============================================================================
//
// = LIBRARY
//    examples
//
// = FILENAME
//    test_proactor.cpp
//
// = DESCRIPTION
//    This program illustrates how the <ACE_Proactor> can be used to
//    implement an application that does various asynchronous
//    operations.
//
// = AUTHOR
//    Irfan Pyarali <irfan@cs.wustl.edu>
//
// ============================================================================

#include "ace/OS_main.h"
#include "ace/Service_Config.h"
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Get_Opt.h"
#include "ace/OS_NS_sys_stat.h"

ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.37 2003/11/10 01:48:03 dhinton Exp")

#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
  // This only works on Win32 platforms and on Unix platforms supporting
  // POSIX aio calls.

// Host that we're connecting to.
static ACE_TCHAR *host = 0;

// Port that we're receiving connections on.
static u_short port = ACE_DEFAULT_SERVER_PORT;

// File that we're sending.
static const ACE_TCHAR *file = ACE_TEXT("test_proactor.cpp");

// Name of the output file.
static const ACE_TCHAR *dump_file = ACE_TEXT("output");

// Keep track of when we're done.
static int done = 0;

// Size of each initial asynchronous <read> operation.
static int initial_read_size = BUFSIZ;

class Receiver : public ACE_Service_Handler
{
  // = TITLE
  //     The class will be created by <ACE_Asynch_Acceptor> when new
  //     connections arrive.  This class will then receive data from
  //     the network connection and dump it to a file.
public:
  // = Initialization and termination.
  Receiver (void);
  ~Receiver (void);

  virtual void open (ACE_HANDLE handle,
                     ACE_Message_Block &message_block);
  // This is called after the new connection has been accepted.

protected:
  // These methods are called by the framework

  virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
  // This is called when asynchronous <read> operation from the socket
  // complete.

  virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
  // This is called when an asynchronous <write> to the file
  // completes.

private:
  int initiate_read_stream (void);
  // Initiate an asynchronous <read> operation on the socket.

  ACE_Asynch_Read_Stream rs_;
  // rs (read stream): for reading from a socket.

  ACE_HANDLE dump_file_;
  // File for dumping data.

  ACE_Asynch_Write_File wf_;
  // wf (write file): for writing to a file.

  u_long file_offset_;
  // Offset for the file.

  ACE_HANDLE handle_;
  // Handle for IO to remote peer.
};

Receiver::Receiver (void)
  : dump_file_ (ACE_INVALID_HANDLE),
    handle_ (ACE_INVALID_HANDLE)
{
}

Receiver::~Receiver (void)
{
  ACE_OS::close (this->dump_file_);
  ACE_OS::closesocket (this->handle_);
}

void
Receiver::open (ACE_HANDLE handle,
                ACE_Message_Block &message_block)
{
  ACE_DEBUG ((LM_DEBUG,
              "%N:%l:Receiver::open called\n"));

  // New connection, so initiate stuff.

  // Cache the new connection
  this->handle_ = handle;

  // File offset starts at zero
  this->file_offset_ = 0;

  // Open dump file (in OVERLAPPED mode)
  this->dump_file_ = ACE_OS::open (dump_file,
                                   O_CREAT | O_RDWR | O_TRUNC | FILE_FLAG_OVERLAPPED,
                                   0644);
  if (this->dump_file_ == ACE_INVALID_HANDLE)
    {
      ACE_ERROR ((LM_ERROR,
                  "%p\n",
                  "ACE_OS::open"));
      return;
    }

  // Initiate <ACE_Asynch_Write_File>.
  if (this->wf_.open (*this,
                      this->dump_file_) == -1)
    {
      ACE_ERROR ((LM_ERROR,
                  "%p\n",
                  "ACE_Asynch_Write_File::open"));
      return;
    }

  // Initiate <ACE_Asynch_Read_Stream>.
  if (this->rs_.open (*this, this->handle_) == -1)
    {
      ACE_ERROR ((LM_ERROR,
                  "%p\n",
                  "ACE_Asynch_Read_Stream::open"));
      return;
    }

  // Fake the result and make the <handle_read_stream> get
  // called. But, not, if there is '0' is transferred.
  if (message_block.length () != 0)
    {
      // Duplicate the message block so that we can keep it around.
      ACE_Message_Block &duplicate =
        *message_block.duplicate ();

      // Fake the result so that we will get called back.
      ACE_Asynch_Read_Stream_Result_Impl *fake_result =
        ACE_Proactor::instance ()->create_asynch_read_stream_result (*this,
                                                                     this->handle_,
                                                                     duplicate,
                                                                     initial_read_size,
                                                                     0,
                                                                     ACE_INVALID_HANDLE,
                                                                     0,
                                                                     0);

      size_t bytes_transferred = message_block.length ();

      // <complete> for Accept would have already moved the <wr_ptr>
      // forward. Update it to the beginning position.
      duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);

      // This will call the callback.
      fake_result->complete (message_block.length (),
                             1,
                             0);

      // Zap the fake result.
      delete fake_result;
    }
  else
    // Otherwise, make sure we proceed. Initiate reading the socket
    // stream.
    if (this->initiate_read_stream () == -1)
      return;
}

int
Receiver::initiate_read_stream (void)
{
  // Create a new <Message_Block>.  Note that this message block will
  // be used both to <read> data asynchronously from the socket and to
  // <write> data asynchronously to the file.
  ACE_Message_Block *mb = 0;
  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (BUFSIZ + 1),
                  -1);

  // Inititiate read
  if (this->rs_.read (*mb,
                      mb->size () - 1) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Read_Stream::read"),
                      -1);
  return 0;
}

void
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,
              "handle_read_stream called\n"));

  // Reset pointers.
  result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';

  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
#if 0
  // This can overrun the ACE_Log_Msg buffer and do bad things.
  // Re-enable it at your risk.
  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
#endif /* 0 */

  if (result.success () && result.bytes_transferred () != 0)
    {
      // Successful read: write the data to the file asynchronously.
      // Note how we reuse the <ACE_Message_Block> for the writing.
      // Therefore, we do not delete this buffer because it is handled
      // in <handle_write_stream>.
      if (this->wf_.write (result.message_block (),
                           result.bytes_transferred (),
                           this->file_offset_) == -1)
        {
          ACE_ERROR ((LM_ERROR,
                      "%p\n",
                      "ACE_Asynch_Write_File::write"));
          return;
        }

      // Initiate new read from the stream.
      if (this->initiate_read_stream () == -1)
        return;
    }
  else
    {
      ACE_DEBUG ((LM_DEBUG,
                  "Receiver completed\n"));

      // No need for this message block anymore.
      result.message_block ().release ();

      // Note that we are done with the test.
      done = 1;

      // We are done: commit suicide.
      delete this;
    }
}

void
Receiver::handle_write_file (const ACE_Asynch_Write_File::Result &result)
{
  ACE_DEBUG ((LM_DEBUG, "handle_write_file called\n"));

  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));

  result.message_block ().release ();

  if (result.success ())
    // Write successful:  Increment file offset
    this->file_offset_ += result.bytes_transferred ();

  // This code is not robust enough to deal with short file writes
  // (which hardly ever happen) ;-)
  ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
}

class Sender : public ACE_Handler
{
  // = TITLE
  //     The class will be created by <main>.  After connecting to the
  //     host, this class will then read data from a file and send it
  //     to the network connection.
public:
  Sender (void);
  ~Sender (void);
  int open (const ACE_TCHAR *host,
            u_short port);
  ACE_HANDLE handle (void) const;
  void handle (ACE_HANDLE);

protected:
  // These methods are called by the freamwork

  virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result);
  // This is called when asynchronous transmit files complete
  virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
  // This is called when asynchronous writes from the socket complete
  virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
  // This is called when asynchronous reads from the socket complete

private:
  int transmit_file (void);
  // Transmit the entire file in one fell swoop.

  int initiate_read_file (void);
  // Initiate an asynchronous file read.

  ACE_SOCK_Stream stream_;
  // Network I/O handle

  ACE_Asynch_Write_Stream ws_;
  // ws (write stream): for writing to the socket

  ACE_Asynch_Read_File rf_;
  // rf (read file): for writing from the file

  ACE_Asynch_Transmit_File tf_;
  // Transmit file.

  ACE_HANDLE input_file_;
  // File to read from

  u_long file_offset_;
  // Current file offset

  u_long file_size_;
  // File size

  ACE_Message_Block welcome_message_;
  // Welcome message

  ACE_Asynch_Transmit_File::Header_And_Trailer header_and_trailer_;
  // Header and trailer which goes with transmit_file

  int stream_write_done_;
  int transmit_file_done_;
  // These flags help to determine when to close down the event loop
};

Sender::Sender (void)
  : input_file_ (ACE_INVALID_HANDLE),
    file_offset_ (0),
    file_size_ (0),
    stream_write_done_ (0),
    transmit_file_done_ (0)
{
  // Moment of inspiration... :-)
  static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
  this->welcome_message_.init (data,
                               ACE_OS::strlen (data));
  this->welcome_message_.wr_ptr (ACE_OS::strlen (data));
}

Sender::~Sender (void)
{
  this->stream_.close ();
}

ACE_HANDLE
Sender::handle (void) const
{
  return this->stream_.get_handle ();
}

void
Sender::handle (ACE_HANDLE handle)
{
  this->stream_.set_handle (handle);
}

int
Sender::open (const ACE_TCHAR *host,
               u_short port)
{
  // Initialize stuff

  // Open input file (in OVERLAPPED mode)
  this->input_file_ =
    ACE_OS::open (file, GENERIC_READ | FILE_FLAG_OVERLAPPED);
  if (this->input_file_ == ACE_INVALID_HANDLE)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_OS::open"), -1);

  // Find file size
  this->file_size_ =
    ACE_OS::filesize (this->input_file_);

  // Connect to remote host
  ACE_INET_Addr address (port, host);
  ACE_SOCK_Connector connector;
  if (connector.connect (this->stream_,
                         address) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_SOCK_Connector::connect"),
                      -1);

  // Open ACE_Asynch_Write_Stream
  if (this->ws_.open (*this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Write_Stream::open"),
                      -1);

  // Open ACE_Asynch_Read_File
  if (this->rf_.open (*this, this->input_file_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Read_File::open"),
                      -1);

  // Start an asynchronous transmit file
  if (this->transmit_file () == -1)
    return -1;

  // Start an asynchronous read file
  if (this->initiate_read_file () == -1)
    return -1;

  return 0;
}

int
Sender::transmit_file (void)
{
  // Open file (in SEQUENTIAL_SCAN mode)
  ACE_HANDLE file_handle =
    ACE_OS::open (file, GENERIC_READ | FILE_FLAG_SEQUENTIAL_SCAN);
  if (file_handle == ACE_INVALID_HANDLE)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_OS::open"),
                      -1);

  // Open ACE_Asynch_Transmit_File
  if (this->tf_.open (*this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Transmit_File::open"),
                      -1);

  // Header and trailer data for the file.
  // @@ What happens if header and trailer are the same?
  this->header_and_trailer_.header_and_trailer (&this->welcome_message_,
                                                this->welcome_message_.length (),
                                                &this->welcome_message_,
                                                this->welcome_message_.length ());

  // Send the entire file in one fell swoop!
  if (this->tf_.transmit_file (file_handle,
                               &this->header_and_trailer_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Transmit_File::transmit_file"),
                      -1);

  return 0;
}

void
Sender::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,
              "handle_transmit_file called\n"));

  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "socket", result.socket ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "file", result.file ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_per_send", result.bytes_per_send ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));

  // Done with file
  ACE_OS::close (result.file ());

  this->transmit_file_done_ = 1;
  if (this->stream_write_done_)
    done = 1;
}

int
Sender::initiate_read_file (void)
{
  // Create a new <Message_Block>.  Note that this message block will
  // be used both to <read> data asynchronously from the file and to
  // <write> data asynchronously to the socket.
  ACE_Message_Block *mb = 0;
  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (BUFSIZ + 1),
                  -1);

  // Inititiate an asynchronous read from the file
  if (this->rf_.read (*mb,
                      mb->size () - 1,
                      this->file_offset_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Read_File::read"),
                      -1);
  return 0;
}

void
Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,
              "handle_read_file called\n"));

  result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';

  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  //ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));

  if (result.success ())
    {
      // Read successful: increment offset and write data to network.
      // Note how we reuse the <ACE_Message_Block> for the writing.
      // Therefore, we do not delete this buffer because it is handled
      // in <handle_write_stream>.

      this->file_offset_ += result.bytes_transferred ();
      if (this->ws_.write (result.message_block (),
                           result.bytes_transferred ()) == -1)
        {
          ACE_ERROR ((LM_ERROR,
                      "%p\n",
                      "ACE_Asynch_Write_Stream::write"));
          return;
        }

      if (this->file_size_ > this->file_offset_)
        {
          // Start an asynchronous read file.
          if (initiate_read_file () == -1)
            return;
        }
    }
}

void
Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,
              "handle_write_stream called\n"));

  // Reset pointers.
  result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());

  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
#if 0
  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
#endif

  if (result.success ())
    {
      // Partial write to socket
      int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
      if (unsent_data != 0)
        {
          // Reset pointers
          result.message_block ().rd_ptr (result.bytes_transferred ());

          // Duplicate the message block and retry remaining data
          if (this->ws_.write (*result.message_block ().duplicate (),
                               unsent_data) == -1)
            {
              ACE_ERROR ((LM_ERROR,
                          "%p\n",
                          "ACE_Asynch_Write_Stream::write"));
              return;
            }
        }
      else if (!(this->file_size_ > this->file_offset_))
        {
          this->stream_write_done_ = 1;
          if (this->transmit_file_done_)
            done = 1;
        }
    }

  // Release message block.
  result.message_block ().release ();
}

static int
parse_args (int argc, ACE_TCHAR *argv[])
{
  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("h:p:f:d:"));
  int c;

  while ((c = get_opt ()) != EOF)
    switch (c)
      {
      case 'h':
        host = get_opt.opt_arg ();
        break;
      case 'p':
        port = ACE_OS::atoi (get_opt.opt_arg ());
        break;
      case 'f':
        file = get_opt.opt_arg ();
        break;
      case 'd':
        dump_file = get_opt.opt_arg ();
        break;
      default:
        ACE_ERROR ((LM_ERROR, "%p.\n",
                    "usage :\n"
                    "-h <host>\n"
                    "-p <port>\n"
                    "-f <file>\n"));
        return -1;
      }

  return 0;
}

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  if (parse_args (argc, argv) == -1)
    return -1;

  Sender sender;

  // Note: acceptor parameterized by the Receiver.
  ACE_Asynch_Acceptor<Receiver> acceptor;

  // If passive side
  if (host == 0)
   {
     if (acceptor.open (ACE_INET_Addr (port),
                        initial_read_size,
                        1) == -1)
       return -1;
   }
  // If active side
  else if (sender.open (host, port) == -1)
    return -1;

  int success = 1;

  while (success > 0  && !done)
    // Dispatch events via Proactor singleton.
    success = ACE_Proactor::instance ()->handle_events ();

  return 0;
}

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Asynch_Acceptor<Receiver>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Asynch_Acceptor<Receiver>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/

int
ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_DEBUG ((LM_DEBUG,
              "This example does not work on this platform.\n"));
  return 1;
}

#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/