session.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. __filename__ = "session.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.0.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import os
  9. import sys
  10. import requests
  11. from utils import urlPermitted
  12. import json
  13. baseDirectory=None
  14. def createSession(domain: str, port: int, onionRoute: bool):
  15. session = requests.session()
  16. #if domain.startswith('127.') or domain.startswith('192.') or domain.startswith('10.'):
  17. # session.mount('http://', SourceAddressAdapter(domain))
  18. #session.mount('http://', SourceAddressAdapter((domain, port)))
  19. if onionRoute:
  20. session.proxies = {}
  21. session.proxies['http'] = 'socks5h://localhost:9050'
  22. session.proxies['https'] = 'socks5h://localhost:9050'
  23. return session
  24. def getJson(session,url: str,headers: {},params: {}, \
  25. version='1.0.0',httpPrefix='https',domain='testdomain') -> {}:
  26. if not isinstance(url, str):
  27. print('url: '+str(url))
  28. print('ERROR: getJson url should be a string')
  29. return None
  30. sessionParams={}
  31. sessionHeaders={}
  32. if headers:
  33. sessionHeaders=headers
  34. if params:
  35. sessionParams=params
  36. pythonVersion=str(sys.version_info[0])+'.'+str(sys.version_info[1])+'.'+str(sys.version_info[2])
  37. sessionHeaders['User-Agent']='Epicyon/'+version
  38. if domain:
  39. sessionHeaders['User-Agent']+='; +'+httpPrefix+'://'+domain+'/'
  40. #"Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5)"
  41. if not session:
  42. print('WARN: no session specified for getJson')
  43. #session.cookies.clear()
  44. try:
  45. result=session.get(url, headers=sessionHeaders, params=sessionParams)
  46. return result.json()
  47. except Exception as e:
  48. print('ERROR: getJson failed')
  49. print('url: '+str(url))
  50. print('headers: '+str(sessionHeaders))
  51. print('params: '+str(sessionParams))
  52. print(e)
  53. return None
  54. def postJson(session,postJsonObject: {},federationList: [],inboxUrl: str,headers: {},capability: str) -> str:
  55. """Post a json message to the inbox of another person
  56. Supplying a capability, such as "inbox:write"
  57. """
  58. # always allow capability requests
  59. if not capability.startswith('cap'):
  60. # check that we are posting to a permitted domain
  61. if not urlPermitted(inboxUrl,federationList,capability):
  62. print('postJson: '+inboxUrl+' not permitted')
  63. return None
  64. postResult = session.post(url = inboxUrl, data = json.dumps(postJsonObject), headers=headers)
  65. return postResult.text
  66. def postJsonString(session,postJsonStr: str, \
  67. federationList: [], \
  68. inboxUrl: str, \
  69. headers: {}, \
  70. capability: str, \
  71. debug: bool) -> bool:
  72. """Post a json message string to the inbox of another person
  73. Supplying a capability, such as "inbox:write"
  74. NOTE: Here we post a string rather than the original json so that
  75. conversions between string and json format don't invalidate
  76. the message body digest of http signatures
  77. """
  78. # always allow capability requests
  79. if not capability.startswith('cap'):
  80. # check that we are posting to a permitted domain
  81. if not urlPermitted(inboxUrl,federationList,capability):
  82. print('postJson: '+inboxUrl+' not permitted by capabilities')
  83. return None
  84. postResult = session.post(url = inboxUrl, data = postJsonStr, headers=headers)
  85. if postResult.status_code<200 or postResult.status_code>202:
  86. if postResult.status_code==401:
  87. print('WARN: >>> Post to '+inboxUrl+' is unauthorized <<<')
  88. else:
  89. print('WARN: Failed to post to '+inboxUrl)
  90. print('status code '+str(postResult.status_code))
  91. return False
  92. return True
  93. def postImage(session,attachImageFilename: str,federationList: [],inboxUrl: str,headers: {},capability: str) -> str:
  94. """Post an image to the inbox of another person or outbox via c2s
  95. Supplying a capability, such as "inbox:write"
  96. """
  97. # always allow capability requests
  98. if not capability.startswith('cap'):
  99. # check that we are posting to a permitted domain
  100. if not urlPermitted(inboxUrl,federationList,capability):
  101. print('postJson: '+inboxUrl+' not permitted')
  102. return None
  103. if not (attachImageFilename.endswith('.jpg') or \
  104. attachImageFilename.endswith('.jpeg') or \
  105. attachImageFilename.endswith('.png') or \
  106. attachImageFilename.endswith('.gif')):
  107. print('Image must be png, jpg, or gif')
  108. return None
  109. if not os.path.isfile(attachImageFilename):
  110. print('Image not found: '+attachImageFilename)
  111. return None
  112. contentType='image/jpeg'
  113. if attachImageFilename.endswith('.png'):
  114. contentType='image/png'
  115. if attachImageFilename.endswith('.gif'):
  116. contentType='image/gif'
  117. headers['Content-type']=contentType
  118. with open(attachImageFilename, 'rb') as avFile:
  119. mediaBinary = avFile.read()
  120. postResult = session.post(url=inboxUrl, data=mediaBinary, headers=headers)
  121. return postResult.text
  122. return None