attachment.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. from email.message import EmailMessage
  2. from email.mime.base import MIMEBase
  3. from email.mime.text import MIMEText
  4. from email.mime.multipart import MIMEMultipart
  5. from email.mime.application import MIMEApplication
  6. import io
  7. from pathlib import Path, PurePath
  8. from typing import Union
  9. from .utils import PIL, plt, pd
  10. class Attachments:
  11. def __init__(self, attachments:Union[list, dict], encoding='UTF-8'):
  12. self.attachments = attachments
  13. self.encoding = encoding
  14. def attach(self, msg:EmailMessage):
  15. if msg.get_content_type() == "text/plain":
  16. # We need to change the content type
  17. # This occurs if no body is defined or only text is defined
  18. # The content type is therefore changed to multipart/mixed
  19. # See issue #23
  20. msg.make_mixed()
  21. for part in self._get_parts():
  22. msg.attach(part)
  23. def _get_parts(self):
  24. if isinstance(self.attachments, dict):
  25. for name, cont in self.attachments.items():
  26. yield self._get_part_named(cont, name=name)
  27. elif isinstance(self.attachments, (list, set, tuple)):
  28. for cont in self.attachments:
  29. yield self._get_part(cont)
  30. else:
  31. # A single attachment
  32. yield self._get_part(self.attachments)
  33. def _get_part(self, item) -> MIMEBase:
  34. cont = self._get_bytes(item)
  35. filename = self._get_filename(item)
  36. part = MIMEApplication(cont)
  37. part.add_header(
  38. "Content-Disposition",
  39. "attachment", filename=filename
  40. )
  41. part.add_header('Content-Transfer-Encoding', 'base64')
  42. return part
  43. def _get_part_named(self, item, name) -> MIMEBase:
  44. cont = self._get_bytes_named(item, name)
  45. part = MIMEApplication(cont)
  46. part.add_header(
  47. "Content-Disposition",
  48. "attachment", filename=name
  49. )
  50. return part
  51. def _get_bytes(self, item) -> bytes:
  52. if isinstance(item, str):
  53. # Considered as path
  54. if Path(item).is_file():
  55. return Path(item).read_bytes()
  56. else:
  57. raise ValueError(f"Unknown attachment '{item}'. Perhaps a mistyped path?")
  58. elif isinstance(item, PurePath):
  59. return item.read_bytes()
  60. else:
  61. raise TypeError(f"Unknown attachment {type(item)}")
  62. def _get_bytes_named(self, item, name:str) -> bytes:
  63. has_pandas = pd is not None
  64. has_pillow = PIL is not None
  65. has_matplotlib = plt is not None
  66. if isinstance(item, str):
  67. # Considered as raw document
  68. return item
  69. elif isinstance(item, PurePath):
  70. return item.read_bytes()
  71. elif has_pandas and isinstance(item, (pd.DataFrame, pd.Series)):
  72. buff = io.BytesIO()
  73. if name.endswith(".xlsx"):
  74. item.to_excel(buff)
  75. return buff.getvalue()
  76. elif name.endswith(".csv"):
  77. return item.to_csv().encode(self.encoding)
  78. elif name.endswith(".html"):
  79. return item.to_html().encode(self.encoding)
  80. elif name.endswith('.txt'):
  81. return str(item)
  82. else:
  83. raise ValueError(f"Unknown dataframe conversion for '{name}'")
  84. elif isinstance(item, (bytes, bytearray)):
  85. return item
  86. elif has_pillow and isinstance(item, PIL.Image.Image):
  87. buf = io.BytesIO()
  88. item.save(buf, format='PNG')
  89. buf.seek(0)
  90. return buf.read()
  91. elif has_matplotlib and isinstance(item, plt.Figure):
  92. buf = io.BytesIO()
  93. item.savefig(buf, format=Path(name).suffix[1:])
  94. buf.seek(0)
  95. return buf.read()
  96. else:
  97. raise TypeError(f"Unknown attachment {type(item)} ({name})")
  98. def _get_filename(self, item):
  99. if isinstance(item, str):
  100. # Considered as path
  101. # NOTE: the validation it is a file should already be
  102. # done
  103. return Path(item).name
  104. elif isinstance(item, PurePath):
  105. return item.name
  106. else: # pragma: no cover
  107. # NOTE: this piece of code should not be run as the type check is
  108. # done before. If this is run, then there is an unfinished feature.
  109. raise NotImplementedError(f"Cannot figure out filename for {item}")