#!/usr/local/bin/python2.6 # # $HeadURL: https://svn.spodhuis.org/ksvn/websites/trunk/sks.spodhuis.org/wsgi/sks_peers.py $ # $Id: sks_peers.py 134 2009-03-24 21:32:47Z XXX $ # # Author: Phil Pennock # Copyright 2009; use/modify/copy freely with attribution. # No warranties whatsoever; only use if you're able to assess this code. # """ WSGI application to server up a table of peers of this server. Main entry point is 'application', if 'main' invoked then can choose to start a standalone webserver. """ import gc from math import sqrt import numbers import os import Queue import random import re import signal import socket import sys import syslog import textwrap # purely for /internalz import threading import time import urllib2 from BeautifulSoup import BeautifulSoup import Cheetah.Template import dns.resolver # dnspython import ipaddr import selector sks_data = None sks_data_lock = None startup_lock = threading.Lock() # see sks_peers_init() for rationale wsgi_selector = None kPRIVILEGED_ACCESS = ('94.142.240.6', '94.142.241.88/29', '2a02:898:0:30::31:1', '2a02:898:31::/48') kHOSTNAME = 'sks.spodhuis.org' kHOSTNAME_ALT = 'sks-peer.spodhuis.org' kMAINT_EMAIL = 'address@hidden' kSKS_MEMBERSHIP = '/var/sks/membership' kRESOLVE_THREADS = 20 kSKS_POLL_THREADS = 20 kRECON_PORT = 11370 kHKP_PORT = 11371 kSTATS_FETCH_TIMEOUT = 30 kCOUNTRIES_ZONE = 'zz.countries.nerd.dk.' kIPLIST_SANITY_MIN = 2600000 kIPLIST_NEED_THRESHOLD = 500 # use the sks-keyservers.net value kINTER_SCAN_INTERVAL_SECS = 6 * 3600 kINTER_SCAN_INTERVAL_JITTER = 120 kSYSLOG_NAME = 'sks-peers' kSYSLOG_FACILITY = syslog.LOG_DAEMON kSKIP_ENTRIES = ('localhost', '127.0.0.1', '::1') # This one may *not* contain any variables or directives as it may be used # by quick & dirty handlers, rather than the template system kPAGE_TEMPLATE_BASIC_HEAD = \ """ """ kPAGE_TEMPLATE_BADUSER = kPAGE_TEMPLATE_BASIC_HEAD + """#slurp $summary

$summary

$error
""" kPAGE_TEMPLATE_HEAD = kPAGE_TEMPLATE_BASIC_HEAD + """#slurp $my_hostname Peer Mesh

$my_hostname Peer Mesh

