gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-go] branch master updated: Handle DHT lookup aborts.


From: gnunet
Subject: [gnunet-go] branch master updated: Handle DHT lookup aborts.
Date: Tue, 31 Mar 2020 13:19:02 +0200

This is an automated email from the git hooks/post-receive script.

bernd-fix pushed a commit to branch master
in repository gnunet-go.

The following commit(s) were added to refs/heads/master by this push:
     new 2a2a558  Handle DHT lookup aborts.
2a2a558 is described below

commit 2a2a558aa553aca3394513114d13d4888ae822db
Author: Bernd Fix <address@hidden>
AuthorDate: Tue Mar 31 13:12:30 2020 +0200

    Handle DHT lookup aborts.
---
 src/cmd/gnunet-service-gns-go/main.go           |   2 +
 src/cmd/peer_mockup/process.go                  |  26 +++---
 src/gnunet/go.mod                               |   4 +-
 src/gnunet/go.sum                               |   5 ++
 src/gnunet/message/factory.go                   |   2 +
 src/gnunet/message/msg_dht.go                   |  37 ++++++++
 src/gnunet/modules.go                           |  11 ++-
 src/gnunet/service/client.go                    |  35 +++++---
 src/gnunet/service/context.go                   |  68 +++++++++++++++
 src/gnunet/service/{namecache => dht}/module.go |  19 +++--
 src/gnunet/service/gns/dns.go                   |  13 ++-
 src/gnunet/service/gns/module.go                | 109 +++++++++++++++++++-----
 src/gnunet/service/gns/service.go               |  71 +++++++++++----
 src/gnunet/service/namecache/module.go          |   5 +-
 src/gnunet/service/service.go                   |  52 ++++++++---
 src/gnunet/transport/channel.go                 |  32 +++++--
 src/gnunet/transport/channel_netw.go            |  92 ++++++++++++++++++--
 src/gnunet/transport/connection.go              |  10 ++-
 src/gnunet/util/misc.go                         |   1 +
 src/gnunet/util/time.go                         |  19 +++++
 20 files changed, 510 insertions(+), 103 deletions(-)

diff --git a/src/cmd/gnunet-service-gns-go/main.go 
b/src/cmd/gnunet-service-gns-go/main.go
index 5f4062a..2815eaa 100644
--- a/src/cmd/gnunet-service-gns-go/main.go
+++ b/src/cmd/gnunet-service-gns-go/main.go
@@ -89,6 +89,8 @@ loop:
                                break loop
                        case syscall.SIGHUP:
                                logger.Println(logger.INFO, "[gns] SIGHUP")
+                       case syscall.SIGURG:
+                               // TODO: 
https://github.com/golang/go/issues/37942
                        default:
                                logger.Println(logger.INFO, "[gns] Unhandled 
signal: "+sig.String())
                        }
diff --git a/src/cmd/peer_mockup/process.go b/src/cmd/peer_mockup/process.go
index a26177b..510fc48 100644
--- a/src/cmd/peer_mockup/process.go
+++ b/src/cmd/peer_mockup/process.go
@@ -9,6 +9,12 @@ import (
        "gnunet/message"
        "gnunet/transport"
        "gnunet/util"
+
+       "github.com/bfix/gospel/concurrent"
+)
+
+var (
+       sig = concurrent.NewSignaller()
 )
 
 func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
@@ -20,7 +26,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) 
(err error) {
        in := make(chan message.Message)
        go func() {
                for {
-                       msg, err := c.Receive()
+                       msg, err := c.Receive(sig)
                        if err != nil {
                                fmt.Printf("Receive: %s\n", err.Error())
                                return
@@ -33,7 +39,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) 
(err error) {
        init := (from == p)
        if init {
                peerid := util.NewPeerID(p.GetID())
-               c.Send(message.NewTransportTcpWelcomeMsg(peerid))
+               c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig)
        }
 
        // remember peer addresses (only ONE!)
@@ -53,11 +59,11 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) 
(err error) {
                        case *message.TransportTcpWelcomeMsg:
                                peerid := util.NewPeerID(p.GetID())
                                if init {
-                                       c.Send(message.NewHelloMsg(peerid))
+                                       c.Send(message.NewHelloMsg(peerid), sig)
                                        target := util.NewPeerID(t.GetID())
-                                       
c.Send(message.NewTransportPingMsg(target, tAddr))
+                                       
c.Send(message.NewTransportPingMsg(target, tAddr), sig)
                                } else {
-                                       
c.Send(message.NewTransportTcpWelcomeMsg(peerid))
+                                       
c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig)
                                }
 
                        case *message.HelloMsg:
@@ -67,7 +73,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) 
(err error) {
                                if err := mOut.Sign(p.PrvKey()); err != nil {
                                        return err
                                }
-                               c.Send(mOut)
+                               c.Send(mOut, sig)
 
                        case *message.TransportPongMsg:
                                rc, err := msg.Verify(t.PubKey())
@@ -79,14 +85,14 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) 
(err error) {
                                }
                                send[message.TRANSPORT_PONG] = true
                                if mOut, ok := 
pending[message.TRANSPORT_SESSION_SYN]; ok {
-                                       c.Send(mOut)
+                                       c.Send(mOut, sig)
                                }
 
                        case *message.SessionSynMsg:
                                mOut := message.NewSessionSynAckMsg()
                                mOut.Timestamp = msg.Timestamp
                                if send[message.TRANSPORT_PONG] {
-                                       c.Send(mOut)
+                                       c.Send(mOut, sig)
                                } else {
                                        pending[message.TRANSPORT_SESSION_SYN] 
= mOut
                                }
@@ -97,7 +103,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) 
(err error) {
                        case *message.SessionAckMsg:
 
                        case *message.SessionKeepAliveMsg:
-                               
c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce))
+                               
c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce), sig)
 
                        case *message.EphemeralKeyMsg:
                                rc, err := msg.Verify(t.PubKey())
