httpsig.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. __filename__ = "posts.py"
  2. __author__ = "Bob Mottram"
  3. __credits__ = ['lamia']
  4. __license__ = "AGPL3+"
  5. __version__ = "1.0.0"
  6. __maintainer__ = "Bob Mottram"
  7. __email__ = "bob@freedombone.net"
  8. __status__ = "Production"
  9. # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
  10. from Crypto.PublicKey import RSA
  11. from Crypto.Hash import SHA256
  12. #from Crypto.Signature import PKCS1_v1_5
  13. from Crypto.Signature import pkcs1_15
  14. from requests.auth import AuthBase
  15. import base64
  16. import json
  17. from time import gmtime, strftime
  18. import datetime
  19. from pprint import pprint
  20. def messageContentDigest(messageBodyJsonStr: str) -> str:
  21. return base64.b64encode(SHA256.new(messageBodyJsonStr.encode('utf-8')).digest()).decode('utf-8')
  22. def signPostHeaders(dateStr: str,privateKeyPem: str, \
  23. nickname: str, \
  24. domain: str,port: int, \
  25. toDomain: str,toPort: int, \
  26. path: str, \
  27. httpPrefix: str, \
  28. messageBodyJsonStr: str) -> str:
  29. """Returns a raw signature string that can be plugged into a header and
  30. used to verify the authenticity of an HTTP transmission.
  31. """
  32. if port:
  33. if port!=80 and port!=443:
  34. if ':' not in domain:
  35. domain=domain+':'+str(port)
  36. if toPort:
  37. if toPort!=80 and toPort!=443:
  38. if ':' not in toDomain:
  39. toDomain=toDomain+':'+str(port)
  40. if not dateStr:
  41. dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  42. keyID=httpPrefix+'://'+domain+'/users/'+nickname+'#main-key'
  43. if not messageBodyJsonStr:
  44. headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'content-type': 'application/json'}
  45. else:
  46. bodyDigest=messageContentDigest(messageBodyJsonStr)
  47. contentLength=len(messageBodyJsonStr)
  48. headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json','content-length': str(contentLength)}
  49. privateKeyPem=RSA.import_key(privateKeyPem)
  50. #headers.update({
  51. # '(request-target)': f'post {path}',
  52. #})
  53. # build a digest for signing
  54. signedHeaderKeys = headers.keys()
  55. signedHeaderText = ''
  56. for headerKey in signedHeaderKeys:
  57. signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
  58. #print(f'*********************signing: headerKey: {headerKey}: {headers[headerKey]}')
  59. signedHeaderText = signedHeaderText.strip()
  60. #print('******************************Send: signedHeaderText: '+signedHeaderText)
  61. headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
  62. # Sign the digest
  63. rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
  64. signature = base64.b64encode(rawSignature).decode('ascii')
  65. # Put it into a valid HTTP signature format
  66. signatureDict = {
  67. 'keyId': keyID,
  68. 'algorithm': 'rsa-sha256',
  69. 'headers': ' '.join(signedHeaderKeys),
  70. 'signature': signature
  71. }
  72. signatureHeader = ','.join(
  73. [f'{k}="{v}"' for k, v in signatureDict.items()])
  74. return signatureHeader
  75. def createSignedHeader(privateKeyPem: str,nickname: str, \
  76. domain: str,port: int, \
  77. toDomain: str,toPort: int, \
  78. path: str,httpPrefix: str,withDigest: bool, \
  79. messageBodyJsonStr: str) -> {}:
  80. """Note that the domain is the destination, not the sender
  81. """
  82. contentType='application/activity+json'
  83. headerDomain=toDomain
  84. if toPort:
  85. if toPort!=80 and toPort!=443:
  86. if ':' not in headerDomain:
  87. headerDomain=headerDomain+':'+str(toPort)
  88. dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  89. if not withDigest:
  90. headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr}
  91. signatureHeader = \
  92. signPostHeaders(dateStr,privateKeyPem,nickname, \
  93. domain,port,toDomain,toPort, \
  94. path,httpPrefix,None)
  95. else:
  96. bodyDigest=messageContentDigest(messageBodyJsonStr)
  97. contentLength=len(messageBodyJsonStr)
  98. #print('***************************Send (request-target): post '+path)
  99. #print('***************************Send host: '+headerDomain)
  100. #print('***************************Send date: '+dateStr)
  101. #print('***************************Send digest: '+bodyDigest)
  102. #print('***************************Send Content-type: '+contentType)
  103. #print('***************************Send Content-Length: '+str(len(messageBodyJsonStr)))
  104. #print('***************************Send messageBodyJsonStr: '+messageBodyJsonStr)
  105. headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-length': str(contentLength),'content-type': contentType}
  106. signatureHeader = \
  107. signPostHeaders(dateStr,privateKeyPem,nickname, \
  108. domain,port, \
  109. toDomain,toPort, \
  110. path,httpPrefix,messageBodyJsonStr)
  111. headers['signature'] = signatureHeader
  112. return headers
  113. def verifyRecentSignature(signedDateStr: str) -> bool:
  114. """Checks whether the given time taken from the header is within
  115. 12 hours of the current time
  116. """
  117. currDate=datetime.datetime.utcnow()
  118. signedDate=datetime.datetime.strptime(signedDateStr,"%a, %d %b %Y %H:%M:%S %Z")
  119. timeDiffSec=(currDate-signedDate).seconds
  120. # 12 hours tollerance
  121. if timeDiffSec > 43200:
  122. print('WARN: Header signed too long ago: '+signedDateStr)
  123. print(str(timeDiffSec/(60*60))+' hours')
  124. return False
  125. if timeDiffSec < 0:
  126. print('WARN: Header signed in the future! '+signedDateStr)
  127. print(str(timeDiffSec/(60*60))+' hours')
  128. return False
  129. return True
  130. def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
  131. path: str,GETmethod: bool, \
  132. messageBodyDigest: str, \
  133. messageBodyJsonStr: str,debug: bool) -> bool:
  134. """Returns true or false depending on if the key that we plugged in here
  135. validates against the headers, method, and path.
  136. publicKeyPem - the public key from an rsa key pair
  137. headers - should be a dictionary of request headers
  138. path - the relative url that was requested from this site
  139. GETmethod - GET or POST
  140. messageBodyJsonStr - the received request body (used for digest)
  141. """
  142. if GETmethod:
  143. method='GET'
  144. else:
  145. method='POST'
  146. if debug:
  147. print('DEBUG: verifyPostHeaders '+method)
  148. publicKeyPem = RSA.import_key(publicKeyPem)
  149. # Build a dictionary of the signature values
  150. signatureHeader = headers['signature']
  151. signatureDict = {
  152. k: v[1:-1]
  153. for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
  154. }
  155. #print('********************signatureHeader: '+str(signatureHeader))
  156. #print('********************signatureDict: '+str(signatureDict))
  157. # Unpack the signed headers and set values based on current headers and
  158. # body (if a digest was included)
  159. signedHeaderList = []
  160. for signedHeader in signatureDict['headers'].split(' '):
  161. if debug:
  162. print('DEBUG: verifyPostHeaders signedHeader='+signedHeader)
  163. if signedHeader == '(request-target)':
  164. signedHeaderList.append(
  165. f'(request-target): {method.lower()} {path}')
  166. #print('***************************Verify (request-target): '+method.lower()+' '+path)
  167. elif signedHeader == 'digest':
  168. if messageBodyDigest:
  169. bodyDigest=messageBodyDigest
  170. else:
  171. bodyDigest=messageContentDigest(messageBodyJsonStr)
  172. signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
  173. #print('***************************Verify digest: SHA-256='+bodyDigest)
  174. #print('***************************Verify messageBodyJsonStr: '+messageBodyJsonStr)
  175. elif signedHeader == 'content-length':
  176. if headers.get(signedHeader):
  177. signedHeaderList.append(f'content-length: {headers[signedHeader]}')
  178. else:
  179. if headers.get('Content-Length'):
  180. contentLength=headers['Content-Length']
  181. signedHeaderList.append(f'content-length: {contentLength}')
  182. else:
  183. if headers.get('Content-length'):
  184. contentLength=headers['Content-length']
  185. signedHeaderList.append(f'content-length: {contentLength}')
  186. else:
  187. if debug:
  188. print('DEBUG: verifyPostHeaders '+signedHeader+' not found in '+str(headers))
  189. else:
  190. if headers.get(signedHeader):
  191. if signedHeader=='date':
  192. if not verifyRecentSignature(headers[signedHeader]):
  193. if debug:
  194. print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader])
  195. return False
  196. #print('***************************Verify '+signedHeader+': '+headers[signedHeader])
  197. signedHeaderList.append(
  198. f'{signedHeader}: {headers[signedHeader]}')
  199. else:
  200. signedHeaderCap=signedHeader.capitalize()
  201. if signedHeaderCap=='Date':
  202. if not verifyRecentSignature(headers[signedHeaderCap]):
  203. if debug:
  204. print('DEBUG: verifyPostHeaders date is not recent '+headers[signedHeader])
  205. return False
  206. #print('***************************Verify '+signedHeaderCap+': '+headers[signedHeaderCap])
  207. if headers.get(signedHeaderCap):
  208. signedHeaderList.append(
  209. f'{signedHeader}: {headers[signedHeaderCap]}')
  210. #print('***********************signedHeaderList: ')
  211. #pprint(signedHeaderList)
  212. if debug:
  213. print('DEBUG: signedHeaderList: '+str(signedHeaderList))
  214. # Now we have our header data digest
  215. signedHeaderText = '\n'.join(signedHeaderList)
  216. #print('***********************Verify: signedHeaderText: '+signedHeaderText)
  217. headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
  218. # Get the signature, verify with public key, return result
  219. signature = base64.b64decode(signatureDict['signature'])
  220. try:
  221. pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
  222. return True
  223. except (ValueError, TypeError):
  224. if debug:
  225. print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
  226. return False