gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-go] branch master updated: Fixed DHT cancellation sequence and m


From: gnunet
Subject: [gnunet-go] branch master updated: Fixed DHT cancellation sequence and minor improvements.
Date: Wed, 01 Apr 2020 12:28:31 +0200

This is an automated email from the git hooks/post-receive script.

bernd-fix pushed a commit to branch master
in repository gnunet-go.

The following commit(s) were added to refs/heads/master by this push:
     new db7da66  Fixed DHT cancellation sequence and minor improvements.
db7da66 is described below

commit db7da66be57c6a9df87f3ea1f3cd681539ad9b51
Author: Bernd Fix <address@hidden>
AuthorDate: Wed Apr 1 12:23:06 2020 +0200

    Fixed DHT cancellation sequence and minor improvements.
---
 src/gnunet/modules.go                |   1 -
 src/gnunet/service/client.go         |   2 +-
 src/gnunet/service/context.go        |  21 ++++--
 src/gnunet/service/dht/module.go     |   4 --
 src/gnunet/service/gns/module.go     |  12 +---
 src/gnunet/service/gns/service.go    | 136 +++++++++++++++++++----------------
 src/gnunet/transport/channel.go      |  12 ----
 src/gnunet/transport/channel_netw.go |   3 -
 8 files changed, 90 insertions(+), 101 deletions(-)

diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
index b71eb40..063b914 100644
--- a/src/gnunet/modules.go
+++ b/src/gnunet/modules.go
@@ -60,6 +60,5 @@ func init() {
                LookupLocal:  Modules.Namecache.Get,
                StoreLocal:   Modules.Namecache.Put,
                LookupRemote: Modules.DHT.Get,
-               CancelRemote: Modules.DHT.Cancel,
        }
 }
diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go
index fe3fa9e..b153bab 100644
--- a/src/gnunet/service/client.go
+++ b/src/gnunet/service/client.go
@@ -69,7 +69,7 @@ func ServiceRequestResponse(
        req message.Message) (message.Message, error) {
 
        // client-connect to the service
-       logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, 
callee)
+       logger.Printf(logger.DBG, "[%s] Connecting to %s service...\n", caller, 
callee)
        cl, err := NewClient(endp)
        if err != nil {
                return nil, err
diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go
index 4896bd5..ffffafb 100644
--- a/src/gnunet/service/context.go
+++ b/src/gnunet/service/context.go
@@ -30,17 +30,19 @@ import (
 // by a service; the session is handled by the 'ServeClient' method of a
 // service implementation.
 type SessionContext struct {
-       Id  int                   // session identifier
-       wg  *sync.WaitGroup       // wait group for the session
-       sig *concurrent.Signaller // signaller for the session
+       Id      int                   // session identifier
+       wg      *sync.WaitGroup       // wait group for the session
+       sig     *concurrent.Signaller // signaller for the session
+       pending int                   // number of pending go-routines
 }
 
 // NewSessionContext instantiates a new session context.
 func NewSessionContext() *SessionContext {
        return &SessionContext{
-               Id:  util.NextID(),
-               wg:  new(sync.WaitGroup),
-               sig: concurrent.NewSignaller(),
+               Id:      util.NextID(),
+               wg:      new(sync.WaitGroup),
+               sig:     concurrent.NewSignaller(),
+               pending: 0,
        }
 }
 
@@ -55,11 +57,18 @@ func (ctx *SessionContext) Cancel() {
 // Add a go-routine to the wait group.
 func (ctx *SessionContext) Add() {
        ctx.wg.Add(1)
+       ctx.pending++
 }
 
 // Remove a go-routine from the wait group.
 func (ctx *SessionContext) Remove() {
        ctx.wg.Done()
+       ctx.pending--
+}
+
+// Waiting returns the number of waiting go-routines.
+func (ctx *SessionContext) Waiting() int {
+       return ctx.pending
 }
 
 // Signaller returns the working instance for the context.
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index a54d0eb..61c2540 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -40,10 +40,6 @@ func (nc *DHTModule) Get(ctx *service.SessionContext, query 
*gns.Query) (*messag
        return nil, nil
 }
 
-func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query) 
error {
-       return nil
-}
-
 func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) 
error {
        return nil
 }
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index 43f7b7a..e885d2d 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -27,7 +27,6 @@ import (
        "gnunet/enums"
        "gnunet/message"
        "gnunet/service"
-       "gnunet/transport"
        "gnunet/util"
 
        "github.com/bfix/gospel/crypto/ed25519"
@@ -115,7 +114,6 @@ type GNSModule struct {
        LookupLocal  func(ctx *service.SessionContext, query *Query) 
(*message.GNSBlock, error)
        StoreLocal   func(ctx *service.SessionContext, block *message.GNSBlock) 
error
        LookupRemote func(ctx *service.SessionContext, query *Query) 
(*message.GNSBlock, error)
-       CancelRemote func(ctx *service.SessionContext, query *Query) error
 }
 
 // Resolve a GNS name with multiple labels. If pkey is not nil, the name
@@ -404,15 +402,7 @@ func (gns *GNSModule) Lookup(
                        // get the block from a remote lookup
                        if block, err = gns.LookupRemote(ctx, query); err != 
nil || block == nil {
                                if err != nil {
-                                       // check for aborted remote lookup: we 
need to cancel the query
-                                       if err == 
transport.ErrChannelInterrupted {
-                                               logger.Println(logger.WARN, 
"[gns] remote Lookup aborted -- cleaning up.")
-                                               if err = gns.CancelRemote(ctx, 
query); err != nil {
-                                                       
logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", 
err.Error())
-                                               }
-                                       } else {
-                                               logger.Printf(logger.ERROR, 
"[gns] remote Lookup failed: %s\n", err.Error())
-                                       }
+                                       logger.Printf(logger.ERROR, "[gns] 
remote Lookup failed: %s\n", err.Error())
                                        block = nil
                                } else {
                                        logger.Println(logger.DBG, "[gns] 
remote Lookup: no block found")
diff --git a/src/gnunet/service/gns/service.go 
b/src/gnunet/service/gns/service.go
index 2526ae8..464e622 100644
--- a/src/gnunet/service/gns/service.go
+++ b/src/gnunet/service/gns/service.go
@@ -59,7 +59,6 @@ func NewGNSService() service.Service {
        inst.LookupLocal = inst.LookupNamecache
        inst.StoreLocal = inst.StoreNamecache
        inst.LookupRemote = inst.LookupDHT
-       inst.CancelRemote = inst.CancelDHT
        return inst
 }
 
@@ -76,45 +75,46 @@ func (s *GNSService) Stop() error {
 // Serve a client channel.
 func (s *GNSService) ServeClient(ctx *service.SessionContext, mc 
*transport.MsgChannel) {
 
+       reqId := 0
 loop:
        for {
                // receive next message from client
-               logger.Printf(logger.DBG, "[gns] Waiting for message in session 
'%d'...\n", ctx.Id)
+               reqId++
+               logger.Printf(logger.DBG, "[gns:%d:%d] Waiting for client 
request...\n", ctx.Id, reqId)
                msg, err := mc.Receive(ctx.Signaller())
                if err != nil {
                        if err == io.EOF {
-                               logger.Println(logger.INFO, "[gns] Client 
channel closed.")
+                               logger.Printf(logger.INFO, "[gns:%d:%d] Client 
channel closed.\n", ctx.Id, reqId)
                        } else if err == transport.ErrChannelInterrupted {
-                               logger.Println(logger.INFO, "[gns] Service 
operation interrupted.")
+                               logger.Printf(logger.INFO, "[gns:%d:%d] Service 
operation interrupted.\n", ctx.Id, reqId)
                        } else {
-                               logger.Printf(logger.ERROR, "[gns] 
Message-receive failed: %s\n", err.Error())
+                               logger.Printf(logger.ERROR, "[gns:%d:%d] 
Message-receive failed: %s\n", ctx.Id, reqId, err.Error())
                        }
                        break loop
                }
-               logger.Printf(logger.INFO, "[gns] Received msg: %v\n", msg)
+               logger.Printf(logger.INFO, "[gns:%d:%d] Received request: 
%v\n", ctx.Id, reqId, msg)
 
                // perform lookup
-               var resp message.Message
                switch m := msg.(type) {
                case *message.GNSLookupMsg:
                        
//----------------------------------------------------------
                        // GNS_LOOKUP
                        
//----------------------------------------------------------
-                       logger.Println(logger.INFO, "[gns] Lookup request 
received.")
-                       respX := message.NewGNSLookupResultMsg(m.Id)
-                       resp = respX
 
                        // perform lookup on block (locally and remote)
-                       go func() {
+                       go func(id int, m *message.GNSLookupMsg) {
+                               logger.Printf(logger.INFO, "[gns:%d:%d] Lookup 
request received.\n", ctx.Id, id)
+                               resp := message.NewGNSLookupResultMsg(m.Id)
                                ctx.Add()
                                defer func() {
                                        // send response
                                        if resp != nil {
                                                if err := mc.Send(resp, 
ctx.Signaller()); err != nil {
-                                                       
logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error())
+                                                       
logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to send response: %s\n", 
ctx.Id, id, err.Error())
                                                }
                                        }
                                        // go-routine finished
+                                       logger.Printf(logger.DBG, "[gns:%d:%d] 
Lookup request finished.\n", ctx.Id, id)
                                        ctx.Remove()
                                }()
 
@@ -123,7 +123,7 @@ loop:
                                kind := NewRRTypeList(int(m.Type))
                                recset, err := s.Resolve(ctx, label, pkey, 
kind, int(m.Options), 0)
                                if err != nil {
-                                       logger.Printf(logger.ERROR, "[gns] 
Failed to lookup block: %s\n", err.Error())
+                                       logger.Printf(logger.ERROR, 
"[gns:%d:%d] Failed to lookup block: %s\n", ctx.Id, id, err.Error())
                                        if err == 
transport.ErrChannelInterrupted {
                                                resp = nil
                                        }
@@ -131,40 +131,40 @@ loop:
                                }
                                // handle records
                                if recset != nil {
-                                       logger.Printf(logger.DBG, "[gns] 
Received record set with %d entries\n", recset.Count)
+                                       logger.Printf(logger.DBG, "[gns:%d:%d] 
Received record set with %d entries\n", ctx.Id, id, recset.Count)
 
                                        // get records from block
                                        if recset.Count == 0 {
-                                               logger.Println(logger.WARN, 
"[gns] No records in block")
+                                               logger.Printf(logger.WARN, 
"[gns:%d:%d] No records in block\n", ctx.Id, id)
                                                return
                                        }
                                        // process records
                                        for i, rec := range recset.Records {
-                                               logger.Printf(logger.DBG, 
"[gns] Record #%d: %v\n", i, rec)
+                                               logger.Printf(logger.DBG, 
"[gns:%d:%d] Record #%d: %v\n", ctx.Id, id, i, rec)
 
                                                // is this the record type we 
are looking for?
                                                if rec.Type == m.Type || 
int(m.Type) == enums.GNS_TYPE_ANY {
                                                        // add it to the 
response message
-                                                       respX.AddRecord(rec)
+                                                       resp.AddRecord(rec)
                                                }
                                        }
                                }
-                       }()
+                       }(reqId, m)
 
                default:
                        
//----------------------------------------------------------
                        // UNKNOWN message type received
                        
//----------------------------------------------------------
-                       logger.Printf(logger.ERROR, "[gns] Unhandled message of 
type (%d)\n", msg.Header().MsgType)
+                       logger.Printf(logger.ERROR, "[gns:%d:%d] Unhandled 
message of type (%d)\n", ctx.Id, reqId, msg.Header().MsgType)
                        break loop
                }
        }
-       // cancel all tasks running for this session/connection
-       logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n", 
ctx.Id)
-       ctx.Cancel()
-
        // close client connection
        mc.Close()
+
+       // cancel all tasks running for this session/connection
+       logger.Printf(logger.INFO, "[gns:%d] Start closing session... [%d]\n", 
ctx.Id, ctx.Waiting())
+       ctx.Cancel()
 }
 
 // LookupNamecache
@@ -269,27 +269,64 @@ func (s *GNSService) StoreNamecache(ctx 
*service.SessionContext, block *message.
 // LookupDHT
 func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) 
(block *message.GNSBlock, err error) {
        logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", 
hex.EncodeToString(query.Key.Bits))
-
-       // assemble DHT request
-       req := message.NewDHTClientGetMsg(query.Key)
-       req.Id = uint64(util.NextID())
-       req.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL)
-       req.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD)
-       req.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE)
        block = nil
 
-       // get response from DHT service
-       var resp message.Message
-       if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", 
config.Cfg.DHT.Endpoint, req); err != nil {
-               return
+       // client-connect to the DHT service
+       logger.Println(logger.DBG, "[gns] Connecting to DHT service...")
+       cl, err := service.NewClient(config.Cfg.DHT.Endpoint)
+       if err != nil {
+               return nil, err
+       }
+       defer func() {
+               logger.Println(logger.DBG, "[gns] Closing connection to DHT 
service")
+               cl.Close()
+       }()
+
+       var (
+               // response received from service
+               resp message.Message
+
+               // request-response interaction with service
+               interact = func(req message.Message, withResponse bool) (err 
error) {
+                       // send request
+                       logger.Println(logger.DBG, "[gns] Sending request to 
DHT service")
+                       if err = cl.SendRequest(ctx, req); err == nil && 
withResponse {
+                               // wait for a single response
+                               logger.Println(logger.DBG, "[gns] Waiting for 
response from DHT service")
+                               resp, err = cl.ReceiveResponse(ctx)
+                       }
+                       return
+               }
+       )
+
+       // send DHT GET request and wait for response
+       reqGet := message.NewDHTClientGetMsg(query.Key)
+       reqGet.Id = uint64(util.NextID())
+       reqGet.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL)
+       reqGet.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD)
+       reqGet.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE)
+
+       if err = interact(reqGet, true); err != nil {
+               // check for aborted remote lookup: we need to cancel the query
+               if err == transport.ErrChannelInterrupted {
+                       logger.Println(logger.WARN, "[gns] remote Lookup 
aborted -- cleaning up.")
+
+                       // send DHT GET_STOP request and terminate
+                       reqStop := message.NewDHTClientGetStopMsg(query.Key)
+                       reqStop.Id = reqGet.Id
+                       if err = interact(reqStop, false); err != nil {
+                               logger.Printf(logger.ERROR, "[gns] remote 
Lookup abort failed: %s\n", err.Error())
+                       }
+                       return nil, transport.ErrChannelInterrupted
+               }
        }
 
-       // handle message depending on its type
+       // handle response message depending on its type
        logger.Println(logger.DBG, "[gns] Handling response from DHT service")
        switch m := resp.(type) {
        case *message.DHTClientResultMsg:
                // check for matching IDs
-               if m.Id != req.Id {
+               if m.Id != reqGet.Id {
                        logger.Println(logger.ERROR, "[gns] Got response for 
unknown ID")
                        break
                }
@@ -331,30 +368,3 @@ func (s *GNSService) LookupDHT(ctx 
*service.SessionContext, query *Query) (block
        }
        return
 }
-
-// CancelDHT
-func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err 
error) {
-       logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n", 
hex.EncodeToString(query.Key.Bits))
-
-       // assemble DHT request
-       req := message.NewDHTClientGetStopMsg(query.Key)
-       req.Id = uint64(util.NextID())
-
-       // get response from DHT service
-       var resp message.Message
-       if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", 
config.Cfg.DHT.Endpoint, req); err != nil {
-               return
-       }
-
-       // handle message depending on its type
-       logger.Println(logger.DBG, "[gns] Handling response from DHT service")
-       switch m := resp.(type) {
-       case *message.DHTClientResultMsg:
-               // check for matching IDs
-               if m.Id != req.Id {
-                       logger.Println(logger.ERROR, "[gns] Got response for 
unknown ID")
-                       break
-               }
-       }
-       return
-}
diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go
index 4ba49ac..458a063 100644
--- a/src/gnunet/transport/channel.go
+++ b/src/gnunet/transport/channel.go
@@ -38,7 +38,6 @@ var (
        ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented")
        ErrChannelNotOpened      = fmt.Errorf("Channel not opened")
        ErrChannelInterrupted    = fmt.Errorf("Channel interrupted")
-       ErrChannelClosed         = fmt.Errorf("Channel closed")
 )
 
 ////////////////////////////////////////////////////////////////////////
@@ -145,12 +144,6 @@ func (c *MsgChannel) Close() error {
 
 // Send a GNUnet message over a channel.
 func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) 
error {
-
-       // check for closed channel
-       if !c.ch.IsOpen() {
-               return ErrChannelClosed
-       }
-
        // convert message to binary data
        data, err := data.Marshal(msg)
        if err != nil {
@@ -181,11 +174,6 @@ func (c *MsgChannel) Send(msg message.Message, sig 
*concurrent.Signaller) error
 
 // Receive GNUnet messages over a plain Channel.
 func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, 
error) {
-       // check for closed channel
-       if !c.ch.IsOpen() {
-               return nil, ErrChannelClosed
-       }
-
        // get bytes from channel
        get := func(pos, count int) error {
                n, err := c.ch.Read(c.buf[pos:pos+count], sig)
diff --git a/src/gnunet/transport/channel_netw.go 
b/src/gnunet/transport/channel_netw.go
index 4f0ba4a..b70faa4 100644
--- a/src/gnunet/transport/channel_netw.go
+++ b/src/gnunet/transport/channel_netw.go
@@ -121,8 +121,6 @@ func (c *NetworkChannel) Read(buf []byte, sig 
*concurrent.Signaller) (int, error
                        switch val := x.(type) {
                        case bool:
                                if val {
-                                       c.conn.Close()
-                                       c.conn = nil
                                        return 0, ErrChannelInterrupted
                                }
                        }
@@ -156,7 +154,6 @@ func (c *NetworkChannel) Write(buf []byte, sig 
*concurrent.Signaller) (int, erro
                        switch val := x.(type) {
                        case bool:
                                if val {
-                                       c.conn.Close()
                                        return 0, ErrChannelInterrupted
                                }
                        }

-- 
To stop receiving notification emails like this one, please contact
address@hidden.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]