storage.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. package server
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/aws/aws-sdk-go/aws"
  7. "github.com/aws/aws-sdk-go/aws/awserr"
  8. "github.com/aws/aws-sdk-go/aws/session"
  9. "github.com/aws/aws-sdk-go/service/s3"
  10. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  11. "golang.org/x/net/context"
  12. "golang.org/x/oauth2"
  13. "golang.org/x/oauth2/google"
  14. "google.golang.org/api/drive/v3"
  15. "google.golang.org/api/googleapi"
  16. "io"
  17. "io/ioutil"
  18. "log"
  19. "net/http"
  20. "os"
  21. "path/filepath"
  22. "strings"
  23. "time"
  24. "storj.io/common/storj"
  25. "storj.io/uplink"
  26. )
  27. // Storage is the interface for storage operation
  28. type Storage interface {
  29. // Get retrieves a file from storage
  30. Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
  31. // Head retrieves content length of a file from storage
  32. Head(token string, filename string) (contentLength uint64, err error)
  33. // Put saves a file on storage
  34. Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
  35. // Delete removes a file from storage
  36. Delete(token string, filename string) error
  37. // IsNotExist indicates if a file doesn't exist on storage
  38. IsNotExist(err error) bool
  39. // Purge cleans up the storage
  40. Purge(days time.Duration) error
  41. // Type returns the storage type
  42. Type() string
  43. }
  44. // LocalStorage is a local storage
  45. type LocalStorage struct {
  46. Storage
  47. basedir string
  48. logger *log.Logger
  49. }
  50. // NewLocalStorage is the factory for LocalStorage
  51. func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
  52. return &LocalStorage{basedir: basedir, logger: logger}, nil
  53. }
  54. // Type returns the storage type
  55. func (s *LocalStorage) Type() string {
  56. return "local"
  57. }
  58. // Head retrieves content length of a file from storage
  59. func (s *LocalStorage) Head(token string, filename string) (contentLength uint64, err error) {
  60. path := filepath.Join(s.basedir, token, filename)
  61. var fi os.FileInfo
  62. if fi, err = os.Lstat(path); err != nil {
  63. return
  64. }
  65. contentLength = uint64(fi.Size())
  66. return
  67. }
  68. // Get retrieves a file from storage
  69. func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  70. path := filepath.Join(s.basedir, token, filename)
  71. // content type , content length
  72. if reader, err = os.Open(path); err != nil {
  73. return
  74. }
  75. var fi os.FileInfo
  76. if fi, err = os.Lstat(path); err != nil {
  77. return
  78. }
  79. contentLength = uint64(fi.Size())
  80. return
  81. }
  82. // Delete removes a file from storage
  83. func (s *LocalStorage) Delete(token string, filename string) (err error) {
  84. metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
  85. _ = os.Remove(metadata)
  86. path := filepath.Join(s.basedir, token, filename)
  87. err = os.Remove(path)
  88. return
  89. }
  90. // Purge cleans up the storage
  91. func (s *LocalStorage) Purge(days time.Duration) (err error) {
  92. err = filepath.Walk(s.basedir,
  93. func(path string, info os.FileInfo, err error) error {
  94. if err != nil {
  95. return err
  96. }
  97. if info.IsDir() {
  98. return nil
  99. }
  100. if info.ModTime().Before(time.Now().Add(-1 * days)) {
  101. err = os.Remove(path)
  102. return err
  103. }
  104. return nil
  105. })
  106. return
  107. }
  108. // IsNotExist indicates if a file doesn't exist on storage
  109. func (s *LocalStorage) IsNotExist(err error) bool {
  110. if err == nil {
  111. return false
  112. }
  113. return os.IsNotExist(err)
  114. }
  115. // Put saves a file on storage
  116. func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  117. var f io.WriteCloser
  118. var err error
  119. path := filepath.Join(s.basedir, token)
  120. if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
  121. return err
  122. }
  123. if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
  124. return err
  125. }
  126. defer CloseCheck(f.Close)
  127. if _, err = io.Copy(f, reader); err != nil {
  128. return err
  129. }
  130. return nil
  131. }
  132. // S3Storage is a storage backed by AWS S3
  133. type S3Storage struct {
  134. Storage
  135. bucket string
  136. session *session.Session
  137. s3 *s3.S3
  138. logger *log.Logger
  139. purgeDays time.Duration
  140. noMultipart bool
  141. }
  142. // NewS3Storage is the factory for S3Storage
  143. func NewS3Storage(accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
  144. sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
  145. return &S3Storage{
  146. bucket: bucketName,
  147. s3: s3.New(sess),
  148. session: sess,
  149. logger: logger,
  150. noMultipart: disableMultipart,
  151. purgeDays: time.Duration(purgeDays*24) * time.Hour,
  152. }, nil
  153. }
  154. // Type returns the storage type
  155. func (s *S3Storage) Type() string {
  156. return "s3"
  157. }
  158. // Head retrieves content length of a file from storage
  159. func (s *S3Storage) Head(token string, filename string) (contentLength uint64, err error) {
  160. key := fmt.Sprintf("%s/%s", token, filename)
  161. headRequest := &s3.HeadObjectInput{
  162. Bucket: aws.String(s.bucket),
  163. Key: aws.String(key),
  164. }
  165. // content type , content length
  166. response, err := s.s3.HeadObject(headRequest)
  167. if err != nil {
  168. return
  169. }
  170. if response.ContentLength != nil {
  171. contentLength = uint64(*response.ContentLength)
  172. }
  173. return
  174. }
  175. // Purge cleans up the storage
  176. func (s *S3Storage) Purge(days time.Duration) (err error) {
  177. // NOOP expiration is set at upload time
  178. return nil
  179. }
  180. // IsNotExist indicates if a file doesn't exist on storage
  181. func (s *S3Storage) IsNotExist(err error) bool {
  182. if err == nil {
  183. return false
  184. }
  185. if aerr, ok := err.(awserr.Error); ok {
  186. switch aerr.Code() {
  187. case s3.ErrCodeNoSuchKey:
  188. return true
  189. }
  190. }
  191. return false
  192. }
  193. // Get retrieves a file from storage
  194. func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  195. key := fmt.Sprintf("%s/%s", token, filename)
  196. getRequest := &s3.GetObjectInput{
  197. Bucket: aws.String(s.bucket),
  198. Key: aws.String(key),
  199. }
  200. response, err := s.s3.GetObject(getRequest)
  201. if err != nil {
  202. return
  203. }
  204. if response.ContentLength != nil {
  205. contentLength = uint64(*response.ContentLength)
  206. }
  207. reader = response.Body
  208. return
  209. }
  210. // Delete removes a file from storage
  211. func (s *S3Storage) Delete(token string, filename string) (err error) {
  212. metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
  213. deleteRequest := &s3.DeleteObjectInput{
  214. Bucket: aws.String(s.bucket),
  215. Key: aws.String(metadata),
  216. }
  217. _, err = s.s3.DeleteObject(deleteRequest)
  218. if err != nil {
  219. return
  220. }
  221. key := fmt.Sprintf("%s/%s", token, filename)
  222. deleteRequest = &s3.DeleteObjectInput{
  223. Bucket: aws.String(s.bucket),
  224. Key: aws.String(key),
  225. }
  226. _, err = s.s3.DeleteObject(deleteRequest)
  227. return
  228. }
  229. // Put saves a file on storage
  230. func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  231. key := fmt.Sprintf("%s/%s", token, filename)
  232. s.logger.Printf("Uploading file %s to S3 Bucket", filename)
  233. var concurrency int
  234. if !s.noMultipart {
  235. concurrency = 20
  236. } else {
  237. concurrency = 1
  238. }
  239. // Create an uploader with the session and custom options
  240. uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
  241. u.Concurrency = concurrency // default is 5
  242. u.LeavePartsOnError = false
  243. })
  244. var expire *time.Time
  245. if s.purgeDays.Hours() > 0 {
  246. expire = aws.Time(time.Now().Add(s.purgeDays))
  247. }
  248. _, err = uploader.Upload(&s3manager.UploadInput{
  249. Bucket: aws.String(s.bucket),
  250. Key: aws.String(key),
  251. Body: reader,
  252. Expires: expire,
  253. })
  254. return
  255. }
  256. // GDrive is a storage backed by GDrive
  257. type GDrive struct {
  258. service *drive.Service
  259. rootID string
  260. basedir string
  261. localConfigPath string
  262. chunkSize int
  263. logger *log.Logger
  264. }
  265. // NewGDriveStorage is the factory for GDrive
  266. func NewGDriveStorage(clientJSONFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
  267. b, err := ioutil.ReadFile(clientJSONFilepath)
  268. if err != nil {
  269. return nil, err
  270. }
  271. // If modifying these scopes, delete your previously saved client_secret.json.
  272. config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
  273. if err != nil {
  274. return nil, err
  275. }
  276. // ToDo: Upgrade deprecated version
  277. srv, err := drive.New(getGDriveClient(config, localConfigPath, logger)) // nolint: staticcheck
  278. if err != nil {
  279. return nil, err
  280. }
  281. chunkSize = chunkSize * 1024 * 1024
  282. storage := &GDrive{service: srv, basedir: basedir, rootID: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
  283. err = storage.setupRoot()
  284. if err != nil {
  285. return nil, err
  286. }
  287. return storage, nil
  288. }
  289. const gdriveRootConfigFile = "root_id.conf"
  290. const gdriveTokenJSONFile = "token.json"
  291. const gdriveDirectoryMimeType = "application/vnd.google-apps.folder"
  292. func (s *GDrive) setupRoot() error {
  293. rootFileConfig := filepath.Join(s.localConfigPath, gdriveRootConfigFile)
  294. rootID, err := ioutil.ReadFile(rootFileConfig)
  295. if err != nil && !os.IsNotExist(err) {
  296. return err
  297. }
  298. if string(rootID) != "" {
  299. s.rootID = string(rootID)
  300. return nil
  301. }
  302. dir := &drive.File{
  303. Name: s.basedir,
  304. MimeType: gdriveDirectoryMimeType,
  305. }
  306. di, err := s.service.Files.Create(dir).Fields("id").Do()
  307. if err != nil {
  308. return err
  309. }
  310. s.rootID = di.Id
  311. err = ioutil.WriteFile(rootFileConfig, []byte(s.rootID), os.FileMode(0600))
  312. if err != nil {
  313. return err
  314. }
  315. return nil
  316. }
  317. func (s *GDrive) hasChecksum(f *drive.File) bool {
  318. return f.Md5Checksum != ""
  319. }
  320. func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
  321. return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
  322. }
  323. func (s *GDrive) findID(filename string, token string) (string, error) {
  324. filename = strings.Replace(filename, `'`, `\'`, -1)
  325. filename = strings.Replace(filename, `"`, `\"`, -1)
  326. fileID, tokenID, nextPageToken := "", "", ""
  327. q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootID, token, gdriveDirectoryMimeType)
  328. l, err := s.list(nextPageToken, q)
  329. if err != nil {
  330. return "", err
  331. }
  332. for 0 < len(l.Files) {
  333. for _, fi := range l.Files {
  334. tokenID = fi.Id
  335. break
  336. }
  337. if l.NextPageToken == "" {
  338. break
  339. }
  340. l, err = s.list(l.NextPageToken, q)
  341. if err != nil {
  342. return "", err
  343. }
  344. }
  345. if filename == "" {
  346. return tokenID, nil
  347. } else if tokenID == "" {
  348. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  349. }
  350. q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenID, filename, gdriveDirectoryMimeType)
  351. l, err = s.list(nextPageToken, q)
  352. if err != nil {
  353. return "", err
  354. }
  355. for 0 < len(l.Files) {
  356. for _, fi := range l.Files {
  357. fileID = fi.Id
  358. break
  359. }
  360. if l.NextPageToken == "" {
  361. break
  362. }
  363. l, err = s.list(l.NextPageToken, q)
  364. if err != nil {
  365. return "", err
  366. }
  367. }
  368. if fileID == "" {
  369. return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
  370. }
  371. return fileID, nil
  372. }
  373. // Type returns the storage type
  374. func (s *GDrive) Type() string {
  375. return "gdrive"
  376. }
  377. // Head retrieves content length of a file from storage
  378. func (s *GDrive) Head(token string, filename string) (contentLength uint64, err error) {
  379. var fileID string
  380. fileID, err = s.findID(filename, token)
  381. if err != nil {
  382. return
  383. }
  384. var fi *drive.File
  385. if fi, err = s.service.Files.Get(fileID).Fields("size").Do(); err != nil {
  386. return
  387. }
  388. contentLength = uint64(fi.Size)
  389. return
  390. }
  391. // Get retrieves a file from storage
  392. func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  393. var fileID string
  394. fileID, err = s.findID(filename, token)
  395. if err != nil {
  396. return
  397. }
  398. var fi *drive.File
  399. fi, err = s.service.Files.Get(fileID).Fields("size", "md5Checksum").Do()
  400. if err != nil {
  401. return
  402. }
  403. if !s.hasChecksum(fi) {
  404. err = fmt.Errorf("Cannot find file %s/%s", token, filename)
  405. return
  406. }
  407. contentLength = uint64(fi.Size)
  408. ctx := context.Background()
  409. var res *http.Response
  410. res, err = s.service.Files.Get(fileID).Context(ctx).Download()
  411. if err != nil {
  412. return
  413. }
  414. reader = res.Body
  415. return
  416. }
  417. // Delete removes a file from storage
  418. func (s *GDrive) Delete(token string, filename string) (err error) {
  419. metadata, _ := s.findID(fmt.Sprintf("%s.metadata", filename), token)
  420. _ = s.service.Files.Delete(metadata).Do()
  421. var fileID string
  422. fileID, err = s.findID(filename, token)
  423. if err != nil {
  424. return
  425. }
  426. err = s.service.Files.Delete(fileID).Do()
  427. return
  428. }
  429. // Purge cleans up the storage
  430. func (s *GDrive) Purge(days time.Duration) (err error) {
  431. nextPageToken := ""
  432. expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339)
  433. q := fmt.Sprintf("'%s' in parents and modifiedTime < '%s' and mimeType!='%s' and trashed=false", s.rootID, expirationDate, gdriveDirectoryMimeType)
  434. l, err := s.list(nextPageToken, q)
  435. if err != nil {
  436. return err
  437. }
  438. for 0 < len(l.Files) {
  439. for _, fi := range l.Files {
  440. err = s.service.Files.Delete(fi.Id).Do()
  441. if err != nil {
  442. return
  443. }
  444. }
  445. if l.NextPageToken == "" {
  446. break
  447. }
  448. l, err = s.list(l.NextPageToken, q)
  449. if err != nil {
  450. return
  451. }
  452. }
  453. return
  454. }
  455. // IsNotExist indicates if a file doesn't exist on storage
  456. func (s *GDrive) IsNotExist(err error) bool {
  457. if err == nil {
  458. return false
  459. }
  460. if e, ok := err.(*googleapi.Error); ok {
  461. return e.Code == http.StatusNotFound
  462. }
  463. return false
  464. }
  465. // Put saves a file on storage
  466. func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
  467. dirID, err := s.findID("", token)
  468. if err != nil {
  469. return err
  470. }
  471. if dirID == "" {
  472. dir := &drive.File{
  473. Name: token,
  474. Parents: []string{s.rootID},
  475. MimeType: gdriveDirectoryMimeType,
  476. }
  477. di, err := s.service.Files.Create(dir).Fields("id").Do()
  478. if err != nil {
  479. return err
  480. }
  481. dirID = di.Id
  482. }
  483. // Instantiate empty drive file
  484. dst := &drive.File{
  485. Name: filename,
  486. Parents: []string{dirID},
  487. MimeType: contentType,
  488. }
  489. ctx := context.Background()
  490. _, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
  491. if err != nil {
  492. return err
  493. }
  494. return nil
  495. }
  496. // Retrieve a token, saves the token, then returns the generated client.
  497. func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
  498. tokenFile := filepath.Join(localConfigPath, gdriveTokenJSONFile)
  499. tok, err := gDriveTokenFromFile(tokenFile)
  500. if err != nil {
  501. tok = getGDriveTokenFromWeb(config, logger)
  502. saveGDriveToken(tokenFile, tok, logger)
  503. }
  504. return config.Client(context.Background(), tok)
  505. }
  506. // Request a token from the web, then returns the retrieved token.
  507. func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
  508. authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
  509. fmt.Printf("Go to the following link in your browser then type the "+
  510. "authorization code: \n%v\n", authURL)
  511. var authCode string
  512. if _, err := fmt.Scan(&authCode); err != nil {
  513. logger.Fatalf("Unable to read authorization code %v", err)
  514. }
  515. tok, err := config.Exchange(context.TODO(), authCode)
  516. if err != nil {
  517. logger.Fatalf("Unable to retrieve token from web %v", err)
  518. }
  519. return tok
  520. }
  521. // Retrieves a token from a local file.
  522. func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
  523. f, err := os.Open(file)
  524. defer CloseCheck(f.Close)
  525. if err != nil {
  526. return nil, err
  527. }
  528. tok := &oauth2.Token{}
  529. err = json.NewDecoder(f).Decode(tok)
  530. return tok, err
  531. }
  532. // Saves a token to a file path.
  533. func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
  534. logger.Printf("Saving credential file to: %s\n", path)
  535. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
  536. defer CloseCheck(f.Close)
  537. if err != nil {
  538. logger.Fatalf("Unable to cache oauth token: %v", err)
  539. }
  540. err = json.NewEncoder(f).Encode(token)
  541. if err != nil {
  542. logger.Fatalf("Unable to encode oauth token: %v", err)
  543. }
  544. }
  545. // StorjStorage is a storage backed by Storj
  546. type StorjStorage struct {
  547. Storage
  548. project *uplink.Project
  549. bucket *uplink.Bucket
  550. purgeDays time.Duration
  551. logger *log.Logger
  552. }
  553. // NewStorjStorage is the factory for StorjStorage
  554. func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) {
  555. var instance StorjStorage
  556. var err error
  557. ctx := context.TODO()
  558. parsedAccess, err := uplink.ParseAccess(access)
  559. if err != nil {
  560. return nil, err
  561. }
  562. instance.project, err = uplink.OpenProject(ctx, parsedAccess)
  563. if err != nil {
  564. return nil, err
  565. }
  566. instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
  567. if err != nil {
  568. //Ignoring the error to return the one that occurred first, but try to clean up.
  569. _ = instance.project.Close()
  570. return nil, err
  571. }
  572. instance.purgeDays = time.Duration(purgeDays*24) * time.Hour
  573. instance.logger = logger
  574. return &instance, nil
  575. }
  576. // Type returns the storage type
  577. func (s *StorjStorage) Type() string {
  578. return "storj"
  579. }
  580. // Head retrieves content length of a file from storage
  581. func (s *StorjStorage) Head(token string, filename string) (contentLength uint64, err error) {
  582. key := storj.JoinPaths(token, filename)
  583. ctx := context.TODO()
  584. obj, err := s.project.StatObject(ctx, s.bucket.Name, key)
  585. if err != nil {
  586. return 0, err
  587. }
  588. contentLength = uint64(obj.System.ContentLength)
  589. return
  590. }
  591. // Get retrieves a file from storage
  592. func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
  593. key := storj.JoinPaths(token, filename)
  594. s.logger.Printf("Getting file %s from Storj Bucket", filename)
  595. ctx := context.TODO()
  596. download, err := s.project.DownloadObject(ctx, s.bucket.Name, key, nil)
  597. if err != nil {
  598. return nil, 0, err
  599. }
  600. contentLength = uint64(download.Info().System.ContentLength)
  601. reader = download
  602. return
  603. }
  604. // Delete removes a file from storage
  605. func (s *StorjStorage) Delete(token string, filename string) (err error) {
  606. key := storj.JoinPaths(token, filename)
  607. s.logger.Printf("Deleting file %s from Storj Bucket", filename)
  608. ctx := context.TODO()
  609. _, err = s.project.DeleteObject(ctx, s.bucket.Name, key)
  610. return
  611. }
  612. // Purge cleans up the storage
  613. func (s *StorjStorage) Purge(days time.Duration) (err error) {
  614. // NOOP expiration is set at upload time
  615. return nil
  616. }
  617. // Put saves a file on storage
  618. func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
  619. key := storj.JoinPaths(token, filename)
  620. s.logger.Printf("Uploading file %s to Storj Bucket", filename)
  621. ctx := context.TODO()
  622. var uploadOptions *uplink.UploadOptions
  623. if s.purgeDays.Hours() > 0 {
  624. uploadOptions = &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)}
  625. }
  626. writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, uploadOptions)
  627. if err != nil {
  628. return err
  629. }
  630. n, err := io.Copy(writer, reader)
  631. if err != nil || uint64(n) != contentLength {
  632. //Ignoring the error to return the one that occurred first, but try to clean up.
  633. _ = writer.Abort()
  634. return err
  635. }
  636. err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"content-type": contentType})
  637. if err != nil {
  638. //Ignoring the error to return the one that occurred first, but try to clean up.
  639. _ = writer.Abort()
  640. return err
  641. }
  642. err = writer.Commit()
  643. return err
  644. }
  645. // IsNotExist indicates if a file doesn't exist on storage
  646. func (s *StorjStorage) IsNotExist(err error) bool {
  647. return errors.Is(err, uplink.ErrObjectNotFound)
  648. }