Sophie

Sophie

distrib > Mandriva > 10.0-com > i586 > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 1169

libace5-doc-5.4-2mdk.i586.rpm

// ex2.cpp,v 4.20 2003/11/01 11:15:23 dhinton Exp

// ============================================================================
//
// = LIBRARY
//   examples
//
// = FILENAME
//   ex2.cpp
//
// = DESCRIPTION
//    Example for using <ACE_UPIPE_SAP> and <ACE_Thread> for
//    intra-process communication.
//
// = AUTHOR
//    Gerhard Lenzer and Douglas C. Schmidt
//
// ============================================================================

#include "ace/OS_main.h"
#include "ace/UPIPE_Connector.h"
#include "ace/UPIPE_Acceptor.h"
#include "ace/Auto_Ptr.h"
#include "ace/OS_NS_time.h"

ACE_RCSID(UPIPE_SAP, ex2, "ex2.cpp,v 4.20 2003/11/01 11:15:23 dhinton Exp")

#if defined (ACE_HAS_THREADS)

// Data for testsuite.
static int size = 0;
static int iterations = 0;

static void *
supplier (void *)
{
  ACE_UPIPE_Stream s_stream;

  ACE_UPIPE_Addr c_addr (ACE_TEXT("pattern"));

  ACE_Auto_Basic_Array_Ptr<char> mybuf (new char[size]);

  for (int i = 0; i < size; i++)
    mybuf[i] = 'a';

  ACE_DEBUG ((LM_DEBUG,
              "(%t) supplier starting connect thread\n"));

  ACE_UPIPE_Connector con;

  if (con.connect (s_stream, c_addr) == -1)
    ACE_ERROR ((LM_ERROR,
                "(%t) %p\n",
                "ACE_UPIPE_Acceptor.connect failed"));

  // Test asynchronicity (the "acausal principle" ;-)).
  s_stream.enable (ACE_SIGIO);

  ACE_Message_Block *mb_p;

  for (int j = 0; j < iterations; j++)
    {
      ACE_NEW_RETURN (mb_p,
                      ACE_Message_Block (size,
                                         ACE_Message_Block::MB_DATA,
                                         (ACE_Message_Block *) 0,
                                         mybuf.get ()),
                      0);
      if (s_stream.send (mb_p) == -1)
        ACE_ERROR_RETURN ((LM_ERROR,
                           "(%t) %p\n",
                           "send failed"),
                          0);
    }

  ACE_NEW_RETURN (mb_p,
                  ACE_Message_Block ((size_t) 0),
                  0);

  // Insert a 0-sized message block to signal the other side to shut
  // down.
  if (s_stream.send (mb_p) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "(%t) %p\n",
                       "send failed"),
                          0);
  s_stream.close ();
  return 0;
}

static void *
consumer (void *)
{
  ACE_UPIPE_Stream c_stream;

  // Set the high water mark to size to achieve optimum performance.

  int wm = size * iterations;

  if (c_stream.control (ACE_IO_Cntl_Msg::SET_HWM,
                        &wm) == -1)
    ACE_DEBUG ((LM_DEBUG,
                "set HWM failed\n"));

  ACE_UPIPE_Addr serv_addr (ACE_TEXT("pattern"));

  // accept will wait up to 4 seconds
  ACE_UPIPE_Acceptor acc (serv_addr);

  ACE_DEBUG ((LM_DEBUG,
              "(%t) consumer spawning the supplier thread\n"));

  // Spawn the supplier thread.
  if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier),
                                              (void *) 0,
                                              THR_NEW_LWP | THR_DETACHED) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "spawn"),
                      0);

  ACE_DEBUG ((LM_DEBUG,
              "(%t) consumer starting accept\n"));

  if (acc.accept (c_stream) == -1)
    ACE_ERROR ((LM_ERROR,
                "(%t) %p\n",
                "ACE_UPIPE_Acceptor.accept failed"));

  // Time measurement.
  time_t currsec;
  ACE_OS::time (&currsec);
  time_t start = (time_t) currsec;

  int received_messages = 0;

  for (ACE_Message_Block *mb = 0;
       c_stream.recv (mb) != -1 && mb->size () != 0;
       mb->release ())
    received_messages++;

  ACE_OS::time (&currsec);
  time_t secs = (time_t) currsec - start;

  ACE_DEBUG ((LM_DEBUG,
              "(%t) Transferred %d blocks of size %d\n"
              "The program ran %d seconds\n",
              received_messages, size, secs));
  c_stream.close ();
  return 0;
}

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  size = argc > 1 ? ACE_OS::atoi (argv[1]) : 32;
  iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : 16;

  // Spawn the two threads.
  if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer),
                                              (void *) 0,
                                              THR_NEW_LWP | THR_DETACHED) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "spawn"),
                      1);
  // Wait for producer and consumer threads to exit.
  ACE_Thread_Manager::instance ()->wait ();
  return 0;
}
#else
int
ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_ERROR_RETURN ((LM_ERROR,
                     "threads not supported on this platform\n"),
                     0);
}
#endif /* ACE_HAS_THREADS */