@@ -108,7 +114,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) 
(err error) {
                                        return errors.New("EPHKEY verification 
failed")
                                }
                                t.SetEphKeyMsg(msg)
-                               c.Send(p.EphKeyMsg())
+                               c.Send(p.EphKeyMsg(), sig)
                                secret := crypto.SharedSecret(p.EphPrvKey(), 
t.EphKeyMsg().Public())
                                c.SharedSecret(util.Clone(secret.Bits[:]))
 
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index 83720fd..443666f 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -3,7 +3,7 @@ module gnunet
 go 1.12
 
 require (
-       github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449
+       github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46
        github.com/miekg/dns v1.1.26
-       golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392
+       golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
 )
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index e9ece97..8c956b4 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -1,5 +1,7 @@
 github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 
h1:oIq3s14sMh1sq791v9VpR+GJvhVGbvuOWlfTjruRTDQ=
 github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449/go.mod 
h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA=
+github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46 
h1:5aNd1/ISbO1ltgmyUGza7kdaN4fD/Qal6uKZk9goMhw=
+github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46/go.mod 
h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA=
 github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU=
 github.com/miekg/dns v1.1.26/go.mod 
h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -7,6 +9,8 @@ golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 
h1:Gv7RPwsi3eZ2Fgewe3CBsu
 golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 
h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M=
 golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod 
h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
+golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 
h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
+golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod 
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190923162816-aa69164e4478 
h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
@@ -19,6 +23,7 @@ golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod 
h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe 
h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M=
 golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
diff --git a/src/gnunet/message/factory.go b/src/gnunet/message/factory.go
index e6e6e73..33e806b 100644
--- a/src/gnunet/message/factory.go
+++ b/src/gnunet/message/factory.go
@@ -59,6 +59,8 @@ func NewEmptyMessage(msgType uint16) (Message, error) {
        //------------------------------------------------------------------
        case DHT_CLIENT_GET:
                return NewDHTClientGetMsg(nil), nil
+       case DHT_CLIENT_GET_STOP:
+               return NewDHTClientGetStopMsg(nil), nil
        case DHT_CLIENT_RESULT:
                return NewDHTClientResultMsg(nil), nil
 
diff --git a/src/gnunet/message/msg_dht.go b/src/gnunet/message/msg_dht.go
index 925cb71..62a233f 100644
--- a/src/gnunet/message/msg_dht.go
+++ b/src/gnunet/message/msg_dht.go
@@ -126,3 +126,40 @@ func (m *DHTClientResultMsg) String() string {
 func (msg *DHTClientResultMsg) Header() *MessageHeader {
        return &MessageHeader{msg.MsgSize, msg.MsgType}
 }
+
+//----------------------------------------------------------------------
+// DHT_CLIENT_GET_STOP
+//----------------------------------------------------------------------
+
+// DHTClientGetStopMsg
+type DHTClientGetStopMsg struct {
+       MsgSize  uint16           `order:"big"` // total size of message
+       MsgType  uint16           `order:"big"` // DHT_CLIENT_GET_STOP (144)
+       Reserved uint32           `order:"big"` // Reserved (0)
+       Id       uint64           `order:"big"` // Unique ID identifying this 
request
+       Key      *crypto.HashCode // The key to search for
+}
+
+// NewDHTClientGetStopMsg creates a new default DHTClientGetStopMsg object.
+func NewDHTClientGetStopMsg(key *crypto.HashCode) *DHTClientGetStopMsg {
+       if key == nil {
+               key = new(crypto.HashCode)
+       }
+       return &DHTClientGetStopMsg{
+               MsgSize:  80,
+               MsgType:  DHT_CLIENT_GET_STOP,
+               Reserved: 0, // mandatory
+               Id:       0,
+               Key:      key,
+       }
+}
+
+// String returns a human-readable representation of the message.
+func (m *DHTClientGetStopMsg) String() string {
+       return fmt.Sprintf("DHTClientGetStopMsg{Id:%d,Key=%s}", m.Id, 
hex.EncodeToString(m.Key.Bits))
+}
+
+// Header returns the message header in a separate instance.
+func (msg *DHTClientGetStopMsg) Header() *MessageHeader {
+       return &MessageHeader{msg.MsgSize, msg.MsgType}
+}
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
index bc2993e..b71eb40 100644
--- a/src/gnunet/modules.go
+++ b/src/gnunet/modules.go
@@ -29,6 +29,7 @@
 package gnunet
 
 import (
+       "gnunet/service/dht"
        "gnunet/service/gns"
        "gnunet/service/namecache"
 )
@@ -37,6 +38,7 @@ import (
 type Instances struct {
        GNS       *gns.GNSModule
        Namecache *namecache.NamecacheModule
+       DHT       *dht.DHTModule
 }
 
 // Local reference to instance list
@@ -50,9 +52,14 @@ func init() {
        // Namecache (no calls to other modules)
        Modules.Namecache = new(namecache.NamecacheModule)
 
+       // DHT (no calls to other modules)
+       Modules.DHT = new(dht.DHTModule)
+
        // GNS (calls Namecache, DHT and Identity)
        Modules.GNS = &gns.GNSModule{
-               LookupLocal: Modules.Namecache.Get,
-               StoreLocal:  Modules.Namecache.Put,
+               LookupLocal:  Modules.Namecache.Get,
+               StoreLocal:   Modules.Namecache.Put,
+               LookupRemote: Modules.DHT.Get,
+               CancelRemote: Modules.DHT.Cancel,
        }
 }
diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go
index 1e54c2d..fe3fa9e 100644
--- a/src/gnunet/service/client.go
+++ b/src/gnunet/service/client.go
@@ -25,36 +25,49 @@ import (
        "github.com/bfix/gospel/logger"
 )
 
-// Client
+// Client type: Use to perform client-side interactions with GNUnet services.
 type Client struct {
-       ch *transport.MsgChannel
+       ch *transport.MsgChannel // channel for message exchange
 }
 
-// NewClient
+// NewClient creates a new client instance for the given channel endpoint.
 func NewClient(endp string) (*Client, error) {
-       //
+       // create a new channel to endpoint.
        ch, err := transport.NewChannel(endp)
        if err != nil {
                return nil, err
        }
+       // wrap into a message channel for the client.
        return &Client{
                ch: transport.NewMsgChannel(ch),
        }, nil
 }
 
-func (c *Client) SendRequest(req message.Message) error {
-       return c.ch.Send(req)
+// SendRequest sends a give message to the service.
+func (c *Client) SendRequest(ctx *SessionContext, req message.Message) error {
+       return c.ch.Send(req, ctx.Signaller())
 }
 
-func (c *Client) ReceiveResponse() (message.Message, error) {
-       return c.ch.Receive()
+// ReceiveResponse waits for a response from the service; it can be interrupted
+// by sending "false" to the cmd channel.
+func (c *Client) ReceiveResponse(ctx *SessionContext) (message.Message, error) 
{
+       return c.ch.Receive(ctx.Signaller())
 }
 
+// Close a client; no further message exchange is possible.
 func (c *Client) Close() error {
        return c.ch.Close()
 }
 
-func ServiceRequestResponse(caller, callee, endp string, req message.Message) 
(message.Message, error) {
+// ServiceRequestResponse is a helper method for a one request - one response
+// secenarios of client/serice interactions.
+func ServiceRequestResponse(
+       ctx *SessionContext,
+       caller string,
+       callee string,
+       endp string,
+       req message.Message) (message.Message, error) {
+
        // client-connect to the service
        logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, 
callee)
        cl, err := NewClient(endp)
@@ -63,13 +76,13 @@ func ServiceRequestResponse(caller, callee, endp string, 
req message.Message) (m
        }
        // send request
        logger.Printf(logger.DBG, "[%s] Sending request to %s service\n", 
caller, callee)
-       if err = cl.SendRequest(req); err != nil {
+       if err = cl.SendRequest(ctx, req); err != nil {
                return nil, err
        }
        // wait for a single response, then close the connection
        logger.Printf(logger.DBG, "[%s] Waiting for response from %s 
service\n", caller, callee)
        var resp message.Message
-       if resp, err = cl.ReceiveResponse(); err != nil {
+       if resp, err = cl.ReceiveResponse(ctx); err != nil {
                return nil, err
        }
        logger.Printf(logger.DBG, "[%s] Closing connection to %s service\n", 
caller, callee)
diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go
new file mode 100644
index 0000000..4896bd5
--- /dev/null
+++ b/src/gnunet/service/context.go
@@ -0,0 +1,68 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019, 2020 Bernd Fix  >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package service
+
+import (
+       "sync"
+
+       "gnunet/util"
+
+       "github.com/bfix/gospel/concurrent"
+)
+
+// SessionContext is used to set a context for each client connection handled
+// by a service; the session is handled by the 'ServeClient' method of a
+// service implementation.
+type SessionContext struct {
+       Id  int                   // session identifier
+       wg  *sync.WaitGroup       // wait group for the session
+       sig *concurrent.Signaller // signaller for the session
+}
+
+// NewSessionContext instantiates a new session context.
+func NewSessionContext() *SessionContext {
+       return &SessionContext{
+               Id:  util.NextID(),
+               wg:  new(sync.WaitGroup),
+               sig: concurrent.NewSignaller(),
+       }
+}
+
+// Cancel all go-routines associated with this context.
+func (ctx *SessionContext) Cancel() {
+       // send signal to terminate...
+       ctx.sig.Send(true)
+       // wait for session go-routines to finish
+       ctx.wg.Wait()
+}
+
+// Add a go-routine to the wait group.
+func (ctx *SessionContext) Add() {
+       ctx.wg.Add(1)
+}
+
+// Remove a go-routine from the wait group.
+func (ctx *SessionContext) Remove() {
+       ctx.wg.Done()
+}
+
+// Signaller returns the working instance for the context.
+func (ctx *SessionContext) Signaller() *concurrent.Signaller {
+       return ctx.sig
+}
diff --git a/src/gnunet/service/namecache/module.go 
b/src/gnunet/service/dht/module.go
similarity index 71%
copy from src/gnunet/service/namecache/module.go
copy to src/gnunet/service/dht/module.go
index 6a10625..a54d0eb 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -16,29 +16,34 @@
 //
 // SPDX-License-Identifier: AGPL3.0-or-later
 
-package namecache
+package dht
 
 import (
        "gnunet/message"
+       "gnunet/service"
        "gnunet/service/gns"
 )
 
 //======================================================================
-// "GNS name cache" implementation
+// "DHT" implementation
 //======================================================================
 
 //----------------------------------------------------------------------
-// Put and get GNS blocks into/from a cache (transient storage)
+// Put and get blocks into/from a DHT.
 //----------------------------------------------------------------------
 
-// Namecache handles the transient storage of GNS blocks under the query key.
-type NamecacheModule struct {
+// DHT handles the permanent storage of blocks under the query key.
+type DHTModule struct {
 }
 
-func (nc *NamecacheModule) Get(query *gns.Query) (*message.GNSBlock, error) {
+func (nc *DHTModule) Get(ctx *service.SessionContext, query *gns.Query) 
(*message.GNSBlock, error) {
        return nil, nil
 }
 
-func (nc *NamecacheModule) Put(block *message.GNSBlock) error {
+func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query) 
error {
+       return nil
+}
+
+func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) 
error {
        return nil
 }
diff --git a/src/gnunet/service/gns/dns.go b/src/gnunet/service/gns/dns.go
index 28e9813..7a9a30c 100644
--- a/src/gnunet/service/gns/dns.go
+++ b/src/gnunet/service/gns/dns.go
@@ -26,6 +26,7 @@ import (
 
        "gnunet/enums"
        "gnunet/message"
+       "gnunet/service"
        "gnunet/util"
 
        "github.com/bfix/gospel/crypto/ed25519"
@@ -202,10 +203,16 @@ func QueryDNS(id int, name string, server net.IP, kind 
RRTypeList) *message.GNSR
 // ResolveDNS resolves a name in DNS. Multiple DNS servers are queried in
 // parallel; the first result delivered by any of the servers is returned
 // as the result list of matching resource records.
-func (gns *GNSModule) ResolveDNS(name string, servers []string, kind 
RRTypeList, pkey *ed25519.PublicKey, depth int) (set *message.GNSRecordSet, err 
error) {
-       logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n", 
name)
+func (gns *GNSModule) ResolveDNS(
+       ctx *service.SessionContext,
+       name string,
+       servers []string,
+       kind RRTypeList,
+       pkey *ed25519.PublicKey,
+       depth int) (set *message.GNSRecordSet, err error) {
 
        // start DNS queries concurrently
+       logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n", 
name)
        res := make(chan *message.GNSRecordSet)
        running := 0
        for _, srv := range servers {
@@ -215,7 +222,7 @@ func (gns *GNSModule) ResolveDNS(name string, servers 
[]string, kind RRTypeList,
                if addr == nil {
                        // no, it is a name... try to resolve an IP address 
from the name
                        query := NewRRTypeList(enums.GNS_TYPE_DNS_A, 
enums.GNS_TYPE_DNS_AAAA)
-                       if set, err = gns.ResolveUnknown(srv, nil, pkey, query, 
depth+1); err != nil {
+                       if set, err = gns.ResolveUnknown(ctx, srv, nil, pkey, 
query, depth+1); err != nil {
                                logger.Printf(logger.ERROR, "[dns] Can't 
resolve NS server '%s': %s\n", srv, err.Error())
                                continue
                        }
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index d067633..43f7b7a 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -26,6 +26,8 @@ import (
        "gnunet/crypto"
        "gnunet/enums"
        "gnunet/message"
+       "gnunet/service"
+       "gnunet/transport"
        "gnunet/util"
 
        "github.com/bfix/gospel/crypto/ed25519"
@@ -110,15 +112,22 @@ func NewQuery(pkey *ed25519.PublicKey, label string) 
*Query {
 // GNSModule handles the resolution of GNS names to RRs bundled in a block.
 type GNSModule struct {
        // Use function references for calls to methods in other modules:
-       //
-       LookupLocal  func(query *Query) (*message.GNSBlock, error)
-       StoreLocal   func(block *message.GNSBlock) error
-       LookupRemote func(query *Query) (*message.GNSBlock, error)
+       LookupLocal  func(ctx *service.SessionContext, query *Query) 
(*message.GNSBlock, error)
+       StoreLocal   func(ctx *service.SessionContext, block *message.GNSBlock) 
error
+       LookupRemote func(ctx *service.SessionContext, query *Query) 
(*message.GNSBlock, error)
+       CancelRemote func(ctx *service.SessionContext, query *Query) error
 }
 
 // Resolve a GNS name with multiple labels. If pkey is not nil, the name
 // is interpreted as "relative to current zone".
-func (gns *GNSModule) Resolve(path string, pkey *ed25519.PublicKey, kind 
RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) {
+func (gns *GNSModule) Resolve(
+       ctx *service.SessionContext,
+       path string,
+       pkey *ed25519.PublicKey,
+       kind RRTypeList,
+       mode int,
+       depth int) (set *message.GNSRecordSet, err error) {
+
        // check for recursion depth
        if depth > config.Cfg.GNS.MaxDepth {
                return nil, ErrGNSRecursionExceeded
@@ -130,14 +139,20 @@ func (gns *GNSModule) Resolve(path string, pkey 
*ed25519.PublicKey, kind RRTypeL
        // check for relative path
        if pkey != nil {
                //resolve relative path
-               return gns.ResolveRelative(names, pkey, kind, mode, depth)
+               return gns.ResolveRelative(ctx, names, pkey, kind, mode, depth)
        }
        // resolve absolute path
-       return gns.ResolveAbsolute(names, kind, mode, depth)
+       return gns.ResolveAbsolute(ctx, names, kind, mode, depth)
 }
 
 // Resolve a fully qualified GNS absolute name (with multiple labels).
-func (gns *GNSModule) ResolveAbsolute(labels []string, kind RRTypeList, mode 
int, depth int) (set *message.GNSRecordSet, err error) {
+func (gns *GNSModule) ResolveAbsolute(
+       ctx *service.SessionContext,
+       labels []string,
+       kind RRTypeList,
+       mode int,
+       depth int) (set *message.GNSRecordSet, err error) {
+
        // get the zone key for the TLD
        pkey := gns.GetZoneKey(labels[0])
        if pkey == nil {
@@ -146,12 +161,19 @@ func (gns *GNSModule) ResolveAbsolute(labels []string, 
kind RRTypeList, mode int
                return
        }
        // continue with resolution relative to a zone.
-       return gns.ResolveRelative(labels[1:], pkey, kind, mode, depth)
+       return gns.ResolveRelative(ctx, labels[1:], pkey, kind, mode, depth)
 }
 
 // Resolve relative path (to a given zone) recursively by processing simple
 // (PKEY,Label) lookups in sequence and handle intermediate GNS record types
-func (gns *GNSModule) ResolveRelative(labels []string, pkey 
*ed25519.PublicKey, kind RRTypeList, mode int, depth int) (set 
*message.GNSRecordSet, err error) {
+func (gns *GNSModule) ResolveRelative(
+       ctx *service.SessionContext,
+       labels []string,
+       pkey *ed25519.PublicKey,
+       kind RRTypeList,
+       mode int,
+       depth int) (set *message.GNSRecordSet, err error) {
+
        // Process all names in sequence
        var (
                records []*message.GNSResourceRecord // final resource records 
from resolution
@@ -162,7 +184,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey 
*ed25519.PublicKey,
 
                // resolve next level
                var block *message.GNSBlock
-               if block, err = gns.Lookup(pkey, labels[0], mode); err != nil {
+               if block, err = gns.Lookup(ctx, pkey, labels[0], mode); err != 
nil {
                        // failed to resolve name
                        return
                }
@@ -225,10 +247,23 @@ func (gns *GNSModule) ResolveRelative(labels []string, 
pkey *ed25519.PublicKey,
                                lbls += "."
                        }
                        fqdn := lbls + inst.Query
-                       if set, err = gns.ResolveDNS(fqdn, inst.Servers, kind, 
pkey, depth); err != nil {
+                       if set, err = gns.ResolveDNS(ctx, fqdn, inst.Servers, 
kind, pkey, depth); err != nil {
                                logger.Println(logger.ERROR, "[gns] GNS2DNS 
resolution failed.")
                                return
                        }
+                       // add synthetic LEHO record if we have results and are 
at the
+                       // end of the name (labels).
+                       if len(set.Records) > 0 && len(labels) == 1 {
+                               // add LEHO supplemental record: The TTL of the 
new record is
+                               // the longest-living record in the current set.
+                               expires := util.AbsoluteTimeNow()
+                               for _, rec := range set.Records {
+                                       if rec.Expires.Compare(expires) > 0 {
+                                               expires = rec.Expires
+                                       }
+                               }
+                               set.Records = append(set.Records, 
gns.newLEHORecord(inst.Query, expires))
+                       }
                        // we are done with resolution; pass on records to 
caller
                        records = set.Records
                        break
@@ -250,7 +285,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey 
*ed25519.PublicKey,
                                break
                        }
                        logger.Println(logger.DBG, "[gns] CNAME resolution 
required.")
-                       if set, err = gns.ResolveUnknown(inst.name, labels, 
pkey, kind, depth+1); err != nil {
+                       if set, err = gns.ResolveUnknown(ctx, inst.name, 
labels, pkey, kind, depth+1); err != nil {
                                logger.Println(logger.ERROR, "[gns] CNAME 
resolution failed.")
                                return
                        }
@@ -300,7 +335,14 @@ func (gns *GNSModule) ResolveRelative(labels []string, 
pkey *ed25519.PublicKey,
 // relative to the zone PKEY. If the name is an absolute GNS name (ending in
 // a PKEY TLD), it is also resolved with GNS. All other names are resolved
 // via DNS queries.
-func (gns *GNSModule) ResolveUnknown(name string, labels []string, pkey 
*ed25519.PublicKey, kind RRTypeList, depth int) (set *message.GNSRecordSet, err 
error) {
+func (gns *GNSModule) ResolveUnknown(
+       ctx *service.SessionContext,
+       name string,
+       labels []string,
+       pkey *ed25519.PublicKey,
+       kind RRTypeList,
+       depth int) (set *message.GNSRecordSet, err error) {
+
        // relative GNS-based server name?
        if strings.HasSuffix(name, ".+") {
                // resolve server name relative to current zone
@@ -308,14 +350,14 @@ func (gns *GNSModule) ResolveUnknown(name string, labels 
[]string, pkey *ed25519
                for _, label := range util.ReverseStringList(labels) {
                        name += "." + label
                }
-               if set, err = gns.Resolve(name, pkey, kind, 
enums.GNS_LO_DEFAULT, depth+1); err != nil {
+               if set, err = gns.Resolve(ctx, name, pkey, kind, 
enums.GNS_LO_DEFAULT, depth+1); err != nil {
                        return
                }
        } else {
                // check for absolute GNS name (with PKEY as TLD)
                if zk := gns.GetZoneKey(name); zk != nil {
                        // resolve absolute GNS name (name ends in a PKEY)
-                       if set, err = gns.Resolve(util.StripPathRight(name), 
zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil {
+                       if set, err = gns.Resolve(ctx, 
util.StripPathRight(name), zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil 
{
                                return
                        }
                } else {
@@ -342,13 +384,17 @@ func (gns *GNSModule) GetZoneKey(path string) 
*ed25519.PublicKey {
 }
 
 // Lookup name in GNS.
-func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) 
(block *message.GNSBlock, err error) {
+func (gns *GNSModule) Lookup(
+       ctx *service.SessionContext,
+       pkey *ed25519.PublicKey,
+       label string,
+       mode int) (block *message.GNSBlock, err error) {
 
        // create query (lookup key)
        query := NewQuery(pkey, label)
 
        // try local lookup first
-       if block, err = gns.LookupLocal(query); err != nil {
+       if block, err = gns.LookupLocal(ctx, query); err != nil {
                logger.Printf(logger.ERROR, "[gns] local Lookup: %s\n", 
err.Error())
                block = nil
                return
@@ -356,9 +402,17 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, 
label string, mode int) (b
        if block == nil {
                if mode == enums.GNS_LO_DEFAULT {
                        // get the block from a remote lookup
-                       if block, err = gns.LookupRemote(query); err != nil || 
block == nil {
+                       if block, err = gns.LookupRemote(ctx, query); err != 
nil || block == nil {
                                if err != nil {
-                                       logger.Printf(logger.ERROR, "[gns] 
remote Lookup: %s\n", err.Error())
+                                       // check for aborted remote lookup: we 
need to cancel the query
+                                       if err == 
transport.ErrChannelInterrupted {
+                                               logger.Println(logger.WARN, 
"[gns] remote Lookup aborted -- cleaning up.")
+                                               if err = gns.CancelRemote(ctx, 
query); err != nil {
+                                                       
logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", 
err.Error())
+                                               }
+                                       } else {
+                                               logger.Printf(logger.ERROR, 
"[gns] remote Lookup failed: %s\n", err.Error())
+                                       }
                                        block = nil
                                } else {
                                        logger.Println(logger.DBG, "[gns] 
remote Lookup: no block found")
@@ -367,8 +421,21 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, 
label string, mode int) (b
                                return
                        }
                        // store RRs from remote locally.
-                       gns.StoreLocal(block)
+                       gns.StoreLocal(ctx, block)
                }
        }
        return
 }
+
+// newLEHORecord creates a new supplemental GNS record of type LEHO.
+func (gns *GNSModule) newLEHORecord(name string, expires util.AbsoluteTime) 
*message.GNSResourceRecord {
+       rr := new(message.GNSResourceRecord)
+       rr.Expires = expires
+       rr.Flags = uint32(enums.GNS_FLAG_SUPPL)
+       rr.Type = uint32(enums.GNS_TYPE_LEHO)
+       rr.Size = uint32(len(name) + 1)
+       rr.Data = make([]byte, rr.Size)
+       copy(rr.Data, []byte(name))
+       rr.Data[len(name)] = 0
+       return rr
+}
diff --git a/src/gnunet/service/gns/service.go 
b/src/gnunet/service/gns/service.go
index 1a62e77..2526ae8 100644
--- a/src/gnunet/service/gns/service.go
+++ b/src/gnunet/service/gns/service.go
@@ -22,7 +22,6 @@ import (
        "encoding/hex"
        "fmt"
        "io"
-       "sync"
 
        "gnunet/config"
        "gnunet/crypto"
@@ -60,6 +59,7 @@ func NewGNSService() service.Service {
        inst.LookupLocal = inst.LookupNamecache
        inst.StoreLocal = inst.StoreNamecache
        inst.LookupRemote = inst.LookupDHT
+       inst.CancelRemote = inst.CancelDHT
        return inst
 }
 
@@ -74,15 +74,18 @@ func (s *GNSService) Stop() error {
 }
 
 // Serve a client channel.
-func (s *GNSService) ServeClient(wg *sync.WaitGroup, mc *transport.MsgChannel) 
{
-       defer wg.Done()
+func (s *GNSService) ServeClient(ctx *service.SessionContext, mc 
*transport.MsgChannel) {
+
 loop:
        for {
                // receive next message from client
-               msg, err := mc.Receive()
+               logger.Printf(logger.DBG, "[gns] Waiting for message in session 
'%d'...\n", ctx.Id)
+               msg, err := mc.Receive(ctx.Signaller())
                if err != nil {
                        if err == io.EOF {
                                logger.Println(logger.INFO, "[gns] Client 
channel closed.")
+                       } else if err == transport.ErrChannelInterrupted {
+                               logger.Println(logger.INFO, "[gns] Service 
operation interrupted.")
                        } else {
                                logger.Printf(logger.ERROR, "[gns] 
Message-receive failed: %s\n", err.Error())
                        }
@@ -102,23 +105,28 @@ loop:
                        resp = respX
 
                        // perform lookup on block (locally and remote)
-                       wg.Add(1)
                        go func() {
+                               ctx.Add()
                                defer func() {
                                        // send response
-                                       if err := mc.Send(resp); err != nil {
-                                               logger.Printf(logger.ERROR, 
"[gns] Failed to send response: %s\n", err.Error())
+                                       if resp != nil {
+                                               if err := mc.Send(resp, 
ctx.Signaller()); err != nil {
+                                                       
logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error())
+                                               }
                                        }
                                        // go-routine finished
-                                       wg.Done()
+                                       ctx.Remove()
                                }()
 
                                pkey := ed25519.NewPublicKeyFromBytes(m.Zone)
                                label := m.GetName()
                                kind := NewRRTypeList(int(m.Type))
-                               recset, err := s.Resolve(label, pkey, kind, 
int(m.Options), 0)
+                               recset, err := s.Resolve(ctx, label, pkey, 
kind, int(m.Options), 0)
                                if err != nil {
                                        logger.Printf(logger.ERROR, "[gns] 
Failed to lookup block: %s\n", err.Error())
+                                       if err == 
transport.ErrChannelInterrupted {
+                                               resp = nil
+                                       }
                                        return
                                }
                                // handle records
@@ -151,12 +159,16 @@ loop:
                        break loop
                }
        }
+       // cancel all tasks running for this session/connection
+       logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n", 
ctx.Id)
+       ctx.Cancel()
+
        // close client connection
        mc.Close()
 }
 
 // LookupNamecache
-func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, 
err error) {
+func (s *GNSService) LookupNamecache(ctx *service.SessionContext, query 
*Query) (block *message.GNSBlock, err error) {
        logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n", 
hex.EncodeToString(query.Key.Bits))
 
        // assemble Namecache request
@@ -166,7 +178,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block 
*message.GNSBlock, err
 
        // get response from Namecache service
        var resp message.Message
-       if resp, err = service.ServiceRequestResponse("gns", "Namecache", 
config.Cfg.Namecache.Endpoint, req); err != nil {
+       if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache", 
config.Cfg.Namecache.Endpoint, req); err != nil {
                return
        }
 
@@ -219,7 +231,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block 
*message.GNSBlock, err
 }
 
 // StoreNamecache
-func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) {
+func (s *GNSService) StoreNamecache(ctx *service.SessionContext, block 
*message.GNSBlock) (err error) {
        logger.Println(logger.DBG, "[gns] StoreNamecache()...")
 
        // assemble Namecache request
@@ -228,7 +240,7 @@ func (s *GNSService) StoreNamecache(block 
*message.GNSBlock) (err error) {
 
        // get response from Namecache service
        var resp message.Message
-       if resp, err = service.ServiceRequestResponse("gns", "Namecache", 
config.Cfg.Namecache.Endpoint, req); err != nil {
+       if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache", 
config.Cfg.Namecache.Endpoint, req); err != nil {
                return
        }
 
@@ -255,7 +267,7 @@ func (s *GNSService) StoreNamecache(block 
*message.GNSBlock) (err error) {
 }
 
 // LookupDHT
-func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err 
error) {
+func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) 
(block *message.GNSBlock, err error) {
        logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", 
hex.EncodeToString(query.Key.Bits))
 
        // assemble DHT request
@@ -268,7 +280,7 @@ func (s *GNSService) LookupDHT(query *Query) (block 
*message.GNSBlock, err error
 
        // get response from DHT service
        var resp message.Message
-       if resp, err = service.ServiceRequestResponse("gns", "DHT", 
config.Cfg.DHT.Endpoint, req); err != nil {
+       if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", 
config.Cfg.DHT.Endpoint, req); err != nil {
                return
        }
 
@@ -313,9 +325,36 @@ func (s *GNSService) LookupDHT(query *Query) (block 
*message.GNSBlock, err error
 
                // we got a result from DHT that was not in the namecache,
                // so store it there now.
-               if err = s.StoreNamecache(block); err != nil {
+               if err = s.StoreNamecache(ctx, block); err != nil {
                        logger.Printf(logger.ERROR, "[gns] can't store block in 
Namecache: %s\n", err.Error())
                }
        }
        return
 }
+
+// CancelDHT
+func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err 
error) {
+       logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n", 
hex.EncodeToString(query.Key.Bits))
+
+       // assemble DHT request
+       req := message.NewDHTClientGetStopMsg(query.Key)
+       req.Id = uint64(util.NextID())
+
+       // get response from DHT service
+       var resp message.Message
+       if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", 
config.Cfg.DHT.Endpoint, req); err != nil {
+               return
+       }
+
+       // handle message depending on its type
+       logger.Println(logger.DBG, "[gns] Handling response from DHT service")
+       switch m := resp.(type) {
+       case *message.DHTClientResultMsg:
+               // check for matching IDs
+               if m.Id != req.Id {
+                       logger.Println(logger.ERROR, "[gns] Got response for 
unknown ID")
+                       break
+               }
+       }
+       return
+}
diff --git a/src/gnunet/service/namecache/module.go 
b/src/gnunet/service/namecache/module.go
index 6a10625..a205444 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -20,6 +20,7 @@ package namecache
 
 import (
        "gnunet/message"
+       "gnunet/service"
        "gnunet/service/gns"
 )
 
@@ -35,10 +36,10 @@ import (
 type NamecacheModule struct {
 }
 
-func (nc *NamecacheModule) Get(query *gns.Query) (*message.GNSBlock, error) {
+func (nc *NamecacheModule) Get(ctx *service.SessionContext, query *gns.Query) 
(*message.GNSBlock, error) {
        return nil, nil
 }
 
-func (nc *NamecacheModule) Put(block *message.GNSBlock) error {
+func (nc *NamecacheModule) Put(ctx *service.SessionContext, block 
*message.GNSBlock) error {
        return nil
 }
diff --git a/src/gnunet/service/service.go b/src/gnunet/service/service.go
index d484a95..1017e0b 100644
--- a/src/gnunet/service/service.go
+++ b/src/gnunet/service/service.go
@@ -33,19 +33,21 @@ import (
 // Channel semantics in the specification string.
 type Service interface {
        Start(spec string) error
-       ServeClient(wg *sync.WaitGroup, ch *transport.MsgChannel)
+       ServeClient(ctx *SessionContext, ch *transport.MsgChannel)
        Stop() error
 }
 
 // ServiceImpl is an implementation of generic service functionality.
 type ServiceImpl struct {
-       impl    Service
-       hdlr    chan transport.Channel
-       ctrl    chan bool
-       srvc    transport.ChannelServer
-       wg      *sync.WaitGroup
-       name    string
-       running bool
+       impl    Service                 // Specific service implementation
+       hdlr    chan transport.Channel  // Channel from listener
+       ctrl    chan bool               // Control channel
+       drop    chan int                // Channel to drop a session from 
pending list
+       srvc    transport.ChannelServer // multi-user service
+       wg      *sync.WaitGroup         // wait group for go routine 
synchronization
+       name    string                  // service name
+       running bool                    // service currently running?
+       pending map[int]*SessionContext // list of pending sessions
 }
 
 // NewServiceImpl instantiates a new ServiceImpl object.
@@ -54,10 +56,12 @@ func NewServiceImpl(name string, srv Service) *ServiceImpl {
                impl:    srv,
                hdlr:    make(chan transport.Channel),
                ctrl:    make(chan bool),
+               drop:    make(chan int),
                srvc:    nil,
                wg:      new(sync.WaitGroup),
                name:    name,
                running: false,
+               pending: make(map[int]*SessionContext),
        }
 }
 
@@ -83,6 +87,8 @@ func (si *ServiceImpl) Start(spec string) (err error) {
        loop:
                for si.running {
                        select {
+
+                       // handle incoming connections
                        case in := <-si.hdlr:
                                if in == nil {
                                        logger.Printf(logger.INFO, "[%s] 
Listener terminated.\n", si.name)
@@ -90,14 +96,38 @@ func (si *ServiceImpl) Start(spec string) (err error) {
                                }
                                switch ch := in.(type) {
                                case transport.Channel:
-                                       logger.Printf(logger.INFO, "[%s] Client 
connected.\n", si.name)
-                                       si.wg.Add(1)
-                                       go si.impl.ServeClient(si.wg, 
transport.NewMsgChannel(ch))
+                                       // run a new session with context
+                                       ctx := NewSessionContext()
+                                       sessId := ctx.Id
+                                       si.pending[sessId] = ctx
+                                       logger.Printf(logger.INFO, "[%s] 
Session '%d' started.\n", si.name, sessId)
+
+                                       go func() {
+                                               // serve client on the message 
channel
+                                               si.impl.ServeClient(ctx, 
transport.NewMsgChannel(ch))
+                                               // session is done now.
+                                               logger.Printf(logger.INFO, 
"[%s] Session with client '%d' ended.\n", si.name, sessId)
+                                               si.drop <- sessId
+                                       }()
                                }
+
+                       // handle session removal
+                       case sessId := <-si.drop:
+                               delete(si.pending, sessId)
+
+                       // handle cancelation signal on listener.
                        case <-si.ctrl:
                                break loop
                        }
                }
+
+               // terminate pending sessions
+               for _, ctx := range si.pending {
+                       logger.Printf(logger.DBG, "[%s] Session '%d' 
closing...\n", si.name, ctx.Id)
+                       ctx.Cancel()
+               }
+
+               // close-down service
                logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name)
                si.srvc.Close()
                si.running = false
diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go
index 092ab8a..4ba49ac 100644
--- a/src/gnunet/transport/channel.go
+++ b/src/gnunet/transport/channel.go
@@ -28,6 +28,7 @@ import (
        "gnunet/message"
        "gnunet/util"
 
+       "github.com/bfix/gospel/concurrent"
        "github.com/bfix/gospel/data"
        "github.com/bfix/gospel/logger"
 )
@@ -36,6 +37,8 @@ import (
 var (
        ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented")
        ErrChannelNotOpened      = fmt.Errorf("Channel not opened")
+       ErrChannelInterrupted    = fmt.Errorf("Channel interrupted")
+       ErrChannelClosed         = fmt.Errorf("Channel closed")
 )
 
 ////////////////////////////////////////////////////////////////////////
@@ -49,10 +52,11 @@ var (
 //     "tcp+1.2.3.4:5"       -- for TCP channels
 //     "udp+1.2.3.4:5"       -- for UDP channels
 type Channel interface {
-       Open(spec string) error
-       Close() error
-       Read([]byte) (int, error)
-       Write([]byte) (int, error)
+       Open(spec string) error                           // open channel (for 
read/write)
+       Close() error                                     // close open channel
+       IsOpen() bool                                     // check if channel 
is open
+       Read([]byte, *concurrent.Signaller) (int, error)  // read from channel
+       Write([]byte, *concurrent.Signaller) (int, error) // write to channel
 }
 
 // ChannelFactory instantiates specific Channel imülementations.
@@ -140,7 +144,12 @@ func (c *MsgChannel) Close() error {
 }
 
 // Send a GNUnet message over a channel.
-func (c *MsgChannel) Send(msg message.Message) error {
+func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) 
error {
+
+       // check for closed channel
+       if !c.ch.IsOpen() {
+               return ErrChannelClosed
+       }
 
        // convert message to binary data
        data, err := data.Marshal(msg)
@@ -160,7 +169,7 @@ func (c *MsgChannel) Send(msg message.Message) error {
        }
 
        // send packet
-       n, err := c.ch.Write(data)
+       n, err := c.ch.Write(data, sig)
        if err != nil {
                return err
        }
@@ -171,9 +180,15 @@ func (c *MsgChannel) Send(msg message.Message) error {
 }
 
 // Receive GNUnet messages over a plain Channel.
-func (c *MsgChannel) Receive() (message.Message, error) {
+func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, 
error) {
+       // check for closed channel
+       if !c.ch.IsOpen() {
+               return nil, ErrChannelClosed
+       }
+
+       // get bytes from channel
        get := func(pos, count int) error {
-               n, err := c.ch.Read(c.buf[pos : pos+count])
+               n, err := c.ch.Read(c.buf[pos:pos+count], sig)
                if err != nil {
                        return err
                }
@@ -182,6 +197,7 @@ func (c *MsgChannel) Receive() (message.Message, error) {
                }
                return nil
        }
+
        if err := get(0, 4); err != nil {
                return nil, err
        }
diff --git a/src/gnunet/transport/channel_netw.go 
b/src/gnunet/transport/channel_netw.go
index c56c089..4f0ba4a 100644
--- a/src/gnunet/transport/channel_netw.go
+++ b/src/gnunet/transport/channel_netw.go
@@ -24,9 +24,30 @@ import (
        "strconv"
        "strings"
 
+       "github.com/bfix/gospel/concurrent"
        "github.com/bfix/gospel/logger"
 )
 
+// ChannelResult for read/write operations on channels.
+type ChannelResult struct {
+       count int   // number of bytes read/written
+       err   error // error (or nil)
+}
+
+// NewChannelResult instanciates a new object with given attributes.
+func NewChannelResult(n int, err error) *ChannelResult {
+       return &ChannelResult{
+               count: n,
+               err:   err,
+       }
+}
+
+// Values() returns the attributes of a result instance (for passing up the
+// call stack).
+func (cr *ChannelResult) Values() (int, error) {
+       return cr.count, cr.err
+}
+
 ////////////////////////////////////////////////////////////////////////
 // Generic network-based Channel
 
@@ -64,27 +85,86 @@ func (c *NetworkChannel) Open(spec string) (err error) {
 // Close a network channel
 func (c *NetworkChannel) Close() error {
        if c.conn != nil {
-               return c.conn.Close()
+               rc := c.conn.Close()
+               c.conn = nil
+               return rc
        }
        return ErrChannelNotOpened
 }
 
+// IsOpen returns true if the channel is opened
+func (c *NetworkChannel) IsOpen() bool {
+       return c.conn != nil
+}
+
 // Read bytes from a network channel into buffer: Returns the number of read
 // bytes and an error code. Only works on open channels ;)
-func (c *NetworkChannel) Read(buf []byte) (int, error) {
+// The read can be aborted by sending 'true' on the cmd interface; the
+// channel is closed after such interruption.
+func (c *NetworkChannel) Read(buf []byte, sig *concurrent.Signaller) (int, 
error) {
+       // check if the channel is open
        if c.conn == nil {
                return 0, ErrChannelNotOpened
        }
-       return c.conn.Read(buf)
+       // perform operation in go-routine
+       result := make(chan *ChannelResult)
+       go func() {
+               result <- NewChannelResult(c.conn.Read(buf))
+       }()
+
+       listener := sig.Listen()
+       defer sig.Drop(listener)
+       for {
+               select {
+               // handle terminate command
+               case x := <-listener:
+                       switch val := x.(type) {
+                       case bool:
+                               if val {
+                                       c.conn.Close()
+                                       c.conn = nil
+                                       return 0, ErrChannelInterrupted
+                               }
+                       }
+               // handle result of read operation
+               case res := <-result:
+                       return res.Values()
+               }
+       }
 }
 
 // Write buffer to a network channel: Returns the number of written bytes and
-// an error code.
-func (c *NetworkChannel) Write(buf []byte) (int, error) {
+// an error code. The write operation can be aborted by sending 'true' on the
+// command channel; the network channel is closed after such interrupt.
+func (c *NetworkChannel) Write(buf []byte, sig *concurrent.Signaller) (int, 
error) {
+       // check if we have an open channel to write to.
        if c.conn == nil {
                return 0, ErrChannelNotOpened
        }
-       return c.conn.Write(buf)
+       // perform operation in go-routine
+       result := make(chan *ChannelResult)
+       go func() {
+               result <- NewChannelResult(c.conn.Write(buf))
+       }()
+
+       listener := sig.Listen()
+       defer sig.Drop(listener)
+       for {
+               select {
+               // handle terminate command
+               case x := <-listener:
+                       switch val := x.(type) {
+                       case bool:
+                               if val {
+                                       c.conn.Close()
+                                       return 0, ErrChannelInterrupted
+                               }
+                       }
+               // handle result of read operation
+               case res := <-result:
+                       return res.Values()
+               }
+       }
 }
 
 ////////////////////////////////////////////////////////////////////////
diff --git a/src/gnunet/transport/connection.go 
b/src/gnunet/transport/connection.go
index e2cfae2..03549fc 100644
--- a/src/gnunet/transport/connection.go
+++ b/src/gnunet/transport/connection.go
@@ -21,6 +21,8 @@ package transport
 import (
        "gnunet/core"
        "gnunet/message"
+
+       "github.com/bfix/gospel/concurrent"
 )
 
 // Connection for communicating peers
@@ -67,11 +69,11 @@ func (c *Connection) Close() error {
 }
 
 // Send a message on the connection
-func (c *Connection) Send(msg message.Message) error {
-       return c.ch.Send(msg)
+func (c *Connection) Send(msg message.Message, sig *concurrent.Signaller) 
error {
+       return c.ch.Send(msg, sig)
 }
 
 // Receive a message on the connection
-func (c *Connection) Receive() (message.Message, error) {
-       return c.ch.Receive()
+func (c *Connection) Receive(sig *concurrent.Signaller) (message.Message, 
error) {
+       return c.ch.Receive(sig)
 }
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index afad974..80f5c84 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -22,6 +22,7 @@ import (
        "strings"
 )
 
+// CounterMap
 type CounterMap map[interface{}]int
 
 func (cm CounterMap) Add(i interface{}) int {
diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go
index 3e09791..e1e0e30 100644
--- a/src/gnunet/util/time.go
+++ b/src/gnunet/util/time.go
@@ -78,6 +78,25 @@ func (t AbsoluteTime) Expired() bool {
        return t.Val < uint64(time.Now().Unix())
 }
 
+// Compare two times (-1 = (t < t2), 0 = (t == t2), 1 = (t > t2)
+func (t AbsoluteTime) Compare(t2 AbsoluteTime) int {
+       if t.Val == math.MaxUint64 {
+               if t2.Val == math.MaxUint64 {
+                       return 0
+               }
+               return 1
+       }
+       if t2.Val == math.MaxUint64 {
+               return -1
+       }
+       if t.Val < t2.Val {
+               return -1
+       } else if t.Val == t2.Val {
+               return 0
+       }
+       return 1
+}
+
 //----------------------------------------------------------------------
 // Relative time
 //----------------------------------------------------------------------

-- 
To stop receiving notification emails like this one, please contact
address@hidden.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]