Viewing file: test_stream.py (5.6 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors # # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Test for object db"""
from gitdb.test.lib import ( TestBase, DummyStream, make_bytes, make_object, fixture_path )
from gitdb import ( DecompressMemMapReader, FDCompressedSha1Writer, LooseObjectDB, Sha1Writer, MemoryDB, IStream, ) from gitdb.util import hex_to_bin
import zlib from gitdb.typ import ( str_blob_type )
import tempfile import os from io import BytesIO
class TestStream(TestBase):
"""Test stream classes"""
data_sizes = (15, 10000, 1000 * 1024 + 512)
def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): """Make stream tests - the orig_stream is seekable, allowing it to be rewound and reused :param cdata: the data we expect to read from stream, the contents :param rewind_stream: function called to rewind the stream to make it ready for reuse""" ns = 10 assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))
# read in small steps ss = len(cdata) // ns for i in range(ns): data = stream.read(ss) chunk = cdata[i * ss:(i + 1) * ss] assert data == chunk # END for each step rest = stream.read() if rest: assert rest == cdata[-len(rest):] # END handle rest
if isinstance(stream, DecompressMemMapReader): assert len(stream.data()) == stream.compressed_bytes_read() # END handle special type
rewind_stream(stream)
# read everything rdata = stream.read() assert rdata == cdata
if isinstance(stream, DecompressMemMapReader): assert len(stream.data()) == stream.compressed_bytes_read() # END handle special type
def test_decompress_reader(self): for close_on_deletion in range(2): for with_size in range(2): for ds in self.data_sizes: cdata = make_bytes(ds, randomize=False)
# zdata = zipped actual data # cdata = original content data
# create reader if with_size: # need object data zdata = zlib.compress(make_object(str_blob_type, cdata)) typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) assert size == len(cdata) assert typ == str_blob_type
# even if we don't set the size, it will be set automatically on first read test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) assert test_reader._s == len(cdata) else: # here we need content data zdata = zlib.compress(cdata) reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) assert reader._s == len(cdata) # END get reader
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
# put in a dummy stream for closing dummy = DummyStream() reader._m = dummy
assert not dummy.closed del(reader) assert dummy.closed == close_on_deletion # END for each datasize # END whether size should be used # END whether stream should be closed when deleted
def test_sha_writer(self): writer = Sha1Writer() assert 2 == writer.write(b"hi") assert len(writer.sha(as_hex=1)) == 40 assert len(writer.sha(as_hex=0)) == 20
# make sure it does something ;) prev_sha = writer.sha() writer.write(b"hi again") assert writer.sha() != prev_sha
def test_compressed_writer(self): for ds in self.data_sizes: fd, path = tempfile.mkstemp() ostream = FDCompressedSha1Writer(fd) data = make_bytes(ds, randomize=False)
# for now, just a single write, code doesn't care about chunking assert len(data) == ostream.write(data) ostream.close()
# its closed already self.assertRaises(OSError, os.close, fd)
# read everything back, compare to data we zip fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0)) written_data = os.read(fd, os.path.getsize(path)) assert len(written_data) == os.path.getsize(path) os.close(fd) assert written_data == zlib.compress(data, 1) # best speed
os.remove(path) # END for each os
def test_decompress_reader_special_case(self): odb = LooseObjectDB(fixture_path('objects')) mdb = MemoryDB() for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911', b'7bb839852ed5e3a069966281bb08d50012fb309b',): ostream = odb.stream(hex_to_bin(sha))
# if there is a bug, we will be missing one byte exactly ! data = ostream.read() assert len(data) == ostream.size
# Putting it back in should yield nothing new - after all, we have dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data))) assert dump.hexsha == sha # end for each loose object sha to test
|