123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628 |
- __filename__ = "like.py"
- __author__ = "Bob Mottram"
- __license__ = "AGPL3+"
- __version__ = "1.0.0"
- __maintainer__ = "Bob Mottram"
- __email__ = "bob@freedombone.net"
- __status__ = "Production"
- import os
- import json
- import time
- from pprint import pprint
- from utils import removePostFromCache
- from utils import urlPermitted
- from utils import getNicknameFromActor
- from utils import getDomainFromActor
- from utils import locatePost
- from utils import getCachedPostFilename
- from utils import loadJson
- from utils import saveJson
- from posts import sendSignedJson
- from session import postJson
- from webfinger import webfingerHandle
- from auth import createBasicAuthHeader
- from posts import getPersonBox
- def undoLikesCollectionEntry(recentPostsCache: {}, \
- baseDir: str,postFilename: str,objectUrl: str, \
- actor: str,domain: str,debug: bool) -> None:
- """Undoes a like for a particular actor
- """
- postJsonObject=loadJson(postFilename)
- if postJsonObject:
- # remove any cached version of this post so that the like icon is changed
- nickname=getNicknameFromActor(actor)
- cachedPostFilename= \
- getCachedPostFilename(baseDir,nickname,domain,postJsonObject)
- if cachedPostFilename:
- if os.path.isfile(cachedPostFilename):
- os.remove(cachedPostFilename)
- removePostFromCache(postJsonObject,recentPostsCache)
- if not postJsonObject.get('type'):
- return
- if postJsonObject['type']!='Create':
- return
- if not postJsonObject.get('object'):
- if debug:
- pprint(postJsonObject)
- print('DEBUG: post '+objectUrl+' has no object')
- return
- if not isinstance(postJsonObject['object'], dict):
- return
- if not postJsonObject['object'].get('likes'):
- return
- if not isinstance(postJsonObject['object']['likes'], dict):
- return
- if not postJsonObject['object']['likes'].get('items'):
- return
- totalItems=0
- if postJsonObject['object']['likes'].get('totalItems'):
- totalItems=postJsonObject['object']['likes']['totalItems']
- itemFound=False
- for likeItem in postJsonObject['object']['likes']['items']:
- if likeItem.get('actor'):
- if likeItem['actor']==actor:
- if debug:
- print('DEBUG: like was removed for '+actor)
- postJsonObject['object']['likes']['items'].remove(likeItem)
- itemFound=True
- break
- if itemFound:
- if totalItems==1:
- if debug:
- print('DEBUG: likes was removed from post')
- del postJsonObject['object']['likes']
- else:
- postJsonObject['object']['likes']['totalItems']= \
- len(postJsonObject['likes']['items'])
- saveJson(postJsonObject,postFilename)
- def likedByPerson(postJsonObject: {}, nickname: str,domain: str) -> bool:
- """Returns True if the given post is liked by the given person
- """
- if noOfLikes(postJsonObject)==0:
- return False
- actorMatch=domain+'/users/'+nickname
- for item in postJsonObject['object']['likes']['items']:
- if item['actor'].endswith(actorMatch):
- return True
- return False
- def noOfLikes(postJsonObject: {}) -> int:
- """Returns the number of likes ona given post
- """
- if not postJsonObject.get('object'):
- return 0
- if not isinstance(postJsonObject['object'], dict):
- return 0
- if not postJsonObject['object'].get('likes'):
- return 0
- if not isinstance(postJsonObject['object']['likes'], dict):
- return 0
- if not postJsonObject['object']['likes'].get('items'):
- postJsonObject['object']['likes']['items']=[]
- postJsonObject['object']['likes']['totalItems']=0
- return len(postJsonObject['object']['likes']['items'])
- def updateLikesCollection(recentPostsCache: {}, \
- baseDir: str,postFilename: str, \
- objectUrl: str, \
- actor: str,domain: str,debug: bool) -> None:
- """Updates the likes collection within a post
- """
- postJsonObject=loadJson(postFilename)
- if postJsonObject:
- # remove any cached version of this post so that the like icon is changed
- nickname=getNicknameFromActor(actor)
- cachedPostFilename= \
- getCachedPostFilename(baseDir,nickname,domain,postJsonObject)
- if cachedPostFilename:
- if os.path.isfile(cachedPostFilename):
- os.remove(cachedPostFilename)
- removePostFromCache(postJsonObject,recentPostsCache)
- if not postJsonObject.get('object'):
- if debug:
- pprint(postJsonObject)
- print('DEBUG: post '+objectUrl+' has no object')
- return
- if not isinstance(postJsonObject['object'], dict):
- return
- if not objectUrl.endswith('/likes'):
- objectUrl=objectUrl+'/likes'
- if not postJsonObject['object'].get('likes'):
- if debug:
- print('DEBUG: Adding initial likes to '+objectUrl)
- likesJson = {
- "@context": "https://www.w3.org/ns/activitystreams",
- 'id': objectUrl,
- 'type': 'Collection',
- "totalItems": 1,
- 'items': [{
- 'type': 'Like',
- 'actor': actor
- }]
- }
- postJsonObject['object']['likes']=likesJson
- else:
- if not postJsonObject['object']['likes'].get('items'):
- postJsonObject['object']['likes']['items']=[]
- for likeItem in postJsonObject['object']['likes']['items']:
- if likeItem.get('actor'):
- if likeItem['actor']==actor:
- return
- newLike={
- 'type': 'Like',
- 'actor': actor
- }
- postJsonObject['object']['likes']['items'].append(newLike)
- postJsonObject['object']['likes']['totalItems']= \
- len(postJsonObject['object']['likes']['items'])
- if debug:
- print('DEBUG: saving post with likes added')
- pprint(postJsonObject)
- saveJson(postJsonObject,postFilename)
- def like(recentPostsCache: {}, \
- session,baseDir: str,federationList: [], \
- nickname: str,domain: str,port: int, \
- ccList: [],httpPrefix: str, \
- objectUrl: str,actorLiked: str, \
- clientToServer: bool, \
- sendThreads: [],postLog: [], \
- personCache: {},cachedWebfingers: {}, \
- debug: bool,projectVersion: str) -> {}:
- """Creates a like
- actor is the person doing the liking
- 'to' might be a specific person (actor) whose post was liked
- object is typically the url of the message which was liked
- """
- if not urlPermitted(objectUrl,federationList,"inbox:write"):
- return None
- fullDomain=domain
- if port:
- if port!=80 and port!=443:
- if ':' not in domain:
- fullDomain=domain+':'+str(port)
- likeTo=[]
- if '/statuses/' in objectUrl:
- likeTo=[objectUrl.split('/statuses/')[0]]
- newLikeJson = {
- "@context": "https://www.w3.org/ns/activitystreams",
- 'type': 'Like',
- 'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname,
- 'object': objectUrl
- }
- if ccList:
- if len(ccList)>0:
- newLikeJson['cc']=ccList
- # Extract the domain and nickname from a statuses link
- likedPostNickname=None
- likedPostDomain=None
- likedPostPort=None
- if actorLiked:
- likedPostNickname=getNicknameFromActor(actorLiked)
- likedPostDomain,likedPostPort=getDomainFromActor(actorLiked)
- else:
- if '/users/' in objectUrl or \
- '/channel/' in objectUrl or \
- '/profile/' in objectUrl:
- likedPostNickname=getNicknameFromActor(objectUrl)
- likedPostDomain,likedPostPort=getDomainFromActor(objectUrl)
- if likedPostNickname:
- postFilename=locatePost(baseDir,nickname,domain,objectUrl)
- if not postFilename:
- print('DEBUG: like baseDir: '+baseDir)
- print('DEBUG: like nickname: '+nickname)
- print('DEBUG: like domain: '+domain)
- print('DEBUG: like objectUrl: '+objectUrl)
- return None
-
- updateLikesCollection(recentPostsCache, \
- baseDir,postFilename,objectUrl, \
- newLikeJson['actor'],domain,debug)
-
- sendSignedJson(newLikeJson,session,baseDir, \
- nickname,domain,port, \
- likedPostNickname,likedPostDomain,likedPostPort, \
- 'https://www.w3.org/ns/activitystreams#Public', \
- httpPrefix,True,clientToServer,federationList, \
- sendThreads,postLog,cachedWebfingers,personCache, \
- debug,projectVersion)
- return newLikeJson
- def likePost(recentPostsCache: {}, \
- session,baseDir: str,federationList: [], \
- nickname: str,domain: str,port: int,httpPrefix: str, \
- likeNickname: str,likeDomain: str,likePort: int, \
- ccList: [], \
- likeStatusNumber: int,clientToServer: bool, \
- sendThreads: [],postLog: [], \
- personCache: {},cachedWebfingers: {}, \
- debug: bool,projectVersion: str) -> {}:
- """Likes a given status post. This is only used by unit tests
- """
- likeDomain=likeDomain
- if likePort:
- if likePort!=80 and likePort!=443:
- if ':' not in likeDomain:
- likeDomain=likeDomain+':'+str(likePort)
- actorLiked= \
- httpPrefix + '://'+likeDomain+'/users/'+likeNickname
- objectUrl=actorLiked+'/statuses/'+str(likeStatusNumber)
- ccUrl=httpPrefix+'://'+likeDomain+'/users/'+likeNickname
- if likePort:
- if likePort!=80 and likePort!=443:
- if ':' not in likeDomain:
- ccUrl= \
- httpPrefix+'://'+likeDomain+':'+ \
- str(likePort)+'/users/'+likeNickname
- return like(recentPostsCache, \
- session,baseDir,federationList,nickname,domain,port, \
- ccList,httpPrefix,objectUrl,actorLiked,clientToServer, \
- sendThreads,postLog,personCache,cachedWebfingers, \
- debug,projectVersion)
- def undolike(recentPostsCache: {}, \
- session,baseDir: str,federationList: [], \
- nickname: str,domain: str,port: int, \
- ccList: [],httpPrefix: str, \
- objectUrl: str,actorLiked: str, \
- clientToServer: bool, \
- sendThreads: [],postLog: [], \
- personCache: {},cachedWebfingers: {}, \
- debug: bool,projectVersion: str) -> {}:
- """Removes a like
- actor is the person doing the liking
- 'to' might be a specific person (actor) whose post was liked
- object is typically the url of the message which was liked
- """
- if not urlPermitted(objectUrl,federationList,"inbox:write"):
- return None
- fullDomain=domain
- if port:
- if port!=80 and port!=443:
- if ':' not in domain:
- fullDomain=domain+':'+str(port)
- likeTo=[]
- if '/statuses/' in objectUrl:
- likeTo=[objectUrl.split('/statuses/')[0]]
- newUndoLikeJson = {
- "@context": "https://www.w3.org/ns/activitystreams",
- 'type': 'Undo',
- 'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname,
- 'object': {
- 'type': 'Like',
- 'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname,
- 'object': objectUrl
- }
- }
- if ccList:
- if len(ccList)>0:
- newUndoLikeJson['cc']=ccList
- newUndoLikeJson['object']['cc']=ccList
- # Extract the domain and nickname from a statuses link
- likedPostNickname=None
- likedPostDomain=None
- likedPostPort=None
- if actorLiked:
- likedPostNickname=getNicknameFromActor(actorLiked)
- likedPostDomain,likedPostPort=getDomainFromActor(actorLiked)
- else:
- if '/users/' in objectUrl or \
- '/channel/' in objectUrl or \
- '/profile/' in objectUrl:
- likedPostNickname=getNicknameFromActor(objectUrl)
- likedPostDomain,likedPostPort=getDomainFromActor(objectUrl)
- if likedPostNickname:
- postFilename=locatePost(baseDir,nickname,domain,objectUrl)
- if not postFilename:
- return None
- undoLikesCollectionEntry(baseDir,postFilename,objectUrl, \
- newLikeJson['actor'],domain,debug)
-
- sendSignedJson(newUndoLikeJson,session,baseDir, \
- nickname,domain,port, \
- likedPostNickname,likedPostDomain,likedPostPort, \
- 'https://www.w3.org/ns/activitystreams#Public', \
- httpPrefix,True,clientToServer,federationList, \
- sendThreads,postLog,cachedWebfingers,personCache, \
- debug,projectVersion)
- else:
- return None
- return newUndoLikeJson
- def undoLikePost(recentPostsCache: {}, \
- session,baseDir: str,federationList: [], \
- nickname: str,domain: str,port: int,httpPrefix: str, \
- likeNickname: str,likeDomain: str,likePort: int, \
- ccList: [], \
- likeStatusNumber: int,clientToServer: bool, \
- sendThreads: [],postLog: [], \
- personCache: {},cachedWebfingers: {}, \
- debug: bool) -> {}:
- """Removes a liked post
- """
- likeDomain=likeDomain
- if likePort:
- if likePort!=80 and likePort!=443:
- if ':' not in likeDomain:
- likeDomain=likeDomain+':'+str(likePort)
- objectUrl = \
- httpPrefix + '://'+likeDomain+'/users/'+likeNickname+ \
- '/statuses/'+str(likeStatusNumber)
- ccUrl=httpPrefix+'://'+likeDomain+'/users/'+likeNickname
- if likePort:
- if likePort!=80 and likePort!=443:
- if ':' not in likeDomain:
- ccUrl= \
- httpPrefix+'://'+likeDomain+':'+ \
- str(likePort)+'/users/'+likeNickname
-
- return undoLike(recentPostsCache, \
- session,baseDir,federationList,nickname,domain,port, \
- ccList,httpPrefix,objectUrl,clientToServer, \
- sendThreads,postLog,personCache,cachedWebfingers,debug)
- def sendLikeViaServer(baseDir: str,session, \
- fromNickname: str,password: str,
- fromDomain: str,fromPort: int, \
- httpPrefix: str,likeUrl: str, \
- cachedWebfingers: {},personCache: {}, \
- debug: bool,projectVersion: str) -> {}:
- """Creates a like via c2s
- """
- if not session:
- print('WARN: No session for sendLikeViaServer')
- return 6
- fromDomainFull=fromDomain
- if fromPort:
- if fromPort!=80 and fromPort!=443:
- if ':' not in fromDomain:
- fromDomainFull=fromDomain+':'+str(fromPort)
- toUrl=['https://www.w3.org/ns/activitystreams#Public']
- ccUrl=httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname+'/followers'
- if '/statuses/' in likeUrl:
- toUrl=[likeUrl.split('/statuses/')[0]]
-
- newLikeJson = {
- "@context": "https://www.w3.org/ns/activitystreams",
- 'type': 'Like',
- 'actor': httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname,
- 'object': likeUrl
- }
- handle=httpPrefix+'://'+fromDomainFull+'/@'+fromNickname
- # lookup the inbox for the To handle
- wfRequest=webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \
- fromDomain,projectVersion)
- if not wfRequest:
- if debug:
- print('DEBUG: announce webfinger failed for '+handle)
- return 1
- postToBox='outbox'
- # get the actor inbox for the To handle
- inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName = \
- getPersonBox(baseDir,session,wfRequest,personCache, \
- projectVersion,httpPrefix,fromNickname, \
- fromDomain,postToBox)
-
- if not inboxUrl:
- if debug:
- print('DEBUG: No '+postToBox+' was found for '+handle)
- return 3
- if not fromPersonId:
- if debug:
- print('DEBUG: No actor was found for '+handle)
- return 4
-
- authHeader=createBasicAuthHeader(fromNickname,password)
-
- headers = {'host': fromDomain, \
- 'Content-type': 'application/json', \
- 'Authorization': authHeader}
- postResult = \
- postJson(session,newLikeJson,[],inboxUrl,headers,"inbox:write")
- #if not postResult:
- # if debug:
- # print('DEBUG: POST announce failed for c2s to '+inboxUrl)
- # return 5
- if debug:
- print('DEBUG: c2s POST like success')
- return newLikeJson
- def sendUndoLikeViaServer(baseDir: str,session, \
- fromNickname: str,password: str, \
- fromDomain: str,fromPort: int, \
- httpPrefix: str,likeUrl: str, \
- cachedWebfingers: {},personCache: {}, \
- debug: bool,projectVersion: str) -> {}:
- """Undo a like via c2s
- """
- if not session:
- print('WARN: No session for sendUndoLikeViaServer')
- return 6
- fromDomainFull=fromDomain
- if fromPort:
- if fromPort!=80 and fromPort!=443:
- if ':' not in fromDomain:
- fromDomainFull=fromDomain+':'+str(fromPort)
- toUrl=['https://www.w3.org/ns/activitystreams#Public']
- ccUrl=httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname+'/followers'
- if '/statuses/' in likeUrl:
- toUrl=[likeUrl.split('/statuses/')[0]]
- newUndoLikeJson = {
- "@context": "https://www.w3.org/ns/activitystreams",
- 'type': 'Undo',
- 'actor': httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname,
- 'object': {
- 'type': 'Like',
- 'actor': httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname,
- 'object': likeUrl
- }
- }
- handle=httpPrefix+'://'+fromDomainFull+'/@'+fromNickname
- # lookup the inbox for the To handle
- wfRequest = webfingerHandle(session,handle,httpPrefix,cachedWebfingers, \
- fromDomain,projectVersion)
- if not wfRequest:
- if debug:
- print('DEBUG: announce webfinger failed for '+handle)
- return 1
- postToBox='outbox'
- # get the actor inbox for the To handle
- inboxUrl,pubKeyId,pubKey,fromPersonId,sharedInbox,capabilityAcquisition,avatarUrl,displayName = \
- getPersonBox(baseDir,session,wfRequest,personCache, \
- projectVersion,httpPrefix,fromNickname, \
- fromDomain,postToBox)
-
- if not inboxUrl:
- if debug:
- print('DEBUG: No '+postToBox+' was found for '+handle)
- return 3
- if not fromPersonId:
- if debug:
- print('DEBUG: No actor was found for '+handle)
- return 4
-
- authHeader=createBasicAuthHeader(fromNickname,password)
-
- headers = {'host': fromDomain, \
- 'Content-type': 'application/json', \
- 'Authorization': authHeader}
- postResult = \
- postJson(session,newUndoLikeJson,[],inboxUrl,headers,"inbox:write")
- #if not postResult:
- # if debug:
- # print('DEBUG: POST announce failed for c2s to '+inboxUrl)
- # return 5
- if debug:
- print('DEBUG: c2s POST undo like success')
- return newUndoLikeJson
- def outboxLike(recentPostsCache: {}, \
- baseDir: str,httpPrefix: str, \
- nickname: str,domain: str,port: int, \
- messageJson: {},debug: bool) -> None:
- """ When a like request is received by the outbox from c2s
- """
- if not messageJson.get('type'):
- if debug:
- print('DEBUG: like - no type')
- return
- if not messageJson['type']=='Like':
- if debug:
- print('DEBUG: not a like')
- return
- if not messageJson.get('object'):
- if debug:
- print('DEBUG: no object in like')
- return
- if not isinstance(messageJson['object'], str):
- if debug:
- print('DEBUG: like object is not string')
- return
- if debug:
- print('DEBUG: c2s like request arrived in outbox')
- messageId=messageJson['object'].replace('/activity','')
- if ':' in domain:
- domain=domain.split(':')[0]
- postFilename=locatePost(baseDir,nickname,domain,messageId)
- if not postFilename:
- if debug:
- print('DEBUG: c2s like post not found in inbox or outbox')
- print(messageId)
- return True
- updateLikesCollection(recentPostsCache, \
- baseDir,postFilename,messageId, \
- messageJson['actor'],domain,debug)
- if debug:
- print('DEBUG: post liked via c2s - '+postFilename)
- def outboxUndoLike(baseDir: str,httpPrefix: str, \
- nickname: str,domain: str,port: int, \
- messageJson: {},debug: bool) -> None:
- """ When an undo like request is received by the outbox from c2s
- """
- if not messageJson.get('type'):
- return
- if not messageJson['type']=='Undo':
- return
- if not messageJson.get('object'):
- return
- if not isinstance(messageJson['object'], dict):
- if debug:
- print('DEBUG: undo like object is not dict')
- return
- if not messageJson['object'].get('type'):
- if debug:
- print('DEBUG: undo like - no type')
- return
- if not messageJson['object']['type']=='Like':
- if debug:
- print('DEBUG: not a undo like')
- return
- if not messageJson['object'].get('object'):
- if debug:
- print('DEBUG: no object in undo like')
- return
- if not isinstance(messageJson['object']['object'], str):
- if debug:
- print('DEBUG: undo like object is not string')
- return
- if debug:
- print('DEBUG: c2s undo like request arrived in outbox')
- messageId=messageJson['object']['object'].replace('/activity','')
- if ':' in domain:
- domain=domain.split(':')[0]
- postFilename=locatePost(baseDir,nickname,domain,messageId)
- if not postFilename:
- if debug:
- print('DEBUG: c2s undo like post not found in inbox or outbox')
- print(messageId)
- return True
- undoLikesCollectionEntry(baseDir,postFilename,messageId, \
- messageJson['actor'],domain,debug)
- if debug:
- print('DEBUG: post undo liked via c2s - '+postFilename)
|