feature (s3): multithread on s3 plugin

This commit is contained in:
MickaelK 2023-11-27 00:25:28 +11:00
parent aed5a61f22
commit def45d5ad5
2 changed files with 191 additions and 207 deletions

View file

@ -1,26 +0,0 @@
package plg_backend_s3
import (
. "github.com/mickael-kerjean/filestash/server/common"
"time"
)
var ls_timeout func() time.Duration
func init() {
ls_timeout = func() time.Duration {
return time.Duration(Config.Get("features.protection.ls_timeout").Schema(func(f *FormElement) *FormElement {
if f == nil {
f = &FormElement{}
}
f.Default = 2
f.Name = "ls_timeout"
f.Type = "number"
f.Target = []string{}
f.Description = "failsafe timeout for listing files under a folder"
f.Placeholder = "Default: 2"
return f
}).Int()) * time.Second
}
ls_timeout()
}

View file

@ -3,6 +3,8 @@ package plg_backend_s3
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"sync"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
@ -17,20 +19,19 @@ import (
. "github.com/mickael-kerjean/filestash/server/common" . "github.com/mickael-kerjean/filestash/server/common"
"io" "io"
"net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
) )
var S3Cache AppCache var S3Cache AppCache
type S3Backend struct { type S3Backend struct {
client *s3.S3 client *s3.S3
config *aws.Config config *aws.Config
params map[string]string params map[string]string
context context.Context context context.Context
threadSize int
} }
func init() { func init() {
@ -38,11 +39,10 @@ func init() {
S3Cache = NewAppCache(2, 1) S3Cache = NewAppCache(2, 1)
} }
func (s S3Backend) Init(params map[string]string, app *App) (IBackend, error) { func (this S3Backend) Init(params map[string]string, app *App) (IBackend, error) {
if params["encryption_key"] != "" && len(params["encryption_key"]) != 32 { if params["encryption_key"] != "" && len(params["encryption_key"]) != 32 {
return nil, NewError(fmt.Sprintf("Encryption key needs to be 32 characters (current: %d)", len(params["encryption_key"])), 400) return nil, NewError(fmt.Sprintf("Encryption key needs to be 32 characters (current: %d)", len(params["encryption_key"])), 400)
} }
if params["region"] == "" { if params["region"] == "" {
params["region"] = "us-east-2" params["region"] = "us-east-2"
if strings.HasSuffix(params["endpoint"], ".cloudflarestorage.com") { if strings.HasSuffix(params["endpoint"], ".cloudflarestorage.com") {
@ -57,7 +57,6 @@ func (s S3Backend) Init(params map[string]string, app *App) (IBackend, error) {
SessionToken: params["session_token"], SessionToken: params["session_token"],
}}) }})
} }
if params["role_arn"] != "" { if params["role_arn"] != "" {
sessOptions := session.Options{Config: aws.Config{Region: aws.String(params["region"])}} sessOptions := session.Options{Config: aws.Config{Region: aws.String(params["region"])}}
creds = append(creds, &stscreds.AssumeRoleProvider{ creds = append(creds, &stscreds.AssumeRoleProvider{
@ -66,7 +65,6 @@ func (s S3Backend) Init(params map[string]string, app *App) (IBackend, error) {
Duration: stscreds.DefaultDuration, Duration: stscreds.DefaultDuration,
}) })
} }
creds = append( creds = append(
creds, creds,
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.Must(session.NewSession()))}, &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.Must(session.NewSession()))},
@ -81,17 +79,23 @@ func (s S3Backend) Init(params map[string]string, app *App) (IBackend, error) {
if params["endpoint"] != "" { if params["endpoint"] != "" {
config.Endpoint = aws.String(params["endpoint"]) config.Endpoint = aws.String(params["endpoint"])
} }
threadSize, err := strconv.Atoi(params["number_thread"])
if err != nil {
threadSize = 50
} else if threadSize > 5000 || threadSize < 1 {
threadSize = 2
}
backend := &S3Backend{ backend := &S3Backend{
config: config, config: config,
params: params, params: params,
client: s3.New(session.New(config)), client: s3.New(session.New(config)),
context: app.Context, context: app.Context,
threadSize: threadSize,
} }
return backend, nil return backend, nil
} }
func (s S3Backend) LoginForm() Form { func (this S3Backend) LoginForm() Form {
return Form{ return Form{
Elmnts: []FormElement{ Elmnts: []FormElement{
FormElement{ FormElement{
@ -113,7 +117,22 @@ func (s S3Backend) LoginForm() Form {
Name: "advanced", Name: "advanced",
Type: "enable", Type: "enable",
Placeholder: "Advanced", Placeholder: "Advanced",
Target: []string{"s3_role_arn", "s3_path", "s3_session_token", "s3_encryption_key", "s3_region", "s3_endpoint"}, Target: []string{
"s3_region", "s3_endpoint", "s3_role_arn", "s3_session_token",
"s3_path", "s3_encryption_key", "s3_number_thread",
},
},
FormElement{
Id: "s3_region",
Name: "region",
Type: "text",
Placeholder: "Region",
},
FormElement{
Id: "s3_endpoint",
Name: "endpoint",
Type: "text",
Placeholder: "Endpoint",
}, },
FormElement{ FormElement{
Id: "s3_role_arn", Id: "s3_role_arn",
@ -140,22 +159,16 @@ func (s S3Backend) LoginForm() Form {
Placeholder: "Encryption Key", Placeholder: "Encryption Key",
}, },
FormElement{ FormElement{
Id: "s3_region", Id: "s3_number_thread",
Name: "region", Name: "number_thread",
Type: "text", Type: "text",
Placeholder: "Region", Placeholder: "Num. Thread",
},
FormElement{
Id: "s3_endpoint",
Name: "endpoint",
Type: "text",
Placeholder: "Endpoint",
}, },
}, },
} }
} }
func (s S3Backend) Meta(path string) Metadata { func (this S3Backend) Meta(path string) Metadata {
if path == "/" { if path == "/" {
return Metadata{ return Metadata{
CanCreateFile: NewBool(false), CanCreateFile: NewBool(false),
@ -167,12 +180,11 @@ func (s S3Backend) Meta(path string) Metadata {
return Metadata{} return Metadata{}
} }
func (s S3Backend) Ls(path string) (files []os.FileInfo, err error) { func (this S3Backend) Ls(path string) (files []os.FileInfo, err error) {
files = make([]os.FileInfo, 0) files = make([]os.FileInfo, 0)
p := s.path(path) p := this.path(path)
if p.bucket == "" { if p.bucket == "" {
b, err := s.client.ListBuckets(&s3.ListBucketsInput{}) b, err := this.client.ListBuckets(&s3.ListBucketsInput{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -186,11 +198,9 @@ func (s S3Backend) Ls(path string) (files []os.FileInfo, err error) {
} }
return files, nil return files, nil
} }
client := s3.New(s.createSession(p.bucket)) client := s3.New(this.createSession(p.bucket))
startTime := time.Now()
err = client.ListObjectsV2PagesWithContext( err = client.ListObjectsV2PagesWithContext(
s.context, this.context,
&s3.ListObjectsV2Input{ &s3.ListObjectsV2Input{
Bucket: aws.String(p.bucket), Bucket: aws.String(p.bucket),
Prefix: aws.String(p.path), Prefix: aws.String(p.path),
@ -214,27 +224,22 @@ func (s S3Backend) Ls(path string) (files []os.FileInfo, err error) {
FType: "directory", FType: "directory",
}) })
} }
if time.Since(startTime) > ls_timeout() {
Log.Debug("plg_backend_s3::ls timeout triggered after getting %d files", len(files))
return false
}
return aws.BoolValue(objs.IsTruncated) return aws.BoolValue(objs.IsTruncated)
}) },
)
return files, err return files, err
} }
func (s S3Backend) Cat(path string) (io.ReadCloser, error) { func (this S3Backend) Cat(path string) (io.ReadCloser, error) {
p := s.path(path) p := this.path(path)
client := s3.New(s.createSession(p.bucket)) client := s3.New(this.createSession(p.bucket))
input := &s3.GetObjectInput{ input := &s3.GetObjectInput{
Bucket: aws.String(p.bucket), Bucket: aws.String(p.bucket),
Key: aws.String(p.path), Key: aws.String(p.path),
} }
if s.params["encryption_key"] != "" { if this.params["encryption_key"] != "" {
input.SSECustomerAlgorithm = aws.String("AES256") input.SSECustomerAlgorithm = aws.String("AES256")
input.SSECustomerKey = aws.String(s.params["encryption_key"]) input.SSECustomerKey = aws.String(this.params["encryption_key"])
} }
obj, err := client.GetObject(input) obj, err := client.GetObject(input)
if err != nil { if err != nil {
@ -254,14 +259,12 @@ func (s S3Backend) Cat(path string) (io.ReadCloser, error) {
} }
return nil, err return nil, err
} }
return obj.Body, nil return obj.Body, nil
} }
func (s S3Backend) Mkdir(path string) error { func (this S3Backend) Mkdir(path string) error {
p := s.path(path) p := this.path(path)
client := s3.New(s.createSession(p.bucket)) client := s3.New(this.createSession(p.bucket))
if p.path == "" { if p.path == "" {
_, err := client.CreateBucket(&s3.CreateBucketInput{ _, err := client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(path), Bucket: aws.String(path),
@ -275,87 +278,102 @@ func (s S3Backend) Mkdir(path string) error {
return err return err
} }
func (s S3Backend) Rm(path string) error { func (this S3Backend) Rm(path string) error {
p := s.path(path) p := this.path(path)
client := s3.New(s.createSession(p.bucket)) client := s3.New(this.createSession(p.bucket))
if p.bucket == "" { if p.bucket == "" {
return ErrNotFound return ErrNotFound
} else if strings.HasSuffix(path, "/") == false { }
// CASE 1: remove a file
if strings.HasSuffix(path, "/") == false {
_, err := client.DeleteObject(&s3.DeleteObjectInput{ _, err := client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(p.bucket), Bucket: aws.String(p.bucket),
Key: aws.String(p.path), Key: aws.String(p.path),
}) })
return err return err
} }
for { // CASE 2: remove a folder
objs, err := client.ListObjects(&s3.ListObjectsInput{ jobChan := make(chan S3Path, this.threadSize)
Bucket: aws.String(p.bucket), errChan := make(chan error, this.threadSize)
Prefix: aws.String(p.path), ctx, cancel := context.WithCancel(this.context)
Delimiter: aws.String("/"), var wg sync.WaitGroup
}) for i := 1; i <= this.threadSize; i++ {
if err != nil { wg.Add(1)
return err go func() {
} for spath := range jobChan {
for _, obj := range objs.Contents { if ctx.Err() != nil {
_, err := client.DeleteObject(&s3.DeleteObjectInput{ continue
Bucket: aws.String(p.bucket), }
Key: obj.Key, if _, err := client.DeleteObject(&s3.DeleteObjectInput{
}) Bucket: aws.String(spath.bucket),
if err != nil { Key: aws.String(spath.path),
return err }); err != nil {
cancel()
errChan <- err
}
} }
} wg.Done()
for _, pref := range objs.CommonPrefixes { }()
s.Rm("/" + p.bucket + "/" + *pref.Prefix) }
_, err := client.DeleteObject(&s3.DeleteObjectInput{ err := client.ListObjectsV2PagesWithContext(
Bucket: aws.String(p.bucket), this.context,
Key: pref.Prefix, &s3.ListObjectsV2Input{
}) Bucket: aws.String(p.bucket),
if err != nil { Prefix: aws.String(p.path),
return err },
} func(objs *s3.ListObjectsV2Output, lastPage bool) bool {
} if ctx.Err() != nil {
if !aws.BoolValue(objs.IsTruncated) { return false
break }
} for _, object := range objs.Contents {
jobChan <- S3Path{p.bucket, *object.Key}
}
return aws.BoolValue(objs.IsTruncated)
},
)
close(jobChan)
wg.Wait()
close(errChan)
if err != nil {
return err
}
for err := range errChan {
return err
} }
if p.path == "" { if p.path == "" {
_, err := client.DeleteBucket(&s3.DeleteBucketInput{ _, err := client.DeleteBucket(&s3.DeleteBucketInput{
Bucket: aws.String(p.bucket), Bucket: aws.String(p.bucket),
}) })
return err return err
} }
_, err := client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(p.bucket),
Key: aws.String(p.path),
})
return err return err
} }
func (s S3Backend) Mv(from string, to string) error { func (this S3Backend) Mv(from string, to string) error {
f := s.path(from)
t := s.path(to)
if from == to { if from == to {
return nil return nil
} }
client := s3.New(s.createSession(f.bucket)) f := this.path(from)
t := this.path(to)
client := s3.New(this.createSession(f.bucket))
if f.path == "" { // Rename bucket // CASE 1: Rename a bucket
if f.path == "" {
return ErrNotImplemented return ErrNotImplemented
} else if strings.HasSuffix(from, "/") == false { // Move Single file }
// CASE 2: Rename/Move a file
if strings.HasSuffix(from, "/") == false {
input := &s3.CopyObjectInput{ input := &s3.CopyObjectInput{
CopySource: aws.String(fmt.Sprintf("%s/%s", f.bucket, f.path)),
Bucket: aws.String(t.bucket), Bucket: aws.String(t.bucket),
CopySource: aws.String(fmt.Sprintf("%s/%s", f.bucket, s.urlEncodedPath(f.path))),
Key: aws.String(t.path), Key: aws.String(t.path),
} }
if s.params["encryption_key"] != "" { if this.params["encryption_key"] != "" {
input.CopySourceSSECustomerAlgorithm = aws.String("AES256") input.CopySourceSSECustomerAlgorithm = aws.String("AES256")
input.CopySourceSSECustomerKey = aws.String(s.params["encryption_key"]) input.CopySourceSSECustomerKey = aws.String(this.params["encryption_key"])
input.SSECustomerAlgorithm = aws.String("AES256") input.SSECustomerAlgorithm = aws.String("AES256")
input.SSECustomerKey = aws.String(s.params["encryption_key"]) input.SSECustomerKey = aws.String(this.params["encryption_key"])
} }
_, err := client.CopyObject(input) _, err := client.CopyObject(input)
if err != nil { if err != nil {
return err return err
@ -366,135 +384,142 @@ func (s S3Backend) Mv(from string, to string) error {
}) })
return err return err
} }
// Move recursively files and subfolders // CASE 3: Rename/Move a folder
err := client.ListObjectsV2Pages( jobChan := make(chan []S3Path, this.threadSize)
&s3.ListObjectsV2Input{ errChan := make(chan error, this.threadSize)
Bucket: aws.String(f.bucket), ctx, cancel := context.WithCancel(this.context)
Prefix: aws.String(f.path), var wg sync.WaitGroup
Delimiter: aws.String("/"), for i := 1; i <= this.threadSize; i++ {
}, wg.Add(1)
func(objs *s3.ListObjectsV2Output, lastPage bool) bool { go func() {
for _, obj := range objs.Contents { for spath := range jobChan {
from := fmt.Sprintf("%s/%s", f.bucket, s.urlEncodedPath(*obj.Key)) if ctx.Err() != nil {
toKey := t.path + strings.TrimPrefix(*obj.Key, f.path) continue
}
input := &s3.CopyObjectInput{ input := &s3.CopyObjectInput{
CopySource: aws.String(from), CopySource: aws.String(fmt.Sprintf("%s/%s", spath[0].bucket, spath[0].path)),
Bucket: aws.String(t.bucket), Bucket: aws.String(spath[1].bucket),
Key: aws.String(toKey), Key: aws.String(spath[1].path),
} }
if s.params["encryption_key"] != "" { if this.params["encryption_key"] != "" {
input.CopySourceSSECustomerAlgorithm = aws.String("AES256") input.CopySourceSSECustomerAlgorithm = aws.String("AES256")
input.CopySourceSSECustomerKey = aws.String(s.params["encryption_key"]) input.CopySourceSSECustomerKey = aws.String(this.params["encryption_key"])
input.SSECustomerAlgorithm = aws.String("AES256") input.SSECustomerAlgorithm = aws.String("AES256")
input.SSECustomerKey = aws.String(s.params["encryption_key"]) input.SSECustomerKey = aws.String(this.params["encryption_key"])
} }
Log.Debug("CopyObject(%s, %s):", from, f.bucket+"/"+toKey)
_, err := client.CopyObject(input) _, err := client.CopyObject(input)
if err != nil { if err != nil {
Log.Error("CopyObject from: %s to: %s", cancel()
f.bucket+"/"+*obj.Key, errChan <- err
t.bucket+"/"+t.path+*obj.Key, continue
err)
return false
} }
Log.Debug("DeleteObject(%s):", f.bucket+"/"+*obj.Key)
_, err = client.DeleteObject(&s3.DeleteObjectInput{ _, err = client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(f.bucket), Bucket: aws.String(spath[0].bucket),
Key: obj.Key, Key: aws.String(spath[0].path),
}) })
if err != nil { if err != nil {
Log.Error("DeleteObject failed: %s", *obj.Key, err) cancel()
return false errChan <- err
continue
} }
} }
for _, pref := range objs.CommonPrefixes { wg.Done()
from := fmt.Sprintf("/%s/%s", f.bucket, *pref.Prefix) }()
to := fmt.Sprintf("/%s/%s/%s", t.bucket, t.path, strings.TrimPrefix(*pref.Prefix, f.path)) }
Log.Debug("Mv(%s, %s):", from, to) err := client.ListObjectsV2PagesWithContext(
err := s.Mv(from, to) this.context,
if err != nil { &s3.ListObjectsV2Input{
Log.Error("Mv(%s, %s) failed:", from, to, err) Bucket: aws.String(f.bucket),
return false Prefix: aws.String(f.path),
},
func(objs *s3.ListObjectsV2Output, lastPage bool) bool {
if ctx.Err() != nil {
return false
}
for _, object := range objs.Contents {
jobChan <- []S3Path{
{f.bucket, *object.Key},
{t.bucket, t.path + strings.TrimPrefix(*object.Key, f.path)},
} }
} }
return true return aws.BoolValue(objs.IsTruncated)
}, },
) )
close(jobChan)
wg.Wait()
close(errChan)
if err != nil { if err != nil {
Log.Error("ListObjectsV2Pages failed:", err) return err
} }
return err for err := range errChan {
return err
}
return nil
} }
func (s S3Backend) Touch(path string) error { func (this S3Backend) Touch(path string) error {
p := s.path(path) p := this.path(path)
client := s3.New(s.createSession(p.bucket)) client := s3.New(this.createSession(p.bucket))
if p.bucket == "" { if p.bucket == "" {
return ErrNotValid return ErrNotValid
} }
input := &s3.PutObjectInput{ input := &s3.PutObjectInput{
Body: strings.NewReader(""), Body: strings.NewReader(""),
ContentLength: aws.Int64(0), ContentLength: aws.Int64(0),
Bucket: aws.String(p.bucket), Bucket: aws.String(p.bucket),
Key: aws.String(p.path), Key: aws.String(p.path),
} }
if s.params["encryption_key"] != "" { if this.params["encryption_key"] != "" {
input.SSECustomerAlgorithm = aws.String("AES256") input.SSECustomerAlgorithm = aws.String("AES256")
input.SSECustomerKey = aws.String(s.params["encryption_key"]) input.SSECustomerKey = aws.String(this.params["encryption_key"])
} }
_, err := client.PutObject(input) _, err := client.PutObject(input)
return err return err
} }
func (s S3Backend) Save(path string, file io.Reader) error { func (this S3Backend) Save(path string, file io.Reader) error {
p := s.path(path) p := this.path(path)
if p.bucket == "" { if p.bucket == "" {
return ErrNotValid return ErrNotValid
} }
uploader := s3manager.NewUploader(s.createSession(p.bucket)) uploader := s3manager.NewUploader(this.createSession(p.bucket))
input := s3manager.UploadInput{ input := s3manager.UploadInput{
Body: file, Body: file,
Bucket: aws.String(p.bucket), Bucket: aws.String(p.bucket),
Key: aws.String(p.path), Key: aws.String(p.path),
} }
if s.params["encryption_key"] != "" { if this.params["encryption_key"] != "" {
input.SSECustomerAlgorithm = aws.String("AES256") input.SSECustomerAlgorithm = aws.String("AES256")
input.SSECustomerKey = aws.String(s.params["encryption_key"]) input.SSECustomerKey = aws.String(this.params["encryption_key"])
} }
_, err := uploader.Upload(&input) _, err := uploader.Upload(&input)
return err return err
} }
func (s S3Backend) createSession(bucket string) *session.Session { func (this S3Backend) createSession(bucket string) *session.Session {
newParams := map[string]string{"bucket": bucket} newParams := map[string]string{"bucket": bucket}
for k, v := range s.params { for k, v := range this.params {
newParams[k] = v newParams[k] = v
} }
c := S3Cache.Get(newParams) c := S3Cache.Get(newParams)
if c == nil { if c == nil {
res, err := s.client.GetBucketLocation(&s3.GetBucketLocationInput{ res, err := this.client.GetBucketLocation(&s3.GetBucketLocationInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
}) })
if err != nil { if err != nil {
s.config.Region = aws.String("us-east-1") this.config.Region = aws.String("us-east-1")
} else { } else {
if res.LocationConstraint == nil { if res.LocationConstraint == nil {
s.config.Region = aws.String("us-east-1") this.config.Region = aws.String("us-east-1")
} else { } else {
s.config.Region = res.LocationConstraint this.config.Region = res.LocationConstraint
} }
} }
S3Cache.Set(newParams, s.config.Region) S3Cache.Set(newParams, this.config.Region)
} else { } else {
s.config.Region = c.(*string) this.config.Region = c.(*string)
} }
sess := session.New(this.config)
sess := session.New(s.config)
return sess return sess
} }
@ -513,23 +538,8 @@ func (s S3Backend) path(p string) S3Path {
if len(sp) > 2 { if len(sp) > 2 {
path = strings.Join(sp[2:], "/") path = strings.Join(sp[2:], "/")
} }
return S3Path{ return S3Path{
bucket, bucket,
path, path,
} }
} }
func (s S3Backend) urlEncodedPath(path string) string {
sp := strings.Split(path, "/")
var pathElements []string
for _, x := range sp {
// Compatible with RFC 3986.
endoded := strings.Replace(url.QueryEscape(x), "+", "%20", -1)
pathElements = append(pathElements, endoded)
}
encodedPath := strings.Join(pathElements, "/")
return encodedPath
}