httpsig.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. __filename__ = "posts.py"
  2. __author__ = "Bob Mottram"
  3. __credits__ = ['lamia']
  4. __license__ = "AGPL3+"
  5. __version__ = "1.1.0"
  6. __maintainer__ = "Bob Mottram"
  7. __email__ = "bob@freedombone.net"
  8. __status__ = "Production"
  9. # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
  10. try:
  11. from Cryptodome.PublicKey import RSA
  12. from Cryptodome.Hash import SHA256
  13. from Cryptodome.Signature import pkcs1_15
  14. except ImportError:
  15. from Crypto.PublicKey import RSA
  16. from Crypto.Hash import SHA256
  17. # from Crypto.Signature import PKCS1_v1_5
  18. from Crypto.Signature import pkcs1_15
  19. import base64
  20. from time import gmtime, strftime
  21. import datetime
  22. def messageContentDigest(messageBodyJsonStr: str) -> str:
  23. msg = messageBodyJsonStr.encode('utf-8')
  24. digestStr = SHA256.new(msg).digest()
  25. return base64.b64encode(digestStr).decode('utf-8')
  26. def signPostHeaders(dateStr: str, privateKeyPem: str,
  27. nickname: str,
  28. domain: str, port: int,
  29. toDomain: str, toPort: int,
  30. path: str,
  31. httpPrefix: str,
  32. messageBodyJsonStr: str) -> str:
  33. """Returns a raw signature string that can be plugged into a header and
  34. used to verify the authenticity of an HTTP transmission.
  35. """
  36. if port:
  37. if port != 80 and port != 443:
  38. if ':' not in domain:
  39. domain = domain + ':' + str(port)
  40. if toPort:
  41. if toPort != 80 and toPort != 443:
  42. if ':' not in toDomain:
  43. toDomain = toDomain + ':' + str(port)
  44. if not dateStr:
  45. dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  46. keyID = httpPrefix + '://' + domain + '/users/' + nickname + '#main-key'
  47. if not messageBodyJsonStr:
  48. headers = {
  49. '(request-target)': f'post {path}',
  50. 'host': toDomain,
  51. 'date': dateStr,
  52. 'content-type': 'application/json'
  53. }
  54. else:
  55. bodyDigest = messageContentDigest(messageBodyJsonStr)
  56. contentLength = len(messageBodyJsonStr)
  57. headers = {
  58. '(request-target)': f'post {path}',
  59. 'host': toDomain,
  60. 'date': dateStr,
  61. 'digest': f'SHA-256={bodyDigest}',
  62. 'content-type': 'application/activity+json',
  63. 'content-length': str(contentLength)
  64. }
  65. privateKeyPem = RSA.import_key(privateKeyPem)
  66. # headers.update({
  67. # '(request-target)': f'post {path}',
  68. # })
  69. # build a digest for signing
  70. signedHeaderKeys = headers.keys()
  71. signedHeaderText = ''
  72. for headerKey in signedHeaderKeys:
  73. signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
  74. signedHeaderText = signedHeaderText.strip()
  75. headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
  76. # Sign the digest
  77. rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
  78. signature = base64.b64encode(rawSignature).decode('ascii')
  79. # Put it into a valid HTTP signature format
  80. signatureDict = {
  81. 'keyId': keyID,
  82. 'algorithm': 'rsa-sha256',
  83. 'headers': ' '.join(signedHeaderKeys),
  84. 'signature': signature
  85. }
  86. signatureHeader = ','.join(
  87. [f'{k}="{v}"' for k, v in signatureDict.items()])
  88. return signatureHeader
  89. def createSignedHeader(privateKeyPem: str, nickname: str,
  90. domain: str, port: int,
  91. toDomain: str, toPort: int,
  92. path: str, httpPrefix: str, withDigest: bool,
  93. messageBodyJsonStr: str) -> {}:
  94. """Note that the domain is the destination, not the sender
  95. """
  96. contentType = 'application/activity+json'
  97. headerDomain = toDomain
  98. if toPort:
  99. if toPort != 80 and toPort != 443:
  100. if ':' not in headerDomain:
  101. headerDomain = headerDomain + ':' + str(toPort)
  102. dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  103. if not withDigest:
  104. headers = {
  105. '(request-target)': f'post {path}',
  106. 'host': headerDomain,
  107. 'date': dateStr
  108. }
  109. signatureHeader = \
  110. signPostHeaders(dateStr, privateKeyPem, nickname,
  111. domain, port, toDomain, toPort,
  112. path, httpPrefix, None)
  113. else:
  114. bodyDigest = messageContentDigest(messageBodyJsonStr)
  115. contentLength = len(messageBodyJsonStr)
  116. headers = {
  117. '(request-target)': f'post {path}',
  118. 'host': headerDomain,
  119. 'date': dateStr,
  120. 'digest': f'SHA-256={bodyDigest}',
  121. 'content-length': str(contentLength),
  122. 'content-type': contentType
  123. }
  124. signatureHeader = \
  125. signPostHeaders(dateStr, privateKeyPem, nickname,
  126. domain, port,
  127. toDomain, toPort,
  128. path, httpPrefix, messageBodyJsonStr)
  129. headers['signature'] = signatureHeader
  130. return headers
  131. def verifyRecentSignature(signedDateStr: str) -> bool:
  132. """Checks whether the given time taken from the header is within
  133. 12 hours of the current time
  134. """
  135. currDate = datetime.datetime.utcnow()
  136. dateFormat = "%a, %d %b %Y %H:%M:%S %Z"
  137. signedDate = datetime.datetime.strptime(signedDateStr, dateFormat)
  138. timeDiffSec = (currDate - signedDate).seconds
  139. # 12 hours tollerance
  140. if timeDiffSec > 43200:
  141. print('WARN: Header signed too long ago: ' + signedDateStr)
  142. print(str(timeDiffSec / (60 * 60)) + ' hours')
  143. return False
  144. if timeDiffSec < 0:
  145. print('WARN: Header signed in the future! ' + signedDateStr)
  146. print(str(timeDiffSec / (60 * 60)) + ' hours')
  147. return False
  148. return True
  149. def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
  150. path: str, GETmethod: bool,
  151. messageBodyDigest: str,
  152. messageBodyJsonStr: str, debug: bool) -> bool:
  153. """Returns true or false depending on if the key that we plugged in here
  154. validates against the headers, method, and path.
  155. publicKeyPem - the public key from an rsa key pair
  156. headers - should be a dictionary of request headers
  157. path - the relative url that was requested from this site
  158. GETmethod - GET or POST
  159. messageBodyJsonStr - the received request body (used for digest)
  160. """
  161. if GETmethod:
  162. method = 'GET'
  163. else:
  164. method = 'POST'
  165. if debug:
  166. print('DEBUG: verifyPostHeaders ' + method)
  167. publicKeyPem = RSA.import_key(publicKeyPem)
  168. # Build a dictionary of the signature values
  169. signatureHeader = headers['signature']
  170. signatureDict = {
  171. k: v[1:-1]
  172. for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
  173. }
  174. # Unpack the signed headers and set values based on current headers and
  175. # body (if a digest was included)
  176. signedHeaderList = []
  177. for signedHeader in signatureDict['headers'].split(' '):
  178. if debug:
  179. print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader)
  180. if signedHeader == '(request-target)':
  181. appendStr = f'(request-target): {method.lower()} {path}'
  182. signedHeaderList.append(appendStr)
  183. elif signedHeader == 'digest':
  184. if messageBodyDigest:
  185. bodyDigest = messageBodyDigest
  186. else:
  187. bodyDigest = messageContentDigest(messageBodyJsonStr)
  188. signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
  189. elif signedHeader == 'content-length':
  190. if headers.get(signedHeader):
  191. appendStr = f'content-length: {headers[signedHeader]}'
  192. signedHeaderList.append(appendStr)
  193. else:
  194. if headers.get('Content-Length'):
  195. contentLength = headers['Content-Length']
  196. signedHeaderList.append(f'content-length: {contentLength}')
  197. else:
  198. if headers.get('Content-length'):
  199. contentLength = headers['Content-length']
  200. appendStr = f'content-length: {contentLength}'
  201. signedHeaderList.append(appendStr)
  202. else:
  203. if debug:
  204. print('DEBUG: verifyPostHeaders ' + signedHeader +
  205. ' not found in ' + str(headers))
  206. else:
  207. if headers.get(signedHeader):
  208. if signedHeader == 'date':
  209. if not verifyRecentSignature(headers[signedHeader]):
  210. if debug:
  211. print('DEBUG: ' +
  212. 'verifyPostHeaders date is not recent ' +
  213. headers[signedHeader])
  214. return False
  215. signedHeaderList.append(
  216. f'{signedHeader}: {headers[signedHeader]}')
  217. else:
  218. signedHeaderCap = signedHeader.capitalize()
  219. if signedHeaderCap == 'Date':
  220. if not verifyRecentSignature(headers[signedHeaderCap]):
  221. if debug:
  222. print('DEBUG: ' +
  223. 'verifyPostHeaders date is not recent ' +
  224. headers[signedHeader])
  225. return False
  226. if headers.get(signedHeaderCap):
  227. signedHeaderList.append(
  228. f'{signedHeader}: {headers[signedHeaderCap]}')
  229. if debug:
  230. print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
  231. # Now we have our header data digest
  232. signedHeaderText = '\n'.join(signedHeaderList)
  233. headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
  234. # Get the signature, verify with public key, return result
  235. signature = base64.b64decode(signatureDict['signature'])
  236. try:
  237. pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
  238. return True
  239. except (ValueError, TypeError):
  240. if debug:
  241. print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
  242. return False