forked from larrabee/s3sync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstorage-s3st.go
140 lines (124 loc) · 3.78 KB
/
storage-s3st.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import (
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"io/ioutil"
"path/filepath"
"sync/atomic"
"time"
)
//S3Storage configuration
type S3StStorage struct {
awsSvc *s3.S3
awsSession *session.Session
awsBucket string
prefix string
acl string
keysPerReq int64
workers uint
retry uint
retryInterval time.Duration
}
//NewS3Storage return new configured S3 storage
func NewS3StStorage(awsAccessKey, awsSecretKey, awsRegion, endpoint, bucketName, prefix, acl string, keysPerReq int64, workers, retry uint, retryInterval time.Duration) (storage S3StStorage) {
awsConfig := aws.NewConfig()
awsConfig.S3ForcePathStyle = aws.Bool(true)
awsConfig.CredentialsChainVerboseErrors = aws.Bool(true)
if awsAccessKey != "" && awsSecretKey != "" {
cred := credentials.NewStaticCredentials(awsAccessKey, awsSecretKey, "")
awsConfig.WithCredentials(cred)
} else {
cred := credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
})
awsConfig.WithCredentials(cred)
}
awsConfig.Region = aws.String(awsRegion)
if endpoint != "" {
awsConfig.Endpoint = aws.String(endpoint)
}
storage.awsBucket = bucketName
storage.awsSession = session.Must(session.NewSession(awsConfig))
storage.awsSvc = s3.New(storage.awsSession)
storage.prefix = prefix
storage.acl = acl
storage.keysPerReq = keysPerReq
storage.workers = workers
storage.retry = retry
storage.retryInterval = retryInterval
return storage
}
//List S3 bucket and send founded objects to chan
func (storage S3StStorage) List(output chan<- Object) error {
listObjectsFn := func(p *s3.ListObjectsOutput, lastPage bool) bool {
for _, o := range p.Contents {
atomic.AddUint64(&counter.totalObjCnt, 1)
output <- Object{Key: aws.StringValue(o.Key), ETag: aws.StringValue(o.ETag), Mtime: aws.TimeValue(o.LastModified)}
}
if lastPage {
close(output)
}
return !lastPage // continue paging
}
err := storage.awsSvc.ListObjectsPages(&s3.ListObjectsInput{
Bucket: aws.String(storage.awsBucket),
Prefix: aws.String(storage.prefix),
MaxKeys: aws.Int64(storage.keysPerReq),
}, listObjectsFn)
return err
}
//PutObject to bucket
func (storage S3StStorage) PutObject(obj *Object) error {
_, err := storage.awsSvc.PutObject(&s3.PutObjectInput{
Bucket: aws.String(storage.awsBucket),
Key: aws.String(filepath.Join(storage.prefix, obj.Key)),
Body: bytes.NewReader(obj.Content),
ContentType: aws.String(obj.ContentType),
ACL: aws.String(storage.acl),
})
if err != nil {
return err
}
return nil
}
//GetObjectContent download object content from S3
func (storage S3StStorage) GetObjectContent(obj *Object) error {
result, err := storage.awsSvc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(storage.awsBucket),
Key: aws.String(obj.Key),
})
if err != nil {
return err
}
obj.Content, err = ioutil.ReadAll(result.Body)
if err != nil {
return err
}
obj.ContentType = aws.StringValue(result.ContentType)
obj.ETag = aws.StringValue(result.ETag)
obj.Mtime = aws.TimeValue(result.LastModified)
return nil
}
//GetObjectMeta update object metadata from S3
func (storage S3StStorage) GetObjectMeta(obj *Object) error {
result, err := storage.awsSvc.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(storage.awsBucket),
Key: aws.String(obj.Key),
})
if err != nil {
return err
}
obj.ContentType = aws.StringValue(result.ContentType)
obj.ETag = aws.StringValue(result.ETag)
obj.Mtime = aws.TimeValue(result.LastModified)
return nil
}
//GetStorageType return storage type
func (storage S3StStorage) GetStorageType() ConnType {
return s3StConn
}