< Kommunikation Teil 3 Kollektive Kommunikation MPI::Bcast MPI::Scatter MPI::Gather MPI::AllGather MPI::Alltoall MPI::Scatterv MPI::Gatherv MPI::AllGatherv MPI::Alltoallv MPI::Alltoallw MPI::Reduce MPI::Allreduce MPI::Reduce_scatter MPI::Scan MPI::Exscan MPI::User_function MPI::MAXLOC Virtuelle Topologie >

Kollektive Kommunikation

Die Kollektive Kommunikation ermöglicht das gleichzeitige Senden beziehungsweise Empfangen zwischen Prozessen. Soll zum Beispiel eine Variable an alle Prozesse "verschickt" werden, kann dies mit Hilfe einer for-Schleife und MPI::Send() oder aber mittels der kollektiven Funktion MPI::Bcast() geschehen. MPI unterscheidet drei Arten der kollektiven Kommunikation auf die ich in den folgenden Sektionen eingehen werde:

  • Ein Prozess sendet an alle Prozesse (auch an sich selbst)
  • Ein Prozess empfängt von allen Prozessen (auch von sich selbst)
  • Jeder Prozess kommuniziert mit allen anderen Prozessen (auch mit sich selbst)

Die einfachste kollektive Kommunkationsfunktion ist der Broadcast. Bei dieser wird, wie im Einführungsbeispiel erwähnt, ein Datum (oder ein Array) von einem Master-Prozess (root) an alle anderen Prozesse eines Kommunikators gesendet. Im Prinzip findet eine Synchronisation der entsprechenden Variablen statt. Die Bcast()-Methode hat folgenden Aufbau:

	void MPI::Comm::Bcast(void* buffer, int count, const MPI::Datatype& datatype, int root)
	

Im folgenden Beispiel sendet ein root-Prozess ein Integer an alle anderen Prozesse:

	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 
	 4 using std::cout;
	 5 using std::endl;
	 6 using namespace MF;
	 7 
	 8 int main(int argc, char* argv[])
	 9 {
	10   MyMPI const* mpi = MyMPI::instance();
	11   int const master = 0;
	12   int data = 0;
	13 
	14   cout << "vor Bcast(): " << *mpi->instance() << " data: " << data << endl;
	15 
	16   if (mpi->rank() == master) data = 13;
	17   mpi->world().Bcast(&data, 1, MPI::INT, master);
	18 
	19   cout << "nach Bcast(): " << *mpi->instance() << " data: " << data << endl;
	20 
	21   return 0;
	22 }
	

Und zwar wird die Zahl 13 auf alle anderen Prozesse übertragen und ist dann über die Variable data verfügbar. Diese Variable dient somit als Sendepuffer (für den root-Prozess) als auch als Empfangspuffer. Die Vermutung liegt nah, das im Hintergrund auf dem root-Prozess ein MPI::Send() und auf den anderen Prozessen ein MPI::Recv() gestartet wird. Da unter anderem kein explizites tag angegeben wird, sollten alle beteiligten Prozesse den gleichen Broadcast ausführen. Das heißt, nicht durch Verzweigungen im Programm-Code eine anderen Kommunikationsmethode aufrufen. Es kann sonst zu Deadlocks und Programmabstürzen kommen.

Die nächsten beiden MPI-Methoden der kollektiven Kommunikation, die ich vorstellen möchte, sind MPI::Scatter() und MPI::Gather(). Die erste Methode verteilt und die zweite Methode sammelt Daten ein. Im Gegensatz zum einfachen Broadcast bekommt allerdings jetzt jeder Prozess vom root-Prozess einen bestimmten Datenteil gesendet. Beim "Einsammeln" schickt jeder Prozess Daten zum root-Prozess der diese empfängt und nach Prozessrang sortiert. Die Besonderheit dabei ist, dass der root-Prozess sich selbst etwas schickt. Bevor wir zu einem Beispiel kommen erst einmal beide Methoden im Überblick:

	void MPI::Comm::Scatter(const void* sendbuf, int sendcount, const MPI::Datatype& sendtype,
                                      void* recvbuf, int recvcount, const MPI::Datatype& recvtype, int root)

	void MPI::Comm::Gather(const void* sendbuf, int sendcount, const MPI::Datatype& sendtype,
                                     void* recvbuf, int recvcount, const MPI::Datatype& recvtype, int root)
	

Beide Methoden haben den gleichen Aufbau. Es gibt einen Sende- und Empfangspuffer zu denen jeweils die Anzahl und der Datentyp angegeben werden muss. Zu guter letzt steht der Rang des root-Prozesses. Was genau passiert, lässt sich am besten an den versprochenen Beispiel erläutern.

	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 
	 6 using std::cout;
	 7 using std::endl;
	 8 using namespace MF;
	 9 
	10 int main(int argc, char* argv[])
	11 {
	12   MyMPI const* mpi = MyMPI::instance();
	13   int const root = 0;
	14 
	15   typedef std::vector<int> VectorOfInts;
	16 
	17   VectorOfInts recvBuf(mpi->size(),-1);
	18   VectorOfInts* sendBuf = NULL;
	19   int const* pSendBuf   = NULL;
	20 
	21   if(mpi->rank() == root)
	22   {
	23     sendBuf = new VectorOfInts(mpi->size()*mpi->size(),0);
	24     for(unsigned int i=0; i<sendBuf->size(); ++i)
	25       (*sendBuf)[i] = i;
	26     pSendBuf = &(*sendBuf)[0];
	27   }
	28 
	29   cout << *mpi->instance() << " ";
	30   std::copy(recvBuf.begin(), recvBuf.end(), std::ostream_iterator<int>(std::cout," "));
	31   cout << endl;
	32 
	33   mpi->world().Scatter( pSendBuf,   recvBuf.size(), MPI::INT,
	34                        &recvBuf[0], recvBuf.size(), MPI::INT, root);
	35 
	36   cout << *mpi->instance() << " ";
	37   std::copy(recvBuf.begin(), recvBuf.end(), std::ostream_iterator<int>(std::cout," "));
	38   cout << endl;
	39 
	40   if(mpi->rank() == root) delete sendBuf;
	41   return 0;
	42 }
	

Jeder Prozess legt einen Empfangspuffer an. Da der root-Prozess nur einen Sendepuffer benötigt, wird dieser als Zeiger auf einen Vektor angelegt, der erst einmal auf NULL zeigt. Nur der root-Prozess initialisiert diesen Vektor und füllt ihn mit "Werten". Nach der Ausgabe wird die Scatter()-Methode gestartet. Allerdings kann jetzt nicht einfach der Sendepuffer dieser Methode übergeben werden, da außer auf dem root-Prozess der Sendepuffer auf NULL zeigt. Dieser Zeiger wird ausgewertet und es erfolgt ein Programmabsturz. Das liegt daran, dass wir den std::vector benutzt haben. Das Problem lässt sich lösen, indem eine zusätzliche Variable (Zeile 19) eingeführt wird. Diese zeigt auf einen konstanten Integerwert und wird vorerst auch mit NULL initialisiert. Der root-Prozess lässt diese Variable (Zeile 26) dann auf den Beginn der Daten des Vektors zeigen. Jetzt kann die Scatter-Methode mit dieser Variablen als Alias für den Sendepuffer problemlos benutzt werden, da vor dem Aufrufen pSendBuf nicht ausgewertet wird. In der Methode wird diese nur vom root-Prozess benutzt und da haben wir für eine Initialisierung gesorgt. Die Ausgabe ergibt für drei Prozesse folgendes:

	process 0 of 3 running on m13f-mobile5 -1 -1 -1
	process 0 of 3 running on m13f-mobile5 0 1 2
	process 1 of 3 running on m13f-mobile5 -1 -1 -1
	process 1 of 3 running on m13f-mobile5 3 4 5
	process 2 of 3 running on m13f-mobile5 -1 -1 -1
	process 2 of 3 running on m13f-mobile5 6 7 8
	

