capabilities.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. __filename__ = "capabilities.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.1.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import os
  9. from auth import createPassword
  10. from utils import getNicknameFromActor
  11. from utils import getDomainFromActor
  12. from utils import loadJson
  13. from utils import saveJson
  14. def getOcapFilename(baseDir: str,
  15. nickname: str, domain: str,
  16. actor: str, subdir: str) -> str:
  17. """Returns the filename for a particular capability accepted or granted
  18. Also creates directories as needed
  19. """
  20. if not actor:
  21. return None
  22. if ':' in domain:
  23. domain = domain.split(':')[0]
  24. if not os.path.isdir(baseDir + '/accounts'):
  25. os.mkdir(baseDir + '/accounts')
  26. ocDir = baseDir + '/accounts/' + nickname + '@' + domain
  27. if not os.path.isdir(ocDir):
  28. os.mkdir(ocDir)
  29. ocDir = baseDir + '/accounts/' + nickname + '@' + domain + '/ocap'
  30. if not os.path.isdir(ocDir):
  31. os.mkdir(ocDir)
  32. ocDir = baseDir + '/accounts/' + \
  33. nickname + '@' + domain + '/ocap/' + subdir
  34. if not os.path.isdir(ocDir):
  35. os.mkdir(ocDir)
  36. return baseDir + '/accounts/' + \
  37. nickname + '@' + domain + '/ocap/' + \
  38. subdir + '/' + actor.replace('/', '#') + '.json'
  39. def CapablePost(postJson: {}, capabilityList: [], debug: bool) -> bool:
  40. """Determines whether a post arriving in the inbox
  41. should be accepted accoring to the list of capabilities
  42. """
  43. if postJson.get('type'):
  44. # No announces/repeats
  45. if postJson['type'] == 'Announce':
  46. if 'inbox:noannounce' in capabilityList:
  47. if debug:
  48. print('DEBUG: ' +
  49. 'inbox post rejected because inbox:noannounce')
  50. return False
  51. # No likes
  52. if postJson['type'] == 'Like':
  53. if 'inbox:nolike' in capabilityList:
  54. if debug:
  55. print('DEBUG: ' +
  56. 'inbox post rejected because inbox:nolike')
  57. return False
  58. if postJson['type'] == 'Create':
  59. if postJson.get('object'):
  60. # Does this have a reply?
  61. if postJson['object'].get('inReplyTo'):
  62. if postJson['object']['inReplyTo']:
  63. if 'inbox:noreply' in capabilityList:
  64. if debug:
  65. print('DEBUG: ' +
  66. 'inbox post rejected because ' +
  67. 'inbox:noreply')
  68. return False
  69. # are content warnings enforced?
  70. if postJson['object'].get('sensitive'):
  71. if not postJson['object']['sensitive']:
  72. if 'inbox:cw' in capabilityList:
  73. if debug:
  74. print('DEBUG: ' +
  75. 'inbox post rejected because inbox:cw')
  76. return False
  77. # content warning must have non-zero summary
  78. if postJson['object'].get('summary'):
  79. if len(postJson['object']['summary']) < 2:
  80. if 'inbox:cw' in capabilityList:
  81. if debug:
  82. print('DEBUG: ' +
  83. 'inbox post rejected because ' +
  84. 'inbox:cw, summary missing')
  85. return False
  86. if 'inbox:write' in capabilityList:
  87. return True
  88. return True
  89. def capabilitiesRequest(baseDir: str, httpPrefix: str, domain: str,
  90. requestedActor: str, requestedDomain: str,
  91. requestedCaps=["inbox:write", "objects:read"]) -> {}:
  92. # This is sent to the capabilities endpoint /caps/new
  93. # which could be instance wide or for a particular person
  94. # This could also be added to a follow activity
  95. ocapId = createPassword(32)
  96. ocapRequest = {
  97. "@context": "https://www.w3.org/ns/activitystreams",
  98. "id": httpPrefix + "://" + requestedDomain + "/caps/request/" + ocapId,
  99. "type": "Request",
  100. "capability": requestedCaps,
  101. "actor": requestedActor
  102. }
  103. return ocapRequest
  104. def capabilitiesAccept(baseDir: str, httpPrefix: str,
  105. nickname: str, domain: str, port: int,
  106. acceptedActor: str, saveToFile: bool,
  107. acceptedCaps=["inbox:write", "objects:read"]) -> {}:
  108. # This gets returned to capabilities requester
  109. # This could also be added to a follow Accept activity
  110. # reject excessively long actors
  111. if len(acceptedActor) > 256:
  112. return None
  113. fullDomain = domain
  114. if port:
  115. if port != 80 and port != 443:
  116. if ':' not in domain:
  117. fullDomain = domain + ':' + str(port)
  118. # make directories to store capabilities
  119. ocapFilename = \
  120. getOcapFilename(baseDir, nickname, fullDomain, acceptedActor, 'accept')
  121. if not ocapFilename:
  122. return None
  123. ocapAccept = None
  124. # if the capability already exists then load it from file
  125. if os.path.isfile(ocapFilename):
  126. ocapAccept = loadJson(ocapFilename)
  127. # otherwise create a new capability
  128. if not ocapAccept:
  129. acceptedActorNickname = getNicknameFromActor(acceptedActor)
  130. if not acceptedActorNickname:
  131. print('WARN: unable to find nickname in ' + acceptedActor)
  132. return None
  133. acceptedActorDomain, acceptedActorPort = \
  134. getDomainFromActor(acceptedActor)
  135. if acceptedActorPort:
  136. ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \
  137. ':' + str(acceptedActorPort) + '#'+createPassword(32)
  138. else:
  139. ocapId = acceptedActorNickname + '@' + acceptedActorDomain + \
  140. '#' + createPassword(32)
  141. ocapAccept = {
  142. "@context": "https://www.w3.org/ns/activitystreams",
  143. "id": httpPrefix + "://" + fullDomain + "/caps/" + ocapId,
  144. "type": "Capability",
  145. "capability": acceptedCaps,
  146. "scope": acceptedActor,
  147. "actor": httpPrefix + "://" + fullDomain
  148. }
  149. if nickname:
  150. ocapAccept['actor'] = \
  151. httpPrefix + "://" + fullDomain + '/users/' + nickname
  152. if saveToFile:
  153. saveJson(ocapAccept, ocapFilename)
  154. return ocapAccept
  155. def capabilitiesGrantedSave(baseDir: str,
  156. nickname: str, domain: str, ocap: {}) -> bool:
  157. """A capabilities accept is received, so stor it for
  158. reference when sending to the actor
  159. """
  160. if not ocap.get('actor'):
  161. return False
  162. ocapFilename = \
  163. getOcapFilename(baseDir, nickname, domain, ocap['actor'], 'granted')
  164. if not ocapFilename:
  165. return False
  166. saveJson(ocap, ocapFilename)
  167. return True
  168. def capabilitiesUpdate(baseDir: str, httpPrefix: str,
  169. nickname: str, domain: str, port: int,
  170. updateActor: str,
  171. updateCaps: []) -> {}:
  172. """Used to sends an update for a change of object capabilities
  173. Note that the capability id gets changed with a new random token
  174. so that the old capabilities can't continue to be used
  175. """
  176. # reject excessively long actors
  177. if len(updateActor) > 256:
  178. return None
  179. fullDomain = domain
  180. if port:
  181. if port != 80 and port != 443:
  182. if ':' not in domain:
  183. fullDomain = domain + ':' + str(port)
  184. # Get the filename of the capability
  185. ocapFilename = \
  186. getOcapFilename(baseDir, nickname, fullDomain, updateActor, 'accept')
  187. if not ocapFilename:
  188. return None
  189. # The capability should already exist for it to be updated
  190. if not os.path.isfile(ocapFilename):
  191. return None
  192. # create an update activity
  193. ocapUpdate = {
  194. "@context": "https://www.w3.org/ns/activitystreams",
  195. 'type': 'Update',
  196. 'actor': httpPrefix + '://' + fullDomain + '/users/' + nickname,
  197. 'to': [updateActor],
  198. 'cc': [],
  199. 'object': {}
  200. }
  201. # read the existing capability
  202. ocapJson = loadJson(ocapFilename)
  203. # set the new capabilities list. eg. ["inbox:write","objects:read"]
  204. ocapJson['capability'] = updateCaps
  205. # change the id, so that the old capabilities can't continue to be used
  206. updateActorNickname = getNicknameFromActor(updateActor)
  207. if not updateActorNickname:
  208. print('WARN: unable to find nickname in ' + updateActor)
  209. return None
  210. updateActorDomain, updateActorPort = getDomainFromActor(updateActor)
  211. if updateActorPort:
  212. ocapId = updateActorNickname + '@' + updateActorDomain + \
  213. ':' + str(updateActorPort) + '#' + createPassword(32)
  214. else:
  215. ocapId = updateActorNickname + '@' + updateActorDomain + \
  216. '#' + createPassword(32)
  217. ocapJson['id'] = httpPrefix + "://" + fullDomain + "/caps/" + ocapId
  218. ocapUpdate['object'] = ocapJson
  219. # save it again
  220. saveJson(ocapJson, ocapFilename)
  221. return ocapUpdate
  222. def capabilitiesReceiveUpdate(baseDir: str,
  223. nickname: str, domain: str, port: int,
  224. actor: str,
  225. newCapabilitiesId: str,
  226. capabilityList: [], debug: bool) -> bool:
  227. """An update for a capability or the given actor has arrived
  228. """
  229. ocapFilename = \
  230. getOcapFilename(baseDir, nickname, domain, actor, 'granted')
  231. if not ocapFilename:
  232. return False
  233. if not os.path.isfile(ocapFilename):
  234. if debug:
  235. print('DEBUG: capabilities file not found during update')
  236. print(ocapFilename)
  237. return False
  238. ocapJson = loadJson(ocapFilename)
  239. if ocapJson:
  240. ocapJson['id'] = newCapabilitiesId
  241. ocapJson['capability'] = capabilityList
  242. return saveJson(ocapJson, ocapFilename)
  243. return False