[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-go] branch master updated: Fixed DHT cancellation sequence and m
From: |
gnunet |
Subject: |
[gnunet-go] branch master updated: Fixed DHT cancellation sequence and minor improvements. |
Date: |
Wed, 01 Apr 2020 12:28:31 +0200 |
This is an automated email from the git hooks/post-receive script.
bernd-fix pushed a commit to branch master
in repository gnunet-go.
The following commit(s) were added to refs/heads/master by this push:
new db7da66 Fixed DHT cancellation sequence and minor improvements.
db7da66 is described below
commit db7da66be57c6a9df87f3ea1f3cd681539ad9b51
Author: Bernd Fix <address@hidden>
AuthorDate: Wed Apr 1 12:23:06 2020 +0200
Fixed DHT cancellation sequence and minor improvements.
---
src/gnunet/modules.go | 1 -
src/gnunet/service/client.go | 2 +-
src/gnunet/service/context.go | 21 ++++--
src/gnunet/service/dht/module.go | 4 --
src/gnunet/service/gns/module.go | 12 +---
src/gnunet/service/gns/service.go | 136 +++++++++++++++++++----------------
src/gnunet/transport/channel.go | 12 ----
src/gnunet/transport/channel_netw.go | 3 -
8 files changed, 90 insertions(+), 101 deletions(-)
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
index b71eb40..063b914 100644
--- a/src/gnunet/modules.go
+++ b/src/gnunet/modules.go
@@ -60,6 +60,5 @@ func init() {
LookupLocal: Modules.Namecache.Get,
StoreLocal: Modules.Namecache.Put,
LookupRemote: Modules.DHT.Get,
- CancelRemote: Modules.DHT.Cancel,
}
}
diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go
index fe3fa9e..b153bab 100644
--- a/src/gnunet/service/client.go
+++ b/src/gnunet/service/client.go
@@ -69,7 +69,7 @@ func ServiceRequestResponse(
req message.Message) (message.Message, error) {
// client-connect to the service
- logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller,
callee)
+ logger.Printf(logger.DBG, "[%s] Connecting to %s service...\n", caller,
callee)
cl, err := NewClient(endp)
if err != nil {
return nil, err
diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go
index 4896bd5..ffffafb 100644
--- a/src/gnunet/service/context.go
+++ b/src/gnunet/service/context.go
@@ -30,17 +30,19 @@ import (
// by a service; the session is handled by the 'ServeClient' method of a
// service implementation.
type SessionContext struct {
- Id int // session identifier
- wg *sync.WaitGroup // wait group for the session
- sig *concurrent.Signaller // signaller for the session
+ Id int // session identifier
+ wg *sync.WaitGroup // wait group for the session
+ sig *concurrent.Signaller // signaller for the session
+ pending int // number of pending go-routines
}
// NewSessionContext instantiates a new session context.
func NewSessionContext() *SessionContext {
return &SessionContext{
- Id: util.NextID(),
- wg: new(sync.WaitGroup),
- sig: concurrent.NewSignaller(),
+ Id: util.NextID(),
+ wg: new(sync.WaitGroup),
+ sig: concurrent.NewSignaller(),
+ pending: 0,
}
}
@@ -55,11 +57,18 @@ func (ctx *SessionContext) Cancel() {
// Add a go-routine to the wait group.
func (ctx *SessionContext) Add() {
ctx.wg.Add(1)
+ ctx.pending++
}
// Remove a go-routine from the wait group.
func (ctx *SessionContext) Remove() {
ctx.wg.Done()
+ ctx.pending--
+}
+
+// Waiting returns the number of waiting go-routines.
+func (ctx *SessionContext) Waiting() int {
+ return ctx.pending
}
// Signaller returns the working instance for the context.
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index a54d0eb..61c2540 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -40,10 +40,6 @@ func (nc *DHTModule) Get(ctx *service.SessionContext, query
*gns.Query) (*messag
return nil, nil
}
-func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query)
error {
- return nil
-}
-
func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock)
error {
return nil
}
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index 43f7b7a..e885d2d 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -27,7 +27,6 @@ import (
"gnunet/enums"
"gnunet/message"
"gnunet/service"
- "gnunet/transport"
"gnunet/util"
"github.com/bfix/gospel/crypto/ed25519"
@@ -115,7 +114,6 @@ type GNSModule struct {
LookupLocal func(ctx *service.SessionContext, query *Query)
(*message.GNSBlock, error)
StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock)
error
LookupRemote func(ctx *service.SessionContext, query *Query)
(*message.GNSBlock, error)
- CancelRemote func(ctx *service.SessionContext, query *Query) error
}
// Resolve a GNS name with multiple labels. If pkey is not nil, the name
@@ -404,15 +402,7 @@ func (gns *GNSModule) Lookup(
// get the block from a remote lookup
if block, err = gns.LookupRemote(ctx, query); err !=
nil || block == nil {
if err != nil {
- // check for aborted remote lookup: we
need to cancel the query
- if err ==
transport.ErrChannelInterrupted {
- logger.Println(logger.WARN,
"[gns] remote Lookup aborted -- cleaning up.")
- if err = gns.CancelRemote(ctx,
query); err != nil {
-
logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n",
err.Error())
- }
- } else {
- logger.Printf(logger.ERROR,
"[gns] remote Lookup failed: %s\n", err.Error())
- }
+ logger.Printf(logger.ERROR, "[gns]
remote Lookup failed: %s\n", err.Error())
block = nil
} else {
logger.Println(logger.DBG, "[gns]
remote Lookup: no block found")
diff --git a/src/gnunet/service/gns/service.go
b/src/gnunet/service/gns/service.go
index 2526ae8..464e622 100644
--- a/src/gnunet/service/gns/service.go
+++ b/src/gnunet/service/gns/service.go
@@ -59,7 +59,6 @@ func NewGNSService() service.Service {
inst.LookupLocal = inst.LookupNamecache
inst.StoreLocal = inst.StoreNamecache
inst.LookupRemote = inst.LookupDHT
- inst.CancelRemote = inst.CancelDHT
return inst
}
@@ -76,45 +75,46 @@ func (s *GNSService) Stop() error {
// Serve a client channel.
func (s *GNSService) ServeClient(ctx *service.SessionContext, mc
*transport.MsgChannel) {
+ reqId := 0
loop:
for {
// receive next message from client
- logger.Printf(logger.DBG, "[gns] Waiting for message in session
'%d'...\n", ctx.Id)
+ reqId++
+ logger.Printf(logger.DBG, "[gns:%d:%d] Waiting for client
request...\n", ctx.Id, reqId)
msg, err := mc.Receive(ctx.Signaller())
if err != nil {
if err == io.EOF {
- logger.Println(logger.INFO, "[gns] Client
channel closed.")
+ logger.Printf(logger.INFO, "[gns:%d:%d] Client
channel closed.\n", ctx.Id, reqId)
} else if err == transport.ErrChannelInterrupted {
- logger.Println(logger.INFO, "[gns] Service
operation interrupted.")
+ logger.Printf(logger.INFO, "[gns:%d:%d] Service
operation interrupted.\n", ctx.Id, reqId)
} else {
- logger.Printf(logger.ERROR, "[gns]
Message-receive failed: %s\n", err.Error())
+ logger.Printf(logger.ERROR, "[gns:%d:%d]
Message-receive failed: %s\n", ctx.Id, reqId, err.Error())
}
break loop
}
- logger.Printf(logger.INFO, "[gns] Received msg: %v\n", msg)
+ logger.Printf(logger.INFO, "[gns:%d:%d] Received request:
%v\n", ctx.Id, reqId, msg)
// perform lookup
- var resp message.Message
switch m := msg.(type) {
case *message.GNSLookupMsg:
//----------------------------------------------------------
// GNS_LOOKUP
//----------------------------------------------------------
- logger.Println(logger.INFO, "[gns] Lookup request
received.")
- respX := message.NewGNSLookupResultMsg(m.Id)
- resp = respX
// perform lookup on block (locally and remote)
- go func() {
+ go func(id int, m *message.GNSLookupMsg) {
+ logger.Printf(logger.INFO, "[gns:%d:%d] Lookup
request received.\n", ctx.Id, id)
+ resp := message.NewGNSLookupResultMsg(m.Id)
ctx.Add()
defer func() {
// send response
if resp != nil {
if err := mc.Send(resp,
ctx.Signaller()); err != nil {
-
logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error())
+
logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to send response: %s\n",
ctx.Id, id, err.Error())
}
}
// go-routine finished
+ logger.Printf(logger.DBG, "[gns:%d:%d]
Lookup request finished.\n", ctx.Id, id)
ctx.Remove()
}()
@@ -123,7 +123,7 @@ loop:
kind := NewRRTypeList(int(m.Type))
recset, err := s.Resolve(ctx, label, pkey,
kind, int(m.Options), 0)
if err != nil {
- logger.Printf(logger.ERROR, "[gns]
Failed to lookup block: %s\n", err.Error())
+ logger.Printf(logger.ERROR,
"[gns:%d:%d] Failed to lookup block: %s\n", ctx.Id, id, err.Error())
if err ==
transport.ErrChannelInterrupted {
resp = nil
}
@@ -131,40 +131,40 @@ loop:
}
// handle records
if recset != nil {
- logger.Printf(logger.DBG, "[gns]
Received record set with %d entries\n", recset.Count)
+ logger.Printf(logger.DBG, "[gns:%d:%d]
Received record set with %d entries\n", ctx.Id, id, recset.Count)
// get records from block
if recset.Count == 0 {
- logger.Println(logger.WARN,
"[gns] No records in block")
+ logger.Printf(logger.WARN,
"[gns:%d:%d] No records in block\n", ctx.Id, id)
return
}
// process records
for i, rec := range recset.Records {
- logger.Printf(logger.DBG,
"[gns] Record #%d: %v\n", i, rec)
+ logger.Printf(logger.DBG,
"[gns:%d:%d] Record #%d: %v\n", ctx.Id, id, i, rec)
// is this the record type we
are looking for?
if rec.Type == m.Type ||
int(m.Type) == enums.GNS_TYPE_ANY {
// add it to the
response message
- respX.AddRecord(rec)
+ resp.AddRecord(rec)
}
}
}
- }()
+ }(reqId, m)
default:
//----------------------------------------------------------
// UNKNOWN message type received
//----------------------------------------------------------
- logger.Printf(logger.ERROR, "[gns] Unhandled message of
type (%d)\n", msg.Header().MsgType)
+ logger.Printf(logger.ERROR, "[gns:%d:%d] Unhandled
message of type (%d)\n", ctx.Id, reqId, msg.Header().MsgType)
break loop
}
}
- // cancel all tasks running for this session/connection
- logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n",
ctx.Id)
- ctx.Cancel()
-
// close client connection
mc.Close()
+
+ // cancel all tasks running for this session/connection
+ logger.Printf(logger.INFO, "[gns:%d] Start closing session... [%d]\n",
ctx.Id, ctx.Waiting())
+ ctx.Cancel()
}
// LookupNamecache
@@ -269,27 +269,64 @@ func (s *GNSService) StoreNamecache(ctx
*service.SessionContext, block *message.
// LookupDHT
func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query)
(block *message.GNSBlock, err error) {
logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n",
hex.EncodeToString(query.Key.Bits))
-
- // assemble DHT request
- req := message.NewDHTClientGetMsg(query.Key)
- req.Id = uint64(util.NextID())
- req.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL)
- req.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD)
- req.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE)
block = nil
- // get response from DHT service
- var resp message.Message
- if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT",
config.Cfg.DHT.Endpoint, req); err != nil {
- return
+ // client-connect to the DHT service
+ logger.Println(logger.DBG, "[gns] Connecting to DHT service...")
+ cl, err := service.NewClient(config.Cfg.DHT.Endpoint)
+ if err != nil {
+ return nil, err
+ }
+ defer func() {
+ logger.Println(logger.DBG, "[gns] Closing connection to DHT
service")
+ cl.Close()
+ }()
+
+ var (
+ // response received from service
+ resp message.Message
+
+ // request-response interaction with service
+ interact = func(req message.Message, withResponse bool) (err
error) {
+ // send request
+ logger.Println(logger.DBG, "[gns] Sending request to
DHT service")
+ if err = cl.SendRequest(ctx, req); err == nil &&
withResponse {
+ // wait for a single response
+ logger.Println(logger.DBG, "[gns] Waiting for
response from DHT service")
+ resp, err = cl.ReceiveResponse(ctx)
+ }
+ return
+ }
+ )
+
+ // send DHT GET request and wait for response
+ reqGet := message.NewDHTClientGetMsg(query.Key)
+ reqGet.Id = uint64(util.NextID())
+ reqGet.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL)
+ reqGet.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD)
+ reqGet.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE)
+
+ if err = interact(reqGet, true); err != nil {
+ // check for aborted remote lookup: we need to cancel the query
+ if err == transport.ErrChannelInterrupted {
+ logger.Println(logger.WARN, "[gns] remote Lookup
aborted -- cleaning up.")
+
+ // send DHT GET_STOP request and terminate
+ reqStop := message.NewDHTClientGetStopMsg(query.Key)
+ reqStop.Id = reqGet.Id
+ if err = interact(reqStop, false); err != nil {
+ logger.Printf(logger.ERROR, "[gns] remote
Lookup abort failed: %s\n", err.Error())
+ }
+ return nil, transport.ErrChannelInterrupted
+ }
}
- // handle message depending on its type
+ // handle response message depending on its type
logger.Println(logger.DBG, "[gns] Handling response from DHT service")
switch m := resp.(type) {
case *message.DHTClientResultMsg:
// check for matching IDs
- if m.Id != req.Id {
+ if m.Id != reqGet.Id {
logger.Println(logger.ERROR, "[gns] Got response for
unknown ID")
break
}
@@ -331,30 +368,3 @@ func (s *GNSService) LookupDHT(ctx
*service.SessionContext, query *Query) (block
}
return
}
-
-// CancelDHT
-func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err
error) {
- logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n",
hex.EncodeToString(query.Key.Bits))
-
- // assemble DHT request
- req := message.NewDHTClientGetStopMsg(query.Key)
- req.Id = uint64(util.NextID())
-
- // get response from DHT service
- var resp message.Message
- if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT",
config.Cfg.DHT.Endpoint, req); err != nil {
- return
- }
-
- // handle message depending on its type
- logger.Println(logger.DBG, "[gns] Handling response from DHT service")
- switch m := resp.(type) {
- case *message.DHTClientResultMsg:
- // check for matching IDs
- if m.Id != req.Id {
- logger.Println(logger.ERROR, "[gns] Got response for
unknown ID")
- break
- }
- }
- return
-}
diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go
index 4ba49ac..458a063 100644
--- a/src/gnunet/transport/channel.go
+++ b/src/gnunet/transport/channel.go
@@ -38,7 +38,6 @@ var (
ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented")
ErrChannelNotOpened = fmt.Errorf("Channel not opened")
ErrChannelInterrupted = fmt.Errorf("Channel interrupted")
- ErrChannelClosed = fmt.Errorf("Channel closed")
)
////////////////////////////////////////////////////////////////////////
@@ -145,12 +144,6 @@ func (c *MsgChannel) Close() error {
// Send a GNUnet message over a channel.
func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller)
error {
-
- // check for closed channel
- if !c.ch.IsOpen() {
- return ErrChannelClosed
- }
-
// convert message to binary data
data, err := data.Marshal(msg)
if err != nil {
@@ -181,11 +174,6 @@ func (c *MsgChannel) Send(msg message.Message, sig
*concurrent.Signaller) error
// Receive GNUnet messages over a plain Channel.
func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message,
error) {
- // check for closed channel
- if !c.ch.IsOpen() {
- return nil, ErrChannelClosed
- }
-
// get bytes from channel
get := func(pos, count int) error {
n, err := c.ch.Read(c.buf[pos:pos+count], sig)
diff --git a/src/gnunet/transport/channel_netw.go
b/src/gnunet/transport/channel_netw.go
index 4f0ba4a..b70faa4 100644
--- a/src/gnunet/transport/channel_netw.go
+++ b/src/gnunet/transport/channel_netw.go
@@ -121,8 +121,6 @@ func (c *NetworkChannel) Read(buf []byte, sig
*concurrent.Signaller) (int, error
switch val := x.(type) {
case bool:
if val {
- c.conn.Close()
- c.conn = nil
return 0, ErrChannelInterrupted
}
}
@@ -156,7 +154,6 @@ func (c *NetworkChannel) Write(buf []byte, sig
*concurrent.Signaller) (int, erro
switch val := x.(type) {
case bool:
if val {
- c.conn.Close()
return 0, ErrChannelInterrupted
}
}
--
To stop receiving notification emails like this one, please contact
address@hidden.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-go] branch master updated: Fixed DHT cancellation sequence and minor improvements.,
gnunet <=