httpsig.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. __filename__ = "posts.py"
  2. __author__ = "Bob Mottram"
  3. __credits__ = ['lamia']
  4. __license__ = "AGPL3+"
  5. __version__ = "1.0.0"
  6. __maintainer__ = "Bob Mottram"
  7. __email__ = "bob@freedombone.net"
  8. __status__ = "Production"
  9. # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
  10. from Crypto.PublicKey import RSA
  11. from Crypto.Hash import SHA256
  12. #from Crypto.Signature import PKCS1_v1_5
  13. from Crypto.Signature import pkcs1_15
  14. from requests.auth import AuthBase
  15. import base64
  16. import json
  17. from time import gmtime, strftime
  18. import datetime
  19. from pprint import pprint
  20. def messageContentDigest(messageBodyJsonStr: str) -> str:
  21. return base64.b64encode(SHA256.new(messageBodyJsonStr.encode('utf-8')).digest()).decode('utf-8')
  22. def signPostHeaders(dateStr: str,privateKeyPem: str, \
  23. nickname: str, \
  24. domain: str,port: int, \
  25. toDomain: str,toPort: int, \
  26. path: str, \
  27. httpPrefix: str, \
  28. messageBodyJsonStr: str) -> str:
  29. """Returns a raw signature string that can be plugged into a header and
  30. used to verify the authenticity of an HTTP transmission.
  31. """
  32. if port:
  33. if port!=80 and port!=443:
  34. if ':' not in domain:
  35. domain=domain+':'+str(port)
  36. if toPort:
  37. if toPort!=80 and toPort!=443:
  38. if ':' not in toDomain:
  39. toDomain=toDomain+':'+str(port)
  40. if not dateStr:
  41. dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  42. keyID=httpPrefix+'://'+domain+'/users/'+nickname+'#main-key'
  43. if not messageBodyJsonStr:
  44. headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'content-type': 'application/json'}
  45. else:
  46. bodyDigest=messageContentDigest(messageBodyJsonStr)
  47. headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json'}
  48. privateKeyPem=RSA.import_key(privateKeyPem)
  49. #headers.update({
  50. # '(request-target)': f'post {path}',
  51. #})
  52. # build a digest for signing
  53. signedHeaderKeys = headers.keys()
  54. signedHeaderText = ''
  55. for headerKey in signedHeaderKeys:
  56. signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
  57. #print(f'*********************signing: headerKey: {headerKey}: {headers[headerKey]}')
  58. signedHeaderText = signedHeaderText.strip()
  59. #print('******************************Send: signedHeaderText: '+signedHeaderText)
  60. headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
  61. # Sign the digest
  62. rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
  63. signature = base64.b64encode(rawSignature).decode('ascii')
  64. # Put it into a valid HTTP signature format
  65. signatureDict = {
  66. 'keyId': keyID,
  67. 'algorithm': 'rsa-sha256',
  68. 'headers': ' '.join(signedHeaderKeys),
  69. 'signature': signature
  70. }
  71. signatureHeader = ','.join(
  72. [f'{k}="{v}"' for k, v in signatureDict.items()])
  73. return signatureHeader
  74. def createSignedHeader(privateKeyPem: str,nickname: str, \
  75. domain: str,port: int, \
  76. toDomain: str,toPort: int, \
  77. path: str,httpPrefix: str,withDigest: bool, \
  78. messageBodyJsonStr: str) -> {}:
  79. """Note that the domain is the destination, not the sender
  80. """
  81. contentType='application/activity+json'
  82. headerDomain=toDomain
  83. if toPort:
  84. if toPort!=80 and toPort!=443:
  85. if ':' not in headerDomain:
  86. headerDomain=headerDomain+':'+str(toPort)
  87. dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  88. if not withDigest:
  89. headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr}
  90. signatureHeader = \
  91. signPostHeaders(dateStr,privateKeyPem,nickname, \
  92. domain,port,toDomain,toPort, \
  93. path,httpPrefix,None)
  94. else:
  95. bodyDigest=messageContentDigest(messageBodyJsonStr)
  96. #print('***************************Send (request-target): post '+path)
  97. #print('***************************Send host: '+headerDomain)
  98. #print('***************************Send date: '+dateStr)
  99. #print('***************************Send digest: '+bodyDigest)
  100. #print('***************************Send Content-type: '+contentType)
  101. #print('***************************Send messageBodyJsonStr: '+messageBodyJsonStr)
  102. headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': contentType}
  103. signatureHeader = \
  104. signPostHeaders(dateStr,privateKeyPem,nickname, \
  105. domain,port, \
  106. toDomain,toPort, \
  107. path,httpPrefix,messageBodyJsonStr)
  108. headers['signature'] = signatureHeader
  109. return headers
  110. def verifyRecentSignature(signedDateStr: str) -> bool:
  111. """Checks whether the given time taken from the header is within
  112. 12 hours of the current time
  113. """
  114. currDate=datetime.datetime.utcnow()
  115. signedDate=datetime.datetime.strptime(signedDateStr,"%a, %d %b %Y %H:%M:%S %Z")
  116. timeDiffSec=(currDate-signedDate).seconds
  117. # 12 hours tollerance
  118. if timeDiffSec > 43200:
  119. print('WARN: Header signed too long ago: '+signedDateStr)
  120. print(str(timeDiffSec/(60*60))+' hours')
  121. return False
  122. if timeDiffSec < 0:
  123. print('WARN: Header signed in the future! '+signedDateStr)
  124. print(str(timeDiffSec/(60*60))+' hours')
  125. return False
  126. return True
  127. def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
  128. path: str,GETmethod: bool, \
  129. messageBodyDigest: str, \
  130. messageBodyJsonStr: str) -> bool:
  131. """Returns true or false depending on if the key that we plugged in here
  132. validates against the headers, method, and path.
  133. publicKeyPem - the public key from an rsa key pair
  134. headers - should be a dictionary of request headers
  135. path - the relative url that was requested from this site
  136. GETmethod - GET or POST
  137. messageBodyJsonStr - the received request body (used for digest)
  138. """
  139. if GETmethod:
  140. method='GET'
  141. else:
  142. method='POST'
  143. publicKeyPem = RSA.import_key(publicKeyPem)
  144. # Build a dictionary of the signature values
  145. signatureHeader = headers['signature']
  146. signatureDict = {
  147. k: v[1:-1]
  148. for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
  149. }
  150. #print('********************signatureHeader: '+str(signatureHeader))
  151. #print('********************signatureDict: '+str(signatureDict))
  152. # Unpack the signed headers and set values based on current headers and
  153. # body (if a digest was included)
  154. signedHeaderList = []
  155. for signedHeader in signatureDict['headers'].split(' '):
  156. if signedHeader == '(request-target)':
  157. signedHeaderList.append(
  158. f'(request-target): {method.lower()} {path}')
  159. #print('***************************Verify (request-target): '+method.lower()+' '+path)
  160. elif signedHeader == 'digest':
  161. if messageBodyDigest:
  162. bodyDigest=messageBodyDigest
  163. else:
  164. bodyDigest = messageContentDigest(messageBodyJsonStr)
  165. signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
  166. #print('***************************Verify digest: SHA-256='+bodyDigest)
  167. #print('***************************Verify messageBodyJsonStr: '+messageBodyJsonStr)
  168. else:
  169. if headers.get(signedHeader):
  170. if signedHeader=='date':
  171. if not verifyRecentSignature(headers[signedHeader]):
  172. return False
  173. #print('***************************Verify '+signedHeader+': '+headers[signedHeader])
  174. signedHeaderList.append(
  175. f'{signedHeader}: {headers[signedHeader]}')
  176. else:
  177. signedHeaderCap=signedHeader.capitalize()
  178. if signedHeaderCap=='Date':
  179. if not verifyRecentSignature(headers[signedHeaderCap]):
  180. return False
  181. #print('***************************Verify '+signedHeaderCap+': '+headers[signedHeaderCap])
  182. if headers.get(signedHeaderCap):
  183. signedHeaderList.append(
  184. f'{signedHeader}: {headers[signedHeaderCap]}')
  185. #print('***********************signedHeaderList: ')
  186. #pprint(signedHeaderList)
  187. # Now we have our header data digest
  188. signedHeaderText = '\n'.join(signedHeaderList)
  189. #print('***********************Verify: signedHeaderText: '+signedHeaderText)
  190. headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
  191. # Get the signature, verify with public key, return result
  192. signature = base64.b64decode(signatureDict['signature'])
  193. try:
  194. pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
  195. return True
  196. except (ValueError, TypeError):
  197. return False