고른 표본 추출을 통한 빠른 정렬
고른 표본 추출을 통한 병렬 정렬(Parallel Sorting by Regular Sampling: PSRS)은 n개의 프로세스에서 골고루 표본을 추출하여 이것을 주축으로 하여 값들을 정렬하는 병렬 정렬 알고리즘의 일종입니다. 균형이 잘 잡히고, 프로세스의 수가 2의 n제곱 꼴이 되지 않아도 된다는 등의 장점이 있습니다.
MPI를 배우면서 이것을 한번 C+MPI로 구현해 보았습니다. p개의 노드에서 n/p개만큼의 [0..1) 범위의 float형 난수를 발생한 다음에 정렬을 하는 과정입니다. 잘 작성된 코드는 아니지만, 참고할 수 있는 코드일 수도 있어서 실어 봅니다.
1 /** 2 * PSRS implementation using MPI. 3 * 4 * Date: 5th of December, 2006 5 * Author: Jay 6 * 7 * Compile options: 8 * PRINT_MSG: print some messages to stdout. 9 * OUTPUT_FILE: write 'glist_nn.txt', and 'slist_nn.txt'. 10 * 'glist_nn' contains sorted generated numbers 11 * of each node. 'slist_nn' is final result 12 * of each node. 13 */ 14 #include <stdlib.h> 15 #include <stdio.h> 16 #include <mpi.h> 17 #include <limits.h> 18 #include <time.h> 19 #include <stddef.h> 20 #include <string.h> 21 22 /* Upper bound of generated floating point */ 23 #define UPPER_BOUND 1.0 24 25 /* float comparision function */ 26 int float_comp(const void* aa, const void* bb) 27 { 28 const float* a = aa; 29 const float* b = bb; 30 if (*a == *b) return 0; 31 if (*a < *b) return -1; 32 return 1; 33 } 34 35 /* Generates random sequence into array A with size */ 36 void generate_random_sequence(float* A, size_t size) 37 { 38 size_t i; 39 for (i=0; i<size; i++) 40 A[i] = (float)rand()/RAND_MAX; 41 } 42 43 /* Returns pointer to the lower_bound, 44 * which means the lowest position 45 * that the element can be inserted 46 * with sorted order. */ 47 float* lower_bound(float* first, float* last, float val) 48 { 49 ptrdiff_t len = last - first; 50 while (len > 0) 51 { 52 ptrdiff_t half = len / 2; 53 float* middle = first + half; 54 if (*middle < val) 55 { 56 first = middle + 1; 57 len = len - half - 1; 58 } 59 else 60 len = half; 61 } 62 return first; 63 } 64 65 /* Writes each elements in float array to the file */ 66 void output(const char* filename, float* A, size_t size) 67 { 68 FILE* f = fopen(filename, "w"); 69 int i; 70 for (i=0; i<size; i++) 71 { 72 fprintf(f, "%1.5f\n", A[i]); 73 } 74 fclose(f); 75 } 76 77 /* 78 * First command-line argument: n 79 */ 80 int main(int argc, char* argv[]) 81 { 82 int n_number = 1000000; 83 if (argc > 1) n_number = atoi(argv[1]); 84 int id, p; 85 86 MPI_Init(&argc, &argv); 87 88 MPI_Barrier(MPI_COMM_WORLD); 89 double elapsed_time = -MPI_Wtime(); 90 91 MPI_Comm_rank(MPI_COMM_WORLD, &id); 92 MPI_Comm_size(MPI_COMM_WORLD, &p); 93 94 /* Some nodes should control 1 more 95 * element if n % p != 0. */ 96 float A[(n_number+(p-1))/p]; 97 float* single_list = A; 98 int n_control = n_number/p; 99 int n_sorted = n_control; 100 if (n_number%p > id) n_control++; 101 int i; 102 103 #ifdef PRINT_MSG 104 fprintf(stdout, "Process %d generates %d of %d elements.\n", 105 id, n_control, n_number); 106 fflush(stdout); 107 #endif 108 109 /* For unique random number, add (id*1000) */ 110 srand( time(NULL) + id * 1000); 111 generate_random_sequence(A, n_control); 112 113 /* Phase 1 */ 114 /* Each process quicksort their own list and each one picks samples */ 115 qsort(A, n_control, sizeof(float), float_comp); 116 117 if (p > 1) 118 { 119 120 float samples[p]; 121 for (i=0; i<p; i++) 122 samples[i] = A[i*n_control/p]; 123 124 /* Phase 2 */ 125 /* Node 0 gathers samples, sorts them, and picks pivots. */ 126 float all_samples[p*p]; 127 MPI_Gather(samples, p, MPI_FLOAT, all_samples, p, 128 MPI_FLOAT, 0, MPI_COMM_WORLD); 129 float pivots[p-1]; 130 if (!id) 131 { 132 qsort(all_samples, p*p, sizeof(float), float_comp); 133 134 for (i=0; i<p-1; i++) 135 pivots[i] = all_samples[(i+1)*p+p/2-1]; 136 } 137 /* Node 0 broadcasts pivots and each process 138 * partitions its own list. */ 139 MPI_Bcast(pivots, p-1, MPI_FLOAT, 0, MPI_COMM_WORLD); 140 int send_cnts[p], send_disp[p]; 141 send_disp[0] = 0; 142 for (i=1; i<p; i++) 143 { 144 send_disp[i] = 145 (float*)(lower_bound(A, A+n_control, pivots[i-1]))-A; 146 send_cnts[i-1] = send_disp[i] - send_disp[i-1]; 147 } 148 send_cnts[p-1] = n_control - send_disp[p-1]; 149 150 /* Phase 3 */ 151 /* First, exchanges the number of elements that 152 * each one is going to exchange. */ 153 int recv_cnts[p], recv_disp[p+1]; 154 MPI_Alltoall(send_cnts, 1, MPI_FLOAT, recv_cnts, 1, 155 MPI_FLOAT, MPI_COMM_WORLD); 156 recv_disp[0] = 0; 157 for (i=1; i<p; i++) 158 recv_disp[i] = recv_disp[i-1] + recv_cnts[i-1]; 159 recv_disp[p] = recv_disp[p-1]+recv_cnts[p-1]; 160 float partitions[recv_disp[p]]; 161 /* Exchanges elements to appropriate nodes. */ 162 MPI_Alltoallv(A, send_cnts, send_disp, MPI_FLOAT, partitions, 163 recv_cnts, recv_disp, MPI_FLOAT, MPI_COMM_WORLD); 164 165 /* Phase 4 */ 166 /* Each node merges its own partitions into a single list. */ 167 int j; 168 int merge_disp[p]; 169 n_sorted = recv_disp[p]; 170 single_list = malloc(n_sorted*sizeof(float)); 171 memcpy(merge_disp, recv_disp, p*sizeof(int)); 172 for (i=0; i<n_sorted; i++) 173 { 174 float min = UPPER_BOUND; 175 int min_pos = 0; 176 for (j=0; j<p; j++) 177 if (merge_disp[j] < recv_disp[j+1] 178 && min > partitions[merge_disp[j]]) 179 { 180 min = partitions[merge_disp[j]]; 181 min_pos = j; 182 } 183 single_list[i] = min; 184 merge_disp[min_pos]++; 185 } 186 187 } 188 /* Synchronizes for checking maximum elapsed time among nodes. */ 189 MPI_Barrier(MPI_COMM_WORLD); 190 elapsed_time += MPI_Wtime(); 191 192 #ifdef PRINT_MSG 193 fprintf(stdout, "Process %d now has sorted the list that contains \ 194 %d of %d elements.\n", id, n_sorted, n_number); 195 fflush(stdout); 196 #endif 197 198 if (!id) 199 printf("Elapsed Time with %d processes: %10.6f\n", 200 p, elapsed_time); 201 202 /* Output (elapsed_time doesn't count for file output!) */ 203 #ifdef OUTPUT_FILE 204 char filename[100]; 205 strcpy(filename, "glistxx.txt"); 206 filename[5] = '0' + id / 10; 207 filename[6] = '0' + id % 10; 208 output(filename, A, n_control); 209 filename[0] = 's'; 210 output(filename, single_list, n_sorted); 211 #endif 212 if (single_list != A) free(single_list); 213 MPI_Finalize(); 214 215 return 0; 216 } 217