// Futures.cpp,v 1.7 2004/01/12 23:15:25 shuston Exp #include "ace/OS_NS_string.h" #include "ace/OS_NS_time.h" #include "ace/Task.h" #include "ace/Unbounded_Queue.h" #include "ace/Synch.h" #include "ace/SString.h" #include "ace/Method_Request.h" #include "ace/Future.h" #include "ace/Activation_Queue.h" #define OUTSTANDING_REQUESTS 20 // Listing 2 code/ch16 class CompletionCallBack: public ACE_Future_Observer<ACE_CString*> { public: virtual void update (const ACE_Future<ACE_CString*> & future) { ACE_CString *result; // Block for the result. ((ACE_Future<ACE_CString*>)future).get (result); ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ())); delete result; } }; // Listing 2 // Listing 1 code/ch16 class LongWork : public ACE_Method_Request { public: virtual int call (void) { ACE_TRACE (ACE_TEXT ("LongWork::call")); ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n"))); ACE_OS::sleep (1); char buf[1024]; ACE_OS::strcpy (buf, ACE_TEXT ("Completed assigned task\n")); ACE_CString *msg; ACE_NEW_RETURN (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1); result_.set (msg); return 0; } ACE_Future<ACE_CString*> &future (void) { ACE_TRACE (ACE_TEXT ("LongWork::future")); return result_; } void attach (CompletionCallBack *cb) { result_.attach (cb); } private: ACE_Future<ACE_CString*> result_; }; // Listing 1 class Exit : public ACE_Method_Request { public: virtual int call (void) { ACE_TRACE (ACE_TEXT ("Exit::call")); return -1; } }; class Worker; class IManager { public: virtual int return_to_work (Worker *worker) = 0; }; // Listing 3 code/ch16 class Worker: public ACE_Task<ACE_MT_SYNCH> { public: Worker (IManager *manager) : manager_(manager), queue_ (msg_queue ()) { } int perform (ACE_Method_Request *req) { ACE_TRACE (ACE_TEXT ("Worker::perform")); return this->queue_.enqueue (req); } virtual int svc (void) { thread_id_ = ACE_Thread::self (); while (1) { ACE_Method_Request *request = this->queue_.dequeue(); if (request == 0) return -1; // Invoke the request int result = request->call (); if (result == -1) break; // Return to work. this->manager_->return_to_work (this); } return 0; } ACE_thread_t thread_id (void); private: IManager *manager_; ACE_thread_t thread_id_; ACE_Activation_Queue queue_; }; // Listing 3 ACE_thread_t Worker::thread_id (void) { return thread_id_; } // Listing 4 code/ch16 class Manager : public ACE_Task_Base, private IManager { public: enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; Manager () : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_) { ACE_TRACE (ACE_TEXT ("Manager::TP")); } int perform (ACE_Method_Request *req) { ACE_TRACE (ACE_TEXT ("Manager::perform")); return this->queue_.enqueue (req); } int svc (void) { ACE_TRACE (ACE_TEXT ("Manager::svc")); ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); // Create pool when you get in the first time. create_worker_pool (); while (!done ()) { ACE_Time_Value tv ((long)MAX_TIMEOUT); tv += ACE_OS::time (0); // Get the next message ACE_Method_Request *request = this->queue_.dequeue (&tv); if (request == 0) { shut_down (); break; } // Choose a worker. Worker *worker = choose_worker (); // Ask the worker to do the job. worker->perform (request); } return 0; } int shut_down (void); virtual int return_to_work (Worker *worker) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n"))); this->workers_.enqueue_tail (worker); this->workers_cond_.signal (); return 0; } private: Worker *choose_worker (void) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0) while (this->workers_.is_empty ()) workers_cond_.wait (); Worker *worker; this->workers_.dequeue_head (worker); return worker; } int create_worker_pool (void) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); for (int i = 0; i < POOL_SIZE; i++) { Worker *worker; ACE_NEW_RETURN (worker, Worker (this), -1); this->workers_.enqueue_tail (worker); worker->activate (); } return 0; } int done (void) { return (shutdown_ == 1); } ACE_thread_t thread_id (Worker *worker) { return worker->thread_id (); } private: int shutdown_; ACE_Thread_Mutex workers_lock_; ACE_Condition<ACE_Thread_Mutex> workers_cond_; ACE_Unbounded_Queue<Worker* > workers_; ACE_Activation_Queue queue_; }; // Listing 4 int Manager::shut_down (void) { ACE_TRACE (ACE_TEXT ("Manager::shut_down")); ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin (); Worker **worker_ptr = NULL; do { iter.next (worker_ptr); Worker *worker = (*worker_ptr); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"), thread_id (worker))); Exit *req; ACE_NEW_RETURN (req, Exit(), -1); // Send the hangup message worker->perform (req); // Wait for the exit. worker->wait (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Worker %d shut down.\n"), thread_id (worker))); delete req; delete worker; } while (iter.advance ()); shutdown_ = 1; return 0; } // Listing 5 code/ch16 int ACE_TMAIN (int, ACE_TCHAR *[]) { Manager tp; tp.activate (); ACE_Time_Value tv; tv.msec (100); // Wait for a few seconds every time you send a message. CompletionCallBack cb; LongWork workArray[OUTSTANDING_REQUESTS]; for (int i = 0; i < OUTSTANDING_REQUESTS; i++) { workArray[i].attach (&cb); ACE_OS::sleep (tv); tp.perform (&workArray[i]); } ACE_Thread_Manager::instance ()->wait (); return 0; } // Listing 5 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Condition<ACE_Thread_Mutex>; template class ACE_Future<ACE_String_Base<char>*>; template class ACE_Future_Observer<ACE_String_Base<char>*>; template class ACE_Future_Rep<ACE_String_Base<char>*>; template class ACE_Node<Worker*>; template class ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*>; template class ACE_Unbounded_Queue<Worker*>; template class ACE_Unbounded_Queue_Iterator<Worker*>; template class ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*>; template class ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Condition<ACE_Thread_Mutex> #pragma instantiate ACE_Future<ACE_String_Base<char>*> #pragma instantiate ACE_Future_Observer<ACE_String_Base<char>*> #pragma instantiate ACE_Future_Rep<ACE_String_Base<char>*> #pragma instantiate ACE_Node<Worker*> #pragma instantiate ACE_Node<ACE_Future_Observer<ACE_String_Base<char>*>*> #pragma instantiate ACE_Unbounded_Queue<Worker*> #pragma instantiate ACE_Unbounded_Queue_Iterator<Worker*> #pragma instantiate ACE_Unbounded_Set<ACE_Future_Observer<ACE_String_Base<char>*>*> #pragma instantiate ACE_Unbounded_Set_Iterator<ACE_Future_Observer<ACE_String_Base<char>*>*> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */