Skip to content

Commit a243827

Browse files
committed
Implemented stream tests, found a bug on the way, slowly a test-framework for streams starts to show up, but its not yet there
1 parent 6fbb693 commit a243827

File tree

5 files changed

+490
-251
lines changed

5 files changed

+490
-251
lines changed

lib/git/odb/fun.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,26 +83,33 @@ def write_object(type, size, read, write, chunk_size=chunk_size):
8383
:param size: amount of bytes to write from source_stream
8484
:param read: read method of a stream providing the content data
8585
:param write: write method of the output stream
86-
:param close_target_stream: if True, the target stream will be closed when
86+
:param close_target_stream: if True, the target stream will be closed when
8787
the routine exits, even if an error is thrown
8888
:return: The actual amount of bytes written to stream, which includes the header and a trailing newline"""
8989
tbw = 0 # total num bytes written
90-
dbw = 0 # num data bytes written
9190

9291
# WRITE HEADER: type SP size NULL
9392
tbw += write("%s %i\0" % (type, size))
93+
tbw += stream_copy(read, write, size, chunk_size)
94+
95+
return tbw
9496

97+
def stream_copy(read, write, size, chunk_size):
98+
"""Copy a stream up to size bytes using the provided read and write methods,
99+
in chunks of chunk_size
100+
:note: its much like stream_copy utility, but operates just using methods"""
101+
dbw = 0 # num data bytes written
102+
95103
# WRITE ALL DATA UP TO SIZE
96104
while True:
97105
cs = min(chunk_size, size-dbw)
98106
data_len = write(read(cs))
99107
dbw += data_len
100108
if data_len < cs or dbw == size:
101-
tbw += dbw
102109
break
103110
# END check for stream end
104111
# END duplicate data
105-
return tbw
106-
112+
return dbw
113+
107114

108115
#} END routines

lib/git/odb/stream.py

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def is_compressed(self):
7575
""":return: True if reads of this stream yield zlib compressed data. Default False
7676
:note: this does not imply anything about the actual internal storage.
7777
Hence the data could be uncompressed, but read compressed, or vice versa"""
78-
raise False
78+
return False
7979

8080
#} END interface
8181

@@ -105,10 +105,12 @@ def __init__(self, type, size, stream, sha=None, compressed=None):
105105

106106
#{ Interface
107107

108+
@property
108109
def hexsha(self):
109110
""":return: our sha, hex encoded, 40 bytes"""
110111
return to_hex_sha(self[0])
111-
112+
113+
@property
112114
def binsha(self):
113115
""":return: our sha as binary, 20 bytes"""
114116
return to_bin_sha(self[0])
@@ -229,10 +231,11 @@ class DecompressMemMapReader(object):
229231
and decompress it into chunks, thats all ... """
230232
__slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close')
231233

232-
max_read_size = 512*1024
234+
max_read_size = 512*1024 # currently unused
233235

234236
def __init__(self, m, close_on_deletion, size):
235-
"""Initialize with mmap for stream reading"""
237+
"""Initialize with mmap for stream reading
238+
:param m: must be content data - use new if you have object data and no size"""
236239
self._m = m
237240
self._zip = zlib.decompressobj()
238241
self._buf = None # buffer of decompressed bytes
@@ -248,32 +251,38 @@ def __del__(self):
248251
self._m.close()
249252
# END handle resource freeing
250253

251-
@classmethod
252-
def new(self, m, close_on_deletion=False):
253-
"""Create a new DecompressMemMapReader instance for acting as a read-only stream
254-
This method parses the object header from m and returns the parsed
255-
type and size, as well as the created stream instance.
256-
:param m: memory map on which to oparate
257-
:param close_on_deletion: if True, the memory map will be closed once we are
258-
being deleted"""
259-
inst = DecompressMemMapReader(m, close_on_deletion, 0)
260-
254+
def _parse_header_info(self):
255+
"""If this stream contains object data, parse the header info and skip the
256+
stream to a point where each read will yield object content
257+
:return: parsed type_string, size"""
261258
# read header
262259
maxb = 512 # should really be enough, cgit uses 8192 I believe
263-
inst._s = maxb
264-
hdr = inst.read(maxb)
260+
self._s = maxb
261+
hdr = self.read(maxb)
265262
hdrend = hdr.find("\0")
266263
type, size = hdr[:hdrend].split(" ")
267264
size = int(size)
268-
inst._s = size
265+
self._s = size
269266

270267
# adjust internal state to match actual header length that we ignore
271268
# The buffer will be depleted first on future reads
272-
inst._br = 0
269+
self._br = 0
273270
hdrend += 1 # count terminating \0
274-
inst._buf = StringIO(hdr[hdrend:])
275-
inst._buflen = len(hdr) - hdrend
271+
self._buf = StringIO(hdr[hdrend:])
272+
self._buflen = len(hdr) - hdrend
273+
274+
return type, size
276275

276+
@classmethod
277+
def new(self, m, close_on_deletion=False):
278+
"""Create a new DecompressMemMapReader instance for acting as a read-only stream
279+
This method parses the object header from m and returns the parsed
280+
type and size, as well as the created stream instance.
281+
:param m: memory map on which to oparate. It must be object data ( header + contents )
282+
:param close_on_deletion: if True, the memory map will be closed once we are
283+
being deleted"""
284+
inst = DecompressMemMapReader(m, close_on_deletion, 0)
285+
type, size = inst._parse_header_info()
277286
return type, size, inst
278287

279288
def read(self, size=-1):
@@ -355,17 +364,22 @@ def read(self, size=-1):
355364
# needs to be as large as the uncompressed bytes we want to read.
356365
self._cws = self._cwe - len(tail)
357366
self._cwe = self._cws + size
358-
359-
360-
indata = self._m[self._cws:self._cwe] # another copy ... :(
361-
# get the actual window end to be sure we don't use it for computations
362-
self._cwe = self._cws + len(indata)
363367
else:
364368
cws = self._cws
365369
self._cws = self._cwe
366370
self._cwe = cws + size
367-
indata = self._m[self._cws:self._cwe] # ... copy it again :(
368371
# END handle tail
372+
373+
374+
# if window is too small, make it larger so zip can decompress something
375+
win_size = self._cwe - self._cws
376+
if win_size < 8:
377+
self._cwe = self._cws + 8
378+
# END adjust winsize
379+
indata = self._m[self._cws:self._cwe] # another copy ... :(
380+
381+
# get the actual window end to be sure we don't use it for computations
382+
self._cwe = self._cws + len(indata)
369383

370384
dcompdat = self._zip.decompress(indata, size)
371385

test/git/performance/test_streams.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33
from test.testlib import *
44
from git.odb import *
55

6-
from array import array
76
from cStringIO import StringIO
87
from time import time
98
import os
109
import sys
1110
import stat
12-
import random
1311
import subprocess
1412

1513

@@ -18,18 +16,11 @@
1816
)
1917

2018

21-
2219
def make_memory_file(size_in_bytes, randomize=False):
2320
""":return: tuple(size_of_stream, stream)
2421
:param randomize: try to produce a very random stream"""
25-
actual_size = size_in_bytes / 4
26-
producer = xrange(actual_size)
27-
if randomize:
28-
producer = list(producer)
29-
random.shuffle(producer)
30-
# END randomize
31-
a = array('i', producer)
32-
return actual_size*4, StringIO(a.tostring())
22+
d = make_bytes(size_in_bytes, randomize)
23+
return len(d), StringIO(d)
3324

3425

3526
class TestObjDBPerformance(TestBigRepoR):

0 commit comments

Comments
 (0)