$warning $scanning_active
Entries at depth 0 are direct peers. Others are seen by spidering the peers.
""" kPAGE_TEMPLATE_FOOT = """#slurp
HostIPGeocodingMutualVersionKeysDistance
SKS has $peer_count peers of $mesh_count visible
""" kPAGE_TEMPLATE_HOST = """#slurp $html_link$host_aliases_text $ip $geo $mutual $version $keycount $distance """ kPAGE_TEMPLATE_HOSTERR = """#slurp $hostname Error: $error """ kPAGE_TEMPLATE_HOSTMORE = """#slurp $ip$geo """ class Error(Exception): """Generic top-level error exception for this tool.""" pass class InternalUsageError(Error): """We screwed the pooch so badly, we don't even call ourselves right.""" pass def debug(msg='?', *kwargs): syslog.syslog(syslog.LOG_DEBUG, msg % kwargs) def log(msg='?', *kwargs): syslog.syslog(syslog.LOG_INFO, msg % kwargs) def suicide_delayed(delay=3): def kill_me(): time.sleep(delay) os.kill(os.getpid(), signal.SIGTERM) threading.Thread(target=kill_me).start() # signal handler params must come first def act_reload_queue(signal=None, frame=None, note=None): global sks_data_lock, sks_data with sks_data_lock: if sks_data is None: return False if 'gatherer' not in sks_data: return False g = sks_data['gatherer'] if note is None and signal is not None: note = 'Signal %d received' % signal return g.rescan(note) def html_escape(s): """HTML Entity escaping.""" return str(s).replace('&', '&').replace('<', '<').replace('>', '>') def html_interp(*params): """HTML Entity escape each param, returning results as single tuple. Useful for interpolation by providing this as single % param. """ return tuple(map(html_escape, params)) def wrap_check_privileged(handler): """WSGI method decorator, check authorisation to access.""" def _wsgi_hook(environ, start_response): ip = environ.get('REMOTE_ADDR', None) if ip: ip = ipaddr.IP(ip) else: return return_error(environ, start_response) # Only IPv4 objects have .IsLoopback() -- *sigh* factories returning # objects with inconsistent APIs if hasattr(ip, 'IsLoopback'): if ip.IsLoopback(): return handler(environ, start_response) elif isinstance(ip, ipaddr.IPv6): loop = ipaddr.IP('::1') if ip == loop: return handler(environ, start_response) for block in kPRIVILEGED_ACCESS: net = ipaddr.IP(block) if net.Contains(ip): return handler(environ, start_response) return return_error(environ, start_response) pass return _wsgi_hook def _sort_hostnames(iterable): """Sort the iterable parameter as hostnames, least-specific label to most.""" keyed = [] for entry in iterable: key = entry.split('.') key.reverse() keyed.append( (key, entry) ) def _cmp_keyed(a, b): return cmp(a[0], b[0]) or cmp(a[1], b[1]) for entry in sorted(keyed, cmp=_cmp_keyed): yield entry[1] class AddressInfo(object): _all_lock = threading.Lock() _all_ai = [] NEW_ITEM = 1 NEW_NAME = 2 OLD_ITEM = 3 @classmethod def find_address_info(cls, obj, insert=False): newips = [x[0] for x in obj] #debug('AI.fai(%s): have IPs: %s', obj.hostname, ' '.join(newips)) with cls._all_lock: for ai in cls._all_ai: for ip in newips: if ip in ai.addresses: return ai if insert: cls._all_ai.append(obj) return None @classmethod def register_address_info(cls, obj): existing = cls.find_address_info(obj, insert=True) if not existing: return cls.NEW_ITEM added_name = existing.add_alt_name(obj.hostname) if added_name: return cls.NEW_NAME return cls.OLD_ITEM """Information about the IP addressing of an SKS peer.""" def __init__(self, hostname, walk_state, walker_cb=None, **kwargs): self.hostname = hostname self.portstr = '%d' % kRECON_PORT self._walk_state = walk_state self._walker_cb = walker_cb self.addresses = list() self.alt_names = list() self.geography = dict() #debug('AddressInfo(%s): created', hostname) self.update_lock = threading.Lock() with self.update_lock: self.resolve_ips() self.resolve_geographies() # resolve geography first, to populate the dict, to keep iter sane below status = self.__class__.register_address_info(self) self._walker_cb(self._walk_state, (self.hostname, self, status)) @classmethod def _resolve_txt(cls, searchname): answers = dns.resolver.query(searchname, 'TXT') if not answers: return [] res = [] for rdata in answers: res.extend(rdata.strings) return res @classmethod def _resolve_ip_geography(cls, ip): # This sucks, but even googleip doesn't have a 'reverse' method # suitable for use with DNS if ip.find(':') != -1: return None # XXX: Assumes either IPv6 or IPv4 search = ip.split('.') search.reverse() search = '.'.join(search) + '.' + kCOUNTRIES_ZONE return list(cls._resolve_txt(search)) def add_alt_name(self, newname): with self.update_lock: if newname == self.hostname: return False if newname in self.alt_names: return False if newname in self.addresses: return False self.alt_names.append(newname) return True def resolve_ips(self): addrs = [] for (family, socktype, proto, canonname, sockaddr) in socket.getaddrinfo( self.hostname, self.portstr, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG | socket.AI_NUMERICSERV): (addr, port) = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) addrs.append(addr) self.addresses.extend(addrs) def resolve_geographies(self): for ip in self.addresses: self.geography[ip] = self._resolve_ip_geography(ip) def __repr__(self): return 'AddressInfo(%s)' % self.hostname def __len__(self): return len(self.addresses) def __iter__(self): for ip in self.addresses: yield (ip, self.geography[ip]) def iter_all(self): yield self.hostname for x in self.alt_names: yield x for x in self.addresses: yield x class SksNode(object): """Information retrieved from an SKS peer.""" def __init__(self, hostname, walk_state, walker_cb=None, **kwargs): self.hostname = hostname self.portstr = '%d' % kHKP_PORT self._url = 'http://%s:%s/pks/lookup?op=stats' % (self.hostname, self.portstr) self.exception = None self._walk_state = walk_state self.distance = isinstance(walk_state, numbers.Integral) and str(walk_state) or '-1' self._walker_cb = walker_cb #debug('SksNode(%s): created', hostname) self.fetch_page() self.analyse() def fetch_page(self): try: self._data = urllib2.urlopen(self._url, timeout=kSTATS_FETCH_TIMEOUT).read() # urllib2 has URLError and HTTPError, subclassing from IOError; however, # it uses httplib internally which tends to throw BadStatusLine; all of # httplib's exceptions subclass from a local HTTPException class except (IOError, urllib2.httplib.HTTPException), e: self.exception = e def _table_following(self, search): s = self._soup.find(text=search).parent.nextSibling # straight s.name fails for text as it has no name attribute while getattr(s, 'name', '') != u'table': s = s.nextSibling return s def _plain_rows_of(self, search): table = self._table_following(search) return set([str(x.contents[0].strip()) for x in table.findAll('td')]) def _dict_from_plain_rows(self, search): table = self._table_following(search) return dict([x.contents[0].strip().split(None, 2) for x in table.findAll('td')]) def _kvdict_from_table(self, search): table = self._table_following(search) d = dict() for r in table.findAll('tr'): pair = [x.contents[0].strip() for x in r.findAll('td')] d[pair[0].rstrip(':')] = pair[1] return d def analyse(self): if self.exception is not None: return try: self._soup = BeautifulSoup(self._data) except Exception, e: self.exception = e return self.peers = self._dict_from_plain_rows(u'Gossip Peers') if self._walker_cb: for peer in self.peers: if peer in kSKIP_ENTRIES: #debug('Analyse(%s): skipping blacklisted item {%s}', self.hostname, peer) pass else: self._walker_cb(self._walk_state, str(peer)) else: debug('Analyse(%s): no walker', self.hostname) self.mutual = False for me in (kHOSTNAME, kHOSTNAME_ALT): try: if me in self.peers and int(self.peers[me]) == kRECON_PORT: self.mutual = True except ValueError, e: pass self.mailsync = self._plain_rows_of(u'Outgoing Mailsync Peers') self._settings = self._kvdict_from_table(u'Settings') # settings should have: Hostname Version 'HTTP port' 'Recon port' 'Debug level' self.version = self._settings['Version'] try: t = str(self._soup.find(text=u'Statistics').parent.nextSibling.string) if t.startswith('Total number of keys'): self.keycount = int(t.split(':')[1]) except AttributeError, e: self.keycount = -1 del self._data, self._soup def __nonzero__(self): if self.exception: return False return True def __str__(self): if self.exception: return str(self.exception) try: return str(self._settings['Hostname']) except Exception, e: return self.hostname class PoolWorkerFactory(object): """Make generic pool workers.""" @classmethod def make(cls, name, worker_class): """Our factory entry point. Generate a PoolWorker.""" def run_init(self, index, inq, outq, walker_cb): """PoolWorker __init__.""" threading.Thread.__init__(self) self.name = '%s[%d]' % (name, index) self.daemon = True self._in_queue = inq self._out_queue = outq self._walker_cb = walker_cb def do_worker(self, input_data, depth): """Invoke the wrapper class's constructor for the current data.""" try: return worker_class(input_data, walk_state=depth, walker_cb=self._walker_cb) except Exception, e: return e def do_run(self): """The thread run() method for the constructed classes.""" while True: depth, data = self._in_queue.get() self._out_queue.put( (data, self._do_worker(data, depth=depth)) ) self._in_queue.task_done() d = dict() d['__init__'] = run_init d['_do_worker'] = do_worker d['run'] = do_run return type(name, (threading.Thread,), d) class PoolWorkManager(object): """Run workers asynchronously, emitting results in a queue. Eg, with ThreadResolve, we create the resolver, pass it hostnames, pull out tuples of (hostname, AddressInfo) object in non-deterministic order. """ # There's not much point to getting a shutdown Event too, as we can't have # timeouts in Queue obj.join() so no way to sensibly interrupt a collect. def __init__(self, worker_name, data_class, thread_count, walker_cb=None): if not isinstance(thread_count, numbers.Integral) or thread_count < 1: raise InternalUsageError("Not a positive integer: %s" % str(thread_count)) worker_class = PoolWorkerFactory.make(worker_name, data_class) self._in_queue = Queue.Queue() self._out_queue = Queue.Queue() self._threads = list() self._previous_results = {} for i in range(0, thread_count): t = worker_class(i, self._in_queue, self._out_queue, walker_cb) t.start() self._threads.append(t) def __len__(self): """Number of results still to be returned. Includes pending.""" return self._in_queue.qsize() + self._out_queue.qsize() def add_depthed(self, depth, query): self._in_queue.put((depth, query)) def add(self, query): self.add_depthed(0, query) def join(self): self._in_queue.join() def get(self): res = self._out_queue.get() self._out_queue.task_done() return res def collect(self): results = self._previous_results self.join() while self: (k, d) = self.get() results[k] = d self._previous_results = results return results class RecursingWalkManager(threading.Thread): """Handle requests to see more data.""" def __init__(self): threading.Thread.__init__(self) self.name = 'RecursingWalkManager' self.daemon = True self._queue = Queue.Queue() self._managers = None self._have_managers = threading.Semaphore(0) self.okay_next_time = set() self.seen = { kHOSTNAME: 0, kHOSTNAME_ALT: 0 } # host -> depth self.pending = dict() # host -> depth self._hosts_lock = threading.RLock() def __len__(self): return self._queue.qsize() def set_managers(self, manager_dns, manager_node): """Sets the manager objects for handling work dispatch.""" self._manager_dns = manager_dns self._manager_node = manager_node self._have_managers.release() def add_dns(self, depth, entry): self._queue.put(('DNS', depth, entry)) def add_node(self, depth, entry): self._queue.put(('NODE', depth, entry)) def set_seen(self, name, depth=0): """We have seen an entry with this name, at this depth.""" with self._hosts_lock: if name not in self.seen: self.seen[name] = depth def _requeue_node(self, node, ai): """Shuffle node back from pending to try to reprocess (we have DNS).""" with self._hosts_lock: if node in self.pending: depth = self.pending[node] del self.pending[node] depth -= 1 # already incremented, will be re-incremented self._queue.put(('NODE+DNS', depth, (node, ai))) return True return False def _discard_node(self, node): """We have DNS and this is a dup!""" with self._hosts_lock: if node in self.pending: del self.pending[node] return True def _handle_dns_inthread(self, depth, entry): host, ai, status = entry with self._hosts_lock: if status == AddressInfo.NEW_ITEM: #debug('Walk: Seen DNS item for first time: %s', host) if not self._requeue_node(host, ai): self.okay_next_time.add(host) elif host in self.okay_next_time: #debug('Walk: Not first time seen DNS, but first time acting on it: %s', host) self._requeue_node(host, ai) self.okay_next_time.discard(host) else: if host == ai.hostname: #debug('Walk: skipping previously seen DNS: %s', host) pass else: #debug('Walk: discarding dup DNS: %s (%s)', host, ' '.join(ai.iter_all())) pass self._discard_node(host) for n in ai.iter_all(): if n not in self.seen or (depth < self.seen[n] and depth >= 0): self.seen[n] = depth def _handle_node_inthread(self, depth, entry, ai=None): if entry in kSKIP_ENTRIES: return # original caller passes in original depth, or the requeue decr's # before passing back in to compensate depth += 1 with self._hosts_lock: if entry in self.seen: #debug('Walk: already [%s]', entry) if depth < self.seen[entry] and depth >= 0: self.seen[entry] = depth return False if ai is None: if entry not in self.pending: #debug('Walk: deferring until have DNS [%s]', entry) self.pending[entry] = depth self._manager_dns.add_depthed(depth, entry) else: #debug('Walk: already deferred [%s]', entry) pass return False for v in ai.iter_all(): if v in self.seen: #debug('Walk: already [%s] under name [%s]', entry, v) return False if v in kSKIP_ENTRIES: return False self.seen[entry] = depth for v in ai.iter_all(): self.seen[v] = depth #debug('Walk: new %s', entry) self._manager_node.add_depthed(depth, entry) return True def run(self): self._have_managers.acquire() while True: try: what, depth, entry = self._queue.get() #debug('Walk: [%s] [%d] [%s]', what, depth, repr(entry)) if what == 'DNS': self._handle_dns_inthread(depth, entry) continue if what == 'NODE': self._handle_node_inthread(depth, entry) continue if what == 'NODE+DNS': self._handle_node_inthread(depth, entry[0], entry[1]) continue raise InternalUsageError('Unknown queue tag type "%s"' % what) finally: self._queue.task_done() class DataGatherer(threading.Thread): """Master persistent thread responsible for keeping data current. This controls all data gathering threads and sleeps between collections. """ def __init__(self, notify_ev, skip_nodechecker=False): threading.Thread.__init__(self) self._notify_ev = notify_ev self._skip_nodechecker = skip_nodechecker self._cmd_queue = Queue.Queue() self.name = 'DataGatherer' self.daemon = False def _get_cmd(self): if not self._notify_ev.is_set(): return None self._notify_ev.clear() try: cmd = self._cmd_queue.get(block=False) except Queue.Empty: log('ProgError: notify with empty queue') return 'kill' if cmd in ('kill', 'scan'): return cmd raise InternalUsageError('Unknown DG queue cmd {%s}' % cmd) def _run(self): """The data gathering function. Does not return until shutdown. This manages collection threads, puts the data together in a top-level container and replaces the global object with that one, under a lock, before sleeping to re-do this. """ global sks_data, sks_data_lock walker = RecursingWalkManager() resolver = PoolWorkManager( 'Resolve', AddressInfo, kRESOLVE_THREADS, walker_cb=walker.add_dns) node_checker = PoolWorkManager( 'SksNodeCheck', SksNode, kSKS_POLL_THREADS, walker_cb=walker.add_node) walker.set_managers(resolver, node_checker) walker.start() peer_re = re.compile('^([A-Za-z0-9]\S+)\s+\d') while True: with sks_data_lock: if sks_data is not None: sks_data['updating'] = time.time() log('DG: Gathering data') start_over = False hostnames = set() with open(kSKS_MEMBERSHIP) as fn: config_stat = os.fstat(fn.fileno()) for line in fn: m = peer_re.match(line) if not m: continue hostnames.add(m.group(1)) self.hostnames = hostnames for h in hostnames: walker.set_seen(h) resolver.add(h) if not self._skip_nodechecker: node_checker.add(h) sizes = {} new_data = {} keep_checking = True final_safety = False while keep_checking: keep_checking = False for (target, pooled) in [ (None, None), # DNS is fast; we can finish collecting DNS after a subset, stalling on # the URL retrievals, then continue to walk further on and have more DNS # to do. This is a change in integrity semantics with the introduction of # walking. ('node_data', node_checker), ('addresses', resolver), ]: if target is not None: new_data[target] = pooled.collect() old_size = sizes.get(target, 0) new_size = len(new_data[target]) if new_size != old_size: sizes[target] = new_size keep_checking = True cmd = self._get_cmd() if cmd == 'kill': log('DG: termination request received (during scan), returning') return if cmd == 'scan': start_over = True break if len(walker) or len(node_checker) or len(resolver): keep_checking = True # If things completed before that len check, then we'd be incomplete # If we check "one more time" and find differences, then we need to redo # But if we saw empty and that last "one more time" saw no changes, then # we've shown that in the time between seeing no changes and the time # when the queue was empty, there were no changes because the next time # around there were no changes either. So we've covered that interval. if keep_checking: final_safety = False elif not final_safety: keep_checking = True final_safety = True # I *think* the above is correct but should look into formally proving it if start_over: log('DG: early termination of run, starting over') continue new_data.update({ 'mtime': config_stat.st_mtime, 'gatherer': self, }) with sks_data_lock: sks_data = new_data gc.collect() delay = float(random.randint( kINTER_SCAN_INTERVAL_SECS - kINTER_SCAN_INTERVAL_JITTER, kINTER_SCAN_INTERVAL_SECS + kINTER_SCAN_INTERVAL_JITTER)) log('DG: Gathering data complete, sleeping %.0f seconds', delay) sleeping = True while sleeping: sleep_starting = time.time() self._notify_ev.wait(delay) slept = time.time() - sleep_starting if float(slept) > (delay - 1): sleeping = False else: delay -= float(slept) cmd = self._get_cmd() if cmd == 'kill': log('DG: termination request received, returning') return if cmd == 'scan': log('DG: received scan request, starting now') sleeping = False continue def run(self): """Wrapper around _run to collect exceptions safely.""" global sks_data, sks_data_lock try: self._run() except Exception, e: with sks_data_lock: sks_data = { 'exception': e } def kill(self): """Returns true iff successfully killed.""" self._cmd_queue.put('kill') self._notify_ev.set() self.join(3.0) return not self.is_alive() def rescan(self, reason=None): """Cut short the sleep, scan now.""" global sks_data, sks_data_lock if reason is None: reason = 'no reason given for rescan' with sks_data_lock: if 'rescan_triggered' in sks_data: log('DG/rescan ignoring retrigger [%s]', reason) return False sks_data['rescan_triggered'] = time.time() log('DG/rescan asking for scan [%s]', reason) self._cmd_queue.put('scan') self._notify_ev.set() return True def gen_page_namespace(**kwdict): """Return a page namespace dictionary with standard values filled in.""" page_namespace = dict() #TODO: should really URL-encode maintainer too page_namespace['maintainer'] = html_escape(kMAINT_EMAIL) page_namespace['my_hostname'] = html_escape(kHOSTNAME) page_namespace.update(kwdict) return page_namespace def handle_peers_page(environ, start_response): """Write the peers page. WSGI.""" global sks_data, sks_data_lock still_gathering = True try: with sks_data_lock: _sks_data = sks_data if _sks_data is None: addresses = {} node_data = {} last_mtime = 0.0 gatherer = None updating_since = None elif 'addresses' in _sks_data: still_gathering = False addresses = _sks_data['addresses'] node_data = _sks_data['node_data'] elif 'exception' in _sks_data: data_gather_exception = _sks_data['exception'] addresses = {} else: raise InternalUsageError("sks_data is not None but lacks addresses field") if _sks_data is not None: last_mtime = _sks_data.get('mtime', 0.0) gatherer = _sks_data.get('gatherer') updating_since = _sks_data.get('updating') del _sks_data page_namespace = gen_page_namespace() page_head = Cheetah.Template.Template(kPAGE_TEMPLATE_HEAD, searchList=[page_namespace]) page_foot = Cheetah.Template.Template(kPAGE_TEMPLATE_FOOT, searchList=[page_namespace]) if updating_since is None: page_namespace['scanning_active'] = '' else: scanning_for = time.time() - updating_since page_namespace['scanning_active'] = '

