s3.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package storage
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "time"
  9. "github.com/aws/aws-sdk-go-v2/aws"
  10. "github.com/aws/aws-sdk-go-v2/config"
  11. "github.com/aws/aws-sdk-go-v2/credentials"
  12. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  13. "github.com/aws/aws-sdk-go-v2/service/s3"
  14. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  15. )
  16. // S3Storage is a storage backed by AWS S3
  17. type S3Storage struct {
  18. Storage
  19. bucket string
  20. s3 *s3.Client
  21. logger *log.Logger
  22. purgeDays time.Duration
  23. noMultipart bool
  24. }
  25. // NewS3Storage is the factory for S3Storage
  26. func NewS3Storage(ctx context.Context, accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
  27. cfg, err := getAwsConfig(ctx, accessKey, secretKey)
  28. if err != nil {
  29. return nil, err
  30. }
  31. client := s3.NewFromConfig(cfg, func(o *s3.Options) {
  32. o.Region = region
  33. o.UsePathStyle = forcePathStyle
  34. if len(endpoint) > 0 {
  35. o.EndpointResolver = s3.EndpointResolverFromURL(endpoint)
  36. }
  37. })
  38. return &S3Storage{
  39. bucket: bucketName,
  40. s3: client,
  41. logger: logger,
  42. noMultipart: disableMultipart,
  43. purgeDays: time.Duration(purgeDays*24) * time.Hour,
  44. }, nil
  45. }
  46. // Type returns the storage type
  47. func (s *S3Storage) Type() string {
  48. return "s3"
  49. }
  50. // Head retrieves content length of a file from storage
  51. func (s *S3Storage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
  52. key := fmt.Sprintf("%s/%s", token, filename)
  53. headRequest := &s3.HeadObjectInput{
  54. Bucket: aws.String(s.bucket),
  55. Key: aws.String(key),
  56. }
  57. // content type , content length
  58. response, err := s.s3.HeadObject(ctx, headRequest)
  59. if err != nil {
  60. return
  61. }
  62. contentLength = uint64(response.ContentLength)
  63. return
  64. }
  65. // Purge cleans up the storage
  66. func (s *S3Storage) Purge(context.Context, time.Duration) (err error) {
  67. // NOOP expiration is set at upload time
  68. return nil
  69. }
  70. // IsNotExist indicates if a file doesn't exist on storage
  71. func (s *S3Storage) IsNotExist(err error) bool {
  72. if err == nil {
  73. return false
  74. }
  75. var nkerr *types.NoSuchKey
  76. return errors.As(err, &nkerr)
  77. }
  78. // Get retrieves a file from storage
  79. func (s *S3Storage) Get(ctx context.Context, token string, filename string, rng *Range) (reader io.ReadCloser, contentLength uint64, err error) {
  80. key := fmt.Sprintf("%s/%s", token, filename)
  81. getRequest := &s3.GetObjectInput{
  82. Bucket: aws.String(s.bucket),
  83. Key: aws.String(key),
  84. }
  85. if rng != nil {
  86. getRequest.Range = aws.String(rng.Range())
  87. }
  88. response, err := s.s3.GetObject(ctx, getRequest)
  89. if err != nil {
  90. return
  91. }
  92. contentLength = uint64(response.ContentLength)
  93. if rng != nil && response.ContentRange != nil {
  94. rng.SetContentRange(*response.ContentRange)
  95. }
  96. reader = response.Body
  97. return
  98. }
  99. // Delete removes a file from storage
  100. func (s *S3Storage) Delete(ctx context.Context, token string, filename string) (err error) {
  101. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  102. deleteRequest := &s3.DeleteObjectInput{
  103. Bucket: aws.String(s.bucket),
  104. Key: aws.String(metadata),
  105. }
  106. _, err = s.s3.DeleteObject(ctx, deleteRequest)
  107. if err != nil {
  108. return
  109. }
  110. key := fmt.Sprintf("%s/%s", token, filename)
  111. deleteRequest = &s3.DeleteObjectInput{
  112. Bucket: aws.String(s.bucket),
  113. Key: aws.String(key),
  114. }
  115. _, err = s.s3.DeleteObject(ctx, deleteRequest)
  116. return
  117. }
  118. // Put saves a file on storage
  119. func (s *S3Storage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, _ uint64) (err error) {
  120. key := fmt.Sprintf("%s/%s", token, filename)
  121. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  122. var concurrency int
  123. if !s.noMultipart {
  124. concurrency = 20
  125. } else {
  126. concurrency = 1
  127. }
  128. // Create an uploader with the session and custom options
  129. uploader := manager.NewUploader(s.s3, func(u *manager.Uploader) {
  130. u.Concurrency = concurrency // default is 5
  131. u.LeavePartsOnError = false
  132. })
  133. var expire *time.Time
  134. if s.purgeDays.Hours() > 0 {
  135. expire = aws.Time(time.Now().Add(s.purgeDays))
  136. }
  137. _, err = uploader.Upload(ctx, &s3.PutObjectInput{
  138. Bucket: aws.String(s.bucket),
  139. Key: aws.String(key),
  140. Body: reader,
  141. Expires: expire,
  142. ContentType: aws.String(contentType),
  143. })
  144. return
  145. }
  146. func (s *S3Storage) IsRangeSupported() bool { return true }
  147. func getAwsConfig(ctx context.Context, accessKey, secretKey string) (aws.Config, error) {
  148. return config.LoadDefaultConfig(ctx,
  149. config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
  150. Value: aws.Credentials{
  151. AccessKeyID: accessKey,
  152. SecretAccessKey: secretKey,
  153. SessionToken: "",
  154. },
  155. }),
  156. )
  157. }