From def45d5ad56b4eb4d8db9538dcebfafd5fb8cee1 Mon Sep 17 00:00:00 2001 From: MickaelK Date: Mon, 27 Nov 2023 00:25:28 +1100 Subject: [PATCH] feature (s3): multithread on s3 plugin --- server/plugin/plg_backend_s3/config.go | 26 -- server/plugin/plg_backend_s3/index.go | 372 +++++++++++++------------ 2 files changed, 191 insertions(+), 207 deletions(-) delete mode 100644 server/plugin/plg_backend_s3/config.go diff --git a/server/plugin/plg_backend_s3/config.go b/server/plugin/plg_backend_s3/config.go deleted file mode 100644 index b7791c16..00000000 --- a/server/plugin/plg_backend_s3/config.go +++ /dev/null @@ -1,26 +0,0 @@ -package plg_backend_s3 - -import ( - . "github.com/mickael-kerjean/filestash/server/common" - "time" -) - -var ls_timeout func() time.Duration - -func init() { - ls_timeout = func() time.Duration { - return time.Duration(Config.Get("features.protection.ls_timeout").Schema(func(f *FormElement) *FormElement { - if f == nil { - f = &FormElement{} - } - f.Default = 2 - f.Name = "ls_timeout" - f.Type = "number" - f.Target = []string{} - f.Description = "failsafe timeout for listing files under a folder" - f.Placeholder = "Default: 2" - return f - }).Int()) * time.Second - } - ls_timeout() -} diff --git a/server/plugin/plg_backend_s3/index.go b/server/plugin/plg_backend_s3/index.go index cce0b0fd..13a955ff 100644 --- a/server/plugin/plg_backend_s3/index.go +++ b/server/plugin/plg_backend_s3/index.go @@ -3,6 +3,8 @@ package plg_backend_s3 import ( "context" "fmt" + "strconv" + "sync" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -17,20 +19,19 @@ import ( . "github.com/mickael-kerjean/filestash/server/common" "io" - "net/url" "os" "path/filepath" "strings" - "time" ) var S3Cache AppCache type S3Backend struct { - client *s3.S3 - config *aws.Config - params map[string]string - context context.Context + client *s3.S3 + config *aws.Config + params map[string]string + context context.Context + threadSize int } func init() { @@ -38,11 +39,10 @@ func init() { S3Cache = NewAppCache(2, 1) } -func (s S3Backend) Init(params map[string]string, app *App) (IBackend, error) { +func (this S3Backend) Init(params map[string]string, app *App) (IBackend, error) { if params["encryption_key"] != "" && len(params["encryption_key"]) != 32 { return nil, NewError(fmt.Sprintf("Encryption key needs to be 32 characters (current: %d)", len(params["encryption_key"])), 400) } - if params["region"] == "" { params["region"] = "us-east-2" if strings.HasSuffix(params["endpoint"], ".cloudflarestorage.com") { @@ -57,7 +57,6 @@ func (s S3Backend) Init(params map[string]string, app *App) (IBackend, error) { SessionToken: params["session_token"], }}) } - if params["role_arn"] != "" { sessOptions := session.Options{Config: aws.Config{Region: aws.String(params["region"])}} creds = append(creds, &stscreds.AssumeRoleProvider{ @@ -66,7 +65,6 @@ func (s S3Backend) Init(params map[string]string, app *App) (IBackend, error) { Duration: stscreds.DefaultDuration, }) } - creds = append( creds, &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.Must(session.NewSession()))}, @@ -81,17 +79,23 @@ func (s S3Backend) Init(params map[string]string, app *App) (IBackend, error) { if params["endpoint"] != "" { config.Endpoint = aws.String(params["endpoint"]) } - + threadSize, err := strconv.Atoi(params["number_thread"]) + if err != nil { + threadSize = 50 + } else if threadSize > 5000 || threadSize < 1 { + threadSize = 2 + } backend := &S3Backend{ - config: config, - params: params, - client: s3.New(session.New(config)), - context: app.Context, + config: config, + params: params, + client: s3.New(session.New(config)), + context: app.Context, + threadSize: threadSize, } return backend, nil } -func (s S3Backend) LoginForm() Form { +func (this S3Backend) LoginForm() Form { return Form{ Elmnts: []FormElement{ FormElement{ @@ -113,7 +117,22 @@ func (s S3Backend) LoginForm() Form { Name: "advanced", Type: "enable", Placeholder: "Advanced", - Target: []string{"s3_role_arn", "s3_path", "s3_session_token", "s3_encryption_key", "s3_region", "s3_endpoint"}, + Target: []string{ + "s3_region", "s3_endpoint", "s3_role_arn", "s3_session_token", + "s3_path", "s3_encryption_key", "s3_number_thread", + }, + }, + FormElement{ + Id: "s3_region", + Name: "region", + Type: "text", + Placeholder: "Region", + }, + FormElement{ + Id: "s3_endpoint", + Name: "endpoint", + Type: "text", + Placeholder: "Endpoint", }, FormElement{ Id: "s3_role_arn", @@ -140,22 +159,16 @@ func (s S3Backend) LoginForm() Form { Placeholder: "Encryption Key", }, FormElement{ - Id: "s3_region", - Name: "region", + Id: "s3_number_thread", + Name: "number_thread", Type: "text", - Placeholder: "Region", - }, - FormElement{ - Id: "s3_endpoint", - Name: "endpoint", - Type: "text", - Placeholder: "Endpoint", + Placeholder: "Num. Thread", }, }, } } -func (s S3Backend) Meta(path string) Metadata { +func (this S3Backend) Meta(path string) Metadata { if path == "/" { return Metadata{ CanCreateFile: NewBool(false), @@ -167,12 +180,11 @@ func (s S3Backend) Meta(path string) Metadata { return Metadata{} } -func (s S3Backend) Ls(path string) (files []os.FileInfo, err error) { +func (this S3Backend) Ls(path string) (files []os.FileInfo, err error) { files = make([]os.FileInfo, 0) - p := s.path(path) - + p := this.path(path) if p.bucket == "" { - b, err := s.client.ListBuckets(&s3.ListBucketsInput{}) + b, err := this.client.ListBuckets(&s3.ListBucketsInput{}) if err != nil { return nil, err } @@ -186,11 +198,9 @@ func (s S3Backend) Ls(path string) (files []os.FileInfo, err error) { } return files, nil } - client := s3.New(s.createSession(p.bucket)) - - startTime := time.Now() + client := s3.New(this.createSession(p.bucket)) err = client.ListObjectsV2PagesWithContext( - s.context, + this.context, &s3.ListObjectsV2Input{ Bucket: aws.String(p.bucket), Prefix: aws.String(p.path), @@ -214,27 +224,22 @@ func (s S3Backend) Ls(path string) (files []os.FileInfo, err error) { FType: "directory", }) } - - if time.Since(startTime) > ls_timeout() { - Log.Debug("plg_backend_s3::ls timeout triggered after getting %d files", len(files)) - return false - } return aws.BoolValue(objs.IsTruncated) - }) + }, + ) return files, err } -func (s S3Backend) Cat(path string) (io.ReadCloser, error) { - p := s.path(path) - client := s3.New(s.createSession(p.bucket)) - +func (this S3Backend) Cat(path string) (io.ReadCloser, error) { + p := this.path(path) + client := s3.New(this.createSession(p.bucket)) input := &s3.GetObjectInput{ Bucket: aws.String(p.bucket), Key: aws.String(p.path), } - if s.params["encryption_key"] != "" { + if this.params["encryption_key"] != "" { input.SSECustomerAlgorithm = aws.String("AES256") - input.SSECustomerKey = aws.String(s.params["encryption_key"]) + input.SSECustomerKey = aws.String(this.params["encryption_key"]) } obj, err := client.GetObject(input) if err != nil { @@ -254,14 +259,12 @@ func (s S3Backend) Cat(path string) (io.ReadCloser, error) { } return nil, err } - return obj.Body, nil } -func (s S3Backend) Mkdir(path string) error { - p := s.path(path) - client := s3.New(s.createSession(p.bucket)) - +func (this S3Backend) Mkdir(path string) error { + p := this.path(path) + client := s3.New(this.createSession(p.bucket)) if p.path == "" { _, err := client.CreateBucket(&s3.CreateBucketInput{ Bucket: aws.String(path), @@ -275,87 +278,102 @@ func (s S3Backend) Mkdir(path string) error { return err } -func (s S3Backend) Rm(path string) error { - p := s.path(path) - client := s3.New(s.createSession(p.bucket)) +func (this S3Backend) Rm(path string) error { + p := this.path(path) + client := s3.New(this.createSession(p.bucket)) if p.bucket == "" { return ErrNotFound - } else if strings.HasSuffix(path, "/") == false { + } + // CASE 1: remove a file + if strings.HasSuffix(path, "/") == false { _, err := client.DeleteObject(&s3.DeleteObjectInput{ Bucket: aws.String(p.bucket), Key: aws.String(p.path), }) return err } - for { - objs, err := client.ListObjects(&s3.ListObjectsInput{ - Bucket: aws.String(p.bucket), - Prefix: aws.String(p.path), - Delimiter: aws.String("/"), - }) - if err != nil { - return err - } - for _, obj := range objs.Contents { - _, err := client.DeleteObject(&s3.DeleteObjectInput{ - Bucket: aws.String(p.bucket), - Key: obj.Key, - }) - if err != nil { - return err + // CASE 2: remove a folder + jobChan := make(chan S3Path, this.threadSize) + errChan := make(chan error, this.threadSize) + ctx, cancel := context.WithCancel(this.context) + var wg sync.WaitGroup + for i := 1; i <= this.threadSize; i++ { + wg.Add(1) + go func() { + for spath := range jobChan { + if ctx.Err() != nil { + continue + } + if _, err := client.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(spath.bucket), + Key: aws.String(spath.path), + }); err != nil { + cancel() + errChan <- err + } } - } - for _, pref := range objs.CommonPrefixes { - s.Rm("/" + p.bucket + "/" + *pref.Prefix) - _, err := client.DeleteObject(&s3.DeleteObjectInput{ - Bucket: aws.String(p.bucket), - Key: pref.Prefix, - }) - if err != nil { - return err - } - } - if !aws.BoolValue(objs.IsTruncated) { - break - } + wg.Done() + }() + } + err := client.ListObjectsV2PagesWithContext( + this.context, + &s3.ListObjectsV2Input{ + Bucket: aws.String(p.bucket), + Prefix: aws.String(p.path), + }, + func(objs *s3.ListObjectsV2Output, lastPage bool) bool { + if ctx.Err() != nil { + return false + } + for _, object := range objs.Contents { + jobChan <- S3Path{p.bucket, *object.Key} + } + return aws.BoolValue(objs.IsTruncated) + }, + ) + close(jobChan) + wg.Wait() + close(errChan) + if err != nil { + return err + } + for err := range errChan { + return err } - if p.path == "" { _, err := client.DeleteBucket(&s3.DeleteBucketInput{ Bucket: aws.String(p.bucket), }) return err } - _, err := client.DeleteObject(&s3.DeleteObjectInput{ - Bucket: aws.String(p.bucket), - Key: aws.String(p.path), - }) return err } -func (s S3Backend) Mv(from string, to string) error { - f := s.path(from) - t := s.path(to) +func (this S3Backend) Mv(from string, to string) error { if from == to { return nil } - client := s3.New(s.createSession(f.bucket)) + f := this.path(from) + t := this.path(to) + client := s3.New(this.createSession(f.bucket)) - if f.path == "" { // Rename bucket + // CASE 1: Rename a bucket + if f.path == "" { return ErrNotImplemented - } else if strings.HasSuffix(from, "/") == false { // Move Single file + } + // CASE 2: Rename/Move a file + if strings.HasSuffix(from, "/") == false { input := &s3.CopyObjectInput{ + CopySource: aws.String(fmt.Sprintf("%s/%s", f.bucket, f.path)), Bucket: aws.String(t.bucket), - CopySource: aws.String(fmt.Sprintf("%s/%s", f.bucket, s.urlEncodedPath(f.path))), Key: aws.String(t.path), } - if s.params["encryption_key"] != "" { + if this.params["encryption_key"] != "" { input.CopySourceSSECustomerAlgorithm = aws.String("AES256") - input.CopySourceSSECustomerKey = aws.String(s.params["encryption_key"]) + input.CopySourceSSECustomerKey = aws.String(this.params["encryption_key"]) input.SSECustomerAlgorithm = aws.String("AES256") - input.SSECustomerKey = aws.String(s.params["encryption_key"]) + input.SSECustomerKey = aws.String(this.params["encryption_key"]) } - _, err := client.CopyObject(input) if err != nil { return err @@ -366,135 +384,142 @@ func (s S3Backend) Mv(from string, to string) error { }) return err } - // Move recursively files and subfolders - err := client.ListObjectsV2Pages( - &s3.ListObjectsV2Input{ - Bucket: aws.String(f.bucket), - Prefix: aws.String(f.path), - Delimiter: aws.String("/"), - }, - func(objs *s3.ListObjectsV2Output, lastPage bool) bool { - for _, obj := range objs.Contents { - from := fmt.Sprintf("%s/%s", f.bucket, s.urlEncodedPath(*obj.Key)) - toKey := t.path + strings.TrimPrefix(*obj.Key, f.path) + // CASE 3: Rename/Move a folder + jobChan := make(chan []S3Path, this.threadSize) + errChan := make(chan error, this.threadSize) + ctx, cancel := context.WithCancel(this.context) + var wg sync.WaitGroup + for i := 1; i <= this.threadSize; i++ { + wg.Add(1) + go func() { + for spath := range jobChan { + if ctx.Err() != nil { + continue + } input := &s3.CopyObjectInput{ - CopySource: aws.String(from), - Bucket: aws.String(t.bucket), - Key: aws.String(toKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", spath[0].bucket, spath[0].path)), + Bucket: aws.String(spath[1].bucket), + Key: aws.String(spath[1].path), } - if s.params["encryption_key"] != "" { + if this.params["encryption_key"] != "" { input.CopySourceSSECustomerAlgorithm = aws.String("AES256") - input.CopySourceSSECustomerKey = aws.String(s.params["encryption_key"]) + input.CopySourceSSECustomerKey = aws.String(this.params["encryption_key"]) input.SSECustomerAlgorithm = aws.String("AES256") - input.SSECustomerKey = aws.String(s.params["encryption_key"]) + input.SSECustomerKey = aws.String(this.params["encryption_key"]) } - - Log.Debug("CopyObject(%s, %s):", from, f.bucket+"/"+toKey) _, err := client.CopyObject(input) if err != nil { - Log.Error("CopyObject from: %s to: %s", - f.bucket+"/"+*obj.Key, - t.bucket+"/"+t.path+*obj.Key, - err) - return false + cancel() + errChan <- err + continue } - - Log.Debug("DeleteObject(%s):", f.bucket+"/"+*obj.Key) _, err = client.DeleteObject(&s3.DeleteObjectInput{ - Bucket: aws.String(f.bucket), - Key: obj.Key, + Bucket: aws.String(spath[0].bucket), + Key: aws.String(spath[0].path), }) if err != nil { - Log.Error("DeleteObject failed: %s", *obj.Key, err) - return false + cancel() + errChan <- err + continue } } - for _, pref := range objs.CommonPrefixes { - from := fmt.Sprintf("/%s/%s", f.bucket, *pref.Prefix) - to := fmt.Sprintf("/%s/%s/%s", t.bucket, t.path, strings.TrimPrefix(*pref.Prefix, f.path)) - Log.Debug("Mv(%s, %s):", from, to) - err := s.Mv(from, to) - if err != nil { - Log.Error("Mv(%s, %s) failed:", from, to, err) - return false + wg.Done() + }() + } + err := client.ListObjectsV2PagesWithContext( + this.context, + &s3.ListObjectsV2Input{ + Bucket: aws.String(f.bucket), + Prefix: aws.String(f.path), + }, + func(objs *s3.ListObjectsV2Output, lastPage bool) bool { + if ctx.Err() != nil { + return false + } + for _, object := range objs.Contents { + jobChan <- []S3Path{ + {f.bucket, *object.Key}, + {t.bucket, t.path + strings.TrimPrefix(*object.Key, f.path)}, } } - return true + return aws.BoolValue(objs.IsTruncated) }, ) + close(jobChan) + wg.Wait() + close(errChan) if err != nil { - Log.Error("ListObjectsV2Pages failed:", err) + return err } - return err + for err := range errChan { + return err + } + return nil } -func (s S3Backend) Touch(path string) error { - p := s.path(path) - client := s3.New(s.createSession(p.bucket)) - +func (this S3Backend) Touch(path string) error { + p := this.path(path) + client := s3.New(this.createSession(p.bucket)) if p.bucket == "" { return ErrNotValid } - input := &s3.PutObjectInput{ Body: strings.NewReader(""), ContentLength: aws.Int64(0), Bucket: aws.String(p.bucket), Key: aws.String(p.path), } - if s.params["encryption_key"] != "" { + if this.params["encryption_key"] != "" { input.SSECustomerAlgorithm = aws.String("AES256") - input.SSECustomerKey = aws.String(s.params["encryption_key"]) + input.SSECustomerKey = aws.String(this.params["encryption_key"]) } _, err := client.PutObject(input) return err } -func (s S3Backend) Save(path string, file io.Reader) error { - p := s.path(path) - +func (this S3Backend) Save(path string, file io.Reader) error { + p := this.path(path) if p.bucket == "" { return ErrNotValid } - uploader := s3manager.NewUploader(s.createSession(p.bucket)) + uploader := s3manager.NewUploader(this.createSession(p.bucket)) input := s3manager.UploadInput{ Body: file, Bucket: aws.String(p.bucket), Key: aws.String(p.path), } - if s.params["encryption_key"] != "" { + if this.params["encryption_key"] != "" { input.SSECustomerAlgorithm = aws.String("AES256") - input.SSECustomerKey = aws.String(s.params["encryption_key"]) + input.SSECustomerKey = aws.String(this.params["encryption_key"]) } _, err := uploader.Upload(&input) return err } -func (s S3Backend) createSession(bucket string) *session.Session { +func (this S3Backend) createSession(bucket string) *session.Session { newParams := map[string]string{"bucket": bucket} - for k, v := range s.params { + for k, v := range this.params { newParams[k] = v } c := S3Cache.Get(newParams) if c == nil { - res, err := s.client.GetBucketLocation(&s3.GetBucketLocationInput{ + res, err := this.client.GetBucketLocation(&s3.GetBucketLocationInput{ Bucket: aws.String(bucket), }) if err != nil { - s.config.Region = aws.String("us-east-1") + this.config.Region = aws.String("us-east-1") } else { if res.LocationConstraint == nil { - s.config.Region = aws.String("us-east-1") + this.config.Region = aws.String("us-east-1") } else { - s.config.Region = res.LocationConstraint + this.config.Region = res.LocationConstraint } } - S3Cache.Set(newParams, s.config.Region) + S3Cache.Set(newParams, this.config.Region) } else { - s.config.Region = c.(*string) + this.config.Region = c.(*string) } - - sess := session.New(s.config) + sess := session.New(this.config) return sess } @@ -513,23 +538,8 @@ func (s S3Backend) path(p string) S3Path { if len(sp) > 2 { path = strings.Join(sp[2:], "/") } - return S3Path{ bucket, path, } } - -func (s S3Backend) urlEncodedPath(path string) string { - sp := strings.Split(path, "/") - - var pathElements []string - for _, x := range sp { - // Compatible with RFC 3986. - endoded := strings.Replace(url.QueryEscape(x), "+", "%20", -1) - pathElements = append(pathElements, endoded) - } - - encodedPath := strings.Join(pathElements, "/") - return encodedPath -}