Data gathering run in progress (for last %.0f seconds)' % scanning_for page_namespace['warning'] = still_gathering and '

Still gathering data!

' or '' if 'data_gather_exception' in locals(): page_namespace['warning'] = '

DATA GATHERING FAILED

%s
' % html_escape(data_gather_exception) if gatherer is not None: if not gatherer.is_alive(): page_namespace['warning'] += '

DATA GATHERING THREAD DEAD

' suicide_delayed() if addresses: config_stat = os.stat(kSKS_MEMBERSHIP) if config_stat.st_mtime != last_mtime: if gatherer.rescan('membership mtime update'): page_namespace['warning'] += '\n

Config changed, will rescan in background

' host_namespace = dict() host = Cheetah.Template.Template(kPAGE_TEMPLATE_HOST, searchList=[host_namespace, page_namespace]) hostmore = Cheetah.Template.Template(kPAGE_TEMPLATE_HOSTMORE, searchList=[host_namespace, page_namespace]) hosterr = Cheetah.Template.Template(kPAGE_TEMPLATE_HOSTERR, searchList=[host_namespace, page_namespace]) except Exception, e: start_response('500 Error', [('Content-Type', 'text/plain; charset=UTF-8')], sys.exc_info()) yield 'Bleh, we failed:\n\n' yield str(e) yield '\n' raise StopIteration() start_response('200 OK', [ ('Content-Type', 'text/html; charset=UTF-8'), ]) # PEP 333 is explicit that WSGI uses strings, not Unicode yield str(page_head) count_total = 0 count_direct = 0 ordered_entries = [] by_depth = {} # depth, hostname if node_data: for h, v in node_data.iteritems(): if hasattr(v, 'distance'): depth = int(v.distance) else: depth = -1 if depth not in by_depth: by_depth[depth] = [] by_depth[depth].append(h) if -1 in by_depth: shuffle = by_depth[-1] del by_depth[-1] shuffle_to = sorted(by_depth.keys()).pop() + 1 by_depth[shuffle_to] = shuffle for d in sorted(by_depth.keys()): ordered_entries.extend(_sort_hostnames(by_depth[d])) count_direct = len(by_depth[0]) else: ordered_entries = _sort_hostnames(addresses.keys()) count_direct = len(addresses) for h in ordered_entries: host_namespace.clear() host_namespace['hostname'] = html_escape(h) info = addresses.get(h, [('n/a', None)]) sksnode = node_data.get(h, False) count_total += 1 host_namespace['rowcount'] = count_total host_namespace['rowclass'] = count_total % 2 and 'odd' or 'even' if isinstance(info, Exception) or not info: host_namespace['error'] = str(info) yield str(hosterr) continue if len(info) > 1: host_namespace['rowspan'] = ' rowspan="%d"' % len(info) else: host_namespace['rowspan'] = '' sks_url = 'http://%s:11371/pks/lookup?op=stats' % h html_link = '%s' % html_interp(sks_url, h) host_namespace['sks_url'] = sks_url host_namespace['html_link'] = html_link if isinstance(sksnode, Exception) or not sksnode: host_namespace['mutual'] = 'Err' host_namespace['version'] = '?' host_namespace['keycount'] = '0' if hasattr(sksnode, 'distance'): host_namespace['distance'] = str(sksnode.distance) else: host_namespace['distance'] = '?' else: host_namespace['mutual'] = sksnode.mutual and 'Yes' or 'No' host_namespace['version'] = str(sksnode.version) host_namespace['keycount'] = str(sksnode.keycount) host_namespace['distance'] = str(sksnode.distance) if info.alt_names: host_namespace['host_aliases_text'] = \ ' %s' % ' '.join(info.alt_names) else: host_namespace['host_aliases_text'] = '' first = True for item in info: host_namespace['ip'] = html_escape(item[0]) geo = item[1] if geo is None: host_namespace['geo'] = '' else: host_namespace['geo'] = ' '.join(geo).upper() if first: yield str(host) first = False else: yield str(hostmore) page_namespace['peer_count'] = count_direct page_namespace['mesh_count'] = count_total yield str(page_foot) def sks_peers_init(skip_nodechecker=False): """Init global state, creating threads as needed. On first call, returns an object which has a kill() method, to try to shut things down cleanly. On subsequent calls, returns None. Normally we will only ever be called once, but if we're being served as an app so that we're launched on-demand then we'll be called via application(); if we are set to multi-threaded, then there can be concurrent calls to application() and thus a race. Break the race with a lock. It's okay to create a lock at top-level, but not threads. """ global startup_lock, sks_data, sks_data_lock, wsgi_selector with startup_lock: if sks_data_lock is not None: return None sks_data_lock = threading.Lock() dg_shutdown = threading.Event() gatherer = DataGatherer(dg_shutdown, skip_nodechecker) gatherer.start() s = selector.Selector() url_handlers = [] def _add_handler(url, func): url_handlers.append(url) s.add(url, GET=func) # No matter how I hook in, when using wsgiref.simple_server.make_server I # need '/' but with mod_wsgi it's ''. _add_handler('', handle_peers_page) _add_handler('/', handle_peers_page) _add_handler('/ip-valid', handle_ip_valid) _add_handler('/helpz', get_handler_helpz(url_handlers)) _add_handler('/threadz', handle_threadz) _add_handler('/environz', handle_environz) _add_handler('/rescanz', handle_rescanz) _add_handler('/internalz', handle_internalz) s.status404 = return_error wsgi_selector = s syslog.openlog(kSYSLOG_NAME, syslog.LOG_PID, kSYSLOG_FACILITY) signal.signal(signal.SIGUSR1, act_reload_queue) log('Initialised (uid=%d)', os.getuid()) return gatherer def return_error(environ, start_response, summary='Page not found', error='The URL which you entered is invalid', code='404'): page_namespace = gen_page_namespace( summary=html_escape(summary), error=html_escape(error)) error_page = Cheetah.Template.Template(kPAGE_TEMPLATE_BADUSER, searchList=[page_namespace]) start_response('%s nope' % code, [ ('Content-Type', 'text/html; charset=UTF-8'), ]) yield str(error_page) raise StopIteration() def get_handler_helpz(registered): @wrap_check_privileged def _handle_helpz(environ, start_response): start_response('200 OK', [ ('Content-Type', 'text/html; charset=UTF-8'), ]) yield kPAGE_TEMPLATE_BASIC_HEAD + \ '/helpz

