gnumed-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnumed-devel] gmTools.py: send_mail() test


From: gnumed
Subject: [Gnumed-devel] gmTools.py: send_mail() test
Date: Thu, 23 Mar 2017 13:12:16 +0100

To:address@hidden
From:address@hidden
Subject:=20gmTools=20test=20suite=20mail

This=20is=20a=20test=20mail=20from=20the=20gmTools.py=20module.
from __future__ import print_function

__doc__ = """GNUmed worker threads."""
#=====================================================================
__author__ = "K.Hilbert <address@hidden>"
__license__ = "GPL v2 or later"

import sys
import logging
import threading
import datetime as dt

# wx.CallAfter() does not seem to work with multiprocessing !
#import multiprocessing


if __name__ == '__main__':
        sys.path.insert(0, '../../')


_log = logging.getLogger('gm.worker')

#=====================================================================
def execute_in_worker_thread(payload_function=None, payload_kwargs=None, 
completion_callback=None):
        """Create a thread and have it execute <payload_function>.

        <completion_callback> - if not None - better be prepared to
        receive the result of <payload_function>.
        """
        worker = None

        #-------------------------------
        def _run_payload():
                try:
                        if payload_kwargs is None:
                                payload_result = payload_function()
                        else:
                                payload_result = 
payload_function(**payload_kwargs)
                except StandardError:
                        _log.exception('error running worker payload: %s', 
payload_function)
                        return
                _log.debug(u'finished running worker payload')
                if completion_callback is None:
                        return
                try:
                        completion_callback(payload_result)
                except StandardError:
                        _log.exception('error running worker completion 
callback: %s', completion_callback)
                _log.debug(u'finished running worker completion callback')
                return
        #-------------------------------

        if not callable(payload_function):
                raise ValueError(u'<%s> is not callable', payload_function)
        if completion_callback is not None:
                if not callable(completion_callback):
                        raise ValueError(u'<%s> is not callable', 
completion_callback)
        thread_name = dt.datetime.now().strftime('%f-%H:%M:%S-%Y%m%d')
        _log.info(u'creating thread "%s"', thread_name)
        _log.debug(u' "%s" payload function: %s', thread_name, payload_function)
        _log.debug(u' "%s" results callback: %s', thread_name, 
completion_callback)

        #worker = multiprocessing.Process (
        worker = threading.Thread (
                target = _run_payload,
                name = thread_name
        )
        worker.start()
        _log.debug(' "%s" PID: %s', worker.name, worker.ident)
        # from here on, another thread executes _run_payload()
        # which executes payload_function(), and possibly
        # completion_callback()

#=====================================================================
# main
#=====================================================================
if __name__ == "__main__":

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        import time
        import random

        from Gnumed.pycommon import gmLog2

        def test_print_dots(ident=None):

                def slowly_print_dots(info=None):
                        for i in range(5):
                                print('* (#%s in %s)' % (i, info))
                                time.sleep(1 + (random.random()*4))
                        return '%s' % time.localtime()

                def print_dot_end_time(time_str):
                        print('done: %s' % time_str)

                execute_in_worker_thread (
                        payload_function = slowly_print_dots,
                        payload_kwargs = {'info': ident},
                        completion_callback = print_dot_end_time
                )

        test_print_dots('A')
        test_print_dots('B')

reply via email to

[Prev in Thread] Current Thread [Next in Thread]