# # # patch "dws/dws.py" # from [42db972f333461d07a15be7090a34f9314eb295b] # to [c4f712a38239b57a0079c5c12caaf4c3900f07eb] # # patch "dws/dws_test.py" # from [951e59bb43babff749da77b3fd6a88ab03327e7a] # to [008fb3f4cc233de176539c6de51a45cce3d3def4] # # patch "fs_dws.py" # from [ade7ad807f6d378f732a0313d502d3966062694d] # to [fa096d848bc2bae5274217e26fd78dec2ecf3113] # ============================================================ --- dws/dws.py 42db972f333461d07a15be7090a34f9314eb295b +++ dws/dws.py c4f712a38239b57a0079c5c12caaf4c3900f07eb @@ -14,68 +14,89 @@ import re import sys import re +class Urllib2Opener: + def __init__(self, baseurl): + (scheme, host, path, query, frag) = urlparse.urlsplit(baseurl, "http") + self.baseUrl = scheme + "://" + host + + def request(self, uri, headers = None, postData = None): + theUrl = self.baseUrl + uri + if postData: + req = urllib2.Request( url=theUrl, data = postData ) + else: + req = urllib2.Request( url=theUrl ) + + f = urllib2.urlopen(req) + return (f.code, f.msg, f.headers.dict, f.read()) + + +class HTTPLibOpener: + def __init__(self, baseurl): + (scheme, host, path, query, frag) = urlparse.urlsplit(baseurl, "http") + if scheme == "http": + self.conn = httplib.HTTPConnection(host) + else: + self.verbose(1,"https connection to %s" % host) + self.baseUrl = path + + def request(self, uri, headers = None, postData = None): + h = (headers or {}).copy() + h['Connection'] = 'keep-alive' + if postData: + self.conn.request("POST", uri, body = postData, headers = h) + else: + self.conn.request("GET", uri, headers = h) + req = self.conn.getresponse() + return (req.status, req.reason, dict(req.getheaders()), req.read()) + class Connection: def __init__(self, base, path="", verbosity=0): + """ + base - address of DWS server (example: http://yourhost.org/dws.php" + name - base path in DWS namespace + """ self.base = base self.path = path + if len(self.path) > 0 and self.path[-1] != '/': + self.path += '/' self.verbosity = verbosity (scheme, host, path, query, frag) = urlparse.urlsplit(self.base, "http") if scheme not in ("http","https"): raise Exception("bad scheme: http,https are supported") - self.host = host - if scheme == "http": - self.conn = httplib.HTTPConnection(host) - self.verbose(1,"http connection to %s" % host) - else: - self.conn = httplib.HTTPSConnection(host) - self.verbose(1,"https connection to %s" % host) - self.baseUrl = "/" + path + self.opener = HTTPLibOpener(self.base) + self.baseUri = path - def request(self, what, method="GET", postData = "", **kwargs): - theUrl = self.baseUrl + def request(self, what, method="GET", postData = "", requestHeaders = None, responseHeaders = None, **kwargs): + theUri = self.baseUri args = {} args.update(kwargs) args['r'] = str(what) - if theUrl[-1].find('?') != -1: + if theUri[-1].find('?') != -1: d = "&" else: d = "?" for k,v in args.iteritems(): - theUrl += d + k + "=" + str(v) + theUri += d + k + "=" + str(v) d = "&" if postData: - self.verbose( 1, "requesting %s (POST %i bytes)" % (theUrl, len(postData)) ) + self.verbose( 1, "requesting %s (POST %i bytes)" % (theUri, len(postData)) ) else: - self.verbose( 1, "requesting %s" % theUrl ) - r = None - if self.conn: - headers = { - 'Connection': 'keep-alive' - } - if postData: - self.conn.request("POST", theUrl, body = postData, headers = headers) - else: - self.conn.request("GET", theUrl, headers = headers) - req = self.conn.getresponse() - try: - r = str(req.read()) - except Exception,e: - self.error( "request failed (%s): %s" %(theUrl, str(e)) ) - raise - else: - if postData: - req = urllib2.Request( url=theUrl, data = postData ) - else: - req = urllib2.Request( url=theUrl ) + self.verbose( 1, "requesting %s" % theUri ) + respStatus, respReason, respHeaders, respContent = self.opener.request(theUri, requestHeaders, postData) + if respStatus != httplib.OK: + raise IOError("DWS not responding, HTTP response %i %s (%s)" % (respStatus,respReason,theUri)) + dwsStatus = respHeaders.get("x-dws-status","BAD") + if dwsStatus != "OK": + self.warning("DWS bad response\ndws server: " + "\ndws server: ".join( respContent.split("\n") ) ) + dwsMessage = respHeaders.get("x-dws-message","unknown error") + raise IOError("DWS server: %s: %s (%s)" % (dwsStatus,dwsMessage,theUri)) + if responseHeaders is not None: + responseHeaders.update(respHeaders) + + self.verbose(3,"'%s': headers: '%s'" % (theUri, repr(respHeaders))) + self.verbose(2,"'%s': readed %i bytes" % (theUri, len(respContent))) + self.verbose(3,"'%s': content: '%s'" % (theUri, repr(str(respContent)))) + return respContent - try: - f = urllib2.urlopen(req) - r = str(f.read()) - except Exception,e: - self.error( "request failed (%s): %s" %(theUrl, str(e)) ) - raise - self.verbose(2,"'%s': readed %i bytes" % (theUrl, len(r))) - return r - def clear(self): self.request("clear") @@ -119,6 +140,55 @@ class Connection: realName = self.path + name return self.request("put", n = realName, postData=content) + # params: + # files = [ (name, content), ... ] + # returns: + # dws.Response + def putMany(self, files): + agrContent = "" + names = [] + sizes = [] + for name, content in files: + agrContent += content + names.append(self.path + name) + sizes.append(str(len(content))) + + return self.request("put_many", n=",".join(names), s=",".join(sizes), postData = agrContent) + + # + # params: + # names = [ filename, ... ] + # returns: + # false -> error + # [ content, ... ] -> success + # content[i] == None -> error occured during rading i-th file + # + def getMany(self, names): + responseHeaders = {} + agrContent = self.request("get_many", n=",".join(self.path+name for name in names), responseHeaders = responseHeaders) + if not responseHeaders.has_key('x-dws-sizes'): + raise Exception("DWS: getMany response invalid") + sizesStr = responseHeaders['x-dws-sizes'] + try: + sizes = map(int,sizesStr.split(",")) + except ValueError: + raise Exception("DWS: getMany response invalid: invalid sizes descriptor") + if len(sizes) != len(names): + raise Exception("DWS: getMany response invalid: mismatched files count") + expectedSize = [s for s in sizes if s != -1] + if sum(expectedSize) != len(agrContent): + raise Exception("DWS: getMany response invalid: mismatched response size (exp=%i, act=%i)" % (expectedSize, len(agrContent))) + result = [] + offset = 0 + for size in sizes: + if size != -1: + content = agrContent[offset:offset+size] + offset += size + result.append(content) + else: + result.append(None) + return result + def append(self, name, content): realName = self.path + name return self.request("append", n = realName, postData=content) ============================================================ --- dws/dws_test.py 951e59bb43babff749da77b3fd6a88ab03327e7a +++ dws/dws_test.py 008fb3f4cc233de176539c6de51a45cce3d3def4 @@ -12,12 +12,15 @@ class TestSequenceFunctions(unittest.Tes class TestSequenceFunctions(unittest.TestCase): def setUp(self): - self.c = Connection("http://localhost/mtdumb/dws/impl/php/mtdumb.php","path/") - self.c.clear() + self.c = Connection("http://zbigg.internet.v.pl/zbigg-dump/","dws_python_test/") def testList(self): self.c.list() + def assertEquals(self, actual, expected, name = ""): + if actual != expected: + raise AssertionError("%sactual(%s) differs from expected(%s)" % (name,repr(actual), repr(expected))) + def __doTestContent(self,name,content): self.c.put(name, content) self.assert_(int(self.c.stat(name)['size']) == len(content)) @@ -49,6 +52,17 @@ class TestSequenceFunctions(unittest.Tes ly = lx/2 self.assert_(self.c.getParts(name, [(0,ly), (ly, lx), (ly+lx,ly)] ) == content2 ) self.assert_(self.c.getParts(name, [(0,ly), (ly+lx,ly)] ) == content ) + def __testMany(self, files): + from itertools import izip,count + names = [ name for name, content in files] + contents = [content for name, content in files ] + self.c.putMany(files) + c1 = self.c.getMany(names) + for i,act,exp in izip(count(),c1,contents): + self.assertEquals(act, exp, name="C%i: " %i) + xact = self.c.get(names[i]) + self.assertEquals(xact, exp, name="C(single)%i" % i ) + def testPutEmpty(self): self.__doTestContent("f1", "") def testPutZero(self): @@ -63,7 +77,18 @@ class TestSequenceFunctions(unittest.Tes z += " " self.__doTestContent("f5", z) - + def testMany_zeros(self): + files = [ ("m1%i" % i, "" ) for i in range(10) ] + self.__testMany(files) + + def testMany_onebytes(self): + files = [ ("m2%i" % i, ("%c" % i)*1 ) for i in range(10) ] + self.__testMany(files) + + def testMany_onearith(self): + files = [ ("m3%i" % i, ("%c" % i)*i ) for i in range(100) ] + self.__testMany(files) + def testGet(self): pass ============================================================ --- fs_dws.py ade7ad807f6d378f732a0313d502d3966062694d +++ fs_dws.py fa096d848bc2bae5274217e26fd78dec2ecf3113 @@ -15,6 +15,7 @@ import StringIO import dws.dws import sys import StringIO +from itertools import izip class DWSReadableFS(fs.ReadableFS): def __init__(self, hostspec): @@ -25,7 +26,7 @@ class DWSReadableFS(fs.ReadableFS): else: self.path = hostspec[pathIndex+1:] self.hostspec = hostspec[:pathIndex] - self.conn = dws.Connection(self.hostspec,self.path) + self.conn = dws.Connection(self.hostspec,self.path,verbosity=2) self.version = 0 def list(self): @@ -44,15 +45,9 @@ class DWSReadableFS(fs.ReadableFS): return StringIO.StringIO(content) def fetch(self, filenames): - files = {} - list = self.list() - for fn in filenames: - try: - if fn not in list: raise IOError("not found: %s" % fn) - files[fn] = self.conn.get(fn) - except IOError,e: - files[fn] = None - return files + contents = self.conn.getMany(filenames) + assert len(contents) == len(filenames) + return dict( izip( filenames, contents) ) def _real_fetch_bytes(self, filename, bytes): fc = self.conn.getParts(filename, bytes) @@ -94,18 +89,10 @@ class DWSWriteableFS(DWSReadableFS, fs.W try: return int(self.conn.stat(filename)['size']) except Exception,e: - return 0 + return 0 - def _exists(self, full_fn): - try: - return full_fn in self.list() - except IOError: - return False - return True - def put(self, filenames): - for fn, data in filenames.iteritems(): - self.conn.put(fn, data) + self.conn.putMany(filenames.iteritems()) def rollback_interrupted_puts(self, filenames): pass