跳轉到內容

訊息傳遞介面

25% developed
來自華夏公益教科書

本指南假設您之前瞭解 C 程式設計,並通過幾個示例為您介紹訊息傳遞介面 (MPI)。

什麼是 MPI ?

[編輯 | 編輯原始碼]

MPI 是一個標準化且可移植的訊息傳遞系統。訊息傳遞系統主要用於具有獨立記憶體的分散式機器上以執行並行應用程式。使用此係統,每個正在執行的程序將透過傳送和接收訊息來與其他程序通訊並共享其資料。MPI 是 MPI 論壇的結果規範,該論壇涉及多個組織設計一個可移植系統(允許程式在異構網路上工作)。

由於資料只能透過交換訊息來共享,因此該標準不適用於共享記憶體系統,例如多處理器計算機(儘管它可以工作,但這不是主要目標,並且存在更強大的替代方案,例如OpenMP)。基本上,MPI 包括點對點通訊、集體通訊(跨程序網路)、程序組、Fortran 和 C 繫結以及其他高階功能。另一方面,該標準沒有指定顯式共享記憶體操作、除錯工具、對執行緒的顯式支援、I/O 函式。

當前版本為 2.0,儘管 1.1 仍在使用。

建議使用威廉·格羅普、尤因·拉斯克和安東尼·斯傑勒姆合著的 Using MPI: Portable Parallel Programming with the Message-Passing Interface 作為 MPI 入門。有關更完整的資訊,請閱讀MPI: The Complete Reference,作者為 Snir、Otto、Huss-Lederman、Walker 和 Dongarra。此外,標準本身可以在[1][2]找到。有關其他網際網路參考,例如教程,請參閱開放目錄專案或位於阿貢國家實驗室的教程列表

目前,有兩個主要實現:MPICH 和 LAM。後者的官方網站 ([3]) 包含有關 LAM 和 MPI 的許多資訊。

補充庫

[編輯 | 編輯原始碼]

此外,還存在用於解決數值問題或在分散式機器上使用 I/O 函式的庫。我們可以提到:ScaLAPACK 庫、FFTW(一個可移植的 C FFT 庫)、PETSc 科學計算庫。可以在網站[4]上找到更完整的列表。

開發工具

[編輯 | 編輯原始碼]

編譯器

[編輯 | 編輯原始碼]

每個實現都提供一個編譯器,或者至少提供一個現有編譯器的前端。

由於 MPI 沒有指定除錯工具,因此可以使用一些程式來實現此目的:XMPI 是 LAM/MPI 的執行/除錯 GUI;MPI-CHECK 是 Fortran 的偵錯程式;Etnus, Inc. 提供了 Totalview 偵錯程式的版本,該版本支援 MPICH(以及其他一些 MPI 實現);Streamline Computing 的 Distributed Debugging Tool 與多個 MPI 實現一起使用,包括 MPICH 和 LAM/MPI。

基準測試

[編輯 | 編輯原始碼]

Mpptest、SKaMPI 和 MPBench 測量 MPI 實現的效能。它們可用於確定使用哪個實現、如何實現可移植且高效的 MPI 程式,以及預測 MPI 程式的效能。

著名的流行 MPI 基準測試包括 NASA 並行基準測試、SpecMPI 2007 和 HPCC 基準測試套件。

它可以在大型異構 parc 上使用。

陣列求和

[編輯 | 編輯原始碼]

第一個簡單示例的目的是計算陣列中所有儲存值的總和。在 C 程式設計中,順序版本將如下所示

 
 /* Sum of an array - Sequential version */
 #include <stdio.h>
 #define N 100000
 
 int main (void) {
   float array[N];
   double mysum = 0;
   unsigned long long i;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   //Calculation of the sum
   for (i = 0; i < N; i++)
     mysum += array[i];
   printf ("%lf\n", mysum);
   return 0;
 }

第一個版本 - 基本點對點通訊

[編輯 | 編輯原始碼]

MPI 實現允許使用者啟動多個程序,每個程序都有一個定義的數字識別符號,即其“秩”。按照慣例,我們將認為主程序是秩為“0”的程序。所有其他程序都是從程序。在這個程式中,主程序將陣列劃分為子陣列並將它們傳送給每個從程序。然後,每個從程序將計算其子陣列的總和。如果陣列的大小不是從程序數量的偶數倍,那麼主程序將透過計算陣列最後值的總和來完成剩餘的工作(見下圖)。

從陣列到程序的資料分配

對於每個程序,程式的原始碼是相同的,因此可執行程式碼也是相同的。因此,程式碼必須包含從程序和主程序部分。將執行的部分取決於程序的秩。

在這個第一個版本中,工作被分成兩個部分

 /* Sum of an array - First parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 
 #define N 100000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   //Initialization of MPI
   MPI_Init (&argc, &argv);
   //myrank will contain the rank of the process
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   //The part of the program which will be executed is decided
   if (myrank == 0)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float array[N];
   double mysum = 0, tmpsum;
   unsigned long long i;
   MPI_Status status;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   //The array is divided by two and each part is sent to the slaves
   MPI_Send (array, N / 2, MPI_FLOAT, 1, MSG_DATA, MPI_COMM_WORLD);
   MPI_Send (array + N / 2, N / 2, MPI_FLOAT, 2, MSG_DATA, MPI_COMM_WORLD);
   //The master receive the result from the slaves
   MPI_Recv (&tmpsum, 1, MPI_DOUBLE, 1, MSG_RESULT, MPI_COMM_WORLD, &status);
   mysum += tmpsum;
   MPI_Recv (&tmpsum, 1, MPI_DOUBLE, 2, MSG_RESULT, MPI_COMM_WORLD, &status);
   mysum += tmpsum;
   printf ("%lf\n", mysum);
 }
 
 void slave (void) {
   float array[N];
   double sum;
   unsigned long long i;
   MPI_Status status;
   //The slave receives the array from the master
   MPI_Recv (array, N / 2, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < N / 2; i++)
     sum += array[i];
   //The result is send to the master
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

這個程式將逐步解釋。

首先,為了使用 MPI 函式,我們必須包含檔案mpi.h,其中包含所需函式的原型。陣列的大小由N定義。MSG_DATAMSG_RESULT將由MPI_SendMPI_RECV函式使用。聲明瞭兩個函式的原型。這些函式分別包含主程序和從程序的程式碼。

主函式選擇它應該執行主部分(如果程序秩為 0)還是從部分(對於任何其他程序秩)。該函式以MPI_INIT例程開始,該例程必須在任何其他 MPI 函式呼叫之前呼叫。此例程執行各種初始化函式。接下來,透過呼叫MPI_COMM_RANK函式獲取程序的秩,並根據結果執行主部分或從部分。在計算結束時,將呼叫最終例程MPI_FINALIZE。它執行與結束 MPI 使用相關的各種管理任務。

主程序中包含了在程式的順序版本中未找到的兩個宣告:tmpsum變數,它將儲存兩個從程序中的每個結果,以及status變數(其型別由 MPI 定義),它是MPI_RECV函式所需的。

現在,計算部分被對MPI_SENDMPI_RECV的兩次呼叫取代。MPI_SEND函式的引數是傳送的第一個元素的地址、傳送的元素數量、它們的型別(最常見的 MPI 型別是MPI_INTMPI_CHARMPI_FLOATMPI_DOUBLE等)、接收者的秩、訊息的標籤以及通訊器。在最簡單的用法中,通訊器將是MPI_COMM_WORLD,其中包括參與程式執行的所有程序。MPI_RECV函式的引數是接收緩衝區的地址、接收的最大元素數量、它們的型別、傳送者的秩、訊息的標籤、通訊器以及狀態。對於一條常見的訊息,型別、標籤和通訊器必須相同(見下圖)。

MPI_SendMPI_RECV函式針對同一訊息的引數

陣列的前半部分被髮送到第一個從程序,陣列的後半部分被髮送到第二個從程序。在每種情況下,子陣列的大小為N/2。接下來,從從程序接收結果,儲存並新增到最終總和中。

然而,在它能夠接收從從程序計算出的總和之前,每個從程序必須首先對它從主程序接收的子陣列的元素求和。然後,從程序將結果傳送回主程序。

當多條訊息以任意順序傳送到同一個程序時,標籤引數允許該程序區分這些訊息並接收它們。

要使用 lam 實現執行此程式,必須啟動 lam 守護程序

$ lamboot

接下來,使用 mpicc 和 mpirun 進行編譯和執行

$ mpicc ./source.c
$ mpirun -np 3 ./a.out

選項 -np 指定要啟動的程序數量。在這個例子中,它必須是三個(一個主程序,兩個從程序)。程式mpirun建立一個 MPI 環境,將a.out複製到所有三個處理器,並使用適當的處理器編號執行每個處理器。

第二個版本 - 對程序數量的自適應性

[編輯 | 編輯原始碼]

這個基本程式有兩個問題

  • 該程式需要兩個從程序才能正確執行,並且無法自適應於可用程序的數量
  • 主程序以預定義的順序(先第一個從程序,然後是第二個從程序)接收資料。然而,第二個從程序可能先於第一個從程序完成。

這導致了第二個版本

 /* Sum of an array - Second parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 
 #define N 100000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float array[N];
   double mysum, tmpsum;
   unsigned long long step, i;
   int size;
   MPI_Status status;
   //Initialization of the array
   for (i = 0; i < N; i++)
     array[i] = i + 1;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   if (size != 1)
     step = N / (size - 1);
   //The array is divided by the number of slaves
   for (i = 0; i < size - 1; i++)
     MPI_Send (array + i * step, step, MPI_FLOAT, i + 1, MSG_DATA, MPI_COMM_WORLD);
   //The master completes the work if necessary
   for (i = (size - 1) * step, mysum = 0; i < N; i++)
     mysum += array[i];
   //The master receives the results in any order
   for (i = 1; i < size; mysum += tmpsum, i++)
     MPI_Recv (&tmpsum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT, MPI_COMM_WORLD, &status);
   printf ("%lf\n", mysum);
 }
 
 void slave (void) {
   float array[N];
   double sum;
   unsigned long long i;
   int count;
   MPI_Status status;
   MPI_Recv (array, N, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   //The slave finds the size of the array
   MPI_Get_count (&status, MPI_FLOAT, &count);
   for (i = 0, sum = 0; i < count; i++)
     sum += array[i];
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

對於第一個問題,主程序需要程序數量來確定子陣列的大小(它將儲存在 step 變數中)。MPI_COMM_SIZE函式在 size 變數中給出此指示,因此N除以從程序的數量,即size−1。接下來,陣列被髮送到從程序,而在從程序進行計算時,主程序將在必要時完成工作。然後,它將接收結果並將它們新增到最終總和中。為了解決第二個問題,主程序將從任何來源接收結果。這導致在MPI_Recv函式中使用 MPI_ANY_SOURCE 來代替程序的秩。

從程序不需要知道程序的數量,但它需要知道它接收到的子陣列的大小。為此,我們將使用 status 引數。實際上,status 是一個結構,其中包含三個資料:訊息的來源(如果使用MPI_ANY_SOURCE,則有用)、訊息的標籤(如果使用MPI_ANY_TAG,則有用)以及錯誤程式碼。此外,我們可以使用MPI_Get_count函式和 status 引數訪問接收到的資料的實際長度。count 變數是子陣列的長度。

此版本的程式可以使用任何數量的程序,甚至可以使用 1 個程序。在這種特殊情況下,主程序將完成所有計算。

不幸的是,這個程式不適合 MPI,因為,存在大量資料傳輸和少量計算。雖然 MPI 專為非共享記憶體操作而設計,但與資料傳輸相比,計算量必須更大,以便在訊息中損失最少的時間。我們可以透過只發送第一個數字和最後一個數字來求和(並新增這兩個數字之間的所有整數)來改進這個程式,但在這種情況下,只能計算幾何和。

透過辛普森法進行積分

[編輯 | 編輯原始碼]

這個程式的目的是使用辛普森法計算給定函式的積分。這個例子更適合 MPI,因為它需要的計算量更多,資料傳輸量更少,比以前。數學方程是

在 C 程式設計中,順序版本是

 /* Integration - Simpson method - Sequential version */
 
 #include <stdio.h>
 #include <math.h>
 #define n 10000 //Must be even
 
 double f (double x);
 
 int main (void) {
   double integration,step;
   int i;
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   step = (b - a) / n;
   for (i = 1, integration = 0; i < n / 2; i++)
     integration += 2*f(a+(2*i-1)*step) + f(a+2*i*step);
   integration *= 2;
   integration += f(a) + f(b) + 4*f(b - step);
   integration *= step / 3;
   printf ("%.15lf\n", integration);
   return 0;
 }
 
 double f (double x) {
   return exp(-1*pow(x,2));
 }

第一個版本 - 使用者定義的型別

[編輯 | 編輯原始碼]

要完成的工作是與前一個示例中的總和類似(一個數組,每個框的係數不同)。工作將按以下圖所示的方式劃分。

將總和資料分配到各個程序。

總和被分成多個部分,由從節點計算,主節點計算其他值並在必要時完成之前的工作。

總和的域將被髮送到每個從節點,它們將評估函式的多個返回值並將其求和。將傳送的資訊包括總和的起始位置(begin)、每個值之間的差值(step)以及必須評估函式的值的數量(兩個雙精度數和一個整數)。然而,MPI_Send例程傳送的資料必須具有相同的型別。因此,為了最小化傳送的訊息數量並最佳化程式,將建立一個並使用MPI型別來發送這些資料。

通常,要傳送的資料需要分組並最小化,以減少在通訊中浪費的時間(傳送最少的訊息,並帶有最少的資料)。因此,必須將兩個整數儲存在一個數組中並以單個訊息傳送,而不是以兩個不同的訊息傳送。

