Sophie

Sophie

distrib > Mandriva > 10.0 > i586 > media > contrib > by-pkgid > 21280410b6ea906d791d7a12afae2579 > files > 213

libace5-doc-5.4-2mdk.i586.rpm

// Task.cpp,v 1.5 2001/12/24 20:51:02 schmidt Exp

// Tutorial regarding a way to use ACE_Stream.
//
// written by bob mcwhirter (bob@netwrench.com)

#include <ace/Message_Block.h>

#include "Task.h"

Task::Task (const char * nameOfTask,
            int numberOfThreads)
  : d_numberOfThreads (numberOfThreads),
    d_barrier (numberOfThreads)
{
  // Just initialize our name, number of threads, and barrier.

  ACE_OS::strcpy (d_nameOfTask, nameOfTask);
}

Task::~Task (void)
{
  ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::~Task () -- once per Task\n", d_nameOfTask));
}

int Task::open (void *arg)
{
  ACE_UNUSED_ARG (arg);

  ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::open () -- once per Task\n", d_nameOfTask));

  // call ACE_Task::activate () to spawn the threads using
  // our Task::svc () as the function to be run.

  // No need to use THR_DETACHED here, we're going to wait ()
  // for the threads to exit later.  No leaks.

  return this->activate (THR_NEW_LWP, d_numberOfThreads);
}

int Task::put (ACE_Message_Block *message,
          ACE_Time_Value *timeout)
{
  // ACE_Stream uses the put () method of Tasks to send messages.
  // This defaultly does nothing.  Here we link our put () method
  // directly to our putq () method, so that Messages put () to us
  // will appear in the Message_Queue that is checked by the
  // service threads.

  return this->putq (message, timeout);
}

int Task::close (u_long flags)
{
  // When the Stream closes the Module, the Module then close ()'s the Task
  // and passing a value of (1) as the flag.

  // When a service thread exits, it calls close () with a value that is not
  // (1).

  // We use this fact to tell the difference between closing a service thread,
  // and closing the main Task itself.

  if (flags == 1) {
    // The Module has asked to close the main Task.

    ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::close () -- flags == 1 -- once per Task\n", d_nameOfTask));

    // We create a Message_Block of type MB_HANGUP.

    ACE_Message_Block *hangupBlock;

    // Note the use of the lock_adapter () to ensure proper serialization.
    ACE_NEW_RETURN (hangupBlock,
 		    ACE_Message_Block (0,
 			 	       ACE_Message_Block::MB_HANGUP,
 			 	       0,
 			 	       0,
 			  	       0,
 			 	       Task::lock_adapter ()),
                    -1);

    // We then send this Block into the Message_Queue to be seen by the
    // service threads.

    // Once again we duplicate () the Block as send it off...

    if (this->putq (hangupBlock->duplicate ()) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close () putq"), -1);
    }

    // ..and we're free to release () our copy of it.

    hangupBlock->release ();

    // Now, all we have to do is wait () for the service threads to all
    // exit.  This is where using THR_DETACHED in the activate () method
    // will come back to haunt you.

    // The Stream waits until this returns before attempting to remove
    // the next Module/Task group in the Stream.  This allows for an
    // orderly shutting down of the Stream.

    return this->wait ();
  } else {
    ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::close () -- flags != 1 -- once per servicing thread\n", d_nameOfTask));

    // This is where we can clean up any mess left over by each service thread.
    // In this Task, there is nothing to do.
  }
  return 0;
}

int Task::svc (void)
{
  // This is the function that our service threads run once they are spawned.

  ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- once per servicing thread\n", d_nameOfTask));

  // First, we wait until all of our peer service threads have arrived
  // at this point also.

  d_barrier.wait ();

  ACE_Message_Block *messageBlock;

  while (1) {
    // And now we loop almost infinitely.

    // getq () will block until a Message_Block is available to be read,
    // or an error occurs.

    if ( this->getq (messageBlock, 0) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () getq"), -1);
    }

    if (messageBlock->msg_type () == ACE_Message_Block::MB_HANGUP) {

      // If the Message_Block is of type MB_HANGUP, then we're being asked
      // to shut down nicely.

      ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- HANGUP block received\n", d_nameOfTask));

      // So, we duplicate the Block, and put it back into the Message_Queue,
      // in case there are some more peer service threads still running.

      if (this->putq (messageBlock->duplicate ()) == -1) {
    ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () putq"), -1);
      }

      // We release our copy of the Block.
      messageBlock->release ();

      // And we break out of the nearly infinitely loop, and
      // head towards close () ourselves.
      break;
    }

    // If we're here, then we've received a Message_Block that was
    // not informing us to quit, so we're assuming it's a valid
    // meaningful Block.

    ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- Normal block received\n", d_nameOfTask));

    // We grab the read-pointer from the Block, and display it through a DEBUG statement.

    ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- %s\n", d_nameOfTask, messageBlock->rd_ptr () ));

    // We pretend that this takes to time to process the Block.
    // If you're on a fast machine, you might have to raise this
    // value to actually witness different threads handling
    // blocks for each Task.

    ACE_OS::sleep (ACE_Time_Value (0, 250));

    // Since we're part of a Stream, we duplicate the Block, and
    // send it on to the next Task.

    if (put_next (messageBlock->duplicate ()) == -1) {
      ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () put_next"), -1);
    }

    // And then we release our copy of it.

    messageBlock->release ();
  }
  return 0;
}

const char *Task::nameOfTask (void) const
{
  return d_nameOfTask;
}

ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *Task::lock_adapter (void) 
{
  return &lock_adapter_;
}

// Static definition.
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> Task::lock_adapter_;

#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Lock_Adapter <ACE_SYNCH_MUTEX>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Lock_Adapter <ACE_SYNCH_MUTEX>;
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */