forked from facebook/wdt
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDirectorySourceQueue.h
450 lines (377 loc) · 14.8 KB
/
DirectorySourceQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#pragma once
#include <dirent.h>
#include <glog/logging.h>
#include <algorithm>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <wdt/Protocol.h>
#include <wdt/SourceQueue.h>
#include <wdt/WdtTransferRequest.h>
#include <wdt/util/FileByteSource.h>
namespace facebook {
namespace wdt {
/**
* SourceQueue that returns all the regular files under a given directory
* (recursively) as individual FileByteSource objects, sorted by decreasing
* file size.
*
* TODO: The actual building of the queue is specific to this implementation
* which may or may not make it easy to plug a different implementation
* (as shown by the current implementation of Sender.cpp)
*/
class DirectorySourceQueue : public SourceQueue {
public:
/**
* Create a DirectorySourceQueue.
* Call buildQueueSynchronously() or buildQueueAsynchronously() separately
* to actually recurse over the root directory gather files and sizes.
*
* @param options options to use
* @param rootDir root directory to recurse on
* @param abortChecker abort checker
*/
DirectorySourceQueue(const WdtOptions &options, const std::string &rootDir,
IAbortChecker const *abortChecker);
/**
* Recurse over given root directory, gather data about regular files and
* initialize internal data structures. getNextSource() will return sources
* as this call discovers them.
*
* This should only be called once. Subsequent calls will do nothing and
* return false. In case it is called from multiple threads, one of them
* will do initialization while the other calls will fail.
*
* This is synchronous in the succeeding thread - it will block until
* the directory is completely discovered. Use buildQueueAsynchronously()
* for async fetch from parallel thread.
*
* @return true iff initialization was successful and hasn't
* been done before
*/
bool buildQueueSynchronously();
/**
* Starts a new thread to build the queue @see buildQueueSynchronously()
* @return the created thread (to be joined if needed)
*/
std::thread buildQueueAsynchronously();
/// @return true iff all regular files under root dir have been consumed
bool finished() const override;
/// @return true if all the files have been discovered, false otherwise
bool fileDiscoveryFinished() const;
/**
* @param callerThreadCtx context of the calling thread
* @param status this variable is set to the status of the transfer
*
* @return next FileByteSource to consume or nullptr when finished
*/
std::unique_ptr<ByteSource> getNextSource(ThreadCtx *callerThreadCtx,
ErrorCode &status) override;
/// @return total number of files processed/enqueued
int64_t getCount() const override;
/// @return total size of files processed/enqueued
int64_t getTotalSize() const override;
/// @return total number of blocks and status of the transfer
std::pair<int64_t, ErrorCode> getNumBlocksAndStatus() const;
/// @return perf report
const PerfStatReport &getPerfReport() const;
/**
* Sets regex representing files to include for transfer
*
* @param includePattern file inclusion regex
*/
void setIncludePattern(const std::string &includePattern);
/**
* Sets regex representing files to exclude for transfer
*
* @param excludePattern file exclusion regex
*/
void setExcludePattern(const std::string &excludePattern);
/**
* Sets regex representing directories to exclude for transfer
*
* @param pruneDirPattern directory exclusion regex
*/
void setPruneDirPattern(const std::string &pruneDirPattern);
/**
* Sets the number of consumer threads for this queue. used as threshold
* between notify and notifyAll
*/
void setNumClientThreads(int64_t numClientThreads) {
numClientThreads_ = numClientThreads;
}
/**
* Sets the count and trigger for files to open during discovery
* (negative is keep opening until we run out of fd, positive is how
* many files we can still open, 0 is stop opening files)
*/
void setOpenFilesDuringDiscovery(int64_t openFilesDuringDiscovery) {
openFilesDuringDiscovery_ = openFilesDuringDiscovery;
}
/**
* If setOpenFilesDuringDiscovery is not zero, open files using direct
* mode.
*/
void setDirectReads(bool directReads) {
directReads_ = directReads;
}
/// enable extra file deletion in the receiver side
void enableFileDeletion() {
deleteFiles_ = true;
}
/**
* Stat the FileInfo input files (if their size aren't already specified) and
* insert them in the queue
*
* @param fileInfo files to transferred
*/
void setFileInfo(const std::vector<WdtFileInfo> &fileInfo);
void setFileInfoGenerator(WdtTransferRequest::FileInfoGenerator gen);
/// @param blockSizeMbytes block size in Mbytes
void setBlockSizeMbytes(int64_t blockSizeMbytes);
/// Get the file info in this directory queue
std::vector<WdtFileInfo> getFileInfo() const;
/**
* Sets whether to follow symlink or not
*
* @param followSymlinks whether to follow symlink or not
*/
void setFollowSymlinks(bool followSymlinks);
/**
* sets chunks which were sent in some previous transfer
*
* @param previouslyTransferredChunks previously sent chunk info
*/
void setPreviouslyReceivedChunks(
std::vector<FileChunksInfo> &previouslyTransferredChunks);
/**
* returns sources to the queue, checks for fail/retries, doesn't increment
* numentries
*
* @param sources sources to be returned to the queue
*/
void returnToQueue(std::vector<std::unique_ptr<ByteSource>> &sources);
/**
* returns a source to the queue, checks for fail/retries, doesn't increment
* numentries
*
* @param source source to be returned to the queue
*/
void returnToQueue(std::unique_ptr<ByteSource> &source);
/**
* Returns list of files which were not transferred. It empties the queue and
* adds queue entries to the failed file list. This function should be called
* after all the sending threads have finished execution
*
* @return stats for failed sources
*/
std::vector<TransferStats> &getFailedSourceStats();
/// @return returns list of directories which could not be opened
std::vector<std::string> &getFailedDirectories();
/// @return number of bytes previously sent
int64_t getPreviouslySentBytes() const;
~DirectorySourceQueue() override;
/// @return discovered files metadata
std::vector<SourceMetaData *> &getDiscoveredFilesMetaData();
/// Returns the time it took to traverse the directory tree
double getDirectoryTime() const {
return directoryTime_;
}
/**
* Allows to change the root directory, must not be empty, trailing
* slash is automatically added if missing. Can be relative.
* if follow symlink is set the directory will be resolved as absolute
* path.
* @return true if successful, false on error (logged)
*/
bool setRootDir(const std::string &newRootDir);
/**
* Allows the caller to block until all the previous transfers have
* finished, before invoking fileInfoGenerator_ to get the next batch.
* NOTE: This uses numActiveThreadsFn() to get the number of clients pulling
* from the queue and the size of queue to determine if transfers have
* finished.
*
* @param progressReportInterval report progress every
* progressReportInterval milliseconds.
* @param numActiveThreadsFn Func to get number of active threads
*/
void waitForPreviousTransfer(std::chrono::milliseconds progressReportInterval,
std::function<int64_t()> numActiveThreadsFn);
private:
/**
* Resolves a symlink.
*
* @return realpath or empty string on error (logged)
*/
std::string resolvePath(const std::string &path);
/**
* Traverse rootDir_ to gather files and sizes to enqueue
*
* @return true on success, false on error
*/
bool explore();
/**
* Stat the input files and populate queue
* @return true on success, false on error
*/
bool enqueueFiles(std::vector<WdtFileInfo>& fileInfo);
/**
* initial creation from either explore or enqueue files, uses
* createIntoQueueInternal to create blocks
*
* @param fullPath full path of the file to be added
* @param fileInfo Information about file
*/
void createIntoQueue(const std::string &fullPath, WdtFileInfo &fileInfo);
/**
* initial creation from either explore or enqueue files - always increment
* numentries. Lock must be held before calling this.
*
* @param metadata file meta-data
*/
void createIntoQueueInternal(SourceMetaData *metadata);
/**
* when adding multiple files, we have the option of using notify_one multiple
* times or notify_all once. Depending on number of added sources, this
* function uses either notify_one or notify_all
*
* @param addedSource number of sources added
*/
void smartNotify(int32_t addedSource);
/// Removes all elements from the source queue
void clearSourceQueue();
/// if file deletion is enabled, extra files to be deleted are enqueued. This
/// method should be called while holding the lock
void enqueueFilesToBeDeleted();
std::unique_ptr<ThreadCtx> threadCtx_{nullptr};
/// root directory to recurse on if fileInfo_ is empty
std::string rootDir_;
/// regex representing directories to prune
std::string pruneDirPattern_;
/// regex representing files to include
std::string includePattern_;
/// regex representing files to exclude
std::string excludePattern_;
/// Block size in mb
int64_t blockSizeMbytes_{0};
/// List of files to enqueue instead of recursing over rootDir_.
std::vector<WdtFileInfo> fileInfo_;
/// A generator function to invoke to get more files to send
WdtTransferRequest::FileInfoGenerator fileInfoGenerator_;
/// protects
/// initCalled_/initFinished_/sourceQueue_/failedSourceStats_/numWaiters_
mutable std::mutex mutex_;
/// condition variable indicating sourceQueue_ is not empty
mutable std::condition_variable conditionNotEmpty_;
/// condition variable indicating previous batch transfer has finished i.e.
/// queue is empty and all client threads are waiting.
mutable std::condition_variable conditionPrevTransfer_;
/// Indicates whether init() has been called to prevent multiple calls
bool initCalled_{false};
/// Indicates whether call to init() has finished
bool initFinished_{false};
struct SourceComparator {
bool operator()(const std::unique_ptr<ByteSource> &source1,
const std::unique_ptr<ByteSource> &source2) {
bool toBeDeleted1 =
(source1->getMetaData().allocationStatus == TO_BE_DELETED);
bool toBeDeleted2 =
(source2->getMetaData().allocationStatus == TO_BE_DELETED);
if (toBeDeleted1 != toBeDeleted2) {
// always send files to be deleted first
return toBeDeleted2;
}
auto retryCount1 = source1->getTransferStats().getFailedAttempts();
auto retryCount2 = source2->getTransferStats().getFailedAttempts();
if (retryCount1 != retryCount2) {
return retryCount1 > retryCount2;
}
if (source1->getSize() != source2->getSize()) {
return source1->getSize() < source2->getSize();
}
if (source1->getOffset() != source2->getOffset()) {
return source1->getOffset() > source2->getOffset();
}
return source1->getIdentifier() > source2->getIdentifier();
}
};
/**
* priority queue of sources. Sources are first ordered by increasing
* failedAttempts, then by decreasing size. If sizes are equal(always for
* blocks), sources are ordered by offset. This way, we ensure that all the
* threads in the receiver side are not writing to the same file at the same
* time.
*/
std::priority_queue<std::unique_ptr<ByteSource>,
std::vector<std::unique_ptr<ByteSource>>,
SourceComparator>
sourceQueue_;
/**
* number of threads waiting on the queue
*/
int64_t numWaiters_{0};
/// Transfer stats for sources which are not transferred
std::vector<TransferStats> failedSourceStats_;
/// directories which could not be opened
std::vector<std::string> failedDirectories_;
/// Total number of files that have passed through the queue
int64_t numEntries_{0};
/// Seq-id of the next file to be inserted into the queue
/// first valid seq is 1 so we can use 0 as unintilized/invalid in protocol.h
int64_t nextSeqId_{1};
/// total number of blocks that have passed through the queue. Even when
/// blocks are actually disabled, our code internally treats files like single
/// blocks. So, numBlocks_ >= numFiles_.
int64_t numBlocks_{0};
/// Total size of entries/files that have passed through the queue
int64_t totalFileSize_{0};
/// Number of blocks dequeued
int64_t numBlocksDequeued_{0};
/// Whether to follow symlinks or not
bool followSymlinks_{false};
/// shared file data. This are used during transfer to add blocks
/// contribution
std::vector<SourceMetaData *> sharedFileData_;
/// A map from relative file name to previously received chunks
std::unordered_map<std::string, FileChunksInfo> previouslyTransferredChunks_;
/// Stores the time difference between the start and the end of the
/// traversal of directory
double directoryTime_{0};
/// Number of bytes previously sent
int64_t previouslySentBytes_{0};
/**
* Count and trigger of files to open (negative is keep opening until we run
* out of fd, positive is how many files we can still open, 0 is stop opening
* files).
* Sender only (Receiver download resumption directory discovery should not
* open files).
*/
int32_t openFilesDuringDiscovery_{0};
/// Should the WdtFileInfo created during discovery have direct read mode set
bool directReads_{false};
// Number of files opened
int64_t numFilesOpened_{0};
// Number of files opened with odirect
int64_t numFilesOpenedWithDirect_{0};
// Number of consumer threads (to tell between notify/notifyall)
int64_t numClientThreads_{1};
// Should we explore or use fileInfo
bool exploreDirectory_{true};
/// delete extra files in the receiver side
bool deleteFiles_{false};
};
}
}