gnumed-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Gnumed-devel] getting closer to a version that "makes sense"


From: Ian Haywood
Subject: Re: [Gnumed-devel] getting closer to a version that "makes sense"
Date: Thu, 07 Apr 2005 08:05:30 +1000
User-agent: Mozilla Thunderbird 0.8 (X11/20041012)

Tim Churches wrote:

Only those versions, or those versions or later. I have Python 2.4.1 on
all my systems. Will wxPython 2.5.4.1 work?
I find it does, although I am only loading a few modules.

Only 2.4 is available as a tarball or zip file from the pyPgSQL
SourceForge download page. Is that the one you mean? I ask because teh
version in CVS has serveral changes, but has not been packaged.

Has anyone tested pyPgSQL 2.4 with PostgreSQL 8.01? Or do I need
PostgreSQL 7.4? Our testing of pyPgSQL 2.4 with PostgreSQl 8.0 revealed
a few issues, and we'll be distributing a patch file for pyPgSQL 2.4 to
fix these with NetEpi, unless Gerhard Haering updates PyPgSQL and
releases a new tarball/zipfile for it - but he hasn't for over a year
and hasn't included a vital patch we submitted - so we have to
distribute our own patch file - and may have to distribute a forked
version if he doesn't include the fixes soon.
I've never liked pyPgSQL. It relies on the arcane ( and obsolete since python 
2.3) mxDateTime which has caused me no end of problems.

Ages ago I wrote a Postgres client library in Python (just for fun really) 
which I attach, it took me about as long to write
as getting mx+ pypgsql to compile on my system.

If we have to maintain pypgsql ourselves there seems little downside to using 
this, it cuts our dependencies from 4 to 2. (python, wxpython)
We get better error messages, stored queries, pipelining, etc. because we're 
using the new wire protocol.

Ian


import socket, sys, struct, exceptions, types, errno


class Error (exceptions.StandardError):
    pass

class DatabaseError (Error):
    pass

class InterfaceError (Error):
    pass

apilevel = "2.0"

threadsafety = 1
BINARY=1
STRING=0
NUMBER=2
DATETIME=3
ROWID=4
BOOL=5
ARRAY=6

OID_BOOL=16
OID_INT8=20
OID_INT4=23
OID_INT2=21
OID_OID=26
OID_FLOAT4=700
OID_FLOAT8=701
OID_DATE=1082
OID_TIMESTAMP=1114
OID_TIME=1083
OID_TIMESTAMPTZ=1184
OID_TIMETZ=1266

type_map = {OID_BOOL:BOOL, OID_INT8:NUMBER, OID_INT4:NUMBER, OID_INT2:NUMBER, 
OID_FLOAT4:NUMBER,
            OID_FLOAT8:NUMBER, OID_DATE:DATETIME, OID_TIMESTAMP:DATETIME, 
OID_TIME:DATETIME,
            OID_TIMESTAMPTZ:DATETIME, OID_TIMETZ:DATETIME}

def map_type (typ):
    if type_map.jas_key (typ):
        return type_map[typ]
    else:
        return STRING

def connect (database, user, password=None, host=None, port=5432, block=1):
    #sock = None
    err_msg = ''
    if host:
        for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            try:
                sock = socket.socket(af, socktype, proto)
            except socket.error, msg:
                sock = None
                err_msg = msg
                continue
            try:
                sock.connect(sa)
            except socket.error, msg:
                sock.close()
                err_msg = msg
                sock = None
                continue
            break
    else:
        # we're assuming UNIX socket here
        paths = ['/tmp/.s.PGSQL.5432', '/var/run/postgresql/.s.PGSQL.5432'] # 
FIXME: list of paths used on various distros
        if port != 5432:
            paths.append (port)
        for path in paths:
            try:
                sock = socket.socket (socket.AF_UNIX, socket.SOCK_STREAM, 0)
                sock.connect ((path))
            except socket.error, msg:
                sock.close ()
                err_msg = msg
                sock = None
                continue
            break
    if sock is None:
        raise DatabaseError, err_msg
    return Connection (sock, user, database, password, block)


