-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathad_lustre_aggregate.c
350 lines (313 loc) · 12.6 KB
/
ad_lustre_aggregate.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* Copyright (C) 1997 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
* Copyright (C) 2007 Oak Ridge National Laboratory
*
* Copyright (C) 2008 Sun Microsystems, Lustre group
*/
#include "ad_lustre.h"
#include "adio_extern.h"
#undef AGG_DEBUG
void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd,
ADIO_Offset **striping_info_ptr,
int mode, ADIO_Offset min_offset,
ADIO_Offset max_offset)
{
ADIO_Offset *striping_info = NULL;
/* get striping information:
* striping_info[0]: stripe_size
* striping_info[1]: stripe_count
* striping_info[2]: avail_cb_nodes
* striping_info[3]: min_aligned_offset
* striping_info[4]: region_size - size of aar for one group
* striping_info[5]: min_offset_rank
* striping_info[6]: max_offset
*/
ADIO_Offset aligned_aar_size, aar_stripes, min_stripe_offset;
int stripe_size, stripe_count, CO = 1, num_groups;
int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
/* Get hints value */
/* stripe size */
stripe_size = fd->hints->striping_unit;
/* stripe count */
/* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */
stripe_count = fd->hints->striping_factor;
/* Calculate the available number of I/O clients */
if (!mode) {
/* for collective read,
* if "CO" clients access the same OST simultaneously,
* the OST disk seek time would be much. So, to avoid this,
* it might be better if 1 client only accesses 1 OST.
* So, we set CO = 1 to meet the above requirement.
*/
CO = 1;
/*XXX: maybe there are other better way for collective read */
} else {
/* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */
CO = fd->hints->fs_hints.lustre.co_ratio;
}
/* Calculate how many IO clients we need */
/* Algorithm courtesy Pascal Deveze ([email protected]) */
/* To avoid extent lock conflicts,
* avail_cb_nodes should either
* - be a multiple of stripe_count,
* - or divide stripe_count exactly
* so that each OST is accessed by a maximum of CO constant clients. */
if (nprocs_for_coll >= stripe_count)
/* avail_cb_nodes should be a multiple of stripe_count and the number
* of procs per OST should be limited to the minimum between
* nprocs_for_coll/stripe_count and CO
*
* e.g. if stripe_count=20, nprocs_for_coll=42 and CO=3 then
* avail_cb_nodes should be equal to 40 */
avail_cb_nodes =
stripe_count * ADIOI_MIN(nprocs_for_coll/stripe_count, CO);
else {
/* nprocs_for_coll is less than stripe_count */
/* avail_cb_nodes should divide stripe_count */
/* e.g. if stripe_count=60 and nprocs_for_coll=8 then
* avail_cb_nodes should be egal to 6 */
/* This could be done with :
while (stripe_count % avail_cb_nodes != 0) avail_cb_nodes--;
but this can be optimized for large values of nprocs_for_coll and
stripe_count */
divisor = 2;
avail_cb_nodes = 1;
/* try to divise */
while (stripe_count >= divisor*divisor) {
if ((stripe_count % divisor) == 0) {
if (stripe_count/divisor <= nprocs_for_coll) {
/* The value is found ! */
avail_cb_nodes = stripe_count/divisor;
break;
}
/* if divisor is less than nprocs_for_coll, divisor is a
* solution, but it is not sure that it is the best one */
else if (divisor <= nprocs_for_coll)
avail_cb_nodes = divisor;
}
divisor++;
}
}
*striping_info_ptr = (ADIO_Offset *) ADIOI_Malloc(7 * sizeof(ADIO_Offset));
striping_info = *striping_info_ptr;
striping_info[0] = stripe_size;
striping_info[1] = stripe_count;
striping_info[2] = avail_cb_nodes;
min_stripe_offset = min_offset / stripe_size;
striping_info[3] = min_stripe_offset * stripe_size;
if (avail_cb_nodes % stripe_count == 0)
num_groups = avail_cb_nodes / stripe_count;
else
num_groups = avail_cb_nodes / stripe_count + 1;
if (max_offset % stripe_size == 0)
aligned_aar_size = max_offset;
else
aligned_aar_size = (max_offset / stripe_size + 1) * stripe_size;
aligned_aar_size -= striping_info[3];
aar_stripes = aligned_aar_size / stripe_size;
if (aar_stripes % num_groups == 0)
striping_info[4] = aar_stripes / num_groups;
else
striping_info[4] = aar_stripes / num_groups + 1;
striping_info[4] *= stripe_size;
striping_info[5] = min_stripe_offset % avail_cb_nodes;
striping_info[6] = max_offset;
}
int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
ADIO_Offset *len, ADIO_Offset *striping_info)
{
int rank_index;
ADIO_Offset avail_bytes;
int stripe_size = striping_info[0];
int stripe_count = striping_info[1];
int avail_cb_nodes = striping_info[2];
ADIO_Offset min_aligned_offset = striping_info[3];
ADIO_Offset region_size = striping_info[4];
int min_offset_rank = striping_info[5];
/* Produce the group-cyclic pattern for Lustre */
ADIO_Offset rel_aligned_offset = off - min_aligned_offset;
rank_index =
(min_offset_rank
+ (rel_aligned_offset / region_size) * stripe_count
+ (rel_aligned_offset / stripe_size) % stripe_count)
% avail_cb_nodes;
/* we index into fd_end with rank_index, and fd_end was allocated to be no
* bigger than fd->hints->cb_nodes. If we ever violate that, we're
* overrunning arrays. Obviously, we should never ever hit this abort
*/
if (rank_index >= fd->hints->cb_nodes)
MPI_Abort(MPI_COMM_WORLD, 1);
avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
(ADIO_Offset)stripe_size - off;
if (avail_bytes < *len) {
/* this proc only has part of the requested contig. region */
*len = avail_bytes;
}
/* map our index to a rank */
/* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
return fd->hints->ranklist[rank_index];
}
/* ADIOI_LUSTRE_Calc_my_req() - calculate what portions of the access requests
* of this process are located in the file domains of various processes
* (including this one)
*/
void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
ADIO_Offset *len_list, int contig_access_count,
ADIO_Offset *striping_info, int nprocs,
int *count_my_req_procs_ptr,
int **count_my_req_per_proc_ptr,
ADIOI_Access **my_req_ptr,
int ***buf_idx_ptr)
{
int *count_my_req_per_proc, count_my_req_procs, **buf_idx;
int i, l, proc;
ADIO_Offset avail_len, rem_len, curr_idx, off;
ADIOI_Access *my_req;
*count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
count_my_req_per_proc = *count_my_req_per_proc_ptr;
/* count_my_req_per_proc[i] gives the no. of contig. requests of this
* process in process i's file domain. calloc initializes to zero.
*/
buf_idx = (int **) ADIOI_Malloc(nprocs * sizeof(int*));
/* one pass just to calculate how much space to allocate for my_req;
* contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
*/
for (i = 0; i < contig_access_count; i++) {
/* short circuit offset/len processing if len == 0
* (zero-byte read/write
*/
if (len_list[i] == 0)
continue;
off = offset_list[i];
avail_len = len_list[i];
/* note: we set avail_len to be the total size of the access.
* then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return
* the amount that was available.
*/
proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
count_my_req_per_proc[proc]++;
/* figure out how many data is remaining in the access
* we'll take care of this data (if there is any)
* in the while loop below.
*/
rem_len = len_list[i] - avail_len;
while (rem_len != 0) {
off += avail_len; /* point to first remaining byte */
avail_len = rem_len; /* save remaining size, pass to calc */
proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
count_my_req_per_proc[proc]++;
rem_len -= avail_len; /* reduce remaining length by amount from fd */
}
}
/* buf_idx is relevant only if buftype_is_contig.
* buf_idx[i] gives the index into user_buf where data received
* from proc 'i' should be placed. This allows receives to be done
* without extra buffer. This can't be done if buftype is not contig.
*/
/* initialize buf_idx vectors */
for (i = 0; i < nprocs; i++) {
/* add one to count_my_req_per_proc[i] to avoid zero size malloc */
buf_idx[i] = (int *) ADIOI_Malloc((count_my_req_per_proc[i] + 1)
* sizeof(int));
}
/* now allocate space for my_req, offset, and len */
*my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
my_req = *my_req_ptr;
count_my_req_procs = 0;
for (i = 0; i < nprocs; i++) {
if (count_my_req_per_proc[i]) {
my_req[i].offsets = (ADIO_Offset *)
ADIOI_Malloc(count_my_req_per_proc[i] *
sizeof(ADIO_Offset));
my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] *
sizeof(int));
count_my_req_procs++;
}
my_req[i].count = 0; /* will be incremented where needed later */
}
/* now fill in my_req */
curr_idx = 0;
for (i = 0; i < contig_access_count; i++) {
/* short circuit offset/len processing if len == 0
* (zero-byte read/write */
if (len_list[i] == 0)
continue;
off = offset_list[i];
avail_len = len_list[i];
proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
l = my_req[proc].count;
ADIOI_Assert(curr_idx == (int) curr_idx);
ADIOI_Assert(l < count_my_req_per_proc[proc]);
buf_idx[proc][l] = (int) curr_idx;
curr_idx += avail_len;
rem_len = len_list[i] - avail_len;
/* store the proc, offset, and len information in an array
* of structures, my_req. Each structure contains the
* offsets and lengths located in that process's FD,
* and the associated count.
*/
my_req[proc].offsets[l] = off;
ADIOI_Assert(avail_len == (int) avail_len);
my_req[proc].lens[l] = (int) avail_len;
my_req[proc].count++;
while (rem_len != 0) {
off += avail_len;
avail_len = rem_len;
proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
l = my_req[proc].count;
ADIOI_Assert(curr_idx == (int) curr_idx);
ADIOI_Assert(l < count_my_req_per_proc[proc]);
buf_idx[proc][l] = (int) curr_idx;
curr_idx += avail_len;
rem_len -= avail_len;
my_req[proc].offsets[l] = off;
ADIOI_Assert(avail_len == (int) avail_len);
my_req[proc].lens[l] = (int) avail_len;
my_req[proc].count++;
}
}
#ifdef AGG_DEBUG
for (i = 0; i < nprocs; i++) {
if (count_my_req_per_proc[i] > 0) {
FPRINTF(stdout, "data needed from %d (count = %d):\n",
i, my_req[i].count);
for (l = 0; l < my_req[i].count; l++) {
FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n",
l, my_req[i].offsets[l], l, my_req[i].lens[l]);
}
}
}
#endif
*count_my_req_procs_ptr = count_my_req_procs;
*buf_idx_ptr = buf_idx;
}
int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
ADIO_Offset *len_list, int nprocs)
{
/* If the processes are non-interleaved, we will check the req_size.
* if (avg_req_size > big_req_size) {
* docollect = 0;
* }
*/
int i, docollect = 1, big_req_size = 0;
ADIO_Offset req_size = 0, total_req_size;
int avg_req_size, total_access_count;
/* calculate total_req_size and total_access_count */
for (i = 0; i < contig_access_count; i++)
req_size += len_list[i];
MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
fd->comm);
MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
fd->comm);
/* estimate average req_size */
avg_req_size = (int)(total_req_size / total_access_count);
/* get hint of big_req_size */
big_req_size = fd->hints->fs_hints.lustre.coll_threshold;
/* Don't perform collective I/O if there are big requests */
if ((big_req_size > 0) && (avg_req_size > big_req_size))
docollect = 0;
return docollect;
}