media.py 6.9 KB


  1. __filename__ = "media.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.1.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. from blurhash import blurhash_encode as blurencode
  9. from PIL import Image
  10. import numpy
  11. import os
  12. import datetime
  13. from hashlib import sha1
  14. from auth import createPassword
  15. from shutil import copyfile
  16. from shutil import rmtree
  17. from shutil import move
  18. def replaceYouTube(postJsonObject: {}, replacementDomain: str) -> None:
  19. """Replace YouTube with a replacement domain
  20. This denies Google some, but not all, tracking data
  21. """
  22. if not replacementDomain:
  23. return
  24. if not isinstance(postJsonObject['object'], dict):
  25. return
  26. if not postJsonObject['object'].get('content'):
  27. return
  28. if 'www.youtube.com' not in postJsonObject['object']['content']:
  29. return
  30. postJsonObject['object']['content'] = \
  31. postJsonObject['object']['content'].replace('www.youtube.com',
  32. replacementDomain)
  33. def removeMetaData(imageFilename: str, outputFilename: str) -> None:
  34. """Attempts to do this with pure python didn't work well,
  35. so better to use a dedicated tool if one is installed
  36. """
  37. copyfile(imageFilename, outputFilename)
  38. if not os.path.isfile(outputFilename):
  39. print('ERROR: unable to remove metadata from ' + imageFilename)
  40. return
  41. if os.path.isfile('/usr/bin/exiftool'):
  42. print('Removing metadata from ' + outputFilename + ' using exiftool')
  43. os.system('exiftool -all= ' + outputFilename) # nosec
  44. elif os.path.isfile('/usr/bin/mogrify'):
  45. print('Removing metadata from ' + outputFilename + ' using mogrify')
  46. os.system('/usr/bin/mogrify -strip ' + outputFilename) # nosec
  47. def getImageHash(imageFilename: str) -> str:
  48. return blurencode(numpy.array(Image.open(imageFilename).convert("RGB")))
  49. def isMedia(imageFilename: str) -> bool:
  50. permittedMedia = ('png', 'jpg', 'gif', 'webp',
  51. 'mp4', 'ogv', 'mp3', 'ogg')
  52. for m in permittedMedia:
  53. if imageFilename.endswith('.' + m):
  54. return True
  55. print('WARN: ' + imageFilename + ' is not a permitted media type')
  56. return False
  57. def createMediaDirs(baseDir: str, mediaPath: str) -> None:
  58. if not os.path.isdir(baseDir + '/media'):
  59. os.mkdir(baseDir + '/media')
  60. if not os.path.isdir(baseDir + '/' + mediaPath):
  61. os.mkdir(baseDir + '/' + mediaPath)
  62. def getMediaPath() -> str:
  63. currTime = datetime.datetime.utcnow()
  64. weeksSinceEpoch = int((currTime - datetime.datetime(1970, 1, 1)).days / 7)
  65. return 'media/' + str(weeksSinceEpoch)
  66. def getAttachmentMediaType(filename: str) -> str:
  67. """Returns the type of media for the given file
  68. image, video or audio
  69. """
  70. mediaType = None
  71. imageTypes = ('png', 'jpg', 'jpeg',
  72. 'gif', 'webp')
  73. for mType in imageTypes:
  74. if filename.endswith('.' + mType):
  75. return 'image'
  76. videoTypes = ('mp4', 'webm', 'ogv')
  77. for mType in videoTypes:
  78. if filename.endswith('.' + mType):
  79. return 'video'
  80. audioTypes = ('mp3', 'ogg')
  81. for mType in audioTypes:
  82. if filename.endswith('.' + mType):
  83. return 'audio'
  84. return mediaType
  85. def updateEtag(mediaFilename: str) -> None:
  86. """ calculate the etag, which is a sha1 of the data
  87. """
  88. # only create etags for media
  89. if '/media/' not in mediaFilename:
  90. return
  91. # check that the media exists
  92. if not os.path.isfile(mediaFilename):
  93. return
  94. # read the binary data
  95. data = None
  96. try:
  97. with open(mediaFilename, 'rb') as mediaFile:
  98. data = mediaFile.read()
  99. except BaseException:
  100. pass
  101. if not data:
  102. return
  103. # calculate hash
  104. etag = sha1(data).hexdigest() # nosec
  105. # save the hash
  106. try:
  107. with open(mediaFilename + '.etag', 'w+') as etagFile:
  108. etagFile.write(etag)
  109. except BaseException:
  110. pass
  111. def attachMedia(baseDir: str, httpPrefix: str, domain: str, port: int,
  112. postJson: {}, imageFilename: str,
  113. mediaType: str, description: str,
  114. useBlurhash: bool) -> {}:
  115. """Attaches media to a json object post
  116. The description can be None
  117. Blurhash is optional, since low power systems may take a long
  118. time to calculate it
  119. """
  120. if not isMedia(imageFilename):
  121. return postJson
  122. fileExtension = None
  123. acceptedTypes = ('png', 'jpg', 'gif', 'webp',
  124. 'mp4', 'webm', 'ogv', 'mp3', 'ogg')
  125. for mType in acceptedTypes:
  126. if imageFilename.endswith('.' + mType):
  127. if mType == 'jpg':
  128. mType = 'jpeg'
  129. if mType == 'mp3':
  130. mType = 'mpeg'
  131. fileExtension = mType
  132. if not fileExtension:
  133. return postJson
  134. mediaType = mediaType + '/' + fileExtension
  135. print('Attached media type: ' + mediaType)
  136. if fileExtension == 'jpeg':
  137. fileExtension = 'jpg'
  138. if mediaType == 'audio/mpeg':
  139. fileExtension = 'mp3'
  140. if port:
  141. if port != 80 and port != 443:
  142. if ':' not in domain:
  143. domain = domain + ':' + str(port)
  144. mPath = getMediaPath()
  145. mediaPath = mPath + '/' + createPassword(32) + '.' + fileExtension
  146. if baseDir:
  147. createMediaDirs(baseDir, mPath)
  148. mediaFilename = baseDir + '/' + mediaPath
  149. attachmentJson = {
  150. 'mediaType': mediaType,
  151. 'name': description,
  152. 'type': 'Document',
  153. 'url': httpPrefix + '://' + domain + '/' + mediaPath
  154. }
  155. if mediaType.startswith('image/'):
  156. attachmentJson['focialPoint'] = [0.0, 0.0]
  157. if useBlurhash:
  158. attachmentJson['blurhash'] = getImageHash(imageFilename)
  159. postJson['attachment'] = [attachmentJson]
  160. if baseDir:
  161. if mediaType.startswith('image/'):
  162. removeMetaData(imageFilename, mediaFilename)
  163. else:
  164. copyfile(imageFilename, mediaFilename)
  165. updateEtag(mediaFilename)
  166. return postJson
  167. def archiveMedia(baseDir: str, archiveDirectory: str, maxWeeks=4) -> None:
  168. """Any media older than the given number of weeks gets archived
  169. """
  170. currTime = datetime.datetime.utcnow()
  171. weeksSinceEpoch = int((currTime - datetime.datetime(1970, 1, 1)).days/7)
  172. minWeek = weeksSinceEpoch-maxWeeks
  173. if archiveDirectory:
  174. if not os.path.isdir(archiveDirectory):
  175. os.mkdir(archiveDirectory)
  176. if not os.path.isdir(archiveDirectory + '/media'):
  177. os.mkdir(archiveDirectory + '/media')
  178. for subdir, dirs, files in os.walk(baseDir + '/media'):
  179. for weekDir in dirs:
  180. if int(weekDir) < minWeek:
  181. if archiveDirectory:
  182. move(os.path.join(baseDir + '/media', weekDir),
  183. archiveDirectory + '/media')
  184. else:
  185. # archive to /dev/null
  186. rmtree(os.path.join(baseDir + '/media', weekDir))