session.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. __filename__ = "session.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "0.0.1"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import os
  9. import sys
  10. import requests
  11. from utils import urlPermitted
  12. import json
  13. baseDirectory=None
  14. def createSession(domain: str, port: int, onionRoute: bool):
  15. session = requests.session()
  16. #if domain.startswith('127.') or domain.startswith('192.') or domain.startswith('10.'):
  17. # session.mount('http://', SourceAddressAdapter(domain))
  18. #session.mount('http://', SourceAddressAdapter((domain, port)))
  19. if onionRoute:
  20. session.proxies = {}
  21. session.proxies['http'] = 'socks5h://localhost:9050'
  22. session.proxies['https'] = 'socks5h://localhost:9050'
  23. return session
  24. def getJson(session,url: str,headers: {},params: {}, \
  25. version='0.0.1',httpPrefix='https',domain='testdomain') -> {}:
  26. sessionParams={}
  27. sessionHeaders={}
  28. if headers:
  29. sessionHeaders=headers
  30. if params:
  31. sessionParams=params
  32. pythonVersion=str(sys.version_info[0])+'.'+str(sys.version_info[1])+'.'+str(sys.version_info[2])
  33. sessionHeaders['User-Agent']='http.py/'+pythonVersion+' (Epicyon/'+version+'; +'+httpPrefix+'://'+domain+'/)'
  34. #"Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5)"
  35. if not session:
  36. print('WARN: no session specified for getJson')
  37. #session.cookies.clear()
  38. try:
  39. result=session.get(url, headers=sessionHeaders, params=sessionParams)
  40. return result.json()
  41. except Exception as e:
  42. print('ERROR: getJson failed')
  43. print('url: '+url)
  44. print('headers: '+str(sessionHeaders))
  45. print('params: '+str(sessionParams))
  46. print(e)
  47. return None
  48. def postJson(session,postJsonObject: {},federationList: [],inboxUrl: str,headers: {},capability: str) -> str:
  49. """Post a json message to the inbox of another person
  50. Supplying a capability, such as "inbox:write"
  51. """
  52. # always allow capability requests
  53. if not capability.startswith('cap'):
  54. # check that we are posting to a permitted domain
  55. if not urlPermitted(inboxUrl,federationList,capability):
  56. print('postJson: '+inboxUrl+' not permitted')
  57. return None
  58. postResult = session.post(url = inboxUrl, data = json.dumps(postJsonObject), headers=headers)
  59. return postResult.text
  60. def postJsonString(session,postJsonStr: str, \
  61. federationList: [], \
  62. inboxUrl: str, \
  63. headers: {}, \
  64. capability: str, \
  65. debug: bool) -> str:
  66. """Post a json message string to the inbox of another person
  67. Supplying a capability, such as "inbox:write"
  68. NOTE: Here we post a string rather than the original json so that
  69. conversions between string and json format don't invalidate
  70. the message body digest of http signatures
  71. """
  72. # always allow capability requests
  73. if not capability.startswith('cap'):
  74. # check that we are posting to a permitted domain
  75. if not urlPermitted(inboxUrl,federationList,capability):
  76. print('postJson: '+inboxUrl+' not permitted by capabilities')
  77. return None
  78. postResult = session.post(url = inboxUrl, data = postJsonStr, headers=headers)
  79. return postResult.text
  80. def postImage(session,attachImageFilename: str,federationList: [],inboxUrl: str,headers: {},capability: str) -> str:
  81. """Post an image to the inbox of another person or outbox via c2s
  82. Supplying a capability, such as "inbox:write"
  83. """
  84. # always allow capability requests
  85. if not capability.startswith('cap'):
  86. # check that we are posting to a permitted domain
  87. if not urlPermitted(inboxUrl,federationList,capability):
  88. print('postJson: '+inboxUrl+' not permitted')
  89. return None
  90. if not (attachImageFilename.endswith('.jpg') or \
  91. attachImageFilename.endswith('.jpeg') or \
  92. attachImageFilename.endswith('.png') or \
  93. attachImageFilename.endswith('.gif')):
  94. print('Image must be png, jpg, or gif')
  95. return None
  96. if not os.path.isfile(attachImageFilename):
  97. print('Image not found: '+attachImageFilename)
  98. return None
  99. contentType='image/jpeg'
  100. if attachImageFilename.endswith('.png'):
  101. contentType='image/png'
  102. if attachImageFilename.endswith('.gif'):
  103. contentType='image/gif'
  104. headers['Content-type']=contentType
  105. with open(attachImageFilename, 'rb') as avFile:
  106. mediaBinary = avFile.read()
  107. postResult = session.post(url=inboxUrl, data=mediaBinary, headers=headers)
  108. return postResult.text
  109. return None