Sophie

Sophie

distrib > Mandriva > 10.0-com > i586 > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 221

libace5-doc-5.4-2mdk.i586.rpm

// stream.cpp,v 1.5 2001/12/24 20:51:06 schmidt Exp

// Tutorial regarding a way to use ACE_Stream.
//
// written by bob mcwhirter (bob@netwrench.com)

#include "Task.h"
#include "EndTask.h"
// This is our specialized ACE_Task.

#include <ace/Module.h>
#include <ace/Stream.h>
#include <ace/streams.h>
// These are the neccessary ACE headers.

typedef ACE_Module<ACE_MT_SYNCH> Module;
typedef ACE_Stream<ACE_MT_SYNCH> Stream;
// Just to avoid a lot of typing, typedefs
// are generally a good idea.

int main (int argc, char *argv[])
{
  int numberOfMessages = argc > 1 ? ACE_OS::atoi (argv[1]) : 3;
  // unless otherwise specified, just send three messages
  // down the stream.

  Stream theStream;
  // the ACE_Stream itself.

  // Now, we instantiate 4 different Tasks.  These do not
  // need to be all the same class, but they do need to
  // all derrive from the same flavor of ACE_Task.
  //
  // Also, we instantiate a fifth end-cap Task to clean
  // up Message_Blocks as they reach the end.

  Task *taskOne;
  Task *taskTwo;
  Task *taskThree;
  Task *taskFour;
  Task *taskEnd;

  // Out Task's take two arguments: a name, and the number
  // of threads to dedicate to the task.

  taskOne = new Task ("Task No. 1", 1);
  taskTwo = new Task ("Task No. 2", 3);
  taskThree = new Task ("Task No. 3", 7);
  taskFour = new Task ("Task No. 4", 1);

  // Our EndTask only takes 1 argument, as it actually
  // doesn't spawn any threads for processing.

  taskEnd = new EndTask ("End Task");

  Module *moduleOne;
  Module *moduleTwo;
  Module *moduleThree;
  Module *moduleFour;
  Module *moduleEnd;

  // ACE_Stream accepts ACE_Modules, which are simply a pair of
  // ACE_Tasks.  One is dedicated for writing, while the other
  // is dedicated to reading.  Think of the writing side as
  // downstream, and the reading side as upstream.
  //
  // We're only working with a unidirection Stream today,
  // so we'll only actually install a Task into the write
  // side of the module, effectively downstream.

  moduleOne = new Module ("Module No. 1", taskOne);
  moduleTwo = new Module ("Module No. 2", taskTwo);
  moduleThree = new Module ("Module No. 3", taskThree);
  moduleFour = new Module ("Module No. 4", taskFour);
  moduleEnd = new Module ("Module End", taskEnd);

  // Now we push the Modules onto the Stream.
  // Pushing adds the module to the head, or
  // otherwise prepends it to whatever modules
  // are already installed.

  // So, you need to push () the modules on -backwards-
  // from our viewpoint.

  if (theStream.push (moduleEnd) == -1) {
           ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push (moduleFour) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  // As we push a Module onto the Stream, it gets opened.
  // When a Module open ()s, it opens the Tasks that it contains.
  //
  // Since we cannot provide an argument to this embedded
  // call to open (), we supplied specified the number of
  // threads in the constructor of our Tasks.

  if (theStream.push (moduleThree) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push (moduleTwo) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  if (theStream.push (moduleOne) == -1) {
        ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), -1);
  }

  // Now that the Modules are open, the Tasks threads should
  // be launching and entering their svc () loop, so we send
  // some messages down the Stream.

  int sent = 1;

  ACE_Message_Block *message;

  while (sent <= numberOfMessages) {

    // First, create ourselves a Message_Block.  See Tutorials 10-13
    // for more information about Message_Blocks and Message_Queues.
    // Note the use of the lock_adapter () to ensure proper
    // serialization.
    ACE_NEW_RETURN (message,
                    ACE_Message_Block (128,
                                       ACE_Message_Block::MB_DATA,
                                       0,
                                       0,
                                       0,
                                       Task::lock_adapter ()),
                    -1);

    // Now, we grab the write-pointer from the Block,
    // and sprintf () our text into it.

    ACE_OS::sprintf (message->wr_ptr (), "Message No. %d", sent);

    // All we have to do now is drop the Message_Block
    // into the Stream.

    // It is always a good idea to duplicate () a Message_Block
    // when you put it into any Message_Queue, as then
    // you can always be allowed to release () your copy
    // without worry.

    if (theStream.put (message->duplicate (), 0) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "put"), -1);
    }

    message->release ();
    ++sent;
  }

  // Now that we've sent our Message_Blocks, close down
  // the Stream.
  //
  // The Stream will automagically delete the Modules and
  // the contained Tasks.  We don't have to do that.
  //
  // This call will block (due to the way we've written our
  // Task class) until all Message_Blocks have cleared the
  // entire Stream, and all associated threads have exited.

  theStream.close ();

  return 0;
}