session.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. __filename__ = "session.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.0.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import os
  9. import sys
  10. import requests
  11. from utils import urlPermitted
  12. import json
  13. baseDirectory=None
  14. def createSession(onionRoute: bool):
  15. session = requests.session()
  16. if onionRoute:
  17. session.proxies = {}
  18. session.proxies['http'] = 'socks5h://localhost:9050'
  19. session.proxies['https'] = 'socks5h://localhost:9050'
  20. return session
  21. def getJson(session,url: str,headers: {},params: {}, \
  22. version='1.0.0',httpPrefix='https',domain='testdomain') -> {}:
  23. if not isinstance(url, str):
  24. print('url: '+str(url))
  25. print('ERROR: getJson url should be a string')
  26. return None
  27. sessionParams={}
  28. sessionHeaders={}
  29. if headers:
  30. sessionHeaders=headers
  31. if params:
  32. sessionParams=params
  33. sessionHeaders['User-Agent']='Epicyon/'+version
  34. if domain:
  35. sessionHeaders['User-Agent']+='; +'+httpPrefix+'://'+domain+'/'
  36. #"Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5)"
  37. if not session:
  38. print('WARN: no session specified for getJson')
  39. #session.cookies.clear()
  40. try:
  41. result=session.get(url, headers=sessionHeaders, params=sessionParams)
  42. return result.json()
  43. except Exception as e:
  44. print('ERROR: getJson failed')
  45. print('url: '+str(url))
  46. print('headers: '+str(sessionHeaders))
  47. print('params: '+str(sessionParams))
  48. print(e)
  49. return None
  50. def postJson(session,postJsonObject: {},federationList: [], \
  51. inboxUrl: str,headers: {},capability: str) -> str:
  52. """Post a json message to the inbox of another person
  53. Supplying a capability, such as "inbox:write"
  54. """
  55. # always allow capability requests
  56. if not capability.startswith('cap'):
  57. # check that we are posting to a permitted domain
  58. if not urlPermitted(inboxUrl,federationList,capability):
  59. print('postJson: '+inboxUrl+' not permitted')
  60. return None
  61. postResult= \
  62. session.post(url=inboxUrl, \
  63. data=json.dumps(postJsonObject), \
  64. headers=headers)
  65. if postResult:
  66. return postResult.text
  67. return None
  68. def postJsonString(session,postJsonStr: str, \
  69. federationList: [], \
  70. inboxUrl: str, \
  71. headers: {}, \
  72. capability: str, \
  73. debug: bool) -> (bool,bool):
  74. """Post a json message string to the inbox of another person
  75. Supplying a capability, such as "inbox:write"
  76. The second boolean returned is true if the send is unauthorized
  77. NOTE: Here we post a string rather than the original json so that
  78. conversions between string and json format don't invalidate
  79. the message body digest of http signatures
  80. """
  81. # always allow capability requests
  82. if not capability.startswith('cap'):
  83. # check that we are posting to a permitted domain
  84. if not urlPermitted(inboxUrl,federationList,capability):
  85. print('postJson: '+inboxUrl+' not permitted by capabilities')
  86. return None,None
  87. postResult = session.post(url = inboxUrl, data = postJsonStr, headers=headers)
  88. if postResult.status_code<200 or postResult.status_code>202:
  89. #if postResult.status_code==400:
  90. # headers['content-type']='application/ld+json'
  91. # postResult = session.post(url = inboxUrl, data = postJsonStr, headers=headers)
  92. # if not (postResult.status_code<200 or postResult.status_code>202):
  93. # return True
  94. if postResult.status_code>=400 and postResult.status_code<=405 and \
  95. postResult.status_code!=404:
  96. print('WARN: >>> Post to '+inboxUrl+' is unauthorized <<<')
  97. return False,True
  98. else:
  99. print('WARN: Failed to post to '+inboxUrl+' with headers '+str(headers))
  100. print('status code '+str(postResult.status_code))
  101. return False,False
  102. return True,False
  103. def postImage(session,attachImageFilename: str,federationList: [], \
  104. inboxUrl: str,headers: {},capability: str) -> str:
  105. """Post an image to the inbox of another person or outbox via c2s
  106. Supplying a capability, such as "inbox:write"
  107. """
  108. # always allow capability requests
  109. if not capability.startswith('cap'):
  110. # check that we are posting to a permitted domain
  111. if not urlPermitted(inboxUrl,federationList,capability):
  112. print('postJson: '+inboxUrl+' not permitted')
  113. return None
  114. if not (attachImageFilename.endswith('.jpg') or \
  115. attachImageFilename.endswith('.jpeg') or \
  116. attachImageFilename.endswith('.png') or \
  117. attachImageFilename.endswith('.gif')):
  118. print('Image must be png, jpg, or gif')
  119. return None
  120. if not os.path.isfile(attachImageFilename):
  121. print('Image not found: '+attachImageFilename)
  122. return None
  123. contentType='image/jpeg'
  124. if attachImageFilename.endswith('.png'):
  125. contentType='image/png'
  126. if attachImageFilename.endswith('.gif'):
  127. contentType='image/gif'
  128. headers['Content-type']=contentType
  129. with open(attachImageFilename, 'rb') as avFile:
  130. mediaBinary = avFile.read()
  131. postResult = session.post(url=inboxUrl, data=mediaBinary, headers=headers)
  132. if postResult:
  133. return postResult.text
  134. return None