123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- __filename__ = "capabilities.py"
- __author__ = "Bob Mottram"
- __license__ = "AGPL3+"
- __version__ = "1.0.0"
- __maintainer__ = "Bob Mottram"
- __email__ = "bob@freedombone.net"
- __status__ = "Production"
- import os
- import datetime
- import time
- import json
- from auth import createPassword
- from utils import getNicknameFromActor
- from utils import getDomainFromActor
- from utils import loadJson
- from utils import saveJson
- def getOcapFilename(baseDir :str,nickname: str,domain: str,actor :str,subdir: str) -> str:
- """Returns the filename for a particular capability accepted or granted
- Also creates directories as needed
- """
- if not actor:
- return None
- if ':' in domain:
- domain=domain.split(':')[0]
- if not os.path.isdir(baseDir+'/accounts'):
- os.mkdir(baseDir+'/accounts')
- ocDir=baseDir+'/accounts/'+nickname+'@'+domain
- if not os.path.isdir(ocDir):
- os.mkdir(ocDir)
- ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap'
- if not os.path.isdir(ocDir):
- os.mkdir(ocDir)
- ocDir=baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir
- if not os.path.isdir(ocDir):
- os.mkdir(ocDir)
- return baseDir+'/accounts/'+nickname+'@'+domain+'/ocap/'+subdir+'/'+actor.replace('/','#')+'.json'
- def CapablePost(postJson: {}, capabilityList: [], debug :bool) -> bool:
- """Determines whether a post arriving in the inbox
- should be accepted accoring to the list of capabilities
- """
- if postJson.get('type'):
- # No announces/repeats
- if postJson['type']=='Announce':
- if 'inbox:noannounce' in capabilityList:
- if debug:
- print('DEBUG: inbox post rejected because inbox:noannounce')
- return False
- # No likes
- if postJson['type']=='Like':
- if 'inbox:nolike' in capabilityList:
- if debug:
- print('DEBUG: inbox post rejected because inbox:nolike')
- return False
- if postJson['type']=='Create':
- if postJson.get('object'):
- # Does this have a reply?
- if postJson['object'].get('inReplyTo'):
- if postJson['object']['inReplyTo']:
- if 'inbox:noreply' in capabilityList:
- if debug:
- print('DEBUG: inbox post rejected because inbox:noreply')
- return False
- # are content warnings enforced?
- if postJson['object'].get('sensitive'):
- if not postJson['object']['sensitive']:
- if 'inbox:cw' in capabilityList:
- if debug:
- print('DEBUG: inbox post rejected because inbox:cw')
- return False
- # content warning must have non-zero summary
- if postJson['object'].get('summary'):
- if len(postJson['object']['summary'])<2:
- if 'inbox:cw' in capabilityList:
- if debug:
- print('DEBUG: inbox post rejected because inbox:cw, summary missing')
- return False
- if 'inbox:write' in capabilityList:
- return True
- return True
- def capabilitiesRequest(baseDir: str,httpPrefix: str,domain: str, \
- requestedActor: str, \
- requestedCaps=["inbox:write","objects:read"]) -> {}:
- # This is sent to the capabilities endpoint /caps/new
- # which could be instance wide or for a particular person
- # This could also be added to a follow activity
- ocapId=createPassword(32)
- ocapRequest = {
- "@context": "https://www.w3.org/ns/activitystreams",
- "id": httpPrefix+"://"+requestedDomain+"/caps/request/"+ocapId,
- "type": "Request",
- "capability": requestedCaps,
- "actor": requestedActor
- }
- return ocapRequest
-
- def capabilitiesAccept(baseDir: str,httpPrefix: str, \
- nickname: str,domain: str, port: int, \
- acceptedActor: str, saveToFile: bool, \
- acceptedCaps=["inbox:write","objects:read"]) -> {}:
- # This gets returned to capabilities requester
- # This could also be added to a follow Accept activity
- # reject excessively long actors
- if len(acceptedActor)>256:
- return None
- fullDomain=domain
- if port:
- if port!=80 and port !=443:
- if ':' not in domain:
- fullDomain=domain+':'+str(port)
-
- # make directories to store capabilities
- ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,acceptedActor,'accept')
- if not ocapFilename:
- return None
- ocapAccept=None
- # if the capability already exists then load it from file
- if os.path.isfile(ocapFilename):
- ocapAccept=loadJson(ocapFilename)
- # otherwise create a new capability
- if not ocapAccept:
- acceptedActorNickname=getNicknameFromActor(acceptedActor)
- if not acceptedActorNickname:
- print('WARN: unable to find nickname in '+acceptedActor)
- return None
- acceptedActorDomain,acceptedActorPort=getDomainFromActor(acceptedActor)
- if acceptedActorPort:
- ocapId=acceptedActorNickname+'@'+acceptedActorDomain+':'+str(acceptedActorPort)+'#'+createPassword(32)
- else:
- ocapId=acceptedActorNickname+'@'+acceptedActorDomain+'#'+createPassword(32)
- ocapAccept = {
- "@context": "https://www.w3.org/ns/activitystreams",
- "id": httpPrefix+"://"+fullDomain+"/caps/"+ocapId,
- "type": "Capability",
- "capability": acceptedCaps,
- "scope": acceptedActor,
- "actor": httpPrefix+"://"+fullDomain
- }
- if nickname:
- ocapAccept['actor']=httpPrefix+"://"+fullDomain+'/users/'+nickname
- if saveToFile:
- saveJson(ocapAccept,ocapFilename)
- return ocapAccept
- def capabilitiesGrantedSave(baseDir :str,nickname :str,domain :str,ocap: {}) -> bool:
- """A capabilities accept is received, so stor it for
- reference when sending to the actor
- """
- if not ocap.get('actor'):
- return False
- ocapFilename=getOcapFilename(baseDir,nickname,domain,ocap['actor'],'granted')
- if not ocapFilename:
- return False
- saveJson(ocap,ocapFilename)
- return True
- def capabilitiesUpdate(baseDir: str,httpPrefix: str, \
- nickname: str,domain: str, port: int, \
- updateActor: str, \
- updateCaps: []) -> {}:
- """Used to sends an update for a change of object capabilities
- Note that the capability id gets changed with a new random token
- so that the old capabilities can't continue to be used
- """
- # reject excessively long actors
- if len(updateActor)>256:
- return None
- fullDomain=domain
- if port:
- if port!=80 and port !=443:
- if ':' not in domain:
- fullDomain=domain+':'+str(port)
-
- # Get the filename of the capability
- ocapFilename=getOcapFilename(baseDir,nickname,fullDomain,updateActor,'accept')
- if not ocapFilename:
- return None
- # The capability should already exist for it to be updated
- if not os.path.isfile(ocapFilename):
- return None
- # create an update activity
- ocapUpdate = {
- "@context": "https://www.w3.org/ns/activitystreams",
- 'type': 'Update',
- 'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname,
- 'to': [updateActor],
- 'cc': [],
- 'object': {}
- }
- # read the existing capability
- ocapJson=loadJson(ocapFilename)
- # set the new capabilities list. eg. ["inbox:write","objects:read"]
- ocapJson['capability']=updateCaps
- # change the id, so that the old capabilities can't continue to be used
- updateActorNickname=getNicknameFromActor(updateActor)
- if not updateActorNickname:
- print('WARN: unable to find nickname in '+updateActor)
- return None
- updateActorDomain,updateActorPort=getDomainFromActor(updateActor)
- if updateActorPort:
- ocapId=updateActorNickname+'@'+updateActorDomain+':'+str(updateActorPort)+'#'+createPassword(32)
- else:
- ocapId=updateActorNickname+'@'+updateActorDomain+'#'+createPassword(32)
- ocapJson['id']=httpPrefix+"://"+fullDomain+"/caps/"+ocapId
- ocapUpdate['object']=ocapJson
- # save it again
- saveJson(ocapJson,ocapFilename)
- return ocapUpdate
- def capabilitiesReceiveUpdate(baseDir :str, \
- nickname :str,domain :str,port :int, \
- actor :str, \
- newCapabilitiesId :str, \
- capabilityList :[], debug :bool) -> bool:
- """An update for a capability or the given actor has arrived
- """
- ocapFilename= \
- getOcapFilename(baseDir,nickname,domain,actor,'granted')
- if not ocapFilename:
- return False
- if not os.path.isfile(ocapFilename):
- if debug:
- print('DEBUG: capabilities file not found during update')
- print(ocapFilename)
- return False
- ocapJson=loadJson(ocapFilename)
- if ocapJson:
- ocapJson['id']=newCapabilitiesId
- ocapJson['capability']=capabilityList
- return saveJson(ocapJson,ocapFilename)
- return False
|