123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- __filename__ = "media.py"
- __author__ = "Bob Mottram"
- __license__ = "AGPL3+"
- __version__ = "1.1.0"
- __maintainer__ = "Bob Mottram"
- __email__ = "bob@freedombone.net"
- __status__ = "Production"
- from blurhash import blurhash_encode as blurencode
- from PIL import Image
- import numpy
- import os
- import sys
- import json
- import datetime
- from hashlib import sha1
- from auth import createPassword
- from shutil import copyfile
- from shutil import rmtree
- from shutil import move
- def removeMetaData(imageFilename: str,outputFilename: str) -> None:
- imageFile = open(imageFilename)
- image = Image.open(imageFilename)
- if not image:
- return
- data = list(image.getdata())
- if not data:
- return
- imageWithoutExif = Image.new(image.mode, image.size)
- imageWithoutExif.putdata(data)
- imageWithoutExif.save(outputFilename)
- def getImageHash(imageFilename: str) -> str:
- return blurencode(numpy.array(Image.open(imageFilename).convert("RGB")))
- def isMedia(imageFilename: str) -> bool:
- permittedMedia=['png','jpg','gif','webp','mp4','ogv','mp3','ogg']
- for m in permittedMedia:
- if imageFilename.endswith('.'+m):
- return True
- print('WARN: '+imageFilename+' is not a permitted media type')
- return False
- def createMediaDirs(baseDir: str,mediaPath: str) -> None:
- if not os.path.isdir(baseDir+'/media'):
- os.mkdir(baseDir+'/media')
- if not os.path.isdir(baseDir+'/'+mediaPath):
- os.mkdir(baseDir+'/'+mediaPath)
- def getMediaPath() -> str:
- currTime=datetime.datetime.utcnow()
- weeksSinceEpoch=int((currTime - datetime.datetime(1970,1,1)).days/7)
- return 'media/'+str(weeksSinceEpoch)
- def getAttachmentMediaType(filename: str) -> str:
- """Returns the type of media for the given file
- image, video or audio
- """
- mediaType=None
- imageTypes=['png','jpg','jpeg','gif','webp']
- for mType in imageTypes:
- if filename.endswith('.'+mType):
- return 'image'
- videoTypes=['mp4','webm','ogv']
- for mType in videoTypes:
- if filename.endswith('.'+mType):
- return 'video'
- audioTypes=['mp3','ogg']
- for mType in audioTypes:
- if filename.endswith('.'+mType):
- return 'audio'
- return mediaType
- def updateEtag(mediaFilename: str) -> None:
- """ calculate the etag, which is a sha1 of the data
- """
- # only create etags for media
- if '/media/' not in mediaFilename:
- return
- # check that the media exists
- if not os.path.isfile(mediaFilename):
- return
- # read the binary data
- data=None
- try:
- with open(mediaFilename, 'rb') as mediaFile:
- data=mediaFile.read()
- except:
- pass
- if not data:
- return
- # calculate hash
- etag=sha1(data).hexdigest()
- # save the hash
- try:
- with open(mediaFilename+'.etag', 'w') as etagFile:
- etagFile.write(etag)
- except:
- pass
- def attachMedia(baseDir: str,httpPrefix: str,domain: str,port: int, \
- postJson: {},imageFilename: str, \
- mediaType: str,description: str, \
- useBlurhash: bool) -> {}:
- """Attaches media to a json object post
- The description can be None
- Blurhash is optional, since low power systems may take a long time to calculate it
- """
- if not isMedia(imageFilename):
- return postJson
-
- fileExtension=None
- acceptedTypes=['png','jpg','gif','webp','mp4','webm','ogv','mp3','ogg']
- for mType in acceptedTypes:
- if imageFilename.endswith('.'+mType):
- if mType=='jpg':
- mType='jpeg'
- if mType=='mp3':
- mType='mpeg'
- fileExtension=mType
- if not fileExtension:
- return postJson
- mediaType=mediaType+'/'+fileExtension
- print('Attached media type: '+mediaType)
- if fileExtension=='jpeg':
- fileExtension='jpg'
- if mediaType=='audio/mpeg':
- fileExtension='mp3'
- if port:
- if port!=80 and port!=443:
- if ':' not in domain:
- domain=domain+':'+str(port)
- mPath=getMediaPath()
- mediaPath=mPath+'/'+createPassword(32)+'.'+fileExtension
- if baseDir:
- createMediaDirs(baseDir,mPath)
- mediaFilename=baseDir+'/'+mediaPath
- attachmentJson={
- 'mediaType': mediaType,
- 'name': description,
- 'type': 'Document',
- 'url': httpPrefix+'://'+domain+'/'+mediaPath
- }
- if useBlurhash and mediaType.startswith('image/'):
- attachmentJson['blurhash']=getImageHash(imageFilename)
- postJson['attachment']=[attachmentJson]
- if baseDir:
- if mediaType=='image':
- removeMetaData(imageFilename,mediaFilename)
- else:
- copyfile(imageFilename,mediaFilename)
- updateEtag(mediaFilename)
- return postJson
- def archiveMedia(baseDir: str,archiveDirectory: str,maxWeeks=4) -> None:
- """Any media older than the given number of weeks gets archived
- """
- currTime=datetime.datetime.utcnow()
- weeksSinceEpoch=int((currTime - datetime.datetime(1970,1,1)).days/7)
- minWeek=weeksSinceEpoch-maxWeeks
- if archiveDirectory:
- if not os.path.isdir(archiveDirectory):
- os.mkdir(archiveDirectory)
- if not os.path.isdir(archiveDirectory+'/media'):
- os.mkdir(archiveDirectory+'/media')
-
- for subdir, dirs, files in os.walk(baseDir+'/media'):
- for weekDir in dirs:
- if int(weekDir)<minWeek:
- if archiveDirectory:
- move(os.path.join(baseDir+'/media', weekDir),archiveDirectory+'/media')
- else:
- # archive to /dev/null
- rmtree(os.path.join(baseDir+'/media', weekDir))
|