Der Empfangspuffer ist bei allen drei Prozessen ordungsgemäß mit -1 initialisiert worden. Was aber ist genau in der Scatter-Methode abgelaufen? Der root-Prozess hat vorher den Sendepuffer mit den Zahlen 0 bis 8 gefüllt. Dann hat er die ersten drei Zahlen aus dem Sendepuffer sich selbst, die nächsten drei Zahlen an Prozess 1 und die wiederum die nächsten drei Zahlen an Prozess 2 verschickt. Jeder Prozess hat also jetzt in der gleichen Empfangspuffervariablen unterschiedliche Werte stehen, mit denen er jetzt rechnen kann. Der Sendepuffer muss also mindestens genauso groß sein, wie die Anzahl der beteiligten Prozesse mal die Anzahl der jeweils zu empfangenen Daten (Empfangspuffer).

Kommen wir nun zur Gather()-Methode, welche die Umkehrung der Scatter-Methode ist. Das heißt, dass Daten von allen Prozessen zum root-Prozess geschickt werden, der root-Prozess quasi die Daten sammelt. Das folgende Beispiel verdeutlicht die Funktionsweise:

	17   VectorOfInts sendBuf(mpi->size(),mpi->rank());
	18   VectorOfInts* recvBuf = NULL;
	19   int* pRecvBuf = NULL;
	20 
	21   if(mpi->rank() == root)
	22   {
	23     recvBuf = new VectorOfInts(mpi->size()*mpi->size(),-1);
	24     for(unsigned int i=0; i<recvBuf->size(); ++i)
	25       (*recvBuf)[i] = i;
	26     pRecvBuf = &(*recvBuf)[0];
	27   }
	28 
	29   cout << *mpi->instance() << " ";
	30   std::copy(sendBuf.begin(), sendBuf.end(), std::ostream_iterator<int>(std::cout," "));
	31   cout << endl;
	32 
	33   mpi->world().Gather(&sendBuf[0], sendBuf.size(), MPI::INT,
	34                        pRecvBuf,   sendBuf.size(), MPI::INT, root);
	35 
	36   if(mpi->rank() == root)
	37   {
	38     cout << *mpi->instance() << " ";
	39     std::copy(recvBuf->begin(), recvBuf->end(), std::ostream_iterator<int>(std::cout," "));
	40     cout << endl;
	41     delete recvBuf;
	42   }
	43   return 0;
	44 }
	

Die Ausgabe ergibt folgendes:

	process 0 of 3 running on m13f-mobile5 0 0 0
	process 1 of 3 running on m13f-mobile5 1 1 1
	process 2 of 3 running on m13f-mobile5 2 2 2
	process 0 of 3 running on m13f-mobile5 0 0 0 1 1 1 2 2 2
	

Alle Prozesse senden ihre Daten dem root-Prozess, der diese empfängt und der Rangfolge der Prozesse gemäß sortiert. Dabei wird ähnlich wie im vorherigen Beispiel nur beim root-Prozess ein Empfangspuffer aufgebaut.

Jetzt fehlen uns nur noch die kollektiven Kommunikationsmethoden, wo jeder Prozess mit jeden Prozess kommuniziert. Dafür hat die MPI-Bibliothek zwei Methoden vorgesehen: MPI::AllGather() und MPI::Alltoall(). Dabei verhält sich MPI::AllGather() wie MPI::Gather() nur das nach Aufruf der Methode alle Prozesse im Empfangspuffer die gesendeten Daten aller Prozesse haben. MPI::Alltoall() geht noch einen Schritt weiter. Hier bekommt jeder Prozess von den jeweils anderen individuell Daten zugeschickt. Bevor wir zum nächsten Beispiel kommen, erst einmal ein Überblick über die beiden Methoden:

	void MPI::Comm::Allgather(const void* sendbuf, int sendcount, const MPI::Datatype& sendtype,
                                        void* recvbuf, int recvcount, const MPI::Datatype& recvtype)

	void MPI::Comm::Alltoall(const void* sendbuf, int sendcount, const MPI::Datatype& sendtype,
                                       void* recvbuf, int recvcount, const MPI::Datatype& recvtype)
	

	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 
	 6 using std::cout;
	 7 using std::endl;
	 8 using namespace MF;
	 9 
	10 int main(int argc, char* argv[])
	11 {
	12   MyMPI const* mpi = MyMPI::instance();
	13 
	14   std::vector<int> sendBuf(mpi->size(),mpi->rank());
	15   std::vector<int> recvBuf(mpi->size()*mpi->size(),-1);
	16 
	17   cout << *mpi->instance() << " ";
	18   std::copy(sendBuf.begin(), sendBuf.end(), std::ostream_iterator<int>(std::cout," "));
	19   cout << endl;
	20 
	21   mpi->world().Allgather(&sendBuf[0], sendBuf.size(), MPI::INT,
	22                          &recvBuf[0], sendBuf.size(), MPI::INT);
	23 
	24   cout << *mpi->instance() << " ";
	25   std::copy(recvBuf.begin(), recvBuf.end(), std::ostream_iterator<int>(std::cout," "));
	26   cout << endl;
	27 
	28   return 0;
	29 }
	
	process 0 of 3 running on m13f-mobile5 0 0 0
	process 1 of 3 running on m13f-mobile5 1 1 1
	process 2 of 3 running on m13f-mobile5 2 2 2
	process 0 of 3 running on m13f-mobile5 0 0 0 1 1 1 2 2 2
	process 1 of 3 running on m13f-mobile5 0 0 0 1 1 1 2 2 2
	process 2 of 3 running on m13f-mobile5 0 0 0 1 1 1 2 2 2
	

