레이블이 분산컴퓨팅인 게시물을 표시합니다. 모든 게시물 표시
레이블이 분산컴퓨팅인 게시물을 표시합니다. 모든 게시물 표시

2006년 12월 21일

고른 표본 추출을 통한 빠른 정렬

고른 표본 추출을 통한 병렬 정렬(Parallel Sorting by Regular Sampling: PSRS)은 n개의 프로세스에서 골고루 표본을 추출하여 이것을 주축으로 하여 값들을 정렬하는 병렬 정렬 알고리즘의 일종입니다. 균형이 잘 잡히고, 프로세스의 수가 2의 n제곱 꼴이 되지 않아도 된다는 등의 장점이 있습니다.

MPI를 배우면서 이것을 한번 C+MPI로 구현해 보았습니다. p개의 노드에서 n/p개만큼의 [0..1) 범위의 float형 난수를 발생한 다음에 정렬을 하는 과정입니다. 잘 작성된 코드는 아니지만, 참고할 수 있는 코드일 수도 있어서 실어 봅니다.

  1 /**
  2  * PSRS implementation using MPI.
  3  *
  4  * Date: 5th of December, 2006
  5  * Author: Jay
  6  *
  7  * Compile options:
  8  *     PRINT_MSG: print some messages to stdout.
  9  *     OUTPUT_FILE: write 'glist_nn.txt', and 'slist_nn.txt'.
 10  *                  'glist_nn' contains sorted generated numbers
 11  *                  of each node. 'slist_nn' is final result
 12  *                  of each node.
 13  */
 14 #include <stdlib.h>
 15 #include <stdio.h>
 16 #include <mpi.h>
 17 #include <limits.h>
 18 #include <time.h>
 19 #include <stddef.h>
 20 #include <string.h>
 21 
 22 /* Upper bound of generated floating point */
 23 #define UPPER_BOUND 1.0
 24 
 25 /* float comparision function */
 26 int float_comp(const void* aa, const void* bb)
 27 {
 28     const float* a = aa;
 29     const float* b = bb;
 30     if (*a == *b) return 0;
 31     if (*a < *b) return -1;
 32     return 1;
 33 }
 34 
 35 /* Generates random sequence into array A with size */
 36 void generate_random_sequence(float* A, size_t size)
 37 {
 38     size_t i;
 39     for (i=0; i<size; i++)
 40         A[i] = (float)rand()/RAND_MAX;
 41 }
 42 
 43 /* Returns pointer to the lower_bound,
 44  * which means the lowest position
 45  * that the element can be inserted
 46  * with sorted order. */
 47 float* lower_bound(float* first, float* last, float val)
 48 {
 49     ptrdiff_t len = last - first;
 50     while (len > 0)
 51     {
 52         ptrdiff_t half = len / 2;
 53         float* middle = first + half;
 54         if (*middle < val)
 55         {
 56             first = middle + 1;
 57             len = len - half - 1;
 58         }
 59         else
 60             len = half;
 61     }
 62     return first;
 63 }
 64 
 65 /* Writes each elements in float array to the file */
 66 void output(const char* filename, float* A, size_t size)
 67 {
 68     FILE* f = fopen(filename, "w");
 69     int i;
 70     for (i=0; i<size; i++)
 71     {
 72         fprintf(f, "%1.5f\n", A[i]);
 73     }
 74     fclose(f);
 75 }
 76 
 77 /*
 78  * First command-line argument: n
 79  */
 80 int main(int argc, char* argv[])
 81 {
 82     int n_number = 1000000;
 83     if (argc > 1) n_number = atoi(argv[1]);
 84     int id, p;
 85 
 86     MPI_Init(&argc, &argv);
 87 
 88     MPI_Barrier(MPI_COMM_WORLD);
 89     double elapsed_time = -MPI_Wtime();
 90 
 91     MPI_Comm_rank(MPI_COMM_WORLD, &id);
 92     MPI_Comm_size(MPI_COMM_WORLD, &p);
 93 
 94     /* Some nodes should control 1 more
 95      * element if n % p != 0. */
 96     float A[(n_number+(p-1))/p];
 97     float* single_list = A;
 98     int n_control = n_number/p;
 99     int n_sorted = n_control;
100     if (n_number%p > id) n_control++;
101     int i;
102 
103 #ifdef PRINT_MSG    
104     fprintf(stdout, "Process %d generates %d of %d elements.\n",
105             id, n_control, n_number);
106     fflush(stdout);
107 #endif
108 
109     /* For unique random number, add (id*1000) */
110     srand( time(NULL) + id * 1000);
111     generate_random_sequence(A, n_control);
112 
113     /* Phase 1 */
114     /* Each process quicksort their own list and each one picks samples */
115     qsort(A, n_control, sizeof(float), float_comp);
116 
117     if (p > 1)
118     {
119 
120         float samples[p];
121         for (i=0; i<p; i++)
122             samples[i] = A[i*n_control/p];
123 
124         /* Phase 2 */
125         /* Node 0 gathers samples, sorts them, and picks pivots. */
126         float all_samples[p*p];
127         MPI_Gather(samples, p, MPI_FLOAT, all_samples, p,
128                 MPI_FLOAT, 0, MPI_COMM_WORLD);
129         float pivots[p-1];
130         if (!id)
131         {
132             qsort(all_samples, p*p, sizeof(float), float_comp);
133 
134             for (i=0; i<p-1; i++)
135                 pivots[i] = all_samples[(i+1)*p+p/2-1];
136         }
137         /* Node 0 broadcasts pivots and each process 
138          * partitions its own list. */
139         MPI_Bcast(pivots, p-1, MPI_FLOAT, 0, MPI_COMM_WORLD);
140         int send_cnts[p], send_disp[p];
141         send_disp[0] = 0;
142         for (i=1; i<p; i++)
143         {
144             send_disp[i] =
145                 (float*)(lower_bound(A, A+n_control, pivots[i-1]))-A;
146             send_cnts[i-1] = send_disp[i] - send_disp[i-1];
147         }
148         send_cnts[p-1] = n_control - send_disp[p-1];
149 
150         /* Phase 3 */
151         /* First, exchanges the number of elements that 
152          * each one is going to exchange. */
153         int recv_cnts[p], recv_disp[p+1];
154         MPI_Alltoall(send_cnts, 1, MPI_FLOAT, recv_cnts, 1,
155                 MPI_FLOAT, MPI_COMM_WORLD);
156         recv_disp[0] = 0;
157         for (i=1; i<p; i++)
158             recv_disp[i] = recv_disp[i-1] + recv_cnts[i-1];
159         recv_disp[p] = recv_disp[p-1]+recv_cnts[p-1];
160         float partitions[recv_disp[p]];
161         /* Exchanges elements to appropriate nodes. */
162         MPI_Alltoallv(A, send_cnts, send_disp, MPI_FLOAT, partitions,
163                 recv_cnts, recv_disp, MPI_FLOAT, MPI_COMM_WORLD);
164 
165         /* Phase 4 */
166         /* Each node merges its own partitions into a single list. */
167         int j;
168         int merge_disp[p];
169         n_sorted = recv_disp[p];
170         single_list = malloc(n_sorted*sizeof(float));
171         memcpy(merge_disp, recv_disp, p*sizeof(int));
172         for (i=0; i<n_sorted; i++)
173         {
174             float min = UPPER_BOUND;
175             int min_pos = 0;
176             for (j=0; j<p; j++)
177                 if (merge_disp[j] < recv_disp[j+1]
178                         && min > partitions[merge_disp[j]])
179                 {
180                     min = partitions[merge_disp[j]];
181                     min_pos = j;
182                 }
183             single_list[i] = min;
184             merge_disp[min_pos]++;
185         }
186 
187     }
188     /* Synchronizes for checking maximum elapsed time among nodes. */
189     MPI_Barrier(MPI_COMM_WORLD);
190     elapsed_time += MPI_Wtime();
191 
192 #ifdef PRINT_MSG    
193     fprintf(stdout, "Process %d now has sorted the list that contains \
194 %d of %d elements.\n", id, n_sorted, n_number);
195     fflush(stdout);
196 #endif
197 
198     if (!id)
199         printf("Elapsed Time with %d processes: %10.6f\n",
200                 p, elapsed_time);
201 
202     /* Output (elapsed_time doesn't count for file output!) */
203 #ifdef OUTPUT_FILE      
204     char filename[100];
205     strcpy(filename, "glistxx.txt");
206     filename[5] = '0' + id / 10;
207     filename[6] = '0' + id % 10;
208     output(filename, A, n_control);
209     filename[0] = 's';
210     output(filename, single_list, n_sorted);
211 #endif
212     if (single_list != A) free(single_list);
213     MPI_Finalize();
214 
215     return 0;
216 }
217