由於工作分配的速度比工作本身更快,當使用很少的程序時,主節點將等待很長時間才能獲得從節點的結果。在這個程式中,當程序數量少於定義的限制(LIMIT_PROC)時,主節點將參與計算(即,工作將被程序數量而不是從節點數量來劃分)。

 /* Integration - Simpson method - First parallel version */
 
 #include <stdio.h>
 #include <math.h>
 #include <mpi.h>
 #define n 10000 //Must be even
 #define LIMIT_PROC 4 //Number of process for which the master stop to work and only divide the work
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 struct domain_of_sum {
   double begin;
   double step;
   int number; //Number of iteration
 };
 
 double f (double x);
 MPI_Datatype Init_Type_Domain (void); //Declaration of a MPI type which will contain the domain
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 MPI_Datatype Init_Type_Domain (void) {
   MPI_Datatype Domain;
   MPI_Datatype type[3] = { MPI_DOUBLE, MPI_DOUBLE, MPI_INT };
   int blocklen[3] = { 1, 1, 1 };
   MPI_Aint ext;
   MPI_Type_extent (MPI_DOUBLE, &ext);
   MPI_Aint disp[3] = { 0, ext, 2 * ext };
   MPI_Type_struct (3, blocklen, disp, type, &Domain);
   return Domain;
 }
 
 void master (void) {
   double integration, a, b, sum;
   int i, size;
   struct domain_of_sum mydomain; //The C structure which will contain the domain
   MPI_Status status;
   MPI_Datatype Domain = Init_Type_Domain (); //Creation of the MPI type
   MPI_Type_commit (&Domain);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   mydomain.step = (b - a) / n; //step of the integration
   //The work is divided
   if (size > 1) {
     if (size > LIMIT_PROC)
       mydomain.number = (n / 2 - 1) / (size - 1);
     else
       mydomain.number = (n / 2 - 1) / size;
   }
   mydomain.begin = a + mydomain.step;
   //Each domain are sent
   for (i = 0; i < size - 1; i++) {
     mydomain.begin = a + (2 * i * mydomain.number + 1) * mydomain.step;
     MPI_Send (&mydomain, 1, Domain, i + 1, MSG_DATA, MPI_COMM_WORLD);
   }
   //The master complete the work
   for (i = (size - 1) * mydomain.number + 1, integration = 0; i < n / 2; i++)
     integration += 2 * f (a + (2 * i - 1) * mydomain.step) + f (a + 2 * i * mydomain.step);
   //The master receive the result
   for (i = 1; i < size; i++) {
     MPI_Recv (&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT,
     MPI_COMM_WORLD, &status);
     integration += sum;
   }
   integration *= 2;
   integration += f (a) + f (b) + 4 * f (b - mydomain.step);
   integration *= mydomain.step / 3;
   printf ("%.15lf\n", integration);
 }
 
 void slave (void) {
   double sum;
   int i;
   struct domain_of_sum mydomain;
   MPI_Status status;
   MPI_Datatype Domain = Init_Type_Domain (); //Creation of the MPI type 
   MPI_Type_commit (&Domain);
   MPI_Recv (&mydomain, 1, Domain, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < mydomain.number; i++)
     sum += 2*f(mydomain.begin+(2*i)*mydomain.step) + f(mydomain.begin+(2*i+1)*mydomain.step);
   MPI_Send (&sum, 1, MPI_LONG_LONG_INT, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

用於建立MPI型別的所有例程都被重新分組到函式Init_Type_Domain中,以簡化程式碼。MPI_TYPE_STRUCT函式用於建立新型別並將其儲存在Domain中。此函式需要5個引數:新型別將包含的塊數量(在本例中是兩個雙精度數和一個整數);這些塊在陣列中的每個元素的數量(一個雙精度數、另一個雙精度數和一個整數);每個塊在訊息中的偏移量的陣列;這些型別的陣列;以及包含結構的變數的地址。每個塊的偏移量是透過使用MPI_TYPE_EXTENT函式獲得的,該函式給出MPI_DOUBLE的長度。它等同於C語言中的sizeof()函式。MPI型別的長度儲存在一個MPI_Aint變數中。

另一種可能性是宣告一個包含兩個雙精度數和一個整數的陣列的結構。

 struct domain_of_sum {
   double array[2]; //Contains begin and step
   int number;
 };
 
 //The new MPI type
 MPI_Datatype Init_Type_Domain (void) {
   MPI_Datatype Domain;
   MPI_Datatype type[2] = { MPI_DOUBLE, MPI_INT };
   int blocklen[2] = { 2, 1 };
   MPI_Aint ext;
   MPI_Type_extent (MPI_DOUBLE, &ext);
   MPI_Aint disp[2] = { 0, 2 * ext };
   MPI_Type_struct (2, blocklen, disp, type, &Domain);
   return Domain;
 }

第二版

[edit | edit source]

兩種不同的方法用於確定從節點必須實現的運算元量。在第一個例子中,從節點使用MPI_Recv函式的狀態引數。在第二個例子中,這個數量直接與資料一起傳送。另一種方法是透過與主節點固定該資料相同的方式來獲取它,即使用程序的數量。為了最小化資料傳輸量,我們將使用最後一種方法(此版本只是一個最佳化)。

 /* Integration - Simpson method - Second parallel version*/
 
 #include <stdio.h>
 #include <math.h>
 #include <mpi.h>
 #define n 10000
 #define LIMIT_PROC 4
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 double f (double x);
 int Get_the_number (int size);  // Function which gives the number of iterations
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 int Get_the_number (int size) {
   int number;
   if (size > 1) {
     if (size > LIMIT_PROC)
       number = (n / 2 - 1) / (size - 1);
     else
       number = (n / 2 - 1) / size;
   }
   return number;
 }
 
 void master (void) {
   double integration, a, b, sum;
   int i, size, number;
   double mydomain[2];
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   number = Get_the_number (size);
   printf ("Enter the domain of integration: ");
   scanf ("%lf %lf", &a, &b);
   mydomain[1] = (b - a) / n;
   //The work is divided
   mydomain[0] = a + mydomain[1];
   for (i = 0; i < size - 1; i++) {
     mydomain[0] = a + (2 * i * number + 1) * mydomain[1];
     MPI_Send (mydomain, 2, MPI_DOUBLE, i + 1, MSG_DATA, MPI_COMM_WORLD);
   }
   //The master complete the work
   for (i = (size - 1) * number + 1, integration = 0; i < n / 2; i++)
     integration += 2 * f (a + (2 * i - 1) * mydomain[1]) + f (a + 2 * i * mydomain[1]);
   //The master receive the result
   for (i = 1; i < size; i++) {
     MPI_Recv (&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MSG_RESULT, MPI_COMM_WORLD, &status);
     integration += sum;
   }
   integration *= 2;
   integration += f (a) + f (b) + 4 * f (b - mydomain[1]);
   integration *= mydomain[1] / 3;
   printf ("%.15lf\n", integration);
 }
 
 void slave (void) {
   double sum;
   int i, size, number;
   double mydomain[2];
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   number = Get_the_number (size);
   MPI_Recv (mydomain, 2, MPI_DOUBLE, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0, sum = 0; i < number; i++)
     sum+=2*f(mydomain[0]+(2*i)*mydomain[1])+f(mydomain[0]+(2*i+1)*mydomain[1]);
   MPI_Send (&sum, 1, MPI_DOUBLE, 0, MSG_RESULT, MPI_COMM_WORLD);
 }

向量和矩陣的乘積

[edit | edit source]

這個程式的目的是計算向量和矩陣的乘積。在C語言程式設計中,順序版本為

 /* Product of a vector by a matrix - Sequential version */
 
 #include <stdio.h>
 #include <mpi.h>
 #define N 10000
 
 void writecolumn (float *A);
 
 int main (int argc, char **argv) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N];
   unsigned int i, j, step;
   int size;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Product of the matrix and the vector
   for (i = 0; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   writecolumn(C);
   free (A);
   return 0;
 }
 
 void writecolumn (float *A) {
   int i;
   for (i = 0; i < N; i++)
   printf ("%f\n", *(A + i));
 }

第一版 - 廣播

[edit | edit source]

為了並行化這個程式,矩陣將被分成行組,每個部分將被髮送到不同的從節點,而向量將被髮送到每個從節點(參見下圖)。

將矩陣資料分配到各個程序

將相同的資料傳送到每個從節點可以透過使用MPI_Bcast函式廣播這些資料來完成,該函式必須由所有程序呼叫。

 /* Product of a vector by a matrix - First parallel version */
 
 #include <stdio.h>
 #include <mpi.h>
 #define N 10000
 #define LIMIT_PROC 4
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void writecolumn (float *A);
 int Get_the_number (int size);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 int Get_the_number (int size) {
   int number;
   if (size > 1) {
     if (size > LIMIT_PROC)
       number = N / (size - 1);
     else
       number = N / size;
   }
   return number;
 }
 
 void master (void) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N], *buff;
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Division of work
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = Get_the_number (size);
   //Broadcast of the vector
   MPI_Bcast (B, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   for (i = 1; i < size; i++)
     MPI_Send (A + (i - 1) * step * N, N * step, MPI_FLOAT, i, MSG_DATA, MPI_COMM_WORLD);
   //Finishing the work
   for (i = (size - 1) * step; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   buff = (float *) malloc (step * sizeof (float));
   //Receive and reorder
   for (i = 1; i < size; i++) {
     MPI_Recv (buff, step, MPI_FLOAT, MPI_ANY_SOURCE, MSG_RESULT,
     MPI_COMM_WORLD, &status);
     for (j = 0; j < step; j++)
       C[j + (status.MPI_SOURCE - 1) * step] = buff[j];
   }
   writecolumn(C);
   free (A);
   free (buff);
 }
 
 void slave (void) {
   float Vect[N];
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = Get_the_number (size);
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   MPI_Bcast (Vect, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   MPI_Recv (partmat, N * step, MPI_FLOAT, 0, MSG_DATA, MPI_COMM_WORLD, &status);
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += Vect[j] ** (partmat + i * N + j);
   MPI_Send (result, step, MPI_FLOAT, 0, MSG_RESULT, MPI_COMM_WORLD);
   free (partmat);
   free (result);
 }

MPI_Bcast函式需要的引數比MPI_SendMPI_Recv函式少。MPI_Bcast函式需要包含或將包含廣播資料的緩衝區的地址,緩衝區中的條目數量,這些條目型別,廣播根節點的等級(傳送資料的程序)和通訊器。沒有標籤或狀態引數,因為它們用於區分訊息並提供有關接收資料的的資訊,在本例中,所有程序都參與廣播訊息(因此,不需要區分訊息),並且儲存在狀態中的資訊是已知的(源是廣播根節點,長度是條目數量,沒有標籤)。

當主節點收到資料時,它必須重新排序資料,因為接收順序是未指定的(MPI_ANY_SOURCE)。為此,狀態結構包含訊息的源,該源用於重新排序和組成向量。

在這個例子中,矩陣儲存在一個一維陣列中,而不是傳統的二維陣列。可以使用MPI傳送多維陣列,但必須小心分配。即,陣列必須連續儲存,因為傳送函式的第一個引數是起始緩衝區的地址,第二個引數指定將傳送的後續值的數量。然後,靜態和動態分配為

 /* Static allocation */
 int array[rows][columns];

 /* Dynamic allocation */
 int **array;
 array = (int**) malloc (rows * sizeof(int*));
 if (array == NULL) exit (-1);
 
 array[0] = (int*) malloc (rows * columns * sizeof(int));
 if (array[0] == NULL) exit (-1);
 
 for (i=1; i<rows; i++)
   array[i] = array[0] + i * columns;
 
 /* Send either kind of array */
 MPI_Send (array[0],rows*columns,MPI_INT,0,0,MPI_COMM_WORLD);

第二版 - 分散和收集

[edit | edit source]

MPI規範提供了集體通訊函式,MPI_Bcast函式是其中之一。這個程式的下一個版本將使用“分散”和“收集”函式,它們分別將不同的資料傳送給每個人,並從每個人接收不同的資料(參見下圖)。

分散和收集函式的原理

因此,工作將被略微不同地劃分,即,主節點也將參與計算。

 /* Product of a vector by a matrix - Second parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 5000
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void writecolumn (float *A);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void master (void) {
   float *A = (float *) malloc (N * N * sizeof (float)), B[N], C[N];
   unsigned int i, j, step;
   int size;
   MPI_Status status;
   //Initialization
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       *(A + i * N + j) = i + j;
     B[i] = 1;
   }
   //Division of the work
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = N / size;
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   //Sending the data
   MPI_Bcast (B, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //Each processes will receive a different part of the matrix
   MPI_Scatter (A, N * step, MPI_FLOAT, partmat, N * step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //The master make its part (the same code is used for the slaves)
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += *(partmat + i * N + j) * B[j];
   //It finish the work
   for (i = size * step; i < N; ++i)
     for (j = 0, C[i] = 0; j < N; j++)
       C[i] += *(A + i * N + j) * B[j];
   //Receiving the data in the good order
   MPI_Gather (result, step, MPI_FLOAT, C, step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   writecolumn(C);
   free (partmat);
   free (result);
   free (A);
 }
 
 void slave (void) {
   float Vect[N], *A, *C; //C and A are not used by slaves but must be declared
   unsigned int i, j, step;
   int size;
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   step = N / size;
   float *result = (float *) malloc (step * sizeof (float));
   float *partmat = (float *) malloc (step * N * sizeof (float));
   //Receiving the data
   MPI_Bcast (Vect, N, MPI_FLOAT, 0, MPI_COMM_WORLD);
   MPI_Scatter (A, N * step, MPI_FLOAT, partmat, N * step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   //Computation
   for (i = 0; i < step; ++i)
     for (j = 0, result[i] = 0; j < N; j++)
       result[i] += Vect[j] ** (partmat + i * N + j);
   //Sending the result
   MPI_Gather (result, step, MPI_FLOAT, C, step, MPI_FLOAT, 0, MPI_COMM_WORLD);
   free (partmat);
   free (result);
 }
 
 void writecolumn (float *A) {
   int i;
   for (i = 0; i < N; i++)
   printf ("%f\n", *(A + i));
 }

對函式MPI_SCATTER和MPI_GATHER的呼叫對於主節點和從節點來說是相同的(強調的引數被從節點忽略)。引數為

MPI_Scatter MPI_Gather
傳送緩衝區的地址 傳送緩衝區的地址
每個程序傳送的元素數量 傳送緩衝區中的元素數量
傳送緩衝區元素的資料型別 傳送緩衝區元素的資料型別
接收緩衝區的地址 接收緩衝區的地址
接收緩衝區中傳送的元素數量 任何單個接收的元素數量
接收緩衝區元素的資料型別 接收緩衝區元素的資料型別
傳送程序的等級 接收程序的等級
通訊器 通訊器

注意,在第一種情況下,主節點將資料傳送給自己,而在第二種情況下,它從自己接收資料。在這個例子中,矩陣被程序數量劃分,主節點將第一部分發送給自己,第二部分發送到第一個從節點,等等。之後,主節點從自己接收結果向量的第一部分,從第一個從節點接收第二部分,等等。

元胞自動機

[edit | edit source]

在這個例子中,我們將編寫一個元胞自動機。元胞自動機是矩陣的演化,我們對它執行定義的函式,直到收斂或出現迴圈現象。該函式根據其四個鄰居的值,為矩陣的每個點返回一個值。矩陣的邊界將具有與其相反側邊界相同的值。結果矩陣將是一個環面。在C語言程式設計中,順序版本為

 /* Cellular automata - Sequential version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #define N 62
 #define MAX_ITERATION N //Number of iteration
 
 void init_mat (int *A, int length);
 void writemat (int *A);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* A=(int*)malloc(N*N*sizeof(int));
   int* B=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int i,j,k=0;
   init_mat (A,N);
   while(k++<MAX_ITERATION) {
   //Calculation of the new matrix in a temporary matrix
     for (i=0;i<N-2;i++)
       for (j=0;j<N-2;j++)
         *(B+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     //Copy of the new matrix
     for (i=1;i<N-1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(B+(i-1)*N+j-1);
     //Update of the border
     for (i=1;i<N-1;i++) {
       *(A+i)=*(A+(N-2)*N+i);
       *(A+(N-1)*N+i)=*(A+N+i);
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
   }
   writemat(A);
   return 0;
 }
 
 void init_mat (int *A, int length) {
   int i,j;
   for (i=0;i<length;i++)
     for (j=0;j<length;j++)
       *(A+i*length+j)=i+j>length/2&&i+j<length*3/2;
   //Each border of the matrix will have the same value than the border of his opposite side
   for (i=1;i<length-1;i++) {
     *(A+i)=*(A+(length-2)*length+i);
     *(A+(length-1)*length+i)=*(A+length+i);
     *(A+i*length)=*(A+i*length+length-2);
     *(A+i*length+length-1)=*(A+i*length+1);
   }
 }
 
 void writemat (int *A) {
   int i,j;
   for (i = 0; i < N; i++) {
     for (j = 0; j < N; j++)
       printf ("%d ", *(A + i*N +j));
     printf("\n");
   }
 }
 
 int f (int x1,int x2,int x3,int x4) {
   return x1+x2+x3+x4>2;
 }

第一版 - 死鎖

[edit | edit source]

矩陣將被分成行組,就像在圖[fig:vect]中一樣。為了計算矩陣的新部分,應用的函式需要屬於矩陣其他部分的點的值,然後每個程序將計算新矩陣並將第一行和最後一行與其兩個鄰居交換(參見下圖)。

矩陣的劃分和程序之間的通訊

當程序共享它們的資料時,可能會發生死鎖。死鎖是程式的阻塞狀態,原因是某個程序正在等待另一個程序執行特定操作,但後者程序正在等待前者程序執行特定操作。這種現象導致永久等待狀態(參見下圖)。

死鎖示例

在這個程式碼中可能會發生死鎖,因為MPI_SEND和MPI_RECV函式會阻塞程式,直到它們完成,即直到接收者收到傳送的訊息或傳送者傳送要接收的訊息。如果所有程序執行相同的程式碼(主節點除外),則程序一將資料傳送到程序二,程序二將資料傳送到程序三,等等。程序一將等待程序二接收它的資料,但這永遠不會發生,因為程序二也正在等待程序三接收它的資料。由於通訊是迴圈的,因此程式被阻塞。作為第一個解決方案,我們可以更改每個偶數程序上傳送和接收的順序。

 /* Cellular automata - First parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62 //Must be multiple of number of processes (which must be > 1)
 #define MAX_ITERATION N
 #define MSG_DATA 100
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* result=(int*)malloc(N*N*sizeof(int));
   int* A=(int*)malloc(N*N*sizeof(int));
   int* tmp=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int step;
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   step = (N-2)/size;
   //Initialization by the master
   if (!myrank) init_mat(result,N);
   MPI_Scatter (result+N,step*N,MPI_INT,A+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //The next and the previous processes are determined
   left=(myrank-1+size)%size; right=(myrank+1)%size;
   while(k++<MAX_ITERATION) {
     //The order of exchanging the data is decided
     if (myrank%2) {
       MPI_Send (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
     } else {
       MPI_Recv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD);
       MPI_Recv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
       MPI_Send (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
     }
     for (i=1;i<step-1;i++) {
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     for (i=0;i<step;i++)
       for (j=0;j<N-2;j++)
         *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<step+1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
   }
   MPI_Gather (A+N,step*N,MPI_INT,result+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //The master recompose the matrix and print it
   if (!myrank) {
     for (i=1;i<N-1;i++) {
       *(result+i)=*(result+(N-2)*N+i);
       *(result+(N-1)*N+i)=*(result+N+i);
       *(result+i*N)=*(result+i*N+N-2);
       *(result+i*N+N-1)=*(result+i*N+1);
     }
     writemat(result,N);
   }
   MPI_Finalize ();
   return 0;
 }

在本例中,主節點和從節點的程式碼幾乎完全相同,因此無需將程式碼分成兩個函式(主節點特有的程式碼只需在等級條件之前)。

當主節點等待程序 1 的資料時,程序 1 將資料傳送到主節點。然後它等待來自主節點的資料,同時主節點將資料傳送給它。程式碼對所有程序的行為將相同,因此是安全的。

第二版 - 非阻塞傳送和接收

[編輯 | 編輯原始碼]

第二個解決方案是使用非阻塞傳送和接收:這些函式被呼叫並立即將控制權交給程序。然後,程序可以進行一些計算,當需要訪問訊息資料時,它可以透過呼叫一些適當的函式來完成它。使用非阻塞通訊函式的程式可以更快。在這個第二個版本中,所有函式將在所有傳送和所有接收開始後立即完成。

在前面的例子中,矩陣的維度應該是一個程序數的倍數,因為剩餘的行沒有傳送到任何程序。在這個版本中,主節點將這些行傳送到最後一個程序,並分別接收結果行。

 /* Cellular automata - Second parallel version */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62 //Must be multiple of number of processes
 #define MAX_ITERATION N
 #define MSG_DATA 100
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* result=(int*)malloc(N*N*sizeof(int));
   int* A=(int*)malloc(N*N*sizeof(int));
   int* tmp=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int step;
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Request request[4];
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   step = (N-2)/size;
   if (!myrank) init_mat(result,N);
   MPI_Scatter (result+N,step*N,MPI_INT,A+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   //Code allowing any dimension of the matrix
   if (myrank==0&&(N-2)%size) MPI_Send (result+N*(step*size+1),(N-2-step*size)*N,MPI_INT,size-1,MSG_DATA,MPI_COMM_WORLD);
   else if (myrank==size-1&&(N-2)%size) {
     MPI_Recv (A+N*(step+1),(N-2-step*size)*N,MPI_INT,0,MSG_DATA,MPI_COMM_WORLD,&status);
     step+=N-2-step*size;
   }
   left=(myrank-1+size)%size; right=(myrank+1)%size;
   while(k++<MAX_ITERATION) {
     //The data begin to be shared
     MPI_Isend (A+N,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,request);
     MPI_Irecv (A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,request+1);
     MPI_Isend (A+(step)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,request+2);
     MPI_Irecv (A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,request+3);
     //The transfer of data are completed
     for (i=0;i<4;i++)
       MPI_Wait(request+i,&status);
     for (i=1;i<step-1;i++) {
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     for (i=0;i<step;i++)
       for (j=0;j<N-2;j++)
         *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<step+1;i++)
       for (j=1;j<N-1;j++)
       *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
   }
   if (myrank==size-1&&(N-2)%size) {
     step=(N-2)/size;
     MPI_Send (A+N*(step+1),(N-2-step*size)*N,MPI_INT,0,MSG_RESULT,MPI_COMM_WORLD);
   } else if (myrank==0&&(N-2)%size)
     MPI_Recv (result+N*(step*size+1),(N-2-step*size)*N,MPI_INT,size-1,MSG_RESULT,MPI_COMM_WORLD,&status);
   MPI_Gather (A+N,step*N,MPI_INT,result+N,step*N,MPI_INT,0,MPI_COMM_WORLD);
   if (!myrank) {
     for (i=1;i<N-1;i++) {
       *(result+i)=*(result+(N-2)*N+i);
       *(result+(N-1)*N+i)=*(result+N+i);
       *(result+i*N)=*(result+i*N+N-2);
       *(result+i*N+N-1)=*(result+i*N+1);
     }
     writemat(result,N);
   }
   MPI_Finalize ();
   return 0;
 }

MPI_Isend 和 MPI_Irecv(I 代表立即)函式需要一個額外的引數:一個 MPI_Request 變數的地址,該變數將標識稍後完成的請求。狀態引數將傳遞給完成函式。MPI_WAIT 用於完成對先前函式的呼叫(由第一個引數標識),並將等待直到函式有效完成。

第三版 - Sendrecv

[編輯 | 編輯原始碼]

下一個版本使用傳送-接收函式:它等效於並行呼叫傳送和接收函式。只有 while 迴圈不同。

 while(k++<MAX_ITERATION) {
   //The transfers are simultaneous
   MPI_Sendrecv(A+N,N,MPI_INT,left,MSG_DATA,A+(step+1)*N,N,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD,&status);
   MPI_Sendrecv(A+(step)*N,N,MPI_INT,right,MSG_DATA,A,N,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
   for (i=1;i<step-1;i++) {
     *(A+i*N)=*(A+i*N+N-2);
     *(A+i*N+N-1)=*(A+i*N+1);
   }
   for (i=0;i<step;i++)
     for (j=0;j<N-2;j++)
       *(tmp+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
   for (i=1;i<step+1;i++)
     for (j=1;j<N-1;j++)
       *(A+i*N+j)=*(tmp+(i-1)*N+j-1);
 }

MPI_SENDRECV 函式需要引數,這些引數本質上與 MPI_SEND 和 MPI_RECV 函式需要的引數相同。

第四版 - 環形拓撲

[編輯 | 編輯原始碼]

最後一個版本從根本上不同,其中使用的拓撲結構是環形拓撲結構。在程序 0 上計算一個新的矩陣,然後將其傳送到程序 1。經過一次迭代,該程序將其傳送到程序 2,程序 2 計算新的矩陣並將其再次傳送到下一個程序。最後一個程序將矩陣返回給主節點,迴圈將繼續,直到迭代次數達到定義的常數(見下圖)。

環形拓撲結構中的互動

使用這種拓撲結構,可以同時計算多個不同的資料。雖然在本例中對資料的操作相同,但程序可以執行不同型別的操作。只有一個矩陣將在環形拓撲結構中傳送。然而,可以(這也是這種結構的優勢)在之後立即傳送第二個矩陣。

 /* Cellular automata - Parallel version using a ring topology */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <mpi.h>
 #define N 62
 #define MAX_ITERATION N
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 void init_mat (int *A, int length);
 void writemat (int *A, int length);
 int f (int x1,int x2,int x3,int x4);
 
 int main (int argc, char **argv) {
   int* A=(int*)malloc((N*N+1)*sizeof(int));
   int* B=(int*)malloc((N-2)*(N-2)*sizeof(int));
   int myrank,size,left,right;
   MPI_Status status;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   MPI_Comm_size (MPI_COMM_WORLD, &size);
   int i,j,k=0;
   right=(myrank+1)%size; left=(myrank-1+size)%size;
   if (!myrank) init_mat(A,N);
   else MPI_Recv (A,N*N+1,MPI_INT,left,MSG_DATA,MPI_COMM_WORLD,&status);
   while (*(A+N*N)<MAX_ITERATION) {
     for (i=0;i<N-2;i++)
       for (j=0;j<N-2;j++)
         *(B+i*N+j)=f(*(A+(i+1)*N+j),*(A+(i+1)*N+j+2),*(A+i*N+j+1),*(A+(i+2)*N+j+1));
     for (i=1;i<N-1;i++)
       for (j=1;j<N-1;j++)
         *(A+i*N+j)=*(B+(i-1)*N+j-1);
     for (i=1;i<N-1;i++) {
       *(A+i)=*(A+(N-2)*N+i);
       *(A+(N-1)*N+i)=*(A+N+i);
       *(A+i*N)=*(A+i*N+N-2);
       *(A+i*N+N-1)=*(A+i*N+1);
     }
     //The number of iteration is incremented
     (*(A+N*N))++;
     //The matrix is sent and another is receive
     MPI_Sendrecv_replace(A,N*N+1,MPI_INT,right,MSG_DATA,left,MSG_DATA,MPI_COMM_WORLD,&status);
   }
   //When the number of iteration reach the maximum, the program stop
   if ((*(A+N*N))++==MAX_ITERATION) {
     MPI_Sendrecv_replace(A,N*N+1,MPI_INT,right,MSG_DATA,left,MSG_DATA,MPI_COMM_WORLD,&status);
     writemat(A,N);
   } else MPI_Send (A,N*N+1,MPI_INT,right,MSG_DATA,MPI_COMM_WORLD);
     MPI_Finalize ();
   return 0;
 }

迭代次數與矩陣一起儲存,儲存在一個額外的元素中。它將用於確定何時停止迴圈。當一個程序達到最大迭代次數時,矩陣將被傳遞到所有其他程序。這實際上是必要的,以便正確地停止它們,因為否則,它們將繼續迴圈並且永遠不會停止。

MPI_SENDRECV_REPLACE 函式類似於 MPI_SENDRECV 函式,不過它使用相同的緩衝區來發送和接收資料。

該程式將使用有限元方法計算函式在多個點處的導數的數值。方程為:

但是,從數值上來說,除以 0 很難,並且 h 不能隨意選擇(見下圖)。

有限元方法的精度隨 h 的變化而變化的示意圖

h 的最佳值是未知的,因此,演算法將減少 h 的起始值,直到精度開始下降(用不同的 h 計算的兩個導數之間的差異必須下降)。在 C 程式設計中,順序版本是

 /* Derivate - Sequential version */
 
 #include <stdio.h>
 #include <math.h>
 #define H 1 //first h
 #define DEC 1.4 //decrease
 #define N 500
 
 double f (double x);
 
 int main (int argc, char **argv) {
   double x,deriv,diffnew,diffold,h;
   double result;
   int i;
   for (i=0,x=i;i<N;x=++i) {
     h=H;
     deriv=(f(x+h)-f(x-h))/(2*h);
     diffnew=100;
     diffold=2*diffnew;
     while (fabs(diffnew)<fabs(diffold)) {
       h/=DEC;
       result=deriv;
       diffold=diffnew;
       deriv=(f(x+h)-f(x-h))/(2*h);
       diffnew=deriv-result;
     }
   printf ("%lf: %.15lf\n",x,result);
   }
   return 0;
 }
 
 double f (double x) {
   return x*x*x+x*x;
 }

第一版 - 農場拓撲

[編輯 | 編輯原始碼]

由於迴圈次數是可變的,因此無法預測執行時間。該程式將一個導數傳送到每個程序進行計算,當一個程序完成時(不一定是最先完成的),它會立即將資料重新發送到該程序。這些資料和結果被收集到兩個陣列中(deriv[] 和 result[])。

 /* Derivate - Parallel version using farming topology */
 
 #include <stdio.h>
 #include <mpi.h>
 #include <math.h>
 
 #define H 1 //first h
 #define DEC 1.4 //decrease (1.4)
 #define N 500
 #define MSG_DATA 100
 #define MSG_RESULT 101
 
 double f (double x);
 void init_deriv(double *deriv, double x);
 void master (void);
 void slave (void);
 
 int main (int argc, char **argv) {
   int myrank;
   MPI_Init (&argc, &argv);
   MPI_Comm_rank (MPI_COMM_WORLD, &myrank);
   if (!myrank)
     master ();
   else
     slave ();
   MPI_Finalize ();
   return 0;
 }
 
 void init_deriv(double *deriv, double x) {
   deriv[0]=x;
   deriv[4]=H;
   deriv[1]=(f(deriv[0]+deriv[4])-f(deriv[0]-deriv[4]))/(2*deriv[4]);
   deriv[2]=100;
   deriv[3]=2*deriv[2];
 }
 
 void master(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   int i;
   int size;
   MPI_Status status;
   MPI_Comm_size(MPI_COMM_WORLD,&size);
   //Sending data to all processes
   for (i=1;i<size;i++) {
     init_deriv(deriv,1);
     MPI_Send(deriv,5,MPI_DOUBLE,i,MSG_DATA,MPI_COMM_WORLD);
   }
   //When a result is received, another task is sent
   for (;i<N;++i) {
     init_deriv(deriv,1);
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Send(deriv,5,MPI_DOUBLE,status.MPI_SOURCE,MSG_DATA,MPI_COMM_WORLD);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
   deriv[4]=10*H;
   //The current task are completed
   for (i=1;i<size;i++) {
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Send(deriv,5,MPI_DOUBLE,status.MPI_SOURCE,MSG_DATA,MPI_COMM_WORLD);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
 }
 
 void slave(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   MPI_Status status;
   MPI_Recv(deriv,5,MPI_DOUBLE,0,MSG_DATA,MPI_COMM_WORLD,&status);
   while (deriv[4]<5*H) {
     while (fabs(deriv[2])<fabs(deriv[3])) {
       deriv[4]/=DEC;
       result[1]=deriv[1];
       deriv[3]=deriv[2];
       deriv[1]=(f(deriv[0]+deriv[4])-f(deriv[0]-deriv[4]))/(2*deriv[4]);
       deriv[2]=deriv[1]-result[1];
     }
   result[0]=deriv[0];
   MPI_Send (result,2,MPI_DOUBLE,0,MSG_RESULT,MPI_COMM_WORLD);
   MPI_Recv (deriv,5,MPI_DOUBLE,0,MSG_DATA,MPI_COMM_WORLD,&status);
   }
 }

第二版 - 傳送和接收完成

[編輯 | 編輯原始碼]

第二個拓撲結構類似於環形拓撲結構,並引入了一個新的例程:MPI_REQUEST_FREE。主節點將始終將資料傳送到程序 1,並將結果從任何其他程序接收(見下圖)。

所用拓撲結構的示意圖

由於從節點的程式碼與元胞自動機(使用環形拓撲結構的版本)的程式碼類似,因此只描述主節點函式。為了避免死鎖,只使用非阻塞傳送(這裡使用非阻塞接收是為了提高效能)。

 void master(void) {
   double deriv[5],result[2]; //x,deriv,diffnew,diffold,h
   int i;
   int size;
   MPI_Status status;
   MPI_Request request[N],req;
   MPI_Comm_size(MPI_COMM_WORLD,&size);
   //The master fill the ring by sending enough data
   for (i=1;i<size;i++) {
     init_deriv(deriv,i);
     MPI_Isend(deriv,5,MPI_DOUBLE,1,MSG_DATA,MPI_COMM_WORLD,request+i);
   }
   //When a result is received, another data is sent in the ring
   for (;i<N;++i) {
     MPI_Irecv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&req);
     init_deriv(deriv,i);
     MPI_Wait (&req,&status);
     MPI_Request_free (request+(int)result[0]);
     MPI_Isend(deriv,5,MPI_DOUBLE,1,MSG_DATA,MPI_COMM_WORLD,request+i);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
   //The current task are completed
   for (i=1;i<size;i++) {
     MPI_Recv(result,2,MPI_DOUBLE,MPI_ANY_SOURCE,MSG_RESULT,MPI_COMM_WORLD,&status);
     MPI_Request_free (request+(int)result[0]);
     printf ("%lf: %.15lf\n",result[0],result[1]);
   }
 }

當主節點完成傳送足夠的資料(對於所有程序)時,它執行一個新的迴圈。它接收一個結果並同時初始化一個新的資料,接下來,它完成傳送導致它剛剛接收到的結果的資料,並在環形拓撲結構中重新發送一個新的資料。實際上,如果主節點接收了一個數據的結果,這意味著該資料的傳送已完成,無需再完成傳送。MPI_REQUEST_FREE 函式允許在不檢查請求何時邏輯完成的情況下釋放緩衝的請求。

通訊器和組

[編輯 | 編輯原始碼]

集體通訊(如 MPI_BCAST、MPI_SCATTER 和 MPI_GATHER)將資料傳送到屬於同一組的程序。MPI 提供用於管理組和通訊器的函式。這允許在指定的程序上使用集體通訊,從而將網路分成可以執行不同任務的組。

例如,可以在矩陣上進行不同的操作(跡、轉置、求逆、對角化)。對於每個操作,將定義一個組,每個程序將使用集體通訊與其組中的其他程序通訊,而不會干擾其他組。

通訊器是一個物件,它指定了一個通訊域。它與一個組相關聯。組是從現有組建立的,初始組是包含所有程序的通訊器 MPI_COMM_WORLD 的組。

第一版 - 從組建立通訊器

[編輯 | 編輯原始碼]

建立新通訊器的第一種方法是從第一個組建立組,然後建立這些組的通訊器。

 /* Example of use of group */
 
 #include <stdio.h>
 #include <mpi.h>
 
 int main (int argc, char *argv[]) {
   int rank[50],size,i,myrank;
   char data[10]="";
   MPI_Group group,newgroup;
   MPI_Comm newcomm;
   MPI_Init (&argc,&argv);
   //group of MPI_COMM_WORLD which contains all the processes
   MPI_Comm_group (MPI_COMM_WORLD,&group);
   MPI_Group_size (group,&size);
   //Creation of the communicator containing even-ranked processes
   for (i=0;i<size/2;i++)
     rank[i]=2*i; if (size%2) rank[size/2]=size-1;
   MPI_Group_incl (group,(size+1)/2,rank,&newgroup);
   MPI_Comm_create (MPI_COMM_WORLD,newgroup,&newcomm);
   //Only the processes belonging to the group of newcomm will participate to the broadcast on this communicator
   MPI_Comm_rank (MPI_COMM_WORLD,&myrank);
   if (!(myrank%2)) MPI_Bcast (data,10,MPI_INT,0,newcomm);
   MPI_Group_free (&group);
   MPI_Finalize ();
   return 0;
 }

在這個程式中,資料只在偶數等級的程序上廣播。這是透過建立一個包含所有偶數等級程序的通訊器來實現的。首先,使用 `MPI_COMM_GROUP` 函式獲取 `MPI_COMM_WORLD` 的初始組。新建立的組包含了初始組中秩為偶數的 個程序。`MPI_GROUP_INCL` 函式需要一個數組,包含第一個組中將屬於新組的程序的秩。然後,`MPI_COMM_CREATE` 函式建立新組的通訊器。如果程序的秩為奇數,它將參與廣播。最後,使用 `MPI_GROUP_FREE` 函式釋放該組。

`MPI_GROUP_SIZE` 和 `MPI_COMM_RANK` 函式類似於 `MPI_COMM_SIZE` 和 `MPI_GROUP_RANK` 函式。後兩者只是簡寫。

`MPI_Comm_size (comm, &size);` `MPI_Comm_rank (comm, &rank);`
`MPI_Comm_group(comm, &group);` `MPI_Comm_group(comm, &group);`
`MPI_Group_size(group, &size);` `MPI_Group_rank(group, &rank);`
`MPI_Group_free(&group);` `MPI_Group_free(&group);`

然而,如果呼叫 `MPI_GROUP_RANK` 的程序不屬於函式引數中指定的組,則 `rank` 返回的值將為 `MPI_UNDEFINED`。而呼叫 `MPI_COMM_RANK` 將會導致程式停止。

第二版 - 從通訊器建立通訊器

[edit | edit source]

可以不使用組,直接透過分割現有的通訊器來建立新的通訊器。

 int main (int argc, char *argv[]) {
   int color,i,myrank;
   char data[10]="";
   MPI_Comm newcomm;
   MPI_Init (&argc,&argv);
   MPI_Comm_rank (MPI_COMM_WORLD,&myrank);
   //Creation of the communicator containing even-ranked processes
   if (!(myrank%2)) color=0;
   else color=1;
   MPI_Comm_split (MPI_COMM_WORLD,color,1,&newcomm);
   //Only the processes having even rank will participate to the broadcast on this communicator
   if (!(myrank%2)) MPI_Bcast (data,10,MPI_INT,0,newcomm);
   MPI_Finalize ();
   return 0;
 }

所有程序將根據其顏色被分類到新的通訊器中。例如,如果兩個程序的顏色為 1,一個程序的顏色為 2,那麼將建立兩個通訊器,每個組一個。`MPI_COMM_SPLIT` 函式的第三個引數用於在新通訊器中排序程序(這將改變新程序的秩)。在這裡,所有程序都使用相同的排序值。

其他

[edit | edit source]

環境管理

[edit | edit source]

最後,環境管理函式可以用來獲取程序執行的節點名稱或測量程式中某個部分所花費的時間等。

 /* Example of use of environmental function */
 
 #include <stdio.h>
 #include <mpi.h>
 
 int main (int argc, char *argv[]) {
   double starttime,endtime;
   int namelen;
   char processor_name[MPI_MAX_PROCESSOR_NAME];
 
   MPI_Init(&argc,&argv);
   MPI_Get_processor_name(processor_name,&namelen);
   printf ("Process %s\n", processor_name);
 
   starttime = MPI_Wtime();
   //Here some computation or communication
   endtime = MPI_Wtime();
   printf ("Time: %lf\n",endtime-starttime);
 
   MPI_Finalize ();
   return 0;
 }

其他可能性

[edit | edit source]

還存在許多未提及的函式:有不同型別的點對點通訊函式;MPI 允許使用者使用多種函式定義資料型別;還存在其他集體通訊和歸約函式;以及更多用於使用通訊器和組的函式。

MPI 定義了許多有用的常量,包括 `MPI_ANY_SOURCE`、`MPI_INT`、`MPI_UNDEFINED` 等。

MPI 還可以以更高階的方式用於管理程序拓撲。具體而言,MPI 實現可以利用物理網路的特性(對於異構網路),例如透過優先使用最快的連線來傳輸訊息。

基本函式的原型

[edit | edit source]

有關這些 MPI 函式的更多資訊,請查閱手冊頁。

int MPI_Init(int *argc, char ***argv);
int MPI_Finalize(void);
int MPI_Comm_rank(MPI_Comm comm, int *rank);
int MPI_Comm_size(MPI_Comm comm, int *size);
int MPI_Send(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm);
int MPI_Recv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status);
int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count);
int MPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent);
int MPI_Type_struct(int count, int *array_of_blocklengths, MPI_Aint *array_of_displacements, MPI_Datatype *array_of_types, MPI_Datatype *newtype);
int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvcount, int root, MPI_Comm comm);
int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm);
int MPI_ISend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request);
int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request);
int MPI_Sendrecv(void* sendbuf, int sendcount, MPI_Datatype datatype, int dest, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
int MPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype, int dest, int sendtag, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
int MPI_Wait(MPI_Request *request, MPI_Status *status);
int MPI_Request_free(MPI_Request *request);
int MPI_Group_rank(MPI_Group group, int *rank);
int MPI_Group_size(MPI_Group group, int *size);
int MPI_Comm_group(MPI_Comm comm, MPI_Group *group);
int MPI_Group_free(MPI_Group *group);
int MPI_Group_incl(MPI_Group *group, int n, int *ranks, MPI_Group *newgroup);
int MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newgroup);
int MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm);
int MPI_Wtime(void);
int MPI_Get_processor_name(char *name, int *resultlen);

與 C 語言中的一般用法相同,"陣列的地址"實際上是指陣列第一個元素的地址。

先決條件

[edit | edit source]

進一步閱讀

[edit | edit source]
華夏公益教科書