Im Gegensatz zu den vorherigen Beispielen wird auf jeden Prozess ein Empfangspuffer mit der entsprechenden Größe angelegt. Im Sendepuffer befindet sich der jeweilige Rang des Prozesses, abhängig von der Anzahl der beteiligten Prozesse. Es erfolgt der Aufruf von MPI::Gatherall(). Hier werden nun drei Elemente des Prozesses 0 an alle Prozesse geschickt. Diese empfangen die drei Elemente und speichern diese an den ersten drei Stellen im Empfangspuffer ab. An diese schließen sich nahtlos die Elemente an, die von Prozess 1 gesendet werden. Danach folgen die Elemente von Prozess 2.


	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 
	 6 using std::cout;
	 7 using std::endl;
	 8 using namespace MF;
	 9 
	10 int main(int argc, char* argv[])
	11 {
	12   MyMPI const* mpi = MyMPI::instance();
	13 
	14   std::vector<int> sendBuf(mpi->size()*mpi->size(),mpi->rank());
	15   std::vector<int> recvBuf(mpi->size()*mpi->size(),0);
	16 
	17   for(unsigned int i=0; i<sendBuf.size(); ++i)
	18     sendBuf[i] =  sendBuf[i] * 100 + i;
	19 
	20   cout << *mpi->instance() << " ";
	21   std::copy(sendBuf.begin(), sendBuf.end(), std::ostream_iterator<int>(std::cout," "));
	22   cout << endl;
	23 
	24   mpi->world().Alltoall( &sendBuf[0], mpi->size(), MPI::INT,
	25                          &recvBuf[0], mpi->size(), MPI::INT);
	26 
	27   cout << *mpi->instance() << " ";
	28   std::copy(recvBuf.begin(), recvBuf.end(), std::ostream_iterator<int>(std::cout," "));
	29   cout << endl;
	30 
	31   return 0;
	32 }
	
	process 0 of 3 running on m13f-mobile5 0 1 2 3 4 5 6 7 8
	process 1 of 3 running on m13f-mobile5 100 101 102 103 104 105 106 107 108
	process 2 of 3 running on m13f-mobile5 200 201 202 203 204 205 206 207 208
	process 0 of 3 running on m13f-mobile5 0 1 2 100 101 102 200 201 202
	process 1 of 3 running on m13f-mobile5 3 4 5 103 104 105 203 204 205
	process 2 of 3 running on m13f-mobile5 6 7 8 106 107 108 206 207 208
	

Bei MPI::Alltoall() muss auch die Größe des Sendepuffers angepasst werden, da jetzt jeder Prozess an den jeweils anderen individuell Daten verschickt. Um das im Beispiel zu verdeutlichen füllt jeder Prozess seinen Sendepuffer mit "speziellen" Daten. So werden auf Prozess 0 die Zahlen von 0 bis 8 erzeugt. Davon schickt sich Prozess 0 die ersten drei selbst, dann die nächsten drei an Prozess 1 und die letzten drei an Prozess 2. Prozess 1 und zwei platzieren die empfangenen Daten an erster Stelle im Empfangspuffer. Äquivalent geschieht dies mit Prozess 1 beziehungsweise Prozess 2.

Mit den bisher kennen gelernten kollektiven Kommunikationsmethoden kann schon vieles abgedeckt werden. Aber an einer Stelle ist man noch etwas unflexibel. Und zwar bei der Anzahl der zu sendenden beziehungsweise zu empfangenden Daten. Um dies zu erreichen müsste man in der Lage sein, jeden Prozess mitzuteilen wie viele Daten er zu senden und/oder zu empfangen hat. Aber auch daran wurde gedacht und zur Unterscheidung den letzten vier Methoden ein "v" im Namen hinzugefügt. Dieses dient als Hinweis für die Vektorvariante:

void MPI::Comm::Scatterv(const void* sendbuf, const int sendcounts[], const int displs[], const MPI::Datatype& sendtype,
                               void* recvbuf, int recvcount,  const MPI::Datatype& recvtype, int root)

void MPI::Comm::Gatherv(const void* sendbuf, int sendcount, const MPI::Datatype& sendtype,
                              void* recvbuf, const int recvcounts[], const int displs[], const MPI::Datatype& recvtype, int root)

void MPI::Comm::Allgatherv(const void* sendbuf, int sendcount, const MPI::Datatype& sendtype,
                                 void* recvbuf, const int recvcounts[], const int displs[], const MPI::Datatype& recvtype)

void MPI::Comm::Alltoallv(const void* sendbuf, const int sendcounts[], const int sdispls[], const MPI::Datatype& sendtype,
                                void* recvbuf, const int recvcounts[], const int rdispls[], const MPI::Datatype& recvtype)
	

Zusätzlich hat sich noch ein weiterer Parameter "eingeschlichen", der das sogenannte Displacement angibt. Damit ist hier ein "Abstand" gemeint. Wenn zum Beispiel bei MPI::Gatherv() der root-Prozess Daten empfängt und das Displacement für alle Prozesse 2 beträgt, so lässt der root-Prozess beim Einsortieren im Empfangspuffer zwei Datentypen zwischen jeden "Datenpaket" frei. Die Größe des Empfangspuffers muss das natürlich berücksichtigen. Genauer werde ich auf das Displacement im nächsten Kapitel eingehen.
Die nächsten vier Beispiele zeigen die Funktionsweise:


	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 
	 6 using std::cout;
	 7 using std::endl;
	 8 using namespace MF;
	 9 
	10 int main(int argc, char* argv[])
	11 {
	12   MyMPI const* mpi = MyMPI::instance();
	13   int const root = 0;
	14 
	15   typedef std::vector<int> VectorOfInts;
	16 
	17   VectorOfInts recvBuf(mpi->size() - mpi->rank(),-1);
	18   VectorOfInts* sendBuf   = NULL;
	19   VectorOfInts* sendCount = NULL;
	20   VectorOfInts* sendDispl = NULL;
	21   int const* pSendBuf     = NULL;
	22   int const* pSendCount   = NULL;
	23   int const* pSendDispl   = NULL;
	24 
	25   if(mpi->rank() == root)
	26   {
	27     int const stride = 3;
	28     sendBuf = new VectorOfInts(mpi->size()*mpi->size() + (mpi->size()-1)*stride,-1);
	29 
	30     for (unsigned int i=0; i<sendBuf->size(); ++i)
	31       (*sendBuf)[i] = i;
	32 
	33     sendCount = new VectorOfInts(mpi->size(), recvBuf.size());
	34     sendDispl = new VectorOfInts(mpi->size(), -1);
	35 
	36     for(unsigned int i=0; i<recvBuf.size(); ++i)
	37     {
	38       (*sendCount)[i] -= i;
	39       (*sendDispl)[i]  = i * (stride + mpi->size());
	40     }
	41 
	42     pSendBuf   = &(*sendBuf)[0];
	43     pSendCount = &(*sendCount)[0];
	44     pSendDispl = &(*sendDispl)[0];
	45   }
	46 
	47   cout << *mpi->instance() << " ";
	48   std::copy(recvBuf.begin(), recvBuf.end(), std::ostream_iterator<int>(std::cout," "));
	49   cout << endl;
	50 
	51   mpi->world().Scatterv(pSendBuf, pSendCount, pSendDispl, MPI::INT,
	52                         &recvBuf[0], recvBuf.size(), MPI::INT, root);
	53 
	54   cout << *mpi->instance() << " ";
	55   std::copy(recvBuf.begin(), recvBuf.end(), std::ostream_iterator<int>(std::cout," "));
	56   cout << endl;
	57 
	58   if(mpi->rank() == root)
	59   {
	60     delete sendBuf;
	61     delete sendCount;
	62     delete sendDispl;
	63   }
	64   return 0;
	65 }
	
	process 0 of 4 running on m13f-mobile5 -1 -1 -1 -1
	process 1 of 4 running on m13f-mobile5 -1 -1 -1
	process 2 of 4 running on m13f-mobile5 -1 -1
	process 3 of 4 running on m13f-mobile5 -1
	process 0 of 4 running on m13f-mobile5 0 1 2 3
	process 1 of 4 running on m13f-mobile5 7 8 9
	process 2 of 4 running on m13f-mobile5 14 15
	process 3 of 4 running on m13f-mobile5 21
	

