2006년 12월 21일

고른 표본 추출을 통한 빠른 정렬

고른 표본 추출을 통한 병렬 정렬(Parallel Sorting by Regular Sampling: PSRS)은 n개의 프로세스에서 골고루 표본을 추출하여 이것을 주축으로 하여 값들을 정렬하는 병렬 정렬 알고리즘의 일종입니다. 균형이 잘 잡히고, 프로세스의 수가 2의 n제곱 꼴이 되지 않아도 된다는 등의 장점이 있습니다.

MPI를 배우면서 이것을 한번 C+MPI로 구현해 보았습니다. p개의 노드에서 n/p개만큼의 [0..1) 범위의 float형 난수를 발생한 다음에 정렬을 하는 과정입니다. 잘 작성된 코드는 아니지만, 참고할 수 있는 코드일 수도 있어서 실어 봅니다.

  1 /**
  2  * PSRS implementation using MPI.
  3  *
  4  * Date: 5th of December, 2006
  5  * Author: Jay
  6  *
  7  * Compile options:
  8  *     PRINT_MSG: print some messages to stdout.
  9  *     OUTPUT_FILE: write 'glist_nn.txt', and 'slist_nn.txt'.
 10  *                  'glist_nn' contains sorted generated numbers
 11  *                  of each node. 'slist_nn' is final result
 12  *                  of each node.
 13  */
 14 #include <stdlib.h>
 15 #include <stdio.h>
 16 #include <mpi.h>
 17 #include <limits.h>
 18 #include <time.h>
 19 #include <stddef.h>
 20 #include <string.h>
 21 
 22 /* Upper bound of generated floating point */
 23 #define UPPER_BOUND 1.0
 24 
 25 /* float comparision function */
 26 int float_comp(const void* aa, const void* bb)
 27 {
 28     const float* a = aa;
 29     const float* b = bb;
 30     if (*a == *b) return 0;
 31     if (*a < *b) return -1;
 32     return 1;
 33 }
 34 
 35 /* Generates random sequence into array A with size */
 36 void generate_random_sequence(float* A, size_t size)
 37 {
 38     size_t i;
 39     for (i=0; i<size; i++)
 40         A[i] = (float)rand()/RAND_MAX;
 41 }
 42 
 43 /* Returns pointer to the lower_bound,
 44  * which means the lowest position
 45  * that the element can be inserted
 46  * with sorted order. */
 47 float* lower_bound(float* first, float* last, float val)
 48 {
 49     ptrdiff_t len = last - first;
 50     while (len > 0)
 51     {
 52         ptrdiff_t half = len / 2;
 53         float* middle = first + half;
 54         if (*middle < val)
 55         {
 56             first = middle + 1;
 57             len = len - half - 1;
 58         }
 59         else
 60             len = half;
 61     }
 62     return first;
 63 }
 64 
 65 /* Writes each elements in float array to the file */
 66 void output(const char* filename, float* A, size_t size)
 67 {
 68     FILE* f = fopen(filename, "w");
 69     int i;
 70     for (i=0; i<size; i++)
 71     {
 72         fprintf(f, "%1.5f\n", A[i]);
 73     }
 74     fclose(f);
 75 }
 76 
 77 /*
 78  * First command-line argument: n
 79  */
 80 int main(int argc, char* argv[])
 81 {
 82     int n_number = 1000000;
 83     if (argc > 1) n_number = atoi(argv[1]);
 84     int id, p;
 85 
 86     MPI_Init(&argc, &argv);
 87 
 88     MPI_Barrier(MPI_COMM_WORLD);
 89     double elapsed_time = -MPI_Wtime();
 90 
 91     MPI_Comm_rank(MPI_COMM_WORLD, &id);
 92     MPI_Comm_size(MPI_COMM_WORLD, &p);
 93 
 94     /* Some nodes should control 1 more
 95      * element if n % p != 0. */
 96     float A[(n_number+(p-1))/p];
 97     float* single_list = A;
 98     int n_control = n_number/p;
 99     int n_sorted = n_control;
100     if (n_number%p > id) n_control++;
101     int i;
102 
103 #ifdef PRINT_MSG    
104     fprintf(stdout, "Process %d generates %d of %d elements.\n",
105             id, n_control, n_number);
106     fflush(stdout);
107 #endif
108 
109     /* For unique random number, add (id*1000) */
110     srand( time(NULL) + id * 1000);
111     generate_random_sequence(A, n_control);
112 
113     /* Phase 1 */
114     /* Each process quicksort their own list and each one picks samples */
115     qsort(A, n_control, sizeof(float), float_comp);
116 
117     if (p > 1)
118     {
119 
120         float samples[p];
121         for (i=0; i<p; i++)
122             samples[i] = A[i*n_control/p];
123 
124         /* Phase 2 */
125         /* Node 0 gathers samples, sorts them, and picks pivots. */
126         float all_samples[p*p];
127         MPI_Gather(samples, p, MPI_FLOAT, all_samples, p,
128                 MPI_FLOAT, 0, MPI_COMM_WORLD);
129         float pivots[p-1];
130         if (!id)
131         {
132             qsort(all_samples, p*p, sizeof(float), float_comp);
133 
134             for (i=0; i<p-1; i++)
135                 pivots[i] = all_samples[(i+1)*p+p/2-1];
136         }
137         /* Node 0 broadcasts pivots and each process 
138          * partitions its own list. */
139         MPI_Bcast(pivots, p-1, MPI_FLOAT, 0, MPI_COMM_WORLD);
140         int send_cnts[p], send_disp[p];
141         send_disp[0] = 0;
142         for (i=1; i<p; i++)
143         {
144             send_disp[i] =
145                 (float*)(lower_bound(A, A+n_control, pivots[i-1]))-A;
146             send_cnts[i-1] = send_disp[i] - send_disp[i-1];
147         }
148         send_cnts[p-1] = n_control - send_disp[p-1];
149 
150         /* Phase 3 */
151         /* First, exchanges the number of elements that 
152          * each one is going to exchange. */
153         int recv_cnts[p], recv_disp[p+1];
154         MPI_Alltoall(send_cnts, 1, MPI_FLOAT, recv_cnts, 1,
155                 MPI_FLOAT, MPI_COMM_WORLD);
156         recv_disp[0] = 0;
157         for (i=1; i<p; i++)
158             recv_disp[i] = recv_disp[i-1] + recv_cnts[i-1];
159         recv_disp[p] = recv_disp[p-1]+recv_cnts[p-1];
160         float partitions[recv_disp[p]];
161         /* Exchanges elements to appropriate nodes. */
162         MPI_Alltoallv(A, send_cnts, send_disp, MPI_FLOAT, partitions,
163                 recv_cnts, recv_disp, MPI_FLOAT, MPI_COMM_WORLD);
164 
165         /* Phase 4 */
166         /* Each node merges its own partitions into a single list. */
167         int j;
168         int merge_disp[p];
169         n_sorted = recv_disp[p];
170         single_list = malloc(n_sorted*sizeof(float));
171         memcpy(merge_disp, recv_disp, p*sizeof(int));
172         for (i=0; i<n_sorted; i++)
173         {
174             float min = UPPER_BOUND;
175             int min_pos = 0;
176             for (j=0; j<p; j++)
177                 if (merge_disp[j] < recv_disp[j+1]
178                         && min > partitions[merge_disp[j]])
179                 {
180                     min = partitions[merge_disp[j]];
181                     min_pos = j;
182                 }
183             single_list[i] = min;
184             merge_disp[min_pos]++;
185         }
186 
187     }
188     /* Synchronizes for checking maximum elapsed time among nodes. */
189     MPI_Barrier(MPI_COMM_WORLD);
190     elapsed_time += MPI_Wtime();
191 
192 #ifdef PRINT_MSG    
193     fprintf(stdout, "Process %d now has sorted the list that contains \
194 %d of %d elements.\n", id, n_sorted, n_number);
195     fflush(stdout);
196 #endif
197 
198     if (!id)
199         printf("Elapsed Time with %d processes: %10.6f\n",
200                 p, elapsed_time);
201 
202     /* Output (elapsed_time doesn't count for file output!) */
203 #ifdef OUTPUT_FILE      
204     char filename[100];
205     strcpy(filename, "glistxx.txt");
206     filename[5] = '0' + id / 10;
207     filename[6] = '0' + id % 10;
208     output(filename, A, n_control);
209     filename[0] = 's';
210     output(filename, single_list, n_sorted);
211 #endif
212     if (single_list != A) free(single_list);
213     MPI_Finalize();
214 
215     return 0;
216 }
217 

댓글 없음: