Sophie

Sophie

distrib > Fedora > 14 > i386 > by-pkgid > 073847b84f6b724479e93ed498a92be6 > files > 6

UpTools-8.6.1-1.fc14.i686.rpm

/* UpTools v8.6
 *
 * Copyright (c) 2005-2011 Fundacion Universidad de Palermo (Argentina).
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 *
 * 3. Neither the name of the copyright holder nor the names of its
 *    contributors may be used to endorse or promote products derived
 *    from this software without specific prior written permission.
 *
 * 4. Redistributions of any form whatsoever must retain the following
 *    acknowledgment: 'This product includes software developed by the
 *    "Universidad de Palermo, Argentina" (http://www.palermo.edu/).'
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

(MULTITHREADING INTRODUCTION, SPANISH VERSION, UTF8)

Multithreading con UpTools: introducción y ejemplos básicos
-----------------------------------------------------------

Las UpTools son una colección de clases y templates en C++ para el desarrollo de aplicaciones multithreading y aplicaciones avanzadas en general, facilitando la administración de memoria, el paralelismo, el networking, el acceso a recursos compartidos, el manejo de tiempos y eventos, etc. Fueron desarrolladas en la Facultad de Ingeniería de la Universidad de Palermo principalemente a partir de sus proyectos de investigación y desarrollo del Departamento de Comunicaciones.

El presente documento abarca solo algunas de sus capacidades referentes al paralelismo. Para recorrer más extensamente sus capacidades en este tema y en otros, por favor referirse a los ejemplos y otros documentos que puedan haber en la distribución.

Se recorren más abajo algunos ejemplos.

El primero de ellos tiene que ver con la aceleración de algoritmos cuyo espacio de trabajo es subdivisible y pasible de ser procesado en paralelo. Típicamente se utilizaría para trabajos que no se bloquean en su procesamiento, salvo quizás en el acceso a un recurso compartido por un semáforo o mutex.

El segundo ejemplo tiene que ver con el aumento de throughput del procesamiento de una colección de objetos, o con el procesamiento de eventos, en donde a veces alguno de ellos puede llegar a bloquearse a la espera de algún dato (por ejemplo la respuesta de una base de datos). Este es el caso más típico de un servidor transaccional (por ejemplo en servidores de señalización de comunicaciones, o de servicios en internet), mientras que el primer ejemplo es más típico de los problemas de HPC (high performance computing)

El tercer ejemplo tiene que ver con el manejo de timers, típicamente a utilizar en máquinas de estado para protocolos de comunicaciones.

En los tres ejemplos el programador define el procesamiento a realizar, derivando una clase cuyos objetos serán administrados por un manager.


------------------------------------------------------------------------------

Ejemplo 1:

Este ejemplo demuestra las posibilidades para la subdivisión y paralelización de un trabajo. El problema en sí es muy simple: la suma de los números enteros en un determinado rango, y podría calcularse fácilmente sin necesidad de realizar la suma usando una sencilla fórmula, pero sirve para mostrar el uso de las clases involucradas.

El manager de ejecución de trabajos en paralelo se llama UpWorkThreadDistributor. El manager procesa los trabajos subdividiéndolos en muchas partes que ejecutarán en paralelo, y si hace falta, consolidará el resultado del trabajo de cada una de las partes. El trabajo debe implementarse en una clase que derive de la clase base UpWorkSplittable. Veamos entonces el ejemplo:

Primero se define una clase que deriva de UpWorkSplittable:

class SplittableSum : public UpWorkSplittable {
	private:
		// cada instancia de la clase debe ejecutar un determinado procesamiento
		// sobre su espacio de trabajo. En este caso el espacio de trabajo es el
		// intervalo de números entre begin y end: [begin,end)
		unsigned long long begin, end;
		unsigned long long *saveResult;
		unsigned long long result;
		unsigned long long size;
		UpTimeVal* uptv;

	public:
		SplittableSum(unsigned long long _begin, unsigned long long _end,
					  unsigned long long* _saveResult=0, UpTimeVal* _uptv=0 )
					 :
						begin(_begin), end(_end), saveResult(_saveResult),
						size(end-begin), uptv(_uptv)
						{}
		~SplittableSum();
		void show(ostream& o) const;
		// los siguientes métodos existen en la clase base pero deben
		// programarse para adecuarse al caso particular.
		void action();
		SplittableSum* split();
		void join( UpWorkSplittable* s );
		char joinType();
		bool exceedHwThreads();
};


El método siguiente debe definir el procesamiento de cada porción del espacio a procesar.

void SplittableSum::action() {
	register unsigned long long res( 0 );
	for( register unsigned long long x( begin ) ; x<end ; ++x )
		res += x;
	result = res;
}

El método siguiente debe definir cómo particionar el espacio de trabajo, y crear una nueva instancia de la clase con su porción del espacio asignada, a la vez que descargar de la instancia actual la proción entregada a la nueva:
		
SplittableSum* SplittableSum::split() {
	if( size<MINSIZE_SPACE ) return 0;
	size >>=1; // divide por 2 el tamaño
	unsigned long long middle( begin+size );
	SplittableSum* ret( new SplittableSum( middle, end, 0, 0 ) );
	end = middle;
	return ret;
}

UpWorkThreadDistributor, no asume nada respecto del tipo, las dimensiones, o la forma en la que el espacio será dividido en cada etapa, pero llamará recursivamente a split() en cada instancia hasta que le devuelva un puntero nulo.

El programador debe derivar el siguiente método para indicar si hará falta un proceso de consolidación al final del procesamiento de cada instancia.

char SplittableSum::joinType() { return 's'; }

El joinType puede ser serial('s'), en árbol ('t'), o innecesario ('n').

El proceso de consolidación debe ser definido por el programador, si es que hace falta (joinType()!='n'). El manager le asignará al conjunto de las intancias obtenidas de la primera, un mutex para proteger los join(), por lo que no es necesario que el programador los proteja ,sin embargo el programador debe determinar si necesita definir mecanismos de proteccion dentro del método action().

void SplittableSum::join( UpWorkSplittable* s ) {
	result += ((SplittableSum*)s)->result;
}


El programador indica al manager si es o no conveniente subdividir el trabajo en una cantidad de partes mayor a la cantidad de threads de hardware. Por ejmplo si el trabjo tuviera secciones que se bloquean en I/O o sobre otro recurso, sería conveniente tener más subdivisiones que threads físicos. En cambio, en caso de ser no-bloqueantes no sería conveniente (a menos que se compita con tasks de otros procesos en la misma computadora).

bool SplittableSum::exceedHwThreads() {
	return false;
}




Para ejecutar entonces el procesamiento, debe crearse el manager. Aqui se le dice que cree 16 threads en total, un número que asumimos en este caso mayor a la cantidad de threads de hardware (estos últimos los averigua en forma autónoma). Un trabajo podrá dividirse en una cantidad de partes que estará limitada por: la cantidad de hardware threads o la cantidad de threads instanciados en el constructor (en función del valor de retorno de exceedHwThreads()), y por la definición del método split() que arbitrariamente podrá decidir no subdividirse en partes menores a un tamaño especificado.

UpThreadWorkdistributor ld(16);


Hay dos opciones para ejecutar el procesamiento:

SplittableSum* sp = new  new SplittableSum( 1, dataSize+1, &result, &duration );

ld.processWork( sp, true );
o
ld.processWork( sp );


En el primer caso, el metodo processWork(...) no devuelve el control hasta haber terminado de procesar sp, y además el thread llamador procesará también una de las partes del trabajo.

En el segundo caso, el metodo processWork(...) devuelve el control inmediatamente liberando al thread llamador para que siga haciendo otras cosas.

En ambos casos el manager borrará sp al terminar su ejecución, lo que hace necesario que el destructor de sp se encargue de salvar el conjunto de datos del resultado.

Un ejmplo de destructor para el caso que processWork(...) no retorne hasta terminar sería:

SplittableSum::~SplittableSum() {
	// estas condiciones hacen que solo se salve el resultado desde la instancia
	// original y no en las subdivisiones de esta
	if( saveResult ) *saveResult=result;

	// duration es un protected de la clase base y contiene la duración
	// (seteada automáticamente solo en la instancia original)
	if( uptv!=0 ) *uptv = duration;
}

Un ejemplo de destructor para el caso que processWork(...) retorne el control inmediatamente sería:

SplittableSum::~SplittableSum() {
	if( uptv!=0 ) *uptv = duration;
	if( saveResult ) {
		*saveResult=result;
		// aca se desbloquea otro thread que estaba esperando el resultado
		// en un semaforo (que hizo por ejemplo sem.wait() en el semáforo
		// global sem.
		sem.post();
	}
}


El UpThreadWorkDistributor puede seguir aceptando nuevos trabajos derivados de UpWorkSplittable desde este u otro thread aún mientras está procesando sp, y serán procesados o encolados en función de la disponibilidad de threads de hardware o lógicos según corresponda.



------------------------------------------------------------------------------

Ejmplo 2:

Los servidores transaccionales deben muchas veces procesar gran cantidad de trabajos en paralelo. Estos trabajos no son subdivisibles y por lo tanto la mejora de la performance no se obtiene de la subdivisión del cada trabajo en varios threads, sino en el procesamiento paralelo de muchos trabajos.

El UpThreadWorkDistributor puede también aceptar trabajos derivados de la clase UpWork, más sencilla que la clase UpWorkSplittable. A diferencia de esta, UpWork no es subdivisible. En que casos suele utilizarse? Muchas veces el thread que genera un trabajo (el thread que llama al metodo processWork(...) debe liberarse lo más rapido posible porque está a cargo de la administración de un determinado recurso (por ejemplo uno o más stream sockets), mientras que el procesamiento de trabajo puede demandar mucho tiempo, ya sea por mucho procesamiento o porque puede bloquearse a la espera de I/O (de bases de datos por ejemplo).

Cuando un UpWork está siendo procesado por un thread (de software) y se bloquea a la espera de I/O, el sistema operativo libera el core (o thread de hardware - hyperthreading) que lo estaba procesando y comienza a procesar otro thread (de software) que no esté actualmente bloqueado. Por esto conviene en estos casos inicializar el UpThreadWorkDistributor con muchos threads (de software).


El programador deriva una clase de la clase UpWork, debiendo definir el método action() con el procesamiento requerido, y opcionalmente datos privados asociados al trabajo:


class WorkTest : public UpWork {
	private:
		int misDatosPrivados;
	public:
		WorkTest(int _misDatosPrivados) : misDatosPrivados(_misDatosPrivados) {}
		void action() {
			... el procesamiento deseado va aqui ...
		}
};


Para procesar una instancia del WorkTest entonces debe hacers:


WorkTest* w=new WorkTest(t);
// este es el el mismo ld definido para procesar los UpWorkSplittable's.
ld.processWork(w);	// este el el mismo ld definido para procesar

En este caso processWork(...) siempre devuelve en seguida el control liberando al thread que llama, y se encarga de borrar w, por lo que el programador debe encargarse de salvar los resultados que hagan falta si los hubiere.

Si hay threads disponibles (en este caso threads lógicos, no necesariamente de hardware), w será procesado en forma inmediata. Si no hay threads disponibles w será encolado y procesado apenas se libere un thread.

UpThreadWorkdistributor lleva internamente el cálculo del tiempo promedio de procesamiento de los trabajos que recibe, y el intervalo promedio entre arribos de nuevos trabajos. Por lo tanto solo acepta nuevos trabajos si el tiempo que deberán esperar para ser procesados es menor a un límite configurable:
	
ld.setMaxQueueWait(0.5); // límite de espera de 0.5 segundos.


El programador de la aplicación puede en cualquier momento solicitar al UpThreadWorkdistributor los valores del  tiempo promedio de procesamiento de los trabajos y del intervalo promedio entre arribos de nuevos trabajos:

float ld.getWorkArrivalRateUpdated(); // en segundos
float ld.getWorkExecutionAvgTime();   // en segundos

Los tiempos e intervalos promedio son calculados con un filtro que también puede ser configurado por el programador.


------------------------------------------------------------------------------

Ejemplo 3:

Los servidores transaccionales y los que implementan protocolos de comunicaciones como ser SIP, MGCP, etc, necesitan manejar eventos temporales que afectan muchas máquinas de estado. Estos deben ejecutarse en paralelo a los trheads principales de la aplicación. La UpTools ayuda a implemtarlos de manera muy sencilla utilizando el UpTimerManager como se vé en el siguiente ejemplo.

En este ejemplo es necesario manejar timers que se ejecuten luego de esperas específicas o con determinada frecuencia. Al igual que en los ejemplos anteriores el programador debe definir un método action(...) en una clase derivada de una clase base que es administrada por un manager:


class MyTimer : public UpTimer {
	private:
		string msg; // datos privados del timer
	public:
		MyTimer(const UpTimeVal& _when, const UpTimeVal& _interval,
				  unsigned int _repeatCount, const string& s);
		~MyTimer();
		bool action(const UpTimeVal& _now); // a definir por el programador
};


El programador define cuando debe ejecutarse el timer por primera vez, y además cuantas veces y con qué intervalo debe repetirse. Para ello lo puede definir en el constructor:

MyTimer::MyTimer(const UpTimeVal& _when, const UpTimeVal& _interval,
				  unsigned int _repeatCount, const string& s)
:
  UpTimer(_when,_interval,_repeatCount),
  msg(s)
{ }

También podría modificarlo usando el método repeat(...) desde adentro del método action(...) si hiciera falta o desde afuera si es antes de insertar el timer en el manager.

Ahora debe definir la acción. Si devuelve false la accion no se reagendará a pesar que la cuenta de repeticiones no haya llegado a cero todavía.

bool MyTimer::action(const UpTimeVal& _now) {
	std::string aux;
	_now.charTime(aux);
	aux.append(msg);
	write(1,aux.data(),aux.size());
	return true;
}

El UpTimerManager despertará a intervalos configurables, y ejecutará todos aquellos timers que hayan vencido hasta el momento. Una vez ejecutado un timer, si no debe reagendarse será borrado por el mismo manager.

UpTimerManager tm;
tm.setTickInterval(0.1);
tm.startThread();

Para agendar el timer debe usarse el método add del timer manager:

UpTimeVal t;
t.now();
t += 11.5;

MyTimer* ta=new MyTimer( t, 1, 3,"timer a");

tm.add(ta);


El UpTimerManager tiene un thread específico para la ejecución de todos los timers. Si hiciera falta se podrían instanciar varios UpTimerManagers para que haya más de un thread para hacer el procesamiento de los timers.

------------------------------------------------------------------------------




El primer ejemplo expuesto más arriba puede resultar familiar a aquellos que programan en Intel TBB, no así los ejemplos dos y tres que apuntan más a servidores transaccionales. La misma librería puede utilizarse para aplicaciones muy diversas. Incluyo aquí una lista (sin descripción, solo enumerativa) de algunas de las clases y templates de la librería UpTools. Varias de estas se aplican al desarrollo multithreading, otras abarcan aspectos de network programming, administración de memoria, seguridad, manipulación de texto, manejo de múltiples bases de datos, entrada / salida, seguridad, etc.

 class UpAddress
 class UpSockAddr
 class UpHost
 class UpSocket
 class UpNet
 class UpRoute
 class UpTimeVal
 class UpLine
 class UpRegex
 class UpSubstitute
 class UpConf
 class DnsQueryType
 class UpTimerManager
 class UpTimer
 class UpThread
 class UpMutex
 class UpRWLock
 class UpScopedLock
 class UpSemaphore
 class UpWork
 class UpWorkSplittable
 class UpThreadWorkDistributor
 class UpResolver
 class UpDnsQuery
 class UpNaptrQuery
 class UpResolverSimple
 class UpDnsQuerySimple
 class UpResourceRecord
 class UpSqlQuery
 class UpSqlConn
 class UpSqlConnPool
 class UpSqlConnMysql
 class UpSqlConnPgsql
 class UpCryptoSslLibMultithreading
 class UpCryptoSslLib
 class UpSslContext
 class UpSsl
 class UpRsa
 class UpX509
 class UpLinkable
 template UpSmartLinkPtr
 template UpCondition
 template UpThreadSpecificPtr
 template UpThreadSpecificSmartPtr
 template UpBuf
 template UpSelect






















































  class    UpAddress
  class    UpSockAddr
  class    UpNet
  class    UpHost
  class    UpSocket
  class    UpTimeVal
  class    UpLine
  class    UpRegex
  class    UpSubstitute
  class    UpConf
  class    DnsQueryType
  class    UpTimer
  class    UpTimerManager
  class    UpThread
  class    UpWork
  class    UpWorkSplittable
  class    UpThreadWorkDistributor
  class    UpResolver
  class    UpDnsQuery
  class    UpNaptrQuery
  class    UpResourceRecord
  class    UpSqlQuery
  class    UpSqlConn
  class    UpSqlConnPool
  class    UpSqlConnMysql
  class    UpSqlConnPgsql
  class    UpCryptoSslLibMultithreading
  class    UpCryptoSslLib
  class    UpSslContext
  class    UpSsl
  class    UpRsa
  class    UpX509
  class    UpMutex
  class    UpRWLock
  class    UpScopedLock
  class    UpSemaphore
  template UpCondition
  template UpThreadSpecificPtr
  template UpThreadSpecificSmartPtr
  template UpSelect
  template UpSmartLinkPtr
  class    UpLinkable