Nun stellt sich die Frage, welche Zahl als nächstes in der Zahlenfolge kommt. ;-) Da wir hier nicht bei einem IQ-Test sind, erläutere ich mal, was das Programm machen soll: Jeder Prozess legt einen Empfangspuffer allerdings mit unterschiedlicher Größe an (Zeile 17). Der root-Prozess sendet dann jedem Prozess, entsprechend dessen Größe, "Daten". Zusätzlich wird ein Displacement eingehalten.
Die Größe wird in Zeile 38 bestimmt. Der Vektor sendCount enthält für dieses Beispiel die Werte 4, 3, 2 und 1. Das Displacement wird in der darauffolgendenden Zeile berechnet. sendDispl enthält danach die Werte 0, 7, 14 und 21. Diese geben den Abstand im Sendepuffer an, von welchen jedem Prozess die entsprechende Anzahl gesendet wird. Das heißt, Prozess 0 beginnt an Postion 0 im Sendepuffer und sendet sich selbst die vier Zahlen 0, 1, 2 und 3. Danach wird Prozess 1 bedient. Dieser bekommt Zahlen ab Position 7 zugeschickt und zwar genau drei Stück. Danach ist Prozess 2 ab Postion 14 mit zwei Zahlen dran und Prozess 3 bekommt nur eine Zahl ab Position 21 zugeschickt.


	17   VectorOfInts sendBuf(mpi->rank() + 1,-1);
	18   VectorOfInts* recvBuf   = NULL;
	19   VectorOfInts* recvCount = NULL;
	20   VectorOfInts* recvDispl = NULL;
	21   int* pRecvBuf    = NULL;
	22   int* pRecvCount  = NULL;
	23   int* pRecvDispl  = NULL;
	24 
	25   for (unsigned int i=0; i<sendBuf.size(); ++i)
	26     sendBuf[i] = mpi->rank()*10 + i;
	27 
	28   if(mpi->rank() == root)
	29   {
	30     int const stride = 3;
	31 
	32     recvCount = new VectorOfInts(mpi->size(), 1);
	33     recvDispl = new VectorOfInts(mpi->size(), 0);
	34 
	35     for(int i=1; i<mpi->size(); ++i)
	36     {
	37       (*recvCount)[i] = i + 1;
	38       (*recvDispl)[i] = (*recvDispl)[i-1] + i + stride;
	39     }
	40 
	41     recvBuf = new VectorOfInts(recvCount->back() + recvDispl->back(),-1);
	42     pRecvBuf   = &(*recvBuf)[0];
	43     pRecvCount = &(*recvCount)[0];
	44     pRecvDispl = &(*recvDispl)[0];
	45   }
	46 
	47   cout << *mpi->instance() << " ";
	48   std::copy(sendBuf.begin(), sendBuf.end(), std::ostream_iterator<int>(std::cout," "));
	49   cout << endl;
	50 
	51   mpi->world().Gatherv(&sendBuf[0], sendBuf.size(), MPI::INT,
	52                         pRecvBuf, pRecvCount, pRecvDispl, MPI::INT, root);
	53 
	54   if(mpi->rank() == root)
	55   {
	56     cout << *mpi->instance() << " ";
	57     std::copy(recvBuf->begin(), recvBuf->end(), std::ostream_iterator<int>(std::cout," "));
	58     cout << endl;
	59 
	60     delete recvBuf;
	61     delete recvCount;
	62     delete recvDispl;
	63   }
	64   return 0;
	65 }
	
	process 0 of 4 running on m13f-mobile5 0
	process 1 of 4 running on m13f-mobile5 10 11
	process 2 of 4 running on m13f-mobile5 20 21 22
	process 3 of 4 running on m13f-mobile5 30 31 32 33
	process 0 of 4 running on m13f-mobile5 0 -1 -1 -1 10 11 -1 -1 -1 20 21 22 -1 -1 -1 30 31 32 33
	

Der root-Prozess empfängt in diesem Beispiel Daten von allen Prozessen. Die Anzahl der Daten wird dabei in Abhängigkeit des Ranges bestimmt (Zeile 37). Zwischen den empfangenen "Datenpaketen" wird ein Abstand (Displacement) von drei Integern eingehalten. Diese Abstände werden in Zeile 38 berechnet. Der Vektor recvDispl enthält hier die Werte 0, 4, 9 und 15.


	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 
	 6 using std::cout;
	 7 using std::endl;
	 8 using namespace MF;
	 9 
	10 int main(int argc, char* argv[])
	11 {
	12   MyMPI const* mpi = MyMPI::instance();
	13 
	14   std::vector<int> sendBuf(mpi->size(),mpi->rank());
	15   std::vector<int> recvBuf(mpi->size()*mpi->size(),-1);
	16 
	17   int const stride = 3;
	18   recvBuf.resize(recvBuf.size() + (mpi->size()-1)*stride, -1);
	19 
	20   std::vector<int> recvCount(mpi->size(), sendBuf.size());
	21   std::vector<int> recvDispl(mpi->size(), -1);
	22 
	23   for(unsigned int i=0; i<sendBuf.size(); ++i)
	24     recvDispl[i] = i * (stride + mpi->size());
	25 
	26   cout << *mpi->instance() << " ";
	27   std::copy(sendBuf.begin(), sendBuf.end(), std::ostream_iterator<int>(std::cout," "));
	28   cout << endl;
	29 
	30   mpi->world().Allgatherv(&sendBuf[0], sendBuf.size(), MPI::INT,
	31                           &recvBuf[0], &recvCount[0], &recvDispl[0], MPI::INT);
	32 
	33   cout << *mpi->instance() << " ";
	34   std::copy(recvBuf.begin(), recvBuf.end(), std::ostream_iterator<int>(std::cout," "));
	35   cout << endl;
	36 
	37   return 0;
	38 }
	39 
	
	process 0 of 4 running on m13f-mobile5 0 0 0 0
	process 1 of 4 running on m13f-mobile5 1 1 1 1
	process 2 of 4 running on m13f-mobile5 2 2 2 2
	process 3 of 4 running on m13f-mobile5 3 3 3 3
	process 0 of 4 running on m13f-mobile5 0 0 0 0 -1 -1 -1 1 1 1 1 -1 -1 -1 2 2 2 2 -1 -1 -1 3 3 3 3
	process 1 of 4 running on m13f-mobile5 0 0 0 0 -1 -1 -1 1 1 1 1 -1 -1 -1 2 2 2 2 -1 -1 -1 3 3 3 3
	process 2 of 4 running on m13f-mobile5 0 0 0 0 -1 -1 -1 1 1 1 1 -1 -1 -1 2 2 2 2 -1 -1 -1 3 3 3 3
	process 3 of 4 running on m13f-mobile5 0 0 0 0 -1 -1 -1 1 1 1 1 -1 -1 -1 2 2 2 2 -1 -1 -1 3 3 3 3
	

