httpsig.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. __filename__ = "posts.py"
  2. __author__ = "Bob Mottram"
  3. __credits__ = ['lamia']
  4. __license__ = "AGPL3+"
  5. __version__ = "0.0.1"
  6. __maintainer__ = "Bob Mottram"
  7. __email__ = "bob@freedombone.net"
  8. __status__ = "Production"
  9. # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
  10. from Crypto.PublicKey import RSA
  11. from Crypto.Hash import SHA256
  12. #from Crypto.Signature import PKCS1_v1_5
  13. from Crypto.Signature import pkcs1_15
  14. from requests.auth import AuthBase
  15. import base64
  16. import json
  17. from time import gmtime, strftime
  18. from pprint import pprint
  19. def messageContentDigest(messageBodyJsonStr: str) -> str:
  20. return base64.b64encode(SHA256.new(messageBodyJsonStr.encode('utf-8')).digest()).decode('utf-8')
  21. def signPostHeaders(dateStr: str,privateKeyPem: str, \
  22. nickname: str, \
  23. domain: str,port: int, \
  24. toDomain: str,toPort: int, \
  25. path: str, \
  26. httpPrefix: str, \
  27. messageBodyJsonStr: str) -> str:
  28. """Returns a raw signature string that can be plugged into a header and
  29. used to verify the authenticity of an HTTP transmission.
  30. """
  31. if port:
  32. if port!=80 and port!=443:
  33. if ':' not in domain:
  34. domain=domain+':'+str(port)
  35. if toPort:
  36. if toPort!=80 and toPort!=443:
  37. if ':' not in toDomain:
  38. toDomain=toDomain+':'+str(port)
  39. if not dateStr:
  40. dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  41. keyID=httpPrefix+'://'+domain+'/users/'+nickname+'#main-key'
  42. if not messageBodyJsonStr:
  43. headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'content-type': 'application/json'}
  44. else:
  45. bodyDigest=messageContentDigest(messageBodyJsonStr)
  46. headers={'(request-target)': f'post {path}','host': toDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': 'application/activity+json'}
  47. privateKeyPem=RSA.import_key(privateKeyPem)
  48. #headers.update({
  49. # '(request-target)': f'post {path}',
  50. #})
  51. # build a digest for signing
  52. signedHeaderKeys = headers.keys()
  53. signedHeaderText = ''
  54. for headerKey in signedHeaderKeys:
  55. signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
  56. print(f'*********************signing: headerKey: {headerKey}: {headers[headerKey]}')
  57. signedHeaderText = signedHeaderText.strip()
  58. print('******************************Send: signedHeaderText: '+signedHeaderText)
  59. headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
  60. # Sign the digest
  61. rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
  62. signature = base64.b64encode(rawSignature).decode('ascii')
  63. # Put it into a valid HTTP signature format
  64. signatureDict = {
  65. 'keyId': keyID,
  66. 'algorithm': 'rsa-sha256',
  67. 'headers': ' '.join(signedHeaderKeys),
  68. 'signature': signature
  69. }
  70. signatureHeader = ','.join(
  71. [f'{k}="{v}"' for k, v in signatureDict.items()])
  72. return signatureHeader
  73. def createSignedHeader(privateKeyPem: str,nickname: str, \
  74. domain: str,port: int, \
  75. toDomain: str,toPort: int, \
  76. path: str,httpPrefix: str,withDigest: bool, \
  77. messageBodyJsonStr: str) -> {}:
  78. """Note that the domain is the destination, not the sender
  79. """
  80. contentType='application/activity+json'
  81. headerDomain=toDomain
  82. if toPort:
  83. if toPort!=80 and toPort!=443:
  84. if ':' not in headerDomain:
  85. headerDomain=headerDomain+':'+str(toPort)
  86. dateStr=strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  87. if not withDigest:
  88. headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr}
  89. signatureHeader = \
  90. signPostHeaders(dateStr,privateKeyPem,nickname, \
  91. domain,port,toDomain,toPort, \
  92. path,httpPrefix,None)
  93. else:
  94. bodyDigest=messageContentDigest(messageBodyJsonStr)
  95. print('***************************Send (request-target): post '+path)
  96. print('***************************Send host: '+headerDomain)
  97. print('***************************Send date: '+dateStr)
  98. print('***************************Send digest: '+bodyDigest)
  99. print('***************************Send Content-type: '+contentType)
  100. print('***************************Send messageBodyJsonStr: '+messageBodyJsonStr)
  101. headers = {'(request-target)': f'post {path}','host': headerDomain,'date': dateStr,'digest': f'SHA-256={bodyDigest}','content-type': contentType}
  102. signatureHeader = \
  103. signPostHeaders(dateStr,privateKeyPem,nickname, \
  104. domain,port, \
  105. toDomain,toPort, \
  106. path,httpPrefix,messageBodyJsonStr)
  107. headers['signature'] = signatureHeader
  108. return headers
  109. def verifyPostHeaders(httpPrefix: str,publicKeyPem: str,headers: dict, \
  110. path: str,GETmethod: bool, \
  111. messageBodyDigest: str, \
  112. messageBodyJsonStr: str) -> bool:
  113. """Returns true or false depending on if the key that we plugged in here
  114. validates against the headers, method, and path.
  115. publicKeyPem - the public key from an rsa key pair
  116. headers - should be a dictionary of request headers
  117. path - the relative url that was requested from this site
  118. GETmethod - GET or POST
  119. messageBodyJsonStr - the received request body (used for digest)
  120. """
  121. if GETmethod:
  122. method='GET'
  123. else:
  124. method='POST'
  125. publicKeyPem = RSA.import_key(publicKeyPem)
  126. # Build a dictionary of the signature values
  127. signatureHeader = headers['signature']
  128. signatureDict = {
  129. k: v[1:-1]
  130. for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
  131. }
  132. print('********************signatureHeader: '+str(signatureHeader))
  133. print('********************signatureDict: '+str(signatureDict))
  134. # Unpack the signed headers and set values based on current headers and
  135. # body (if a digest was included)
  136. signedHeaderList = []
  137. for signedHeader in signatureDict['headers'].split(' '):
  138. if signedHeader == '(request-target)':
  139. signedHeaderList.append(
  140. f'(request-target): {method.lower()} {path}')
  141. print('***************************Verify (request-target): '+method.lower()+' '+path)
  142. elif signedHeader == 'digest':
  143. if messageBodyDigest:
  144. bodyDigest=messageBodyDigest
  145. else:
  146. bodyDigest = messageContentDigest(messageBodyJsonStr)
  147. signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
  148. print('***************************Verify digest: SHA-256='+bodyDigest)
  149. print('***************************Verify messageBodyJsonStr: '+messageBodyJsonStr)
  150. else:
  151. if headers.get(signedHeader):
  152. print('***************************Verify '+signedHeader+': '+headers[signedHeader])
  153. signedHeaderList.append(
  154. f'{signedHeader}: {headers[signedHeader]}')
  155. else:
  156. signedHeaderCap=signedHeader.capitalize()
  157. print('***************************Verify '+signedHeaderCap+': '+headers[signedHeaderCap])
  158. if headers.get(signedHeaderCap):
  159. signedHeaderList.append(
  160. f'{signedHeader}: {headers[signedHeaderCap]}')
  161. print('***********************signedHeaderList: ')
  162. pprint(signedHeaderList)
  163. # Now we have our header data digest
  164. signedHeaderText = '\n'.join(signedHeaderList)
  165. print('***********************Verify: signedHeaderText: '+signedHeaderText)
  166. headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
  167. # Get the signature, verify with public key, return result
  168. signature = base64.b64decode(signatureDict['signature'])
  169. try:
  170. pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
  171. return True
  172. except (ValueError, TypeError):
  173. return False