고른 표본 추출을 통한 빠른 정렬
고른 표본 추출을 통한 병렬 정렬(Parallel Sorting by Regular Sampling: PSRS)은 n개의 프로세스에서 골고루 표본을 추출하여 이것을 주축으로 하여 값들을 정렬하는 병렬 정렬 알고리즘의 일종입니다. 균형이 잘 잡히고, 프로세스의 수가 2의 n제곱 꼴이 되지 않아도 된다는 등의 장점이 있습니다.
MPI를 배우면서 이것을 한번 C+MPI로 구현해 보았습니다. p개의 노드에서 n/p개만큼의 [0..1) 범위의 float형 난수를 발생한 다음에 정렬을 하는 과정입니다. 잘 작성된 코드는 아니지만, 참고할 수 있는 코드일 수도 있어서 실어 봅니다.
1 /**
2 * PSRS implementation using MPI.
3 *
4 * Date: 5th of December, 2006
5 * Author: Jay
6 *
7 * Compile options:
8 * PRINT_MSG: print some messages to stdout.
9 * OUTPUT_FILE: write 'glist_nn.txt', and 'slist_nn.txt'.
10 * 'glist_nn' contains sorted generated numbers
11 * of each node. 'slist_nn' is final result
12 * of each node.
13 */
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <mpi.h>
17 #include <limits.h>
18 #include <time.h>
19 #include <stddef.h>
20 #include <string.h>
21
22 /* Upper bound of generated floating point */
23 #define UPPER_BOUND 1.0
24
25 /* float comparision function */
26 int float_comp(const void* aa, const void* bb)
27 {
28 const float* a = aa;
29 const float* b = bb;
30 if (*a == *b) return 0;
31 if (*a < *b) return -1;
32 return 1;
33 }
34
35 /* Generates random sequence into array A with size */
36 void generate_random_sequence(float* A, size_t size)
37 {
38 size_t i;
39 for (i=0; i<size; i++)
40 A[i] = (float)rand()/RAND_MAX;
41 }
42
43 /* Returns pointer to the lower_bound,
44 * which means the lowest position
45 * that the element can be inserted
46 * with sorted order. */
47 float* lower_bound(float* first, float* last, float val)
48 {
49 ptrdiff_t len = last - first;
50 while (len > 0)
51 {
52 ptrdiff_t half = len / 2;
53 float* middle = first + half;
54 if (*middle < val)
55 {
56 first = middle + 1;
57 len = len - half - 1;
58 }
59 else
60 len = half;
61 }
62 return first;
63 }
64
65 /* Writes each elements in float array to the file */
66 void output(const char* filename, float* A, size_t size)
67 {
68 FILE* f = fopen(filename, "w");
69 int i;
70 for (i=0; i<size; i++)
71 {
72 fprintf(f, "%1.5f\n", A[i]);
73 }
74 fclose(f);
75 }
76
77 /*
78 * First command-line argument: n
79 */
80 int main(int argc, char* argv[])
81 {
82 int n_number = 1000000;
83 if (argc > 1) n_number = atoi(argv[1]);
84 int id, p;
85
86 MPI_Init(&argc, &argv);
87
88 MPI_Barrier(MPI_COMM_WORLD);
89 double elapsed_time = -MPI_Wtime();
90
91 MPI_Comm_rank(MPI_COMM_WORLD, &id);
92 MPI_Comm_size(MPI_COMM_WORLD, &p);
93
94 /* Some nodes should control 1 more
95 * element if n % p != 0. */
96 float A[(n_number+(p-1))/p];
97 float* single_list = A;
98 int n_control = n_number/p;
99 int n_sorted = n_control;
100 if (n_number%p > id) n_control++;
101 int i;
102
103 #ifdef PRINT_MSG
104 fprintf(stdout, "Process %d generates %d of %d elements.\n",
105 id, n_control, n_number);
106 fflush(stdout);
107 #endif
108
109 /* For unique random number, add (id*1000) */
110 srand( time(NULL) + id * 1000);
111 generate_random_sequence(A, n_control);
112
113 /* Phase 1 */
114 /* Each process quicksort their own list and each one picks samples */
115 qsort(A, n_control, sizeof(float), float_comp);
116
117 if (p > 1)
118 {
119
120 float samples[p];
121 for (i=0; i<p; i++)
122 samples[i] = A[i*n_control/p];
123
124 /* Phase 2 */
125 /* Node 0 gathers samples, sorts them, and picks pivots. */
126 float all_samples[p*p];
127 MPI_Gather(samples, p, MPI_FLOAT, all_samples, p,
128 MPI_FLOAT, 0, MPI_COMM_WORLD);
129 float pivots[p-1];
130 if (!id)
131 {
132 qsort(all_samples, p*p, sizeof(float), float_comp);
133
134 for (i=0; i<p-1; i++)
135 pivots[i] = all_samples[(i+1)*p+p/2-1];
136 }
137 /* Node 0 broadcasts pivots and each process
138 * partitions its own list. */
139 MPI_Bcast(pivots, p-1, MPI_FLOAT, 0, MPI_COMM_WORLD);
140 int send_cnts[p], send_disp[p];
141 send_disp[0] = 0;
142 for (i=1; i<p; i++)
143 {
144 send_disp[i] =
145 (float*)(lower_bound(A, A+n_control, pivots[i-1]))-A;
146 send_cnts[i-1] = send_disp[i] - send_disp[i-1];
147 }
148 send_cnts[p-1] = n_control - send_disp[p-1];
149
150 /* Phase 3 */
151 /* First, exchanges the number of elements that
152 * each one is going to exchange. */
153 int recv_cnts[p], recv_disp[p+1];
154 MPI_Alltoall(send_cnts, 1, MPI_FLOAT, recv_cnts, 1,
155 MPI_FLOAT, MPI_COMM_WORLD);
156 recv_disp[0] = 0;
157 for (i=1; i<p; i++)
158 recv_disp[i] = recv_disp[i-1] + recv_cnts[i-1];
159 recv_disp[p] = recv_disp[p-1]+recv_cnts[p-1];
160 float partitions[recv_disp[p]];
161 /* Exchanges elements to appropriate nodes. */
162 MPI_Alltoallv(A, send_cnts, send_disp, MPI_FLOAT, partitions,
163 recv_cnts, recv_disp, MPI_FLOAT, MPI_COMM_WORLD);
164
165 /* Phase 4 */
166 /* Each node merges its own partitions into a single list. */
167 int j;
168 int merge_disp[p];
169 n_sorted = recv_disp[p];
170 single_list = malloc(n_sorted*sizeof(float));
171 memcpy(merge_disp, recv_disp, p*sizeof(int));
172 for (i=0; i<n_sorted; i++)
173 {
174 float min = UPPER_BOUND;
175 int min_pos = 0;
176 for (j=0; j<p; j++)
177 if (merge_disp[j] < recv_disp[j+1]
178 && min > partitions[merge_disp[j]])
179 {
180 min = partitions[merge_disp[j]];
181 min_pos = j;
182 }
183 single_list[i] = min;
184 merge_disp[min_pos]++;
185 }
186
187 }
188 /* Synchronizes for checking maximum elapsed time among nodes. */
189 MPI_Barrier(MPI_COMM_WORLD);
190 elapsed_time += MPI_Wtime();
191
192 #ifdef PRINT_MSG
193 fprintf(stdout, "Process %d now has sorted the list that contains \
194 %d of %d elements.\n", id, n_sorted, n_number);
195 fflush(stdout);
196 #endif
197
198 if (!id)
199 printf("Elapsed Time with %d processes: %10.6f\n",
200 p, elapsed_time);
201
202 /* Output (elapsed_time doesn't count for file output!) */
203 #ifdef OUTPUT_FILE
204 char filename[100];
205 strcpy(filename, "glistxx.txt");
206 filename[5] = '0' + id / 10;
207 filename[6] = '0' + id % 10;
208 output(filename, A, n_control);
209 filename[0] = 's';
210 output(filename, single_list, n_sorted);
211 #endif
212 if (single_list != A) free(single_list);
213 MPI_Finalize();
214
215 return 0;
216 }
217
댓글 없음:
댓글 쓰기