Dieses Beispiel für MPI::Allgatherv() ist MPI::Allgather nachempfunden. Der einzige Unterschied besteht darin, dass zwischen den einzelnen "Datenpaketen" ein Displacement von drei Integerwerten gelassen wird. Die Anzahl der zu empfangenen Daten vom jeweiligen Prozess ist immer gleich der Anzahl der beteiligten Prozesse.


	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 
	 6 using std::cout;
	 7 using std::endl;
	 8 using namespace MF;
	 9 
	10 int main(int argc, char* argv[])
	11 {
	12   MyMPI const* mpi = MyMPI::instance();
	13   typedef std::vector<int> VectorOfInts;
	14 
	15   int const sendStride = 1;
	16   VectorOfInts sendBuf(mpi->size()*mpi->size(), mpi->rank());
	17   sendBuf.resize(sendBuf.size() + (mpi->size()-1)*sendStride, mpi->rank());
	18 
	19   for (unsigned int i=0; i<sendBuf.size(); ++i)
	20   {
	21     sendBuf[i] = sendBuf[i] * 100 + i;
	22   }
	23 
	24   VectorOfInts sendCount(mpi->size(), mpi->size());
	25   VectorOfInts sendDispl(mpi->size(), -1);
	26 
	27   for(unsigned int i=0; i<mpi->size(); ++i)
	28     sendDispl[i] = i * (sendStride + mpi->size());
	29 
	30   int const recvStride = 3;
	31   VectorOfInts recvBuf(mpi->size()*mpi->size(),-1);
	32   recvBuf.resize(recvBuf.size() + (mpi->size()-1)*recvStride, -1);
	33 
	34   VectorOfInts recvCount(mpi->size(), mpi->size());
	35   VectorOfInts recvDispl(mpi->size(), -1);
	36 
	37   for(unsigned int i=0; i<mpi->size(); ++i)
	38     recvDispl[i] = i * (recvStride + mpi->size());
	39 
	40   cout << *mpi->instance() << " ";
	41   std::copy(sendBuf.begin(), sendBuf.end(), std::ostream_iterator<int>(std::cout," "));
	42   cout << endl;
	43 
	44   mpi->world().Alltoallv(&sendBuf[0], &sendCount[0], &sendDispl[0], MPI::INT,
	45                          &recvBuf[0], &recvCount[0], &recvDispl[0], MPI::INT);
	46 
	47   cout << *mpi->instance() << " ";
	48   std::copy(recvBuf.begin(), recvBuf.end(), std::ostream_iterator<int>(std::cout," "));
	49   cout << endl;
	50 
	51   return 0;
	52 }
	53 
	
	process 0 of 4 running on m13f-mobile5 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
	process 1 of 4 running on m13f-mobile5 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
	process 2 of 4 running on m13f-mobile5 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
	process 3 of 4 running on m13f-mobile5 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
	process 0 of 4 running on m13f-mobile5 0 1 2 3 -1 -1 -1 100 101 102 103 -1 -1 -1 200 201 202 203 -1 -1 -1 300 301 302 303
	process 1 of 4 running on m13f-mobile5 5 6 7 8 -1 -1 -1 105 106 107 108 -1 -1 -1 205 206 207 208 -1 -1 -1 305 306 307 308
	process 2 of 4 running on m13f-mobile5 10 11 12 13 -1 -1 -1 110 111 112 113 -1 -1 -1 210 211 212 213 -1 -1 -1 310 311 312 313
	process 3 of 4 running on m13f-mobile5 15 16 17 18 -1 -1 -1 115 116 117 118 -1 -1 -1 215 216 217 218 -1 -1 -1 315 316 317 318
	

Wie auch bei MPI::Alltoall() sendet hier jeder Prozess den jeweils anderen individuell Daten. Hier wird allerdings beim Senden und Empfangen ein Displacement eingehalten. Auf Prozess 0 zum Beispiel werden die Zahlen von 0 bis 18 erzeugt. Davon sendet sich Prozess 0 die ersten vier Zahlen 0, 1, 2, 3 selbst. Die Zahl 4 wird infolge des Sende-Displacements ignoriert und Prozess 1 bekommt die Zahlen 5, 6, 7 und 8 zugeschickt. Die Zahl 9 wird wieder ausgelassen und so weiter. Beim Empfangen wiederum wird zwischen den Datenpaketen ein Displacement von drei eingehalten. Analog verhält es sich bei den anderen Prozessen.

Wer nun gedacht hat, dass das alles war, den muss ich leider enttäuschen. Denn in den letzten Beispielen war der Datentyp immer gleich. Um diesen dynamisch zu haben, muss ebenfalls ein Vektor, diesmal von Datentypen, verwendet werden. Daraus entsteht die allgemeinste kollektive Funktion MPI::Alltoallw() mit folgenden Parametern:

void MPI::Comm::Alltoallw(const void* sendbuf, const int sendcounts[], const int sdispls[], const MPI::Datatype sendtypes[],
                                void* recvbuf, const int recvcounts[], const int rdispls[], const MPI::Datatype recvtypes[])
	




Globale Reduktionsoperationen

Im letzten Abschnitt haben wir MPI-Funktionen kennen gelernt, die es uns ermöglichen Daten in einer Gruppe von Prozessen zu verteilen. Nach diesem Verteilen wird mit den Daten häufig irgend etwas berechnet. Dafür können auch die folgenden MPI-Funktionen benutzt, die on-the-fly, also während der Kommunikation, Berechnungen durchführen. MPI hat viele Operationen bereits vordefiniert, aber es können auch eigene Operationen definiert werden. Bevor wir zu Beispielen kommen, erst einmal wie üblich die Übersicht über die MPI-Funktionen:

void MPI::Comm::Reduce        (const void* sendbuf, void* recvbuf, int count, const MPI::Datatype& datatype, const MPI::Op& op, int root)
void MPI::Comm::Allreduce     (const void* sendbuf, void* recvbuf, int count, const MPI::Datatype& datatype, const MPI::Op& op)
void MPI::Comm::Reduce_scatter(const void* sendbuf, void* recvbuf, int recvcounts[], const MPI::Datatype& datatype, const MPI::Op& op)
void MPI::Intracomm::Scan     (const void* sendbuf, void* recvbuf, int count, const MPI::Datatype& datatype, const MPI::Op& op)
void MPI::Intracomm::Exscan   (const void* sendbuf, void* recvbuf, int count, const MPI::Datatype& datatype, const MPI::Op& op)
	

Der hier neu hinzu kommende Datentyp ist MPI::Op. Dahinter verbirgt sich die Operation, die auf die Daten auszuführen ist. Allerdings sind nur bestimmte Operationen für bestimmte Datentypen erlaubt. Die folgende Tabelle gibt eine Übersicht darüber:

