Sophie

Sophie

distrib > Mandriva > 10.0-com > i586 > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 1392

libace5-doc-5.4-2mdk.i586.rpm

// Sender.cpp,v 1.7 2003/11/01 11:15:23 dhinton Exp

#include "ace/OS_main.h"
#include "ace/RMCast/RMCast_UDP_Reliable_Sender.h"
#include "ace/INET_Addr.h"
#include "ace/FILE_IO.h"
#include "ace/Message_Block.h"
#include "ace/Reactor.h"

ACE_RCSID(tests, RMCast_Examples_Sender, "Sender.cpp,v 1.7 2003/11/01 11:15:23 dhinton Exp")

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  if (argc != 3)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Usage: %s <filename> <mcastgroup:port>\n",
                         argv[0]),
                        1);
    }

  const ACE_TCHAR *filename = argv[1];
  if (ACE_OS::access (filename, R_OK) != 0)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Cannot read file <%s>\n", filename),
                        1);
    }

  ACE_INET_Addr mcast_group;
  if (mcast_group.set (argv[2]) != 0)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Cannot read file <%s>\n", filename),
                        1);
    }


  ACE_HANDLE handle = ACE_OS::open (filename, O_RDONLY|O_BINARY);
  if (handle == ACE_INVALID_HANDLE)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Cannot open file <%s> %p\n", filename, ""),
                        1);
    }
  ACE_FILE_IO file_io;
  file_io.set_handle (handle);

  // We don't provide a module to receive the control messages, in
  // this example we simply ignore them.
  ACE_RMCast_UDP_Reliable_Sender sender (0);

  if (sender.init (mcast_group) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Cannot init UDP I/O at <%s:%d> %p\n",
                         mcast_group.get_host_name (),
                         mcast_group.get_port_number (),
                         ""),
                        1);
    }

  // Use the Reactor to demultiplex all the messages
  ACE_Reactor *reactor = ACE_Reactor::instance ();

  sender.reactive_incoming_messages (reactor);
  {
    // Resend the messages every 20 milliseconds..
    ACE_Time_Value tv (2, 0);
    sender.reactive_resends (reactor, tv);
  }

  for (;;)
    {
      ACE_Message_Block payload (BUFSIZ + 1);

      ssize_t r = file_io.recv (payload.rd_ptr () + 1, BUFSIZ);
      if (r <= 0)
        break;

      payload.wr_ptr (r + 1);
      *(payload.rd_ptr ()) = 'N'; // Normal
      if (r < BUFSIZ)
        {
          *(payload.rd_ptr ()) = 'E'; // EOF
        }

      ACE_RMCast::Data data;
      data.payload = &payload;
      if (sender.data (data) != 0)
        break;

      if (r < BUFSIZ)
        {
          // Last buffer, terminate loop
          break;
        }

      // Handle incoming events, without blocking...
      ACE_Time_Value tv (4, 0);
      reactor->handle_events (&tv);
    }

  // Wait until all the messages are successfully delivered
  do
    {
      // Try for 50 milliseconds...
      ACE_Time_Value tv (5, 0);
      int r = reactor->handle_events (&tv);
      if (r == -1)
        break;
    }
  while (sender.has_data () || sender.has_members ());

  return 0;
}