/helpz

\n' return _handle_helpz @wrap_check_privileged def handle_threadz(environ, start_response): start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) yield 'Threads active:\n\n' for t in sorted(threading.enumerate()): aliveness = not t.is_alive() and ' DEAD' or '' ident = t.ident and ' [%d]' % t.ident or '' daemonic = not t.daemon and ' ' or '' yield '%s%s%s%s\n' % (t.name, aliveness, ident, daemonic) yield '.\n' @wrap_check_privileged def handle_environz(environ, start_response): start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) for pair in sorted(environ.items()): yield '%s = {%s}\n' % pair yield '.\n' @wrap_check_privileged def handle_rescanz(environ, start_response): global sks_data, sks_data_lock start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) if act_reload_queue(note='rescanz hit'): yield 'rescan triggered\n' else: yield 'rescan not triggered\n' with sks_data_lock: d = sks_data if d is None: yield 'First scan not completed, have no sks_data\n' elif 'rescan_triggered' in d: timediff = time.time() - d['rescan_triggered'] yield 'Rescan in progress, triggered %.0f seconds ago\n' % timediff else: yield 'Reason unknown\n' @wrap_check_privileged def handle_internalz(environ, start_response): start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) def _report_obj(depth, d, name): prefix = ' ' * depth + '* ' results = [] try: for i in sorted(d.keys()): n = name and name + '[' + i + ']' or i try: if isinstance(d[i], dict) and not i.startswith('__'): results.append(prefix + n + ':\n') results.extend(_report_obj(depth + 1, d[i], n)) elif isinstance(d[i], (set, list)) and len(d[i]) > 1: results.append(prefix + n + ':\n') indent = ' ' * (len(prefix) + 2) # textwrap drops trailing newline, even if in input data results.append(textwrap.fill(' '.join(map(str, d[i])), width=110, initial_indent=indent, subsequent_indent=indent) + '\n') else: results.append(prefix + n + ': ' + repr(d[i]) + '\n') if isinstance(d[i], object) and ( d[i].__class__.__module__ == '__main__' or d[i].__class__.__module__.startswith('_mod_wsgi_')): results.extend(_report_obj(depth + 1, d[i].__dict__, n)) elif isinstance(d[i], type) and d[i].__module__ == '__main__': results.extend(_report_obj(depth + 1, dict([(x, getattr(d[i], x)) for x in dir(d[i]) if not x.startswith('__')]), n)) except Exception, e: results.append('Walking dict %s failed: %s\n' % (n, e)) except Exception, e: results.append('Iter dict failed %s\n' % name) return results g = globals() return [x for x in _report_obj(1, g, '') if isinstance(x, str)] def handle_ip_valid(environ, start_response): """Emit a list of IPs, first line status/control, last line '.'""" global sks_data, sks_data_lock with sks_data_lock: data = sks_data start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) if data is None or 'node_data' not in data or not len(data['node_data']): yield 'IP-Gen/1: status=INVALID count=0 reason=first_scan\n.\n' raise StopIteration() ips_all = {} for name, node in data['node_data'].iteritems(): try: ip_geo_list = data['addresses'][name] if node and str(node.version) != '1.0.10' and int(node.keycount) > 1: for ip, geo in ip_geo_list: ips_all[ip] = int(node.keycount) except: continue # We want to discard statistic-distorting outliers, then of what remains, # discard those too far away from "normal", but we really want the "best" # servers to be our guide, so 1 std-dev of the second-highest remaining # value should be safe; in fact, we'll hardcode a limit of how far below. # To discard, find mode size (knowing that value can be split across two # buckets) and discard more than five stddevs from mode. The bucketing # should be larger than the distance from desired value so that the mode # is only split across two buckets, if we assume enough servers that a # small number will be down, most will be valid-if-large-enough, so that # splitting the count across two buckets won't let the third-best value win buckets = {} for ip, count in ips_all.iteritems(): bucket = int(count // 3000) if bucket not in buckets: buckets[bucket] = [] buckets[bucket].append(count) largest_bucket = max(buckets) first_n = len(buckets[largest_bucket]) first_mean = sum(buckets[largest_bucket]) / first_n first_sd = sqrt(sum((x-first_mean)**2 for x in buckets[largest_bucket]) / first_n) first_bounds = (int(first_mean - 5*first_sd), int(first_mean + 5*first_sd)) first_ips_list = filter(lambda x: first_bounds[0] <= ips_all[x] <= first_bounds[1], ips_all) first_ips = dict([(x, ips_all[x]) for x in first_ips_list]) second_mean = sum(first_ips.values()) / len(first_ips) if second_mean < kIPLIST_SANITY_MIN: yield 'IP-Gen/1: status=INVALID count=0 reason=broken_data\n.\n' raise StopIteration() threshold = sorted(first_ips.values())[-2] - kIPLIST_NEED_THRESHOLD ips = [x for x in first_ips if first_ips[x] >= threshold] count = len(ips) log('ip-valid: Yielding %d of %d values', count, len(ips_all)) yield 'IP-Gen/1: status=COMPLETE count=%d tags=skip_1010\n' % count yield '\n'.join(ips) yield '\n.\n' def application(environ, start_response): """The main entry point for serving as an application. WSGI interface. Will init global context if not already init'd. """ global sks_data_lock, wsgi_selector if sks_data_lock is None: # We're embedded in a web-server and need to bootstrap sks_peers_init() if wsgi_selector is None: return return_error(environ, start_response, 'Internal problem', 'Failed to initialise correctly, please report this', '500') i_am = environ.get('SCRIPT_NAME', None) if i_am is not None: t = environ.get('REQUEST_URI','').replace(i_am, '', 1) if not len(t): t = '/' environ['REQUEST_URI'] = t return wsgi_selector(environ, start_response) def main(argv=None): """Entry point when invoked as a normal binary.""" if argv is None: argv = [] if len(argv) and argv[0] == 'standalone': from wsgiref.simple_server import make_server argv.pop(0) if len(argv) and argv[0] == 'nodata': shutdown = sks_peers_init(skip_nodechecker=True) argv.pop(0) else: shutdown = sks_peers_init() if shutdown is None: raise InternalUsageError('init failed to return a shutdown object') if 'make_server' in locals(): server_host = 'localhost' port = 8080 if len(argv) >= 1: if argv[0].startswith(':'): argv.insert(0, 'localhost') argv[1] = argv[1][1:] else: server_host = argv[0] try: if len(argv) >= 2: port = int(argv[1]) if port < 2 or port > 65535: raise OverflowError('port out of range') except ValueError, e: print >>sys.stderr, 'Unable to parse as valid port: %s' % argv[1] except OverflowError, e: print >>sys.stderr, 'Port number %d out of range, resetting' % port port = 8080 global wsgi_selector server = make_server(server_host, port, wsgi_selector) print >>sys.stderr, 'Starting up webserver listening on {%s} %d' % ( server_host, port) try: server.serve_forever() except KeyboardInterrupt, e: print >>sys.stderr, 'Received keyboard interrupt, shutting down' if not shutdown.kill(): print >>sys.stderr, 'Shutdown failed, aargh.' sys.exit(1) else: import wsgiref.handlers global sks_data, sks_data_lock scanning = True while scanning: with sks_data_lock: _sks_data = sks_data if _sks_data is None: time.sleep(1) else: scanning = False wsgiref.handlers.CGIHandler().run(handle_peers_page) if shutdown.kill(): return 0 return 1 if __name__ == '__main__': main(sys.argv[1:]) # vim: set filetype=python sw=2 expandtab :