MPI::Op MPI::MAX, MPI::MIN MPI::SUM, MPI::PROD MPI::LAND, MPI::LOR, MPI::LXOR MPI::BAND, MPI::BOR, MPI::BXOR MPI::MAXLOC, MPI::MINLOC
MPI::Datatype MPI::SIGNED_CHAR, MPI::UNSIGNED_CHAR, MPI::SHORT, MPI::INT, MPI::UNSIGNED, MPI::UNSIGNED_SHORT, MPI::LONG, MPI::UNSIGNED_LONG, MPI::FLOAT, MPI::DOUBLE, MPI::LONG_DOUBLE MPI::SIGNED_CHAR, MPI::UNSIGNED_CHAR, MPI::SHORT, MPI::INT, MPI::UNSIGNED, MPI::UNSIGNED_SHORT, MPI::LONG, MPI::UNSIGNED_LONG, MPI::FLOAT, MPI::DOUBLE, MPI::LONG_DOUBLE, MPI::COMPLEX, MPI::DOUBLE_COMPLEX, MPI::LONG_DOUBLE_COMPLEX MPI::BOOL MPI::SIGNED_CHAR, MPI::UNSIGNED_CHAR, MPI::SHORT, MPI::INT, MPI::UNSIGNED, MPI::UNSIGNED_SHORT, MPI::LONG, MPI::UNSIGNED_LONG, MPI::FLOAT, MPI::DOUBLE, MPI::LONG_DOUBLE MPI::FLOAT_INT, MPI::DOUBLE_INT, MPI::LONG_INT, MPI::TWOINT, MPI::SHORT_INT, MPI::LONG_DOUBLE_INT, MPI::TWOREAL, MPI::TWODOUBLE_PRECISION, MPI::TWOINTEGER



	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 #include "Cont2Cout.hpp"
	 6 
	 7 using std::cout;
	 8 using std::endl;
	 9 using namespace MF;
	10 using namespace Common;
	11 
	12 int main(int argc, char* argv[])
	13 {
	14   MyMPI const* mpi = MyMPI::instance();
	15   typedef std::vector<int> VectorOfInts;
	16 
	17   int const root = 0;
	18 
	19   VectorOfInts sendBuf(mpi->size(),-1);
	20   for(unsigned int i=0; i<sendBuf.size(); ++i)
	21     sendBuf[i] = i + mpi->rank();
	22 
	23   VectorOfInts* recvBuf = NULL;
	24   int* pRecvBuf   = NULL;
	25 
	26   if (mpi->rank() == root)
	27   {
	28     recvBuf = new VectorOfInts(mpi->size(),-1);
	29     pRecvBuf = &(*recvBuf)[0];
	30   }
	31 
	32   cout << *mpi->instance() << " ";
	33   Cont2Cout<VectorOfInts>()(sendBuf," ");
	34   cout << endl;
	35 
	36   mpi->world().Reduce(&sendBuf[0], pRecvBuf, sendBuf.size(), MPI::INT, MPI::SUM, root);
	37 
	38   if (mpi->rank() == root)
	39   {
	40     cout << *mpi->instance() << " ";
	41     Cont2Cout<VectorOfInts>()(recvBuf," ");
	42     cout << endl;
	43     delete recvBuf;
	44   }
	45 
	46   return 0;
	47 }
	
	process 0 of 3 running on m13f-mobile5 0 1 2
	process 1 of 3 running on m13f-mobile5 1 2 3
	process 2 of 3 running on m13f-mobile5 2 3 4
	process 0 of 3 running on m13f-mobile5 3 6 9
	

Die MPI-Funktion MPI::Reduce ähnelt sehr der Funktion MPI::Gather. Nur wird jetzt eine Operation auf die Daten ausgeführt, bevor sie im Empfangspuffer des roots landen. In diesem Beispiel findet eine Addition statt. Das erste Datum jedes Sendepuffers wird summiert also 0 + 1 + 2 = 3 und im Empfangspuffer des root-Prozess' gespeichert. Analog geschieht dies mit dem zweiten und dritten Datum.
Neu in diesen Beispiel ist die Benutzung der Klasse Cont2Cout die in der Header-Datei Cont2Cout.hpp gespeichert ist. Diese dient der Ausgabe des Inhalts eines einfachen Containers.


	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 #include "Cont2Cout.hpp"
	 6 
	 7 using std::cout;
	 8 using std::endl;
	 9 using namespace MF;
	10 using namespace Common;
	11 
	12 int main(int argc, char* argv[])
	13 {
	14   MyMPI const* mpi = MyMPI::instance();
	15   typedef std::vector<int> VectorOfInts;
	16 
	17   VectorOfInts sendBuf(mpi->size(),-1);
	18   for(unsigned int i=0; i<sendBuf.size(); ++i)
	19     sendBuf[i] = i + mpi->rank();
	20 
	21   VectorOfInts recvBuf(sendBuf.size(),-1);
	22 
	23   cout << *mpi->instance() << " ";
	24   Cont2Cout<VectorOfInts>()(sendBuf," ");
	25   cout << endl;
	26 
	27   mpi->world().Allreduce(&sendBuf[0], &recvBuf[0], sendBuf.size(), MPI::INT, MPI::SUM);
	28 
	29   cout << *mpi->instance() << " ";
	30   Cont2Cout<VectorOfInts>()(recvBuf," ");
	31   cout << endl;
	32 
	33   return 0;
	34 }
	
	process 0 of 3 running on m13f-mobile5 0 1 2
	process 1 of 3 running on m13f-mobile5 1 2 3
	process 2 of 3 running on m13f-mobile5 2 3 4
	process 0 of 3 running on m13f-mobile5 3 6 9
	process 1 of 3 running on m13f-mobile5 3 6 9
	process 2 of 3 running on m13f-mobile5 3 6 9
	

Dieses Beispiel ist analog zu MPI::Reduce. Der einzige Unterschied besteht darin, dass alle Prozesse das Ergebnis erhalten.


	17   VectorOfInts sendBuf(mpi->size(),-1);
	18   for(unsigned int i=0; i<sendBuf.size(); ++i)
	19     sendBuf[i] = i + mpi->rank();
	20 
	21   int count =  sendBuf.size() / mpi->size();
	22   VectorOfInts recvCount(mpi->size(),count);
	23 
	24   VectorOfInts recvBuf(count, -1);
	25 
	26   cout << *mpi->instance() << " ";
	27   Cont2Cout<VectorOfInts>()(sendBuf," ");
	28   cout << endl;
	29 
	30   mpi->world().Reduce_scatter(&sendBuf[0], &recvBuf[0], &recvCount[0], MPI::INT, MPI::SUM);
	31 
	32   cout << *mpi->instance() << " ";
	33   Cont2Cout<VectorOfInts>()(recvBuf," ");
	34   cout << endl;
	35 
	36   return 0;
	37 }
	
	process 0 of 3 running on m13f-mobile5 0 1 2
	process 1 of 3 running on m13f-mobile5 1 2 3
	process 2 of 3 running on m13f-mobile5 2 3 4
	process 0 of 3 running on m13f-mobile5 3
	process 1 of 3 running on m13f-mobile5 6
	process 2 of 3 running on m13f-mobile5 9
	

