blocking.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. __filename__ = "blocking.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.1.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import os
  9. from utils import isEvil
  10. from utils import locatePost
  11. from utils import evilIncarnate
  12. from utils import getDomainFromActor
  13. from utils import getNicknameFromActor
  14. def addGlobalBlock(baseDir: str,
  15. blockNickname: str, blockDomain: str) -> bool:
  16. """Global block which applies to all accounts
  17. """
  18. blockingFilename = baseDir + '/accounts/blocking.txt'
  19. if not blockNickname.startswith('#'):
  20. blockHandle = blockNickname + '@' + blockDomain
  21. if os.path.isfile(blockingFilename):
  22. if blockHandle in open(blockingFilename).read():
  23. return False
  24. blockFile = open(blockingFilename, "a+")
  25. blockFile.write(blockHandle + '\n')
  26. blockFile.close()
  27. else:
  28. blockHashtag = blockNickname
  29. if os.path.isfile(blockingFilename):
  30. if blockHashtag + '\n' in open(blockingFilename).read():
  31. return False
  32. blockFile = open(blockingFilename, "a+")
  33. blockFile.write(blockHashtag + '\n')
  34. blockFile.close()
  35. return True
  36. def addBlock(baseDir: str, nickname: str, domain: str,
  37. blockNickname: str, blockDomain: str) -> bool:
  38. """Block the given account
  39. """
  40. if ':' in domain:
  41. domain = domain.split(':')[0]
  42. blockingFilename = baseDir + '/accounts/' + \
  43. nickname + '@' + domain + '/blocking.txt'
  44. blockHandle = blockNickname + '@' + blockDomain
  45. if os.path.isfile(blockingFilename):
  46. if blockHandle in open(blockingFilename).read():
  47. return False
  48. blockFile = open(blockingFilename, "a+")
  49. blockFile.write(blockHandle + '\n')
  50. blockFile.close()
  51. return True
  52. def removeGlobalBlock(baseDir: str,
  53. unblockNickname: str,
  54. unblockDomain: str) -> bool:
  55. """Unblock the given global block
  56. """
  57. unblockingFilename = baseDir + '/accounts/blocking.txt'
  58. if not unblockNickname.startswith('#'):
  59. unblockHandle = unblockNickname + '@' + unblockDomain
  60. if os.path.isfile(unblockingFilename):
  61. if unblockHandle in open(unblockingFilename).read():
  62. with open(unblockingFilename, 'r') as fp:
  63. with open(unblockingFilename + '.new', 'w+') as fpnew:
  64. for line in fp:
  65. handle = line.replace('\n', '').replace('\r', '')
  66. if unblockHandle not in line:
  67. fpnew.write(handle + '\n')
  68. if os.path.isfile(unblockingFilename + '.new'):
  69. os.rename(unblockingFilename + '.new', unblockingFilename)
  70. return True
  71. else:
  72. unblockHashtag = unblockNickname
  73. if os.path.isfile(unblockingFilename):
  74. if unblockHashtag + '\n' in open(unblockingFilename).read():
  75. with open(unblockingFilename, 'r') as fp:
  76. with open(unblockingFilename + '.new', 'w+') as fpnew:
  77. for line in fp:
  78. blockLine = \
  79. line.replace('\n', '').replace('\r', '')
  80. if unblockHashtag not in line:
  81. fpnew.write(blockLine + '\n')
  82. if os.path.isfile(unblockingFilename + '.new'):
  83. os.rename(unblockingFilename + '.new', unblockingFilename)
  84. return True
  85. return False
  86. def removeBlock(baseDir: str, nickname: str, domain: str,
  87. unblockNickname: str, unblockDomain: str) -> bool:
  88. """Unblock the given account
  89. """
  90. if ':' in domain:
  91. domain = domain.split(':')[0]
  92. unblockingFilename = baseDir + '/accounts/' + \
  93. nickname + '@' + domain + '/blocking.txt'
  94. unblockHandle = unblockNickname + '@' + unblockDomain
  95. if os.path.isfile(unblockingFilename):
  96. if unblockHandle in open(unblockingFilename).read():
  97. with open(unblockingFilename, 'r') as fp:
  98. with open(unblockingFilename + '.new', 'w+') as fpnew:
  99. for line in fp:
  100. handle = line.replace('\n', '').replace('\r', '')
  101. if unblockHandle not in line:
  102. fpnew.write(handle + '\n')
  103. if os.path.isfile(unblockingFilename + '.new'):
  104. os.rename(unblockingFilename + '.new', unblockingFilename)
  105. return True
  106. return False
  107. def isBlockedHashtag(baseDir: str, hashtag: str) -> bool:
  108. """Is the given hashtag blocked?
  109. """
  110. # avoid very long hashtags
  111. if len(hashtag) > 32:
  112. return True
  113. globalBlockingFilename = baseDir + '/accounts/blocking.txt'
  114. if os.path.isfile(globalBlockingFilename):
  115. hashtag = hashtag.strip('\n').strip('\r')
  116. if hashtag + '\n' in open(globalBlockingFilename).read():
  117. return True
  118. return False
  119. def getDomainBlocklist(baseDir: str) -> str:
  120. """Returns all globally blocked domains as a string
  121. This can be used for fast matching to mitigate flooding
  122. """
  123. blockedStr = ''
  124. evilDomains = evilIncarnate()
  125. for evil in evilDomains:
  126. blockedStr += evil + '\n'
  127. globalBlockingFilename = baseDir + '/accounts/blocking.txt'
  128. if not os.path.isfile(globalBlockingFilename):
  129. return blockedStr
  130. with open(globalBlockingFilename, 'r') as file:
  131. blockedStr += file.read()
  132. return blockedStr
  133. def isBlockedDomain(baseDir: str, domain: str) -> bool:
  134. """Is the given domain blocked?
  135. """
  136. if isEvil(domain):
  137. return True
  138. globalBlockingFilename = baseDir + '/accounts/blocking.txt'
  139. if os.path.isfile(globalBlockingFilename):
  140. if '*@' + domain in open(globalBlockingFilename).read():
  141. return True
  142. return False
  143. def isBlocked(baseDir: str, nickname: str, domain: str,
  144. blockNickname: str, blockDomain: str) -> bool:
  145. """Is the given nickname blocked?
  146. """
  147. if isEvil(blockDomain):
  148. return True
  149. globalBlockingFilename = baseDir + '/accounts/blocking.txt'
  150. if os.path.isfile(globalBlockingFilename):
  151. if '*@' + blockDomain in open(globalBlockingFilename).read():
  152. return True
  153. if blockNickname:
  154. blockHandle = blockNickname + '@' + blockDomain
  155. if blockHandle in open(globalBlockingFilename).read():
  156. return True
  157. allowFilename = baseDir + '/accounts/' + \
  158. nickname + '@' + domain + '/allowedinstances.txt'
  159. if os.path.isfile(allowFilename):
  160. if blockDomain not in open(allowFilename).read():
  161. return True
  162. blockingFilename = baseDir + '/accounts/' + \
  163. nickname + '@' + domain + '/blocking.txt'
  164. if os.path.isfile(blockingFilename):
  165. if '*@' + blockDomain in open(blockingFilename).read():
  166. return True
  167. if blockNickname:
  168. blockHandle = blockNickname + '@' + blockDomain
  169. if blockHandle in open(blockingFilename).read():
  170. return True
  171. return False
  172. def outboxBlock(baseDir: str, httpPrefix: str,
  173. nickname: str, domain: str, port: int,
  174. messageJson: {}, debug: bool) -> None:
  175. """ When a block request is received by the outbox from c2s
  176. """
  177. if not messageJson.get('type'):
  178. if debug:
  179. print('DEBUG: block - no type')
  180. return
  181. if not messageJson['type'] == 'Block':
  182. if debug:
  183. print('DEBUG: not a block')
  184. return
  185. if not messageJson.get('object'):
  186. if debug:
  187. print('DEBUG: no object in block')
  188. return
  189. if not isinstance(messageJson['object'], str):
  190. if debug:
  191. print('DEBUG: block object is not string')
  192. return
  193. if debug:
  194. print('DEBUG: c2s block request arrived in outbox')
  195. messageId = messageJson['object'].replace('/activity', '')
  196. if '/statuses/' not in messageId:
  197. if debug:
  198. print('DEBUG: c2s block object is not a status')
  199. return
  200. if '/users/' not in messageId and \
  201. '/accounts/' not in messageId and \
  202. '/channel/' not in messageId and \
  203. '/profile/' not in messageId:
  204. if debug:
  205. print('DEBUG: c2s block object has no nickname')
  206. return
  207. if ':' in domain:
  208. domain = domain.split(':')[0]
  209. postFilename = locatePost(baseDir, nickname, domain, messageId)
  210. if not postFilename:
  211. if debug:
  212. print('DEBUG: c2s block post not found in inbox or outbox')
  213. print(messageId)
  214. return
  215. nicknameBlocked = getNicknameFromActor(messageJson['object'])
  216. if not nicknameBlocked:
  217. print('WARN: unable to find nickname in ' + messageJson['object'])
  218. return
  219. domainBlocked, portBlocked = getDomainFromActor(messageJson['object'])
  220. domainBlockedFull = domainBlocked
  221. if portBlocked:
  222. if portBlocked != 80 and portBlocked != 443:
  223. if ':' not in domainBlocked:
  224. domainBlockedFull = domainBlocked + ':' + str(portBlocked)
  225. addBlock(baseDir, nickname, domain,
  226. nicknameBlocked, domainBlockedFull)
  227. if debug:
  228. print('DEBUG: post blocked via c2s - ' + postFilename)
  229. def outboxUndoBlock(baseDir: str, httpPrefix: str,
  230. nickname: str, domain: str, port: int,
  231. messageJson: {}, debug: bool) -> None:
  232. """ When an undo block request is received by the outbox from c2s
  233. """
  234. if not messageJson.get('type'):
  235. if debug:
  236. print('DEBUG: undo block - no type')
  237. return
  238. if not messageJson['type'] == 'Undo':
  239. if debug:
  240. print('DEBUG: not an undo block')
  241. return
  242. if not messageJson.get('object'):
  243. if debug:
  244. print('DEBUG: no object in undo block')
  245. return
  246. if not isinstance(messageJson['object'], dict):
  247. if debug:
  248. print('DEBUG: undo block object is not string')
  249. return
  250. if not messageJson['object'].get('type'):
  251. if debug:
  252. print('DEBUG: undo block - no type')
  253. return
  254. if not messageJson['object']['type'] == 'Block':
  255. if debug:
  256. print('DEBUG: not an undo block')
  257. return
  258. if not messageJson['object'].get('object'):
  259. if debug:
  260. print('DEBUG: no object in undo block')
  261. return
  262. if not isinstance(messageJson['object']['object'], str):
  263. if debug:
  264. print('DEBUG: undo block object is not string')
  265. return
  266. if debug:
  267. print('DEBUG: c2s undo block request arrived in outbox')
  268. messageId = messageJson['object']['object'].replace('/activity', '')
  269. if '/statuses/' not in messageId:
  270. if debug:
  271. print('DEBUG: c2s undo block object is not a status')
  272. return
  273. if '/users/' not in messageId and \
  274. '/accounts/' not in messageId and \
  275. '/channel/' not in messageId and \
  276. '/profile/' not in messageId:
  277. if debug:
  278. print('DEBUG: c2s undo block object has no nickname')
  279. return
  280. if ':' in domain:
  281. domain = domain.split(':')[0]
  282. postFilename = locatePost(baseDir, nickname, domain, messageId)
  283. if not postFilename:
  284. if debug:
  285. print('DEBUG: c2s undo block post not found in inbox or outbox')
  286. print(messageId)
  287. return
  288. nicknameBlocked = getNicknameFromActor(messageJson['object']['object'])
  289. if not nicknameBlocked:
  290. print('WARN: unable to find nickname in ' +
  291. messageJson['object']['object'])
  292. return
  293. domainObject = messageJson['object']['object']
  294. domainBlocked, portBlocked = getDomainFromActor(domainObject)
  295. domainBlockedFull = domainBlocked
  296. if portBlocked:
  297. if portBlocked != 80 and portBlocked != 443:
  298. if ':' not in domainBlocked:
  299. domainBlockedFull = domainBlocked + ':' + str(portBlocked)
  300. removeBlock(baseDir, nickname, domain,
  301. nicknameBlocked, domainBlockedFull)
  302. if debug:
  303. print('DEBUG: post undo blocked via c2s - ' + postFilename)