forked from larrabee/s3sync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.go
98 lines (89 loc) · 2 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"path/filepath"
"sync"
"sync/atomic"
"time"
)
//Counter collect statistic and sync progress
type Counter struct {
sucObjCnt uint64
failObjCnt uint64
skipObjCnt uint64
totalObjCnt uint64
startTime time.Time
}
func failedObjAction(obj Object) {
atomic.AddUint64(&counter.failObjCnt, 1)
switch cli.OnFail {
case onFailLog:
log.Errorf("Failed to sync object: %s, skipping it\n", obj.Key)
case onFailFatal:
log.Fatalf("Failed to sync object: %s, exiting\n", obj.Key)
}
}
func filterObject(obj *Object) bool {
// Filter object by extension
if len(cli.FilterExtension) > 0 {
flag := false
fileExt := filepath.Ext(obj.Key)
for _, ext := range cli.FilterExtension {
if fileExt == ext {
flag = true
break
}
}
if flag == false {
return true
}
}
// Filter object by modify time
if (cli.FilterTimestamp > 0) && (obj.Mtime.Unix() < cli.FilterTimestamp) {
return true
}
return false
}
func processObj(ch <-chan Object, wg *sync.WaitGroup) {
Main:
for obj := range ch {
// Get Metadata
if syncGr.Source.GetStorageType() != s3Conn && syncGr.Source.GetStorageType() != s3StConn {
syncGr.Source.GetObjectMeta(&obj)
}
// Filter objects
if filterObject(&obj) {
atomic.AddUint64(&counter.skipObjCnt, 1)
continue
}
// Download object
for i := uint(0); i <= cli.Retry; i++ {
if err := syncGr.Source.GetObjectContent(&obj); err == nil {
break
} else {
log.Debugf("Getting obj %s failed with err: %s", obj.Key, err)
if i == cli.Retry {
failedObjAction(obj)
continue Main
}
time.Sleep(cli.RetryInterval)
continue
}
}
// Upload object
for i := uint(0); i <= cli.Retry; i++ {
if err := syncGr.Target.PutObject(&obj); err == nil {
break
} else {
log.Debugf("Putting obj %s failed with err: %s", obj.Key, err)
if i == cli.Retry {
failedObjAction(obj)
continue Main
}
time.Sleep(cli.RetryInterval)
continue
}
}
atomic.AddUint64(&counter.sucObjCnt, 1)
}
wg.Done()
}