Diese MPI-Funktion führt zunächst eine Berechnung aus und veteilt dann das Ergebnis an die beteiligten Prozesse. Dabei ist der Vektor recvCount entscheidend. In diesem steht, welcher Prozess wie viele "Anteile" an dem Ergebnis gesendet bekommt. In diesem Beispiel bekommt jeder Prozess genau eine Zahl des Ergebnisses zugeschickt. In Zeile 21 wird genau dieser "Anteil" berechnet.


	17   VectorOfInts sendBuf(mpi->size(),-1);
	18   for(unsigned int i=0; i<sendBuf.size(); ++i)
	19     sendBuf[i] = i + mpi->rank();
	20 
	21   VectorOfInts recvBuf(sendBuf.size(),-1);
	22 
	23   cout << *mpi->instance() << " ";
	24   Cont2Cout<VectorOfInts>()(sendBuf," ");
	25   cout << endl;
	26 
	27   mpi->world().Scan(&sendBuf[0], &recvBuf[0], sendBuf.size(), MPI::INT, MPI::SUM);
	28 
	29   cout << *mpi->instance() << " ";
	30   Cont2Cout<VectorOfInts>()(recvBuf," ");
	31   cout << endl;
	32 
	33   return 0;
	34 }
	
	process 0 of 3 running on m13f-mobile5 0 1 2
	process 0 of 3 running on m13f-mobile5 0 1 2
	process 1 of 3 running on m13f-mobile5 1 2 3
	process 1 of 3 running on m13f-mobile5 1 3 5
	process 2 of 3 running on m13f-mobile5 2 3 4
	process 2 of 3 running on m13f-mobile5 3 6 9
	

MPI::Scan ist eine Spezialisierung der MPI::Reduce-Operation. Bei jener wird über alle Daten eine Operation ausgeführt. Im Gegensatz dazu verwendet MPI::Scan nur die Daten der Vorgänger-Prozesse und die eigenen. Am besten lässt dies sich an dem Beispiel erklären: Prozess 0 hat keine Vorgänger und seine Daten bleiben gleich. Prozess 1 hat Prozess 0 als Vorgänger. Daher wird die Summe aus 0+1, 1+2 und 2+3 berechnet und im Empfangspuffer von Prozess 1 gespeichert. Prozess 2 hat als Vorgänger Prozess 0 und 1. Das ergibt 0+1+2=3, 1+2+3=6 und 2+3+4=9 im Empfangspuffer von Prozess 2.


	17   VectorOfInts sendBuf(mpi->size(),-1);
	18   for(unsigned int i=0; i<sendBuf.size(); ++i)
	19     sendBuf[i] = i + mpi->rank();
	20 
	21   VectorOfInts recvBuf(sendBuf.size(),-1);
	22 
	23   cout << *mpi->instance() << " ";
	24   Cont2Cout<VectorOfInts>()(sendBuf," ");
	25   cout << endl;
	26 
	27   mpi->world().Exscan(&sendBuf[0], &recvBuf[0], sendBuf.size(), MPI::INT, MPI::SUM);
	28 
	29   cout << *mpi->instance() << " ";
	30   Cont2Cout<VectorOfInts>()(recvBuf," ");
	31   cout << endl;
	32 
	33   return 0;
	34 }
	
	process 0 of 3 running on m13f-mobile5 0 1 2
	process 0 of 3 running on m13f-mobile5 -1 -1 -1
	process 1 of 3 running on m13f-mobile5 1 2 3
	process 1 of 3 running on m13f-mobile5 0 1 2
	process 2 of 3 running on m13f-mobile5 2 3 4
	process 2 of 3 running on m13f-mobile5 1 3 5
	

MPI::Exscan ist die Verallgemeinerung von MPI::Scan. Bei dieser Funktion werden nur die Daten der Vorgängerprozesse für die Operation beachtet. Die eigenen Daten werden nicht verwendet. Dadurch sind die Daten des root-Prozesses undefiniert. Die MPI::Scan-Funktion kann emuliert werden, wenn nach der MPI::Exscan-Funktion die eigenen Daten durch die Operation mit den Daten aus dem Empfangspuffer verknüpft werden. In diesem Beispiel verschieben sich in Bezug auf das vorhergehende Beispiel die Ergebnisse um einen Prozess nach hinten. Die Ausgabe auf Prozess 0 ist undefiniert.



Im nächsten Beispiel möchte ich eine benutzerdefinierte Funktion vorstellen, die MPI::SUM implementiert. Die Erklärung folgt unter dem Beispiel.

	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 #include <numeric>
	 6 #include "Cont2Cout.hpp"
	 7 
	 8 using std::cout;
	 9 using std::endl;
	10 using namespace MF;
	11 using namespace Common;
	12 
	13 template<typename T>
	14 void sum(T const* in, T* inout, int len, const MPI::Datatype& datatype)
	15 {
	16   std::transform(in, in+len, inout, inout, std::plus<T>() );
	17 }
	18 
	19 int main(int argc, char* argv[])
	20 {
	21   MyMPI const* mpi = MyMPI::instance();
	22   typedef std::vector<int> VectorOfInts;
	23 
	24   VectorOfInts sendBuf(mpi->size(),-1);
	25   for(unsigned int i=0; i<sendBuf.size(); ++i)
	26     sendBuf[i] = i + mpi->rank();
	27 
	28   VectorOfInts recvBuf(sendBuf.size(),-1);
	29 
	30   cout << *mpi->instance() << " ";
	31   Cont2Cout<VectorOfInts>()(sendBuf," ");
	32   cout << endl;
	33 
	34   void (*pt2function)(int const* in, int* inout, int len, const MPI::Datatype& datatype) = &sum<int>;
	35   MPI::Op mySumOp;
	36   mySumOp.Init((MPI::User_function*) pt2function, true);
	37 
	38   mpi->world().Allreduce(&sendBuf[0], &recvBuf[0], sendBuf.size(), MPI::INT, mySumOp);
	39 
	40   mySumOp.Free();
	41 
	42   cout << *mpi->instance() << " ";
	43   Cont2Cout<VectorOfInts>()(recvBuf," ");
	44   cout << endl;
	45 
	46   return 0;
	47 }
	
	process 0 of 3 running on m13f-mobile5 0 1 2
	process 1 of 3 running on m13f-mobile5 1 2 3
	process 2 of 3 running on m13f-mobile5 2 3 4
	process 0 of 3 running on m13f-mobile5 3 6 9
	process 1 of 3 running on m13f-mobile5 3 6 9
	process 2 of 3 running on m13f-mobile5 3 6 9
	

Die Ausgabe ist wie erwartet die gleiche wie im Beispiel zu MPI::Allreduce. Doch was muss im einzelnen programmiert und beachtet werden um eine eigene Operation zu definieren. Dazu beginnen wir zunächst in Zeile 35 in der ein MPI::Op-Objekt angelegt wird. Dieses Objekt muss mit einem Funktionspointer und einer boolschen Variablen initialisiert werden (Zeile 36). Dabei gibt die boolsche Variable an, ob die Operation kommutativ ist. Es wird weiterhin angenommen, dass eine benutzerdefinierte Funktion assoziativ ist. Ist die boolsche Variable false so wird beginnend bei Prozess 0 in ansteigender Reihenfolge gerechnet.
Der übergebene Funktionspointer muss auf eine Funktion zeigen, die folgenden Prototypen entspricht:

