| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package storage
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "log"
- "time"
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/config"
- "github.com/aws/aws-sdk-go-v2/credentials"
- "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
- "github.com/aws/aws-sdk-go-v2/service/s3"
- "github.com/aws/aws-sdk-go-v2/service/s3/types"
- )
- // S3Storage is a storage backed by AWS S3
- type S3Storage struct {
- Storage
- bucket string
- s3 *s3.Client
- logger *log.Logger
- purgeDays time.Duration
- noMultipart bool
- }
- // NewS3Storage is the factory for S3Storage
- func NewS3Storage(ctx context.Context, accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
- cfg, err := getAwsConfig(ctx, accessKey, secretKey)
- if err != nil {
- return nil, err
- }
- client := s3.NewFromConfig(cfg, func(o *s3.Options) {
- o.Region = region
- o.UsePathStyle = forcePathStyle
- if len(endpoint) > 0 {
- o.EndpointResolver = s3.EndpointResolverFromURL(endpoint)
- }
- })
- return &S3Storage{
- bucket: bucketName,
- s3: client,
- logger: logger,
- noMultipart: disableMultipart,
- purgeDays: time.Duration(purgeDays*24) * time.Hour,
- }, nil
- }
- // Type returns the storage type
- func (s *S3Storage) Type() string {
- return "s3"
- }
- // Head retrieves content length of a file from storage
- func (s *S3Storage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
- key := fmt.Sprintf("%s/%s", token, filename)
- headRequest := &s3.HeadObjectInput{
- Bucket: aws.String(s.bucket),
- Key: aws.String(key),
- }
- // content type , content length
- response, err := s.s3.HeadObject(ctx, headRequest)
- if err != nil {
- return
- }
- contentLength = uint64(response.ContentLength)
- return
- }
- // Purge cleans up the storage
- func (s *S3Storage) Purge(context.Context, time.Duration) (err error) {
- // NOOP expiration is set at upload time
- return nil
- }
- // IsNotExist indicates if a file doesn't exist on storage
- func (s *S3Storage) IsNotExist(err error) bool {
- if err == nil {
- return false
- }
- var nkerr *types.NoSuchKey
- return errors.As(err, &nkerr)
- }
- // Get retrieves a file from storage
- func (s *S3Storage) Get(ctx context.Context, token string, filename string, rng *Range) (reader io.ReadCloser, contentLength uint64, err error) {
- key := fmt.Sprintf("%s/%s", token, filename)
- getRequest := &s3.GetObjectInput{
- Bucket: aws.String(s.bucket),
- Key: aws.String(key),
- }
- if rng != nil {
- getRequest.Range = aws.String(rng.Range())
- }
- response, err := s.s3.GetObject(ctx, getRequest)
- if err != nil {
- return
- }
- contentLength = uint64(response.ContentLength)
- if rng != nil && response.ContentRange != nil {
- rng.SetContentRange(*response.ContentRange)
- }
- reader = response.Body
- return
- }
- // Delete removes a file from storage
- func (s *S3Storage) Delete(ctx context.Context, token string, filename string) (err error) {
- metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
- deleteRequest := &s3.DeleteObjectInput{
- Bucket: aws.String(s.bucket),
- Key: aws.String(metadata),
- }
- _, err = s.s3.DeleteObject(ctx, deleteRequest)
- if err != nil {
- return
- }
- key := fmt.Sprintf("%s/%s", token, filename)
- deleteRequest = &s3.DeleteObjectInput{
- Bucket: aws.String(s.bucket),
- Key: aws.String(key),
- }
- _, err = s.s3.DeleteObject(ctx, deleteRequest)
- return
- }
- // Put saves a file on storage
- func (s *S3Storage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, _ uint64) (err error) {
- key := fmt.Sprintf("%s/%s", token, filename)
- s.logger.Printf("Uploading file %s to S3 Bucket", filename)
- var concurrency int
- if !s.noMultipart {
- concurrency = 20
- } else {
- concurrency = 1
- }
- // Create an uploader with the session and custom options
- uploader := manager.NewUploader(s.s3, func(u *manager.Uploader) {
- u.Concurrency = concurrency // default is 5
- u.LeavePartsOnError = false
- })
- var expire *time.Time
- if s.purgeDays.Hours() > 0 {
- expire = aws.Time(time.Now().Add(s.purgeDays))
- }
- _, err = uploader.Upload(ctx, &s3.PutObjectInput{
- Bucket: aws.String(s.bucket),
- Key: aws.String(key),
- Body: reader,
- Expires: expire,
- ContentType: aws.String(contentType),
- })
- return
- }
- func (s *S3Storage) IsRangeSupported() bool { return true }
- func getAwsConfig(ctx context.Context, accessKey, secretKey string) (aws.Config, error) {
- return config.LoadDefaultConfig(ctx,
- config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
- Value: aws.Credentials{
- AccessKeyID: accessKey,
- SecretAccessKey: secretKey,
- SessionToken: "",
- },
- }),
- )
- }
|