class Connection:
    def __init__ (self, sock, user, db, password, block):
        self.sock = sock
        self.block = block
        self.sock.set_blocking (block)
        self.__password = password
        self.__user = user
        self.__input = ''
        self.msg_len = 0
        self.__output = ''
        self.__outqueue = []
        self.__partial = ''
        self.copyIn = 0
        self.param = {}
        self.status = 'L'
        self.__listeners = {}
        self.__queryqueue = [Cursor (self)]
        self.handlers = {'R':self.AuthenticationRequest, 
'K':self.BackendKeyData, '2':self.BindComplete,
                         '3':self.CloseComplete, 'C':self.CommandComplete, 
'I':self.EmptyQueryResponse,
                         'H':self.CopyOutResponse, 'G':self.CopyInResponse, 
'c':self.CopyDone, 'n':self.NoData,
                         'd':self.CopyData, 'D': self.DataRow, 
'E':self.ErrorResponse,
                         'N':self.NoticeResponse, 
'A':self.NotificationResponse, 't':self.ParameterDescription,
                         'S':self.ParameterStatus, 'Z':self.ReadyForQuery, 
'T':self.RowDescription,
                         '1':self.ParseComplete, 's':self.PortalSuspended}
                         
        # send login message
        self.add_struct ("!L", 196608)
        self.add_str ("user")
        self.add_str (user)
        self.add_str ("database")
        self.add_str (db)
        self.add_str ('')
        self.send ('')
        if block:
            while self.__status == 'L':
                self.recv ()


    def set_blocking (self, block):
        if self.block != block:
            self.sock.set_blocking (block)
            self.block = block
            
    def Error (self, error):
        """
        Handle an error
        Descendants may override this.
        By default raises a DatabaseError
        """
        raise DatabaseError, "%(S)s: %(M)s" % error

    def Notice (self, error):
        """
        An SQL NOTICE message has been recieved
        """
        pass

    def send (self, message_type):
        if message_type:
            self.__outqueue += struct.pack ("!cL", message_type, len 
(self.__output)+4)
        else:
            self.__outqueue += struct.pack ("!L", len (self.__output) + 4)
        self.__outqueue += self.__output
        self.__output = ""

    def recv (self):
        """
        This is where we recieve processor time.
        Only one network packet is ever handled
        @return: true if a network packet was handled
        """
        if self.__partial:
            transmit = self.partial
        else:
            transmit = self.__outqueue[0]
        try:
            sent = self.sock.send (transmit)
        except socket.error, msg:
            if type (msg) == types.TupleType:
                err, msg = msg
                if err == errno.EWOULDBLOCK:
                    sent = 0
                else:
                    self.Error ({'S':'PANIC', 'M':msg, 'C':'08006'})
        if sent:
            if not self.__partial:
                del self.__outqueue[0]
            self.partial = transmit[sent:-1]
            if self.copyIn:
                # send some COPY OUT data
                data = self.queryqueue[0].callback ()
                if data is None:
                    self.add_str ("copy failed")
                    self.send ('f')
                elif data == '': # Copy is done
                    self.send ('c')
                    self.copyIn = 0
                else:
                    self.__output = data
                    self.send ('d')
            return 1
        try:
            self.__input += self.sock.recv ()
        except socket.error, msg:
            if type (msg) == types.TupleType:
                err, msg = msg
                if err == errno.EWOULDBLOCK:
                    return 0
            self.Error ({'S':'PANIC', 'M':msg, 'C':'08006'})
        while 1:
            if self.msg_len == 0:
                if len (self.__input) > 4:
                    self.message_type, self.msg_len = struct.unpack ("!cL", 
self.__input[0:5])
                else:
                    return 1
            self.__pos = 5
            if len (self.__input) >= self.msg_len + 1:
                excess = self._input[self.msg_len+1, -1]
                self.__input = self.__input[0:self.msg_len+1]
                if self.handlers.has_key (self.message_type):
                    self.handlers[self.message_type]
                self.__input = excess
                self.msg_len = 0
            else:
                return 1
            
    def add_struct (self, fmt, *args):
        self.__output += struct.pack (fmt, *args)

    def get_struct (self, fmt):
        length = struct.calcsize (fmt)
        if length+self.__pos > len (self.__input):
            return None
        ret = struct.unpack (fmt, self.__input[self.__pos:self.__pos+length])
        self.__pos += length
        return ret

    def add_str (self, string):
        if type (string) == types.UnicodeType:
            string = string.encode ('latin-1', 'replace')
        self.__output += string
        self.__output += '\000'

    def get_str (self):
        strend = self.__pos
        while strend < len (self.__input) and self.__input[strend] != '\000':
            strend += 1
        if strend == self.__pos:
            return ""
        else:
            ret = self.__input[self.__pos:strend]
            self.__pos = strend+1
            return ret

    def get_raw (self, length):
        end = self.__pos + length
        ret = self.__input[self.__pos:end]
        self.__pos = end
        return ret
            
                
    def AuthenticationRequest (self):
        mode, = self.get_struct ("!L")
        if mode == 0:
            self.__loggedin = 1
        elif mode == 1:
            self.Error ({'S':'PANIC', 'C':'08001', 'M':'Kerberos V4 
authentication not supported', 'H':'You need to remove this option from 
pg_hba.conf'})
        elif mode == 2:
            self.Error ({'S':'PANIC', 'C':'08001', 'M':'Kerberos V5 
authentication not supported', 'H':'You need to remove this option from 
pg_hba.conf'})
        elif mode == 3: # clear text password
            self.add_str (self.__password)
            self.send ('p')
        elif mode == 4: # crypt () password
            salt = self.get_struct ("2s")
            import crypt
            self.add_str (crypt.crypt (self.__password, salt))
            self.send('p')
        elif mode == 5: # md5 password
            salt = self.get_struct ("4s")
            import md5
            md5_1 = md5.new (self.__password + self.__user)
            md5_2 = md5.new (md5_1.hexdigest () + salt)
            self.add_str ("md5" + md5_2.hexdigest ())
            self.send('p')
        elif mode == 6:
            self.Error({'S':'PANIC', 'C':'08001', 'M':'SCM authentication not 
supported', 'H':'You need to remove this option from pg_hba.conf'})
        else:
            self.Error ({'S':'PANIC', 'C':'08001', 'M':'Unknown authentication 
mode %d' % mode, 'H':"You're screwed"})

    def BackendKeyData (self):
        self.__backend_pid, self.__backend_key = self.get_struct ("!LL")

    def CommandComplete (self):
        complete = self.get_str ()
        res = ['INSERT [0-9]+ ([0-9]+)', 'FETCH ([0-9]+)', 'DELETE ([0-9]+)', 
'MOVE ([0-9]+)', 'UPDATE ([0-9]+)']
        for r in res:
            match = re.match (r, comlete)
            if match:
                self.queryqueue[0].rowcount = int (match.group (1))
                return
        self.queryqueue[0].rowcount = 0
        

    def CopyData (self):
        data = self.get_raw (self.msg_len-4)
        self.queryqueue[0].callback (data)

    def CopyDone (self):
        self.queryqueue[0].rowcount = 0

    def CopyOutResponse (self):
        overall, num_cols = self.get_struct ("!BH")
        cols = self.get_struct("%dH" % num_cols)
        self.queryqueue[0].description = [('', c, None, None, None, None, None) 
for c in cols] 

    def CopyInResponse (self):
        self.copyIn = 1
        overall, num_cols = self.get_struct ("!BH")
        cols = self.get_struct("%dH" % num_cols)
        self.queryqueue[0].description = [('', c, None, None, None, None, None) 
for c in cols] 


    def BindComplete (self):
        pass

    def EmptyQueryResponse (self):
        self.queryqueue[0].rowcount = 0

    def CloseComplete (self):
        pass

    def NoData (self):
        pass

    def ParseComplete (self):
        pass

    def PortalSuspended (self):
        pass

    def DataRow (self):
        cols, = self.get_struct ("!h")
        row = []
        for i in range (0, cols):
            itemlen, = self.get_struct ("!l")
            if itemlen == -1:
                row.append (None) # map SQL NULL to Python None
            else:
                if self.description[i]['typeOID'] == OID_INT2:
                    if self.description[i]['format'] == 1:
                        row.append (self.get_struct ("!h"))
                    else:
                        row.append (int (self.get_raw (itemlen)))
                elif self.description[i]['typeOID'] in [OID_INT4, OID_OID]:
                    if self.description[i]['format'] == 1:
                        row.append (self.get_struct ("!l"))
                    else:
                        row.append (int (self.get_raw (itemlen)))
                elif self.description[i]['typeOID'] == OID_INT8:
                    if self.description[i]['format'] == 1:
                        row.append (self.get_struct ("!q"))
                    else:
                        row.append (int (self.get_raw (itemlen)))
                elif self.description[i]['typeOID'] == OID_BOOL:
                    if self.description[i]['format'] == 1:
                        row.append (self.get_struct ("B"))
                    else:
                        row.append (float (self.get_raw (itemlen)))
                elif self.description[i]['typeOID'] == OID_FLOAT4:
                    if self.description[i]['format'] == 1:
                        row.append (self.get_struct ("f"))
                    else:
                        row.append (float (self.get_raw (itemlen)))
                elif self.description[i]['typeOID'] == OID_FLOAT8:
                    if self.description[i]['format'] == 1:
                        row.append (self.get_struct ("d"))
                    else:
                        row.append (float(self.get_raw (itemlen)))
                elif self.description[i]['typeOID'] == OID_DATE:
                    if self.description[i]['format'] == 1:
                        row.append (self.get_struct ("B"))
                    else:
                        row.append (self.get_raw (itemlen) == 't')
                else:
                    row.append (self.get_raw (itemlen))
        self.queryqueue[0].callback (row)

    def ErrorResponse (self):
        fieldtype, = self.get_struct ("c")
        error = {}
        while fieldtype != '\000':
            error[fieldtype] = self.get_str ()
            fieldtype, = self.get_struct ("c")
        self.__outqueue = []
        self.queryqueue = []
        self.status = 'E'
        self.copyIn = 0
        self.Error (error)

    def NoticeResponse (self):
        fieldtype = self.get_struct ("c")
        error = {}
        while fieldtype != '\000':
            error[fieldtype] = self.get_str ()
            fieldtype = self.get_struct ("c")
        self.Notice (error)
        
    def NotificationResponse (self):
        pid = self.get_struct ("!L")
        event = self.get_str ()
        param = self.get_str ()
        if self.__listeners.has_key (event):
            self.__listeners[event] (pid, param)

    def Listen (self, event, callback):
        """
        Listens for a  notification event
        @type callback: callable
        @param callback: the function called, must accept two parameters:
               1. the process ID of the backend raising this event
               2. the notification paramter (not yet supported)
        Should be None to cancel
        Note this won't issue the backend LISTEN command for you
        """
        if callback is None:
            del self.__listeners[event]
        else:
            if not callable (callback):
                raise InterfaceError, "notification callback must be callable"
            self.__listeners[event] = callback

    def ParameterDescription (self):
        pass

    def ParameterStatus (self):
        p = self.get_str ()
        self.param[p] = self.get_str ()

    def ReadyForQuery (self):
        self.status = self.get_struct ("c")
        self.queryqueue[0].finished = 1
        del self.queryqueue[0]
        if self.queryqueue and self.status != 'E':
            self.add_str (self.queryqueue[0].query)
            self.send ('Q')
            self.status = 'Q'

    def RowDescription (self):
        rows, = self.get_struct ("!h")
        self.description = []
        for i in range (0, rows):
            row = {}
            row['name'] = self.get_str ()
            row['tableOID'], row['col_no'], row['typeOID'], row['typlen'], 
row['modifier'], row['format'] = self.get_struct ("!lhlhlh")
            self.description.append (row)
        self.__queryqueue[0].description = [(i['name'], map_type 
(i['typeOID']), None, i['typlen'], None, None, None) for i in self.description]
        print self.description

    def close (self):
        self.send ('X')
        self.sock.close ()

    def parse (self, arg):
        if type (arg) == types.ListType or type (arg) == types.TupleType:
            return "{%s}" % ", ".join ([self.__parse (i) for i in arg])
        if type (arg) == types.BooleanType:
            return (arg or 'f') and 't'
        if isinstance (arg, types.StringTypes):

    def cursor (self):
        return Cursor (self)

    def commit (self):
        if self.state == 'I':
            return
        if self.state == 'E':
            raise DatabaseError, "in error mode, can't commit"
        self.set_blocking (1)
        while self.state != 'T' or self.state != 'I':
            self.recv ()
        if self.state == 'T':
            self.add_str ("COMMIT")
            self.send ("Q")
            while self.state != 'I':
                self.recv ()

    def rollback (self):
        if self.state == 'I':
            return
        self.set_blocking (1)
        while self.state != 'T' or self.state != 'I' or self.state != 'E':
            self.recv ()
        if self.state == 'T' or self.state == 'E':
            self.add_str ("ROLLBACK")
            self.send ("Q")
            while self.state != 'I':
                self.recv ()
            


class Cursor:

    def __init__ (self, conn):
        self.conn = conn
        self.finished = 1
        self.description = []
        self.arraysize = 10

    def execute (self, query, *args):
        if not self.finished:
            self.conn.set_blocking (1)
            while not self.finished:
                self.conn.recv ()
        if args:
            args = [self.conn.parse (i) for i in args]
            query = query % args
        self.__buffer = []
        self.callback = self.__buffer.append
        self.conn.queryqueue.append (self)
        self.query = query
        if len (self.conn.queryqueue) == 1:
            self.conn.add_str (query)
            self.conn.send ('Q')
            self.conn.status = 'Q'
        self.finished = 0
        self.command_clear = 0
        self.rowcount = -1

    def executeAsync (self, callback, query, *args):
        if not self.finished:
            raise InterfaceError, "query still running on this cursor"
        if args:
            args = [self.conn.parse (i) for i in args]
            query = query % args
        self.query = query
        self.callback = callback
        self.conn.queryqueue.append (self)
        if len (self.conn.queryqueue) == 1:
            self.conn.add_str (query)
            self.conn.send ('Q')
            self.conn.status = 'Q'
        self.finished = 0
        self.rowcount = -1
        self.conn.set_blocking (0)

    def fetchone (self):
        self.conn.set_blocking (1)
        while len (self.__buffer) < 1 and self.rowcount == -1:
            self.conn.recv ()
        if self.__buffer:
            ret = self.__buffer[0]
            del self.__buffer[0]
            return ret
        else:
            return []

    def fetchmany (size=self.arraysize):
        self.conn.set_blocking (1)
        while len (self.__buffer) < size and self.rowcount == -1:
            self.conn.recv ()
        if len (self.__buffer) >= size:
            ret = self.__buffer[0:size]
            del self.__buffer[0:size]
            return ret
        else:
            return self.__buffer

    def fetchall (self):
        self.conn.set_blocking (1)
        while self.rowcount == -1:
            self.conn.recv ()
        self.rowcount = len (self.__buffer)
        return self.__buffer

    def nextset (self):
        self.conn.set_blocking (1)
        while self.rowcount == -1:
            self.conn.recv ()
        self.__buffer = []

    def close (self):
        self.conn.set_blocking (1)
        while not self.finished:
            self.conn.recv ()

    def setinputsizes (self, sizes):
        pass


    def setoutputsize (self, size, column=0):
        pass
            
        
        
            
    

Attachment: signature.asc
Description: OpenPGP digital signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]