typedef void MPI_User_function(const void *invec, void* inoutvec, int len, const Datatype& datatype);
	

Die Funktion benötigt also zwei Zeiger auf jeweils ein Array, die Länge des Arrays und den MPI-Datentyp der Elemente des Arrays. Diese Funktion wird der Initialisierungsfunktion als Funktionspointer übergeben, womit wir bei dem eigentlichen "Problem" wären. Denn neben Funktionspointer gibt es in C++ Methodenpointer. Diese zeigen auf nicht-statische Methoden einer Klasse (impliziter this-pointer). Daher könne diese für benutzerdefinierte MPI-Operationen nicht verwendet werden. Die erste Möglichkeit wäre das Definieren einer einfachen Funktion, die zweite das Erstellen einer statischen Funktion in einer Klasse. Die dritte Möglichkeit über statische Wrapper-Funktionen auf nicht-statische Methoden zuzugreifen funktioniert nicht, da der Prototyp keine Übergabe von Objekten erlaubt. Ich habe mich deshalb für Möglichkeit eins entschieden.
Die Funktion sum ab Zeile 13 ist als template-Funktion angelegt, welche es erlaubt für andere Datentypen die Summenfunktion zu nutzen. Anstatt einer expliziten for-Schleife habe ich die Funktion std::transform der stl benutzt. Diese Funktion ruft intern wiederum die std::plus-Funktion auf, die nichts weiter macht als zwei Variablen zu addieren und das Ergebnis zurück zu liefern. Ab gcc 4.3 wird eine parallele Version der Algorithmen wie std::transform angeboten, was dann eine zweite Parallelisierung mit OpenMP darstellen würde ;-)
Aber zurück zur eigentlichen Aufgabe: In Zeile 38 wird der eigene Operator der Funktion MPI::Reduceall übergeben. Der nächste Befehl "räumt" das MPI::Op-Objekt auf, es folgt die Ausgabe und das Programm wird beendet.
Ein Hinweis noch zu Zeile 34. Hier wird explizit ein Funktionspointer angelegt, der auf die sum-Funktion zeigt. Der Funktionspointer wird dann als Parameter bei der Initialisierung in Zeile 36 benutzt. Bei meiner gcc-Version (4.2.1) konnte ich nicht gleich &sum<int> verwenden, da dann der Fehler error: address of overloaded function with no contextual type information link auftritt. Das ist ein Bug im gcc, der auch mit diesem Test-Beispiel geprüft werden kann.



	 1 #include "MyMPI.hpp"
	 2 #include <iostream>
	 3 #include <vector>
	 4 #include <iterator>
	 5 #include "Cont2Cout.hpp"
	 6 
	 7 using std::cout;
	 8 using std::endl;
	 9 using namespace MF;
	10 using namespace Common;
	11 
	12 int main(int argc, char* argv[])
	13 {
	14   MyMPI const* mpi = MyMPI::instance();
	15   typedef std::pair<double, int> DoubleIntPair;
	16   typedef std::vector<DoubleIntPair> VectorOfDoubleIntPairs;
	17 
	18   VectorOfDoubleIntPairs sendBuf(mpi->size());
	19   for(int i=0; i<mpi->size(); ++i)
	20     sendBuf[i] = DoubleIntPair(0.5 + mpi->rank() + i, mpi->rank());
	21 
	22   VectorOfDoubleIntPairs recvBuf(sendBuf.size());
	23 
	24   cout << *mpi->instance() << " ";
	25   Pair2Cout<VectorOfDoubleIntPairs>()(sendBuf,"(", ",", ");  ");
	26   cout << endl;
	27 
	28   mpi->world().Allreduce(&sendBuf[0], &recvBuf[0], sendBuf.size(), MPI::DOUBLE_INT, MPI::MAXLOC);
	29 
	30   cout << *mpi->instance() << " ";
	31   Pair2Cout<VectorOfDoubleIntPairs>()(recvBuf,"(", ",", ");  ");
	32   cout << endl;
	33 
	34   return 0;
	35 }
	
	process 0 of 3 running on m13f-mobile5 (0.5,0);  (1.5,0);  (2.5,0);
	process 1 of 3 running on m13f-mobile5 (1.5,1);  (2.5,1);  (3.5,1);
	process 2 of 3 running on m13f-mobile5 (2.5,2);  (3.5,2);  (4.5,2);
	process 0 of 3 running on m13f-mobile5 (2.5,2);  (3.5,2);  (4.5,2);
	process 1 of 3 running on m13f-mobile5 (2.5,2);  (3.5,2);  (4.5,2);
	process 2 of 3 running on m13f-mobile5 (2.5,2);  (3.5,2);  (4.5,2);
	

Die Operation MPI::MAXLOC berechnet das globale Maximum und liefert gleichzeitig einen dazu gehörigen Index. Analog berechnet MPI::MINLOC das globale Minimum. Der verwendete Datentyp ist ein Paar bestehend aus einem Ganzzahl- beziehungsweise Gleitkommadatentyp in Verbindung mit einem Integer (siehe auch obige Tabelle). Der erste Teil ist der "Wert", der zweite Teil der "Index", also genau anders herum, wenn man den Vergleich zu std::mmap nimmt.
In diesem Beispiel wird ein double-int-Paar verwendet, welches in Zeile 15 als Typ definiert wird. Dabei verwende ich statt eines selbst definierten structs gleich std::pair. Dieser Typ wird wiederum als Typ für einen Vektor benutzt. Der Sendepuffer wird in Zeile 20 mit jeweils einem Wert und dem Rang des Prozesses initialisiert. Nach der Ausführung von MPI::Allreduce stehen in den Empfangspuffern aller Prozesse die Maxima mit den dazugehörigen Indizes der jeweiligen Spalte über alle Prozessen, wie in der Ausgabe zu erkennen.
Die Ausgabe des Vektors bestehend aus std::pair-Objekten ist in der Header-Datei Cont2Cout.hpp defniert.



Diskussion

In diesem Kapitel haben wir die kollektiven Kommunikationsfunktionen von MPI kennen gelernt. Angefangen vom einfachen Broadcast, über die allgemeinste Funktion MPI::Alltoallw bis hin zu den MPI::Operationen.
Bei der Benutzung von diesen Funktionen sollte unbedingt auf allen Prozessen gewährleistet sein, dass sie in gleicher Reihenfolge aufgerufen und nicht durch andere "Kommunikationsaktivitäten" gestört werden. Zum Beispiel sollte MPI::Bcast auf allen beteiligten Prozessen in gleicher Reihenfolge abgearbeiter werden, da sonst bei Verwendung verschiedener root-Parameter ein Deadlock auftreten kann. Des Weiteren kann ein MPI::Recv blockieren, das auf einem Prozess vor dem Broadcast-Befehl ausgeführt wird und das dazugehörige MPI::Send auf einem anderen Prozess aber erst nach dem Broadcast ausgführt wird. Auch sollten Abhängigkeiten zwischen Prozessen vermieden werden, die durch unterschiedlich Kommunikator-Zugehörigkeiten eines Prozesses entstehen können.




Version 1.0