Viewing file: common.py (16.91 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*- # # SelfTest/Hash/common.py: Common code for Crypto.SelfTest.Hash # # Written in 2008 by Dwayne C. Litzenberger <dlitz@dlitz.net> # # =================================================================== # The contents of this file are dedicated to the public domain. To # the extent that dedication to the public domain is not available, # everyone is granted a worldwide, perpetual, royalty-free, # non-exclusive license to exercise all rights associated with the # contents of this file for any purpose whatsoever. # No rights are reserved. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # ===================================================================
"""Self-testing for PyCrypto hash modules"""
import unittest from binascii import a2b_hex, b2a_hex, hexlify
from Crypto.Util.py3compat import b from Crypto.Util.strxor import strxor_c
class _NoDefault: pass # sentinel object def _extract(d, k, default=_NoDefault): """Get an item from a dictionary, and remove it from the dictionary.""" try: retval = d[k] except KeyError: if default is _NoDefault: raise return default del d[k] return retval
# Generic cipher test case class CipherSelfTest(unittest.TestCase):
def __init__(self, module, params): unittest.TestCase.__init__(self) self.module = module
# Extract the parameters params = params.copy() self.description = _extract(params, 'description') self.key = b(_extract(params, 'key')) self.plaintext = b(_extract(params, 'plaintext')) self.ciphertext = b(_extract(params, 'ciphertext')) self.module_name = _extract(params, 'module_name', None) self.assoc_data = _extract(params, 'assoc_data', None) self.mac = _extract(params, 'mac', None) if self.assoc_data: self.mac = b(self.mac)
mode = _extract(params, 'mode', None) self.mode_name = str(mode)
if mode is not None: # Block cipher self.mode = getattr(self.module, "MODE_" + mode)
self.iv = _extract(params, 'iv', None) if self.iv is None: self.iv = _extract(params, 'nonce', None) if self.iv is not None: self.iv = b(self.iv)
else: # Stream cipher self.mode = None self.iv = _extract(params, 'iv', None) if self.iv is not None: self.iv = b(self.iv)
self.extra_params = params
def shortDescription(self): return self.description
def _new(self): params = self.extra_params.copy() key = a2b_hex(self.key)
old_style = [] if self.mode is not None: old_style = [ self.mode ] if self.iv is not None: old_style += [ a2b_hex(self.iv) ]
return self.module.new(key, *old_style, **params)
def isMode(self, name): if not hasattr(self.module, "MODE_"+name): return False return self.mode == getattr(self.module, "MODE_"+name)
def runTest(self): plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) assoc_data = [] if self.assoc_data: assoc_data = [ a2b_hex(b(x)) for x in self.assoc_data]
ct = None pt = None
# # Repeat the same encryption or decryption twice and verify # that the result is always the same # for i in range(2): cipher = self._new() decipher = self._new()
# Only AEAD modes for comp in assoc_data: cipher.update(comp) decipher.update(comp)
ctX = b2a_hex(cipher.encrypt(plaintext)) ptX = b2a_hex(decipher.decrypt(ciphertext))
if ct: self.assertEqual(ct, ctX) self.assertEqual(pt, ptX) ct, pt = ctX, ptX
self.assertEqual(self.ciphertext, ct) # encrypt self.assertEqual(self.plaintext, pt) # decrypt
if self.mac: mac = b2a_hex(cipher.digest()) self.assertEqual(self.mac, mac) decipher.verify(a2b_hex(self.mac))
class CipherStreamingSelfTest(CipherSelfTest):
def shortDescription(self): desc = self.module_name if self.mode is not None: desc += " in %s mode" % (self.mode_name,) return "%s should behave like a stream cipher" % (desc,)
def runTest(self): plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext)
# The cipher should work like a stream cipher
# Test counter mode encryption, 3 bytes at a time ct3 = [] cipher = self._new() for i in range(0, len(plaintext), 3): ct3.append(cipher.encrypt(plaintext[i:i+3])) ct3 = b2a_hex(b("").join(ct3)) self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
# Test counter mode decryption, 3 bytes at a time pt3 = [] cipher = self._new() for i in range(0, len(ciphertext), 3): pt3.append(cipher.encrypt(ciphertext[i:i+3])) # PY3K: This is meant to be text, do not change to bytes (data) pt3 = b2a_hex(b("").join(pt3)) self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
class RoundtripTest(unittest.TestCase): def __init__(self, module, params): from Crypto import Random unittest.TestCase.__init__(self) self.module = module self.iv = Random.get_random_bytes(module.block_size) self.key = b(params['key']) self.plaintext = 100 * b(params['plaintext']) self.module_name = params.get('module_name', None)
def shortDescription(self): return """%s .decrypt() output of .encrypt() should not be garbled""" % (self.module_name,)
def runTest(self):
## ECB mode mode = self.module.MODE_ECB encryption_cipher = self.module.new(a2b_hex(self.key), mode) ciphertext = encryption_cipher.encrypt(self.plaintext) decryption_cipher = self.module.new(a2b_hex(self.key), mode) decrypted_plaintext = decryption_cipher.decrypt(ciphertext) self.assertEqual(self.plaintext, decrypted_plaintext)
class IVLengthTest(unittest.TestCase): def __init__(self, module, params): unittest.TestCase.__init__(self) self.module = module self.key = b(params['key'])
def shortDescription(self): return "Check that all modes except MODE_ECB and MODE_CTR require an IV of the proper length"
def runTest(self): self.assertRaises(TypeError, self.module.new, a2b_hex(self.key), self.module.MODE_ECB, b(""))
def _dummy_counter(self): return "\0" * self.module.block_size
class NoDefaultECBTest(unittest.TestCase): def __init__(self, module, params): unittest.TestCase.__init__(self) self.module = module self.key = b(params['key'])
def runTest(self): self.assertRaises(TypeError, self.module.new, a2b_hex(self.key))
class BlockSizeTest(unittest.TestCase): def __init__(self, module, params): unittest.TestCase.__init__(self) self.module = module self.key = a2b_hex(b(params['key']))
def runTest(self): cipher = self.module.new(self.key, self.module.MODE_ECB) self.assertEqual(cipher.block_size, self.module.block_size)
class ByteArrayTest(unittest.TestCase): """Verify we can use bytearray's for encrypting and decrypting"""
def __init__(self, module, params): unittest.TestCase.__init__(self) self.module = module
# Extract the parameters params = params.copy() self.description = _extract(params, 'description') self.key = b(_extract(params, 'key')) self.plaintext = b(_extract(params, 'plaintext')) self.ciphertext = b(_extract(params, 'ciphertext')) self.module_name = _extract(params, 'module_name', None) self.assoc_data = _extract(params, 'assoc_data', None) self.mac = _extract(params, 'mac', None) if self.assoc_data: self.mac = b(self.mac)
mode = _extract(params, 'mode', None) self.mode_name = str(mode)
if mode is not None: # Block cipher self.mode = getattr(self.module, "MODE_" + mode)
self.iv = _extract(params, 'iv', None) if self.iv is None: self.iv = _extract(params, 'nonce', None) if self.iv is not None: self.iv = b(self.iv) else: # Stream cipher self.mode = None self.iv = _extract(params, 'iv', None) if self.iv is not None: self.iv = b(self.iv)
self.extra_params = params
def _new(self): params = self.extra_params.copy() key = a2b_hex(self.key)
old_style = [] if self.mode is not None: old_style = [ self.mode ] if self.iv is not None: old_style += [ a2b_hex(self.iv) ]
return self.module.new(key, *old_style, **params)
def runTest(self):
plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) assoc_data = [] if self.assoc_data: assoc_data = [ bytearray(a2b_hex(b(x))) for x in self.assoc_data]
cipher = self._new() decipher = self._new()
# Only AEAD modes for comp in assoc_data: cipher.update(comp) decipher.update(comp)
ct = b2a_hex(cipher.encrypt(bytearray(plaintext))) pt = b2a_hex(decipher.decrypt(bytearray(ciphertext)))
self.assertEqual(self.ciphertext, ct) # encrypt self.assertEqual(self.plaintext, pt) # decrypt
if self.mac: mac = b2a_hex(cipher.digest()) self.assertEqual(self.mac, mac) decipher.verify(bytearray(a2b_hex(self.mac)))
class MemoryviewTest(unittest.TestCase): """Verify we can use memoryviews for encrypting and decrypting"""
def __init__(self, module, params): unittest.TestCase.__init__(self) self.module = module
# Extract the parameters params = params.copy() self.description = _extract(params, 'description') self.key = b(_extract(params, 'key')) self.plaintext = b(_extract(params, 'plaintext')) self.ciphertext = b(_extract(params, 'ciphertext')) self.module_name = _extract(params, 'module_name', None) self.assoc_data = _extract(params, 'assoc_data', None) self.mac = _extract(params, 'mac', None) if self.assoc_data: self.mac = b(self.mac)
mode = _extract(params, 'mode', None) self.mode_name = str(mode)
if mode is not None: # Block cipher self.mode = getattr(self.module, "MODE_" + mode)
self.iv = _extract(params, 'iv', None) if self.iv is None: self.iv = _extract(params, 'nonce', None) if self.iv is not None: self.iv = b(self.iv) else: # Stream cipher self.mode = None self.iv = _extract(params, 'iv', None) if self.iv is not None: self.iv = b(self.iv)
self.extra_params = params
def _new(self): params = self.extra_params.copy() key = a2b_hex(self.key)
old_style = [] if self.mode is not None: old_style = [ self.mode ] if self.iv is not None: old_style += [ a2b_hex(self.iv) ]
return self.module.new(key, *old_style, **params)
def runTest(self):
plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) assoc_data = [] if self.assoc_data: assoc_data = [ memoryview(a2b_hex(b(x))) for x in self.assoc_data]
cipher = self._new() decipher = self._new()
# Only AEAD modes for comp in assoc_data: cipher.update(comp) decipher.update(comp)
ct = b2a_hex(cipher.encrypt(memoryview(plaintext))) pt = b2a_hex(decipher.decrypt(memoryview(ciphertext)))
self.assertEqual(self.ciphertext, ct) # encrypt self.assertEqual(self.plaintext, pt) # decrypt
if self.mac: mac = b2a_hex(cipher.digest()) self.assertEqual(self.mac, mac) decipher.verify(memoryview(a2b_hex(self.mac)))
def make_block_tests(module, module_name, test_data, additional_params=dict()): tests = [] extra_tests_added = False for i in range(len(test_data)): row = test_data[i]
# Build the "params" dictionary with # - plaintext # - ciphertext # - key # - mode (default is ECB) # - (optionally) description # - (optionally) any other parameter that this cipher mode requires params = {} if len(row) == 3: (params['plaintext'], params['ciphertext'], params['key']) = row elif len(row) == 4: (params['plaintext'], params['ciphertext'], params['key'], params['description']) = row elif len(row) == 5: (params['plaintext'], params['ciphertext'], params['key'], params['description'], extra_params) = row params.update(extra_params) else: raise AssertionError("Unsupported tuple size %d" % (len(row),))
if not "mode" in params: params["mode"] = "ECB"
# Build the display-name for the test p2 = params.copy() p_key = _extract(p2, 'key') p_plaintext = _extract(p2, 'plaintext') p_ciphertext = _extract(p2, 'ciphertext') p_mode = _extract(p2, 'mode') p_description = _extract(p2, 'description', None)
if p_description is not None: description = p_description elif p_mode == 'ECB' and not p2: description = "p=%s, k=%s" % (p_plaintext, p_key) else: description = "p=%s, k=%s, %r" % (p_plaintext, p_key, p2) name = "%s #%d: %s" % (module_name, i+1, description) params['description'] = name params['module_name'] = module_name params.update(additional_params)
# Add extra test(s) to the test suite before the current test if not extra_tests_added: tests += [ RoundtripTest(module, params), IVLengthTest(module, params), NoDefaultECBTest(module, params), ByteArrayTest(module, params), BlockSizeTest(module, params), ] extra_tests_added = True
# Add the current test to the test suite tests.append(CipherSelfTest(module, params))
return tests
def make_stream_tests(module, module_name, test_data): tests = [] extra_tests_added = False for i in range(len(test_data)): row = test_data[i]
# Build the "params" dictionary params = {} if len(row) == 3: (params['plaintext'], params['ciphertext'], params['key']) = row elif len(row) == 4: (params['plaintext'], params['ciphertext'], params['key'], params['description']) = row elif len(row) == 5: (params['plaintext'], params['ciphertext'], params['key'], params['description'], extra_params) = row params.update(extra_params) else: raise AssertionError("Unsupported tuple size %d" % (len(row),))
# Build the display-name for the test p2 = params.copy() p_key = _extract(p2, 'key') p_plaintext = _extract(p2, 'plaintext') p_ciphertext = _extract(p2, 'ciphertext') p_description = _extract(p2, 'description', None)
if p_description is not None: description = p_description elif not p2: description = "p=%s, k=%s" % (p_plaintext, p_key) else: description = "p=%s, k=%s, %r" % (p_plaintext, p_key, p2) name = "%s #%d: %s" % (module_name, i+1, description) params['description'] = name params['module_name'] = module_name
# Add extra test(s) to the test suite before the current test if not extra_tests_added: tests += [ ByteArrayTest(module, params), ]
tests.append(MemoryviewTest(module, params)) extra_tests_added = True
# Add the test to the test suite tests.append(CipherSelfTest(module, params)) tests.append(CipherStreamingSelfTest(module, params)) return tests
# vim:set ts=4 sw=4 sts=4 expandtab:
|