From c64ba24072c1d33fcbcf47159d3e6e53a0bdac3b Mon Sep 17 00:00:00 2001 From: Daniel_M_Williams
Date: Fri, 9 Mar 2018 11:07:39 -0500 Subject: [PATCH] [feature] Client-side libraries may automatically reconnect - moved watch options to their own file to rationalize definition - defaults behavior is to exit-on-failure - activated by calling to `gps.gps(..., reconnect=True)` - refactors stream(...) function to deduplicate processing - added dictionary methods to 'class dictwrapper' --- gps/client.py | 138 ++++++++++++++++++++++++++++++++++++++++++--------------- gps/gps.py | 94 ++++----------------------------------- gps/options.py | 65 +++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 122 deletions(-) create mode 100644 gps/options.py diff --git a/gps/client.py b/gps/client.py index c660d92..38469db 100644 --- a/gps/client.py +++ b/gps/client.py @@ -13,18 +13,26 @@ import time from .misc import polystr, polybytes -GPSD_PORT = "2947" +from .options import * +GPSD_PORT = "2947" class gpscommon(object): "Isolate socket handling and buffering from the protocol interpretation." - def __init__(self, host="127.0.0.1", port=GPSD_PORT, verbose=0): + def __init__(self, host="127.0.0.1", port=GPSD_PORT, verbose=0, should_reconnect=False): self.sock = None # in case we blow up in connect self.linebuffer = b'' self.verbose = verbose + self.stream_command = '' + self.reconnect = should_reconnect + self.last_read = time.time() # time of last successful non-zero read + self.timeout = 10.0 # after this many seconds, assume connection is dead + if host is not None: - self.connect(host, port) + self.host = host + self.port = port + self.connect(self.host, self.port) def connect(self, host, port): """Connect to a host on a given port. @@ -41,8 +49,8 @@ class gpscommon(object): port = int(port) except ValueError: raise socket.error("nonnumeric port") - # if self.verbose > 0: - # print 'connect:', (host, port) + + msg = "getaddrinfo returns an empty list" self.sock = None for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): @@ -51,14 +59,19 @@ class gpscommon(object): self.sock = socket.socket(af, socktype, proto) # if self.debuglevel > 0: print 'connect:', (host, port) self.sock.connect(sa) + if self.verbose > 0: + print('connected.... {}:{}... streaming '.format(host, port)) + self.stream() + break + except socket.error as e: - msg = str(e) - # if self.debuglevel > 0: print 'connect fail:', (host, port) + msg = str(e) + ' (to {}:{})'.format(host, port) + if self.verbose > 1: + sys.stderr.write("error: {}\n".format(msg.strip())) self.close() - continue - break - if not self.sock: - raise socket.error(msg) + + if not self.reconnect: + raise def close(self): if self.sock: @@ -78,24 +91,41 @@ class gpscommon(object): def read(self): "Wait for and read data being streamed from the daemon." - if self.verbose > 1: - sys.stderr.write("poll: reading from daemon...\n") + + age = time.time() - self.last_read + if age > self.timeout and self.sock: + if self.verbose > 1: + sys.stderr.write("disconnect: connection expired...\n") + self.close() + + if None is self.sock: + self.connect(self.host, self.port) + if None is self.sock: + return -1 + + self.stream() + eol = self.linebuffer.find(b'\n') if eol == -1: # RTCM3 JSON can be over 4.4k long, so go big + if self.verbose > 1: + sys.stderr.write("poll: reading from daemon...\n") + frag = self.sock.recv(8192) + self.linebuffer += frag - if self.verbose > 1: - sys.stderr.write("poll: read complete.\n") - if not self.linebuffer: + if 0 == len(self.linebuffer): if self.verbose > 1: - sys.stderr.write("poll: returning -1.\n") + sys.stderr.write("poll: no available data: returning -1.\n") # Read failed return -1 + + self.last_read = time.time() + eol = self.linebuffer.find(b'\n') if eol == -1: if self.verbose > 1: - sys.stderr.write("poll: returning 0.\n") + sys.stderr.write("poll: partial message: returning 0.\n") # Read succeeded, but only got a fragment self.response = '' # Don't duplicate last response return 0 @@ -132,20 +162,8 @@ class gpscommon(object): "Ship commands to the daemon." if not commands.endswith("\n"): commands += "\n" - self.sock.send(polybytes(commands)) - -WATCH_ENABLE = 0x000001 # enable streaming -WATCH_DISABLE = 0x000002 # disable watching -WATCH_JSON = 0x000010 # JSON output -WATCH_NMEA = 0x000020 # output in NMEA -WATCH_RARE = 0x000040 # output of packets in hex -WATCH_RAW = 0x000080 # output of raw packets -WATCH_SCALED = 0x000100 # scale output to floats -WATCH_TIMING = 0x000200 # timing information -WATCH_SPLIT24 = 0x001000 # split AIS Type 24s -WATCH_PPS = 0x002000 # enable PPS in raw/NMEA -WATCH_DEVICE = 0x000800 # watch specific device - + if None is not self.sock: + self.sock.send(polybytes(commands)) class json_error(BaseException): def __init__(self, data, explanation): @@ -171,8 +189,42 @@ class gpsjson(object): self.data.satellites = [dictwrapper(x) for x in self.data.satellites] - def stream(self, flags=0, devpath=None): - "Control streaming reports from the daemon," + def set_flags(self, flags=0, devpath=None): + self.stream_command = self.generate_stream_command( flags, devpath ) + self.stream() + + def generate_stream_command(self, flags=0, devpath=None): + + if flags & WATCH_OLDSTYLE: + cmd = self.generate_stream_command_old_style(flags) + else: + cmd = self.generate_stream_command_new_style(flags, devpath) + + if self.verbose > 1: + sys.stderr.write("set: flags (0x{}) => {}\n".format(hex(flags), cmd)) + + return cmd + + @staticmethod + def generate_stream_command_old_style(flags=0): + if flags & WATCH_DISABLE: + arg = "w-" + if flags & WATCH_NMEA: + arg += 'r-' + return arg + elif flags & WATCH_ENABLE: + arg = 'w+' + if flags & WATCH_NMEA: + arg += 'r+' + return arg + + @staticmethod + def generate_stream_command_new_style(flags=0, devpath=None): + + if (flags & (WATCH_JSON | WATCH_OLDSTYLE | WATCH_NMEA + | WATCH_RAW)) == 0: + flags |= WATCH_JSON + if flags & WATCH_DISABLE: arg = '?WATCH={"enable":false' if flags & WATCH_JSON: @@ -191,7 +243,8 @@ class gpsjson(object): arg += ',"split24":false' if flags & WATCH_PPS: arg += ',"pps":false' - else: # flags & WATCH_ENABLE: + return arg + "}\n" + elif flags & WATCH_ENABLE: arg = '?WATCH={"enable":true' if flags & WATCH_JSON: arg += ',"json":true' @@ -211,8 +264,18 @@ class gpsjson(object): arg += ',"pps":true' if flags & WATCH_DEVICE: arg += ',"device":"%s"' % devpath - return self.send(arg + "}") + return arg + "}\n" + else: + return "" + def stream(self, flags=0, devpath=None): + "Control streaming reports from the daemon," + + if 0 < flags: + self.set_flags(flags, devpath) + + if self.stream_command: + self.send(self.stream_command) class dictwrapper(object): "Wrapper that yields both class and dictionary behavior," @@ -241,6 +304,9 @@ class dictwrapper(object): return "