Sophie

Sophie

distrib > Mandriva > 10.0-com > i586 > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 774

libace5-doc-5.4-2mdk.i586.rpm

// buffer_stream.cpp,v 4.16 2003/12/30 23:18:58 shuston Exp

// This short program copies stdin to stdout via the use of an ASX
// Stream.  It illustrates an implementation of the classic "bounded
// buffer" program using an ASX Stream containing two Modules.  Each
// ACE_Module contains two Tasks.  Each ACE_Task contains a
// ACE_Message_Queue and a pointer to a ACE_Thread_Manager.  Note how
// the use of these reusable components reduces the reliance on global
// variables, as compared with the bounded_buffer.C example.

#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_time.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Service_Config.h"
#include "ace/Stream.h"
#include "ace/Module.h"
#include "ace/Task.h"

ACE_RCSID(Message_Queue, buffer_stream, "buffer_stream.cpp,v 4.16 2003/12/30 23:18:58 shuston Exp")

#if defined (ACE_HAS_THREADS)

typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
typedef ACE_Task<ACE_MT_SYNCH> MT_Task;

class Common_Task : public MT_Task
  // = TITLE
  //   Methods that are common to the producer and consumer.
{
public:
  Common_Task (void) {}
  // ACE_Task hooks
  virtual int open (void * = 0);
  virtual int close (u_long = 0);
};

// Define the Producer interface. 

class Producer : public Common_Task
{
public:
  Producer (void) {}

  // Read data from stdin and pass to consumer.
  virtual int svc (void);
};

class Consumer : public Common_Task
  // = TITLE
  //    Define the Consumer interface. 
{
public:
  Consumer (void) {}

  virtual int put (ACE_Message_Block *mb,
                   ACE_Time_Value *tv = 0);  
  // Enqueue the message on the ACE_Message_Queue for subsequent
  // handling in the svc() method.

  virtual int svc (void);
  // Receive message from producer and print to stdout.

private:

  ACE_Time_Value timeout_;
};

class Filter : public MT_Task
  // = TITLE
  //    Defines a Filter that prepends a line number in front of each
  //    line.  
{
public:
  Filter (void): count_ (1) {}

  virtual int put (ACE_Message_Block *mb,
                   ACE_Time_Value *tv = 0);
  // Change the size of the message before passing it downstream.

private:
  size_t count_;
  // Count the number of lines passing through the filter.
};

// Spawn off a new thread.

int 
Common_Task::open (void *)
{
  if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "spawn"),
                      -1);
  return 0;
}

int 
Common_Task::close (u_long exit_status)
{
  ACE_DEBUG ((LM_DEBUG,
              "(%t) thread is exiting with status %d in module %s\n",
              exit_status,
              this->name ()));

  // Can do anything here that is required when a thread exits, e.g.,
  // storing thread-specific information in some other storage
  // location, etc.
  return 0;
}

// The Consumer reads data from the stdin stream, creates a message,
// and then queues the message in the message list, where it is
// removed by the consumer thread.  A 0-sized message is enqueued when
// there is no more data to read.  The consumer uses this as a flag to
// know when to exit.

int
Producer::svc (void)
{
  // Keep reading stdin, until we reach EOF. 

  for (int n; ; )
    {
      // Allocate a new message (add one to avoid nasty boundary
      // conditions).
      
      ACE_Message_Block *mb;

      ACE_NEW_RETURN (mb,
                      ACE_Message_Block (BUFSIZ + 1),
                      -1);

      n = ACE_OS::read (ACE_STDIN, mb->wr_ptr (), BUFSIZ);

      if (n <= 0)
        {
          // Send a shutdown message to the other thread and exit.
          mb->length (0);

          if (this->put_next (mb) == -1)
            ACE_ERROR ((LM_ERROR,
                        "(%t) %p\n",
                        "put_next"));
	  break;
        }

      // Send the message to the other thread.
      else
	{
	  mb->wr_ptr (n);
	  // NUL-terminate the string (since we use strlen() on it
	  // later).
          mb->rd_ptr ()[n] = '\0';

	  if (this->put_next (mb) == -1)
	    ACE_ERROR ((LM_ERROR,
                        "(%t) %p\n",
                        "put_next"));
	}
    }

  return 0; 
}

// Simply enqueue the Message_Block into the end of the queue.

int
Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
{ 
  return this->putq (mb, tv); 
}

// The consumer dequeues a message from the ACE_Message_Queue, writes
// the message to the stderr stream, and deletes the message.  The
// Consumer sends a 0-sized message to inform the consumer to stop
// reading and exit.

int
Consumer::svc (void)
{
  int result = 0;

  // Keep looping, reading a message out of the queue, until we
  // timeout or get a message with a length == 0, which signals us to
  // quit.

  for (;;)
    {
      ACE_Message_Block *mb;

      // Wait for upto 4 seconds.
      this->timeout_.sec (ACE_OS::time (0) + 4); 

      result = this->getq (mb, &this->timeout_);

      if (result == -1)
	break;

      int length = mb->length ();

      if (length > 0)
	ACE_OS::write (ACE_STDOUT,
		       mb->rd_ptr (),
		       ACE_OS::strlen (mb->rd_ptr ()));

      mb->release ();

      if (length == 0)
	break;
    }

  if (result == -1 && errno == EWOULDBLOCK)
    ACE_ERROR ((LM_ERROR,
		"(%t) %p\n%a",
		"timed out waiting for message",
		1));
  return 0;
}

// The filter prepends a line number in front of each line.

int
Filter::put (ACE_Message_Block *mb,
	     ACE_Time_Value *tv)
{ 
  if (mb->length () == 0)
    return this->put_next (mb, tv);
  else
    {
      char buf[BUFSIZ];

      // Stash a copy of the buffer away.
      ACE_OS::strncpy (buf, mb->rd_ptr (), sizeof buf);

      // Increase the size of the buffer large enough that it will be
      // reallocated (in order to test the reallocation mechanisms).

      mb->size (mb->length () + BUFSIZ);
      mb->length (mb->size ());

      // Prepend the line count in front of the buffer.
      ACE_OS::sprintf (mb->rd_ptr (),
                       ACE_SIZE_T_FORMAT_SPECIFIER": %s", 
                       this->count_++,
                       buf);
      return this->put_next (mb, tv);
    }
}

// Main driver function.

int 
main (int, char *argv[])
{
  ACE_Service_Config daemon (argv[0]);

  // This Stream controls hierachically-related active objects.
  MT_Stream stream;

  MT_Module *pm;
  MT_Module *fm; 
  MT_Module *cm; 

  ACE_NEW_RETURN (cm,
                  MT_Module ("Consumer",
                             new Consumer),
                  -1);
  ACE_NEW_RETURN (fm,
                  MT_Module ("Filter",
                             new Filter),
                  -1);
  ACE_NEW_RETURN (pm,
                  MT_Module ("Producer",
                             new Producer),
                  -1);

  // Create Consumer, Filter, and Producer Modules and push them onto
  // the Stream.  All processing is performed in the Stream.

  if (stream.push (cm) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "push"),
                      1);
  else if (stream.push (fm) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "push"),
                      1);
  else if (stream.push (pm) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "push"),
                      1);
  // Barrier synchronization: wait for the threads to exit, then exit
  // ourselves.
  ACE_Thread_Manager::instance ()->wait ();
  return 0;
}
#else
int 
main (int, char *[])
{
  ACE_ERROR ((LM_ERROR,
              "threads not supported on this platform\n"));
  return 0;
}
#endif /* ACE_HAS_THREADS */