/* UpTools v8.6 * * Copyright (c) 2005-2011 Fundacion Universidad de Palermo (Argentina). * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. Neither the name of the copyright holder nor the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * 4. Redistributions of any form whatsoever must retain the following * acknowledgment: 'This product includes software developed by the * "Universidad de Palermo, Argentina" (http://www.palermo.edu/).' * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ (MULTITHREADING INTRODUCTION, SPANISH VERSION, UTF8) Multithreading con UpTools: introducción y ejemplos básicos ----------------------------------------------------------- Las UpTools son una colección de clases y templates en C++ para el desarrollo de aplicaciones multithreading y aplicaciones avanzadas en general, facilitando la administración de memoria, el paralelismo, el networking, el acceso a recursos compartidos, el manejo de tiempos y eventos, etc. Fueron desarrolladas en la Facultad de IngenierÃa de la Universidad de Palermo principalemente a partir de sus proyectos de investigación y desarrollo del Departamento de Comunicaciones. El presente documento abarca solo algunas de sus capacidades referentes al paralelismo. Para recorrer más extensamente sus capacidades en este tema y en otros, por favor referirse a los ejemplos y otros documentos que puedan haber en la distribución. Se recorren más abajo algunos ejemplos. El primero de ellos tiene que ver con la aceleración de algoritmos cuyo espacio de trabajo es subdivisible y pasible de ser procesado en paralelo. TÃpicamente se utilizarÃa para trabajos que no se bloquean en su procesamiento, salvo quizás en el acceso a un recurso compartido por un semáforo o mutex. El segundo ejemplo tiene que ver con el aumento de throughput del procesamiento de una colección de objetos, o con el procesamiento de eventos, en donde a veces alguno de ellos puede llegar a bloquearse a la espera de algún dato (por ejemplo la respuesta de una base de datos). Este es el caso más tÃpico de un servidor transaccional (por ejemplo en servidores de señalización de comunicaciones, o de servicios en internet), mientras que el primer ejemplo es más tÃpico de los problemas de HPC (high performance computing) El tercer ejemplo tiene que ver con el manejo de timers, tÃpicamente a utilizar en máquinas de estado para protocolos de comunicaciones. En los tres ejemplos el programador define el procesamiento a realizar, derivando una clase cuyos objetos serán administrados por un manager. ------------------------------------------------------------------------------ Ejemplo 1: Este ejemplo demuestra las posibilidades para la subdivisión y paralelización de un trabajo. El problema en sà es muy simple: la suma de los números enteros en un determinado rango, y podrÃa calcularse fácilmente sin necesidad de realizar la suma usando una sencilla fórmula, pero sirve para mostrar el uso de las clases involucradas. El manager de ejecución de trabajos en paralelo se llama UpWorkThreadDistributor. El manager procesa los trabajos subdividiéndolos en muchas partes que ejecutarán en paralelo, y si hace falta, consolidará el resultado del trabajo de cada una de las partes. El trabajo debe implementarse en una clase que derive de la clase base UpWorkSplittable. Veamos entonces el ejemplo: Primero se define una clase que deriva de UpWorkSplittable: class SplittableSum : public UpWorkSplittable { private: // cada instancia de la clase debe ejecutar un determinado procesamiento // sobre su espacio de trabajo. En este caso el espacio de trabajo es el // intervalo de números entre begin y end: [begin,end) unsigned long long begin, end; unsigned long long *saveResult; unsigned long long result; unsigned long long size; UpTimeVal* uptv; public: SplittableSum(unsigned long long _begin, unsigned long long _end, unsigned long long* _saveResult=0, UpTimeVal* _uptv=0 ) : begin(_begin), end(_end), saveResult(_saveResult), size(end-begin), uptv(_uptv) {} ~SplittableSum(); void show(ostream& o) const; // los siguientes métodos existen en la clase base pero deben // programarse para adecuarse al caso particular. void action(); SplittableSum* split(); void join( UpWorkSplittable* s ); char joinType(); bool exceedHwThreads(); }; El método siguiente debe definir el procesamiento de cada porción del espacio a procesar. void SplittableSum::action() { register unsigned long long res( 0 ); for( register unsigned long long x( begin ) ; x<end ; ++x ) res += x; result = res; } El método siguiente debe definir cómo particionar el espacio de trabajo, y crear una nueva instancia de la clase con su porción del espacio asignada, a la vez que descargar de la instancia actual la proción entregada a la nueva: SplittableSum* SplittableSum::split() { if( size<MINSIZE_SPACE ) return 0; size >>=1; // divide por 2 el tamaño unsigned long long middle( begin+size ); SplittableSum* ret( new SplittableSum( middle, end, 0, 0 ) ); end = middle; return ret; } UpWorkThreadDistributor, no asume nada respecto del tipo, las dimensiones, o la forma en la que el espacio será dividido en cada etapa, pero llamará recursivamente a split() en cada instancia hasta que le devuelva un puntero nulo. El programador debe derivar el siguiente método para indicar si hará falta un proceso de consolidación al final del procesamiento de cada instancia. char SplittableSum::joinType() { return 's'; } El joinType puede ser serial('s'), en árbol ('t'), o innecesario ('n'). El proceso de consolidación debe ser definido por el programador, si es que hace falta (joinType()!='n'). El manager le asignará al conjunto de las intancias obtenidas de la primera, un mutex para proteger los join(), por lo que no es necesario que el programador los proteja ,sin embargo el programador debe determinar si necesita definir mecanismos de proteccion dentro del método action(). void SplittableSum::join( UpWorkSplittable* s ) { result += ((SplittableSum*)s)->result; } El programador indica al manager si es o no conveniente subdividir el trabajo en una cantidad de partes mayor a la cantidad de threads de hardware. Por ejmplo si el trabjo tuviera secciones que se bloquean en I/O o sobre otro recurso, serÃa conveniente tener más subdivisiones que threads fÃsicos. En cambio, en caso de ser no-bloqueantes no serÃa conveniente (a menos que se compita con tasks de otros procesos en la misma computadora). bool SplittableSum::exceedHwThreads() { return false; } Para ejecutar entonces el procesamiento, debe crearse el manager. Aqui se le dice que cree 16 threads en total, un número que asumimos en este caso mayor a la cantidad de threads de hardware (estos últimos los averigua en forma autónoma). Un trabajo podrá dividirse en una cantidad de partes que estará limitada por: la cantidad de hardware threads o la cantidad de threads instanciados en el constructor (en función del valor de retorno de exceedHwThreads()), y por la definición del método split() que arbitrariamente podrá decidir no subdividirse en partes menores a un tamaño especificado. UpThreadWorkdistributor ld(16); Hay dos opciones para ejecutar el procesamiento: SplittableSum* sp = new new SplittableSum( 1, dataSize+1, &result, &duration ); ld.processWork( sp, true ); o ld.processWork( sp ); En el primer caso, el metodo processWork(...) no devuelve el control hasta haber terminado de procesar sp, y además el thread llamador procesará también una de las partes del trabajo. En el segundo caso, el metodo processWork(...) devuelve el control inmediatamente liberando al thread llamador para que siga haciendo otras cosas. En ambos casos el manager borrará sp al terminar su ejecución, lo que hace necesario que el destructor de sp se encargue de salvar el conjunto de datos del resultado. Un ejmplo de destructor para el caso que processWork(...) no retorne hasta terminar serÃa: SplittableSum::~SplittableSum() { // estas condiciones hacen que solo se salve el resultado desde la instancia // original y no en las subdivisiones de esta if( saveResult ) *saveResult=result; // duration es un protected de la clase base y contiene la duración // (seteada automáticamente solo en la instancia original) if( uptv!=0 ) *uptv = duration; } Un ejemplo de destructor para el caso que processWork(...) retorne el control inmediatamente serÃa: SplittableSum::~SplittableSum() { if( uptv!=0 ) *uptv = duration; if( saveResult ) { *saveResult=result; // aca se desbloquea otro thread que estaba esperando el resultado // en un semaforo (que hizo por ejemplo sem.wait() en el semáforo // global sem. sem.post(); } } El UpThreadWorkDistributor puede seguir aceptando nuevos trabajos derivados de UpWorkSplittable desde este u otro thread aún mientras está procesando sp, y serán procesados o encolados en función de la disponibilidad de threads de hardware o lógicos según corresponda. ------------------------------------------------------------------------------ Ejmplo 2: Los servidores transaccionales deben muchas veces procesar gran cantidad de trabajos en paralelo. Estos trabajos no son subdivisibles y por lo tanto la mejora de la performance no se obtiene de la subdivisión del cada trabajo en varios threads, sino en el procesamiento paralelo de muchos trabajos. El UpThreadWorkDistributor puede también aceptar trabajos derivados de la clase UpWork, más sencilla que la clase UpWorkSplittable. A diferencia de esta, UpWork no es subdivisible. En que casos suele utilizarse? Muchas veces el thread que genera un trabajo (el thread que llama al metodo processWork(...) debe liberarse lo más rapido posible porque está a cargo de la administración de un determinado recurso (por ejemplo uno o más stream sockets), mientras que el procesamiento de trabajo puede demandar mucho tiempo, ya sea por mucho procesamiento o porque puede bloquearse a la espera de I/O (de bases de datos por ejemplo). Cuando un UpWork está siendo procesado por un thread (de software) y se bloquea a la espera de I/O, el sistema operativo libera el core (o thread de hardware - hyperthreading) que lo estaba procesando y comienza a procesar otro thread (de software) que no esté actualmente bloqueado. Por esto conviene en estos casos inicializar el UpThreadWorkDistributor con muchos threads (de software). El programador deriva una clase de la clase UpWork, debiendo definir el método action() con el procesamiento requerido, y opcionalmente datos privados asociados al trabajo: class WorkTest : public UpWork { private: int misDatosPrivados; public: WorkTest(int _misDatosPrivados) : misDatosPrivados(_misDatosPrivados) {} void action() { ... el procesamiento deseado va aqui ... } }; Para procesar una instancia del WorkTest entonces debe hacers: WorkTest* w=new WorkTest(t); // este es el el mismo ld definido para procesar los UpWorkSplittable's. ld.processWork(w); // este el el mismo ld definido para procesar En este caso processWork(...) siempre devuelve en seguida el control liberando al thread que llama, y se encarga de borrar w, por lo que el programador debe encargarse de salvar los resultados que hagan falta si los hubiere. Si hay threads disponibles (en este caso threads lógicos, no necesariamente de hardware), w será procesado en forma inmediata. Si no hay threads disponibles w será encolado y procesado apenas se libere un thread. UpThreadWorkdistributor lleva internamente el cálculo del tiempo promedio de procesamiento de los trabajos que recibe, y el intervalo promedio entre arribos de nuevos trabajos. Por lo tanto solo acepta nuevos trabajos si el tiempo que deberán esperar para ser procesados es menor a un lÃmite configurable: ld.setMaxQueueWait(0.5); // lÃmite de espera de 0.5 segundos. El programador de la aplicación puede en cualquier momento solicitar al UpThreadWorkdistributor los valores del tiempo promedio de procesamiento de los trabajos y del intervalo promedio entre arribos de nuevos trabajos: float ld.getWorkArrivalRateUpdated(); // en segundos float ld.getWorkExecutionAvgTime(); // en segundos Los tiempos e intervalos promedio son calculados con un filtro que también puede ser configurado por el programador. ------------------------------------------------------------------------------ Ejemplo 3: Los servidores transaccionales y los que implementan protocolos de comunicaciones como ser SIP, MGCP, etc, necesitan manejar eventos temporales que afectan muchas máquinas de estado. Estos deben ejecutarse en paralelo a los trheads principales de la aplicación. La UpTools ayuda a implemtarlos de manera muy sencilla utilizando el UpTimerManager como se vé en el siguiente ejemplo. En este ejemplo es necesario manejar timers que se ejecuten luego de esperas especÃficas o con determinada frecuencia. Al igual que en los ejemplos anteriores el programador debe definir un método action(...) en una clase derivada de una clase base que es administrada por un manager: class MyTimer : public UpTimer { private: string msg; // datos privados del timer public: MyTimer(const UpTimeVal& _when, const UpTimeVal& _interval, unsigned int _repeatCount, const string& s); ~MyTimer(); bool action(const UpTimeVal& _now); // a definir por el programador }; El programador define cuando debe ejecutarse el timer por primera vez, y además cuantas veces y con qué intervalo debe repetirse. Para ello lo puede definir en el constructor: MyTimer::MyTimer(const UpTimeVal& _when, const UpTimeVal& _interval, unsigned int _repeatCount, const string& s) : UpTimer(_when,_interval,_repeatCount), msg(s) { } También podrÃa modificarlo usando el método repeat(...) desde adentro del método action(...) si hiciera falta o desde afuera si es antes de insertar el timer en el manager. Ahora debe definir la acción. Si devuelve false la accion no se reagendará a pesar que la cuenta de repeticiones no haya llegado a cero todavÃa. bool MyTimer::action(const UpTimeVal& _now) { std::string aux; _now.charTime(aux); aux.append(msg); write(1,aux.data(),aux.size()); return true; } El UpTimerManager despertará a intervalos configurables, y ejecutará todos aquellos timers que hayan vencido hasta el momento. Una vez ejecutado un timer, si no debe reagendarse será borrado por el mismo manager. UpTimerManager tm; tm.setTickInterval(0.1); tm.startThread(); Para agendar el timer debe usarse el método add del timer manager: UpTimeVal t; t.now(); t += 11.5; MyTimer* ta=new MyTimer( t, 1, 3,"timer a"); tm.add(ta); El UpTimerManager tiene un thread especÃfico para la ejecución de todos los timers. Si hiciera falta se podrÃan instanciar varios UpTimerManagers para que haya más de un thread para hacer el procesamiento de los timers. ------------------------------------------------------------------------------ El primer ejemplo expuesto más arriba puede resultar familiar a aquellos que programan en Intel TBB, no asà los ejemplos dos y tres que apuntan más a servidores transaccionales. La misma librerÃa puede utilizarse para aplicaciones muy diversas. Incluyo aquà una lista (sin descripción, solo enumerativa) de algunas de las clases y templates de la librerÃa UpTools. Varias de estas se aplican al desarrollo multithreading, otras abarcan aspectos de network programming, administración de memoria, seguridad, manipulación de texto, manejo de múltiples bases de datos, entrada / salida, seguridad, etc. class UpAddress class UpSockAddr class UpHost class UpSocket class UpNet class UpRoute class UpTimeVal class UpLine class UpRegex class UpSubstitute class UpConf class DnsQueryType class UpTimerManager class UpTimer class UpThread class UpMutex class UpRWLock class UpScopedLock class UpSemaphore class UpWork class UpWorkSplittable class UpThreadWorkDistributor class UpResolver class UpDnsQuery class UpNaptrQuery class UpResolverSimple class UpDnsQuerySimple class UpResourceRecord class UpSqlQuery class UpSqlConn class UpSqlConnPool class UpSqlConnMysql class UpSqlConnPgsql class UpCryptoSslLibMultithreading class UpCryptoSslLib class UpSslContext class UpSsl class UpRsa class UpX509 class UpLinkable template UpSmartLinkPtr template UpCondition template UpThreadSpecificPtr template UpThreadSpecificSmartPtr template UpBuf template UpSelect class UpAddress class UpSockAddr class UpNet class UpHost class UpSocket class UpTimeVal class UpLine class UpRegex class UpSubstitute class UpConf class DnsQueryType class UpTimer class UpTimerManager class UpThread class UpWork class UpWorkSplittable class UpThreadWorkDistributor class UpResolver class UpDnsQuery class UpNaptrQuery class UpResourceRecord class UpSqlQuery class UpSqlConn class UpSqlConnPool class UpSqlConnMysql class UpSqlConnPgsql class UpCryptoSslLibMultithreading class UpCryptoSslLib class UpSslContext class UpSsl class UpRsa class UpX509 class UpMutex class UpRWLock class UpScopedLock class UpSemaphore template UpCondition template UpThreadSpecificPtr template UpThreadSpecificSmartPtr template UpSelect template UpSmartLinkPtr class UpLinkable