gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-go] branch master updated: RC2 for Milestone 1 (NGI Assure)


From: gnunet
Subject: [gnunet-go] branch master updated: RC2 for Milestone 1 (NGI Assure)
Date: Sat, 11 Jun 2022 15:46:57 +0200

This is an automated email from the git hooks/post-receive script.

bernd-fix pushed a commit to branch master
in repository gnunet-go.

The following commit(s) were added to refs/heads/master by this push:
     new 8f8feaf  RC2 for Milestone 1 (NGI Assure)
8f8feaf is described below

commit 8f8feaf176e62f14a6d449c0a2fb6f0ca76b22b8
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Sat Jun 11 15:45:45 2022 +0200

    RC2 for Milestone 1 (NGI Assure)
---
 src/gnunet/cmd/gnunet-service-dht-go/main.go |  31 +++++-
 src/gnunet/core/core.go                      | 156 ++++++++++++++++++---------
 src/gnunet/core/core_test.go                 |   2 +-
 src/gnunet/go.mod                            |   5 +-
 src/gnunet/go.sum                            |   7 +-
 src/gnunet/message/msg_hello.go              |   9 +-
 src/gnunet/message/msg_hello_dht.go          |  75 ++++++++-----
 src/gnunet/service/dht/module.go             |  15 +++
 src/gnunet/service/dht/service.go            |   2 +-
 src/gnunet/service/rpc.go                    |  13 +--
 src/gnunet/test/gnunet-dhtu/main.go          |  80 ++++++--------
 src/gnunet/transport/endpoint.go             |  64 ++++++++++-
 src/gnunet/transport/reader_writer.go        |  23 ++++
 src/gnunet/util/address_test.go              |  54 ++++++++++
 14 files changed, 393 insertions(+), 143 deletions(-)

diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go 
b/src/gnunet/cmd/gnunet-service-dht-go/main.go
index f9e24bc..ef6da91 100644
--- a/src/gnunet/cmd/gnunet-service-dht-go/main.go
+++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go
@@ -32,6 +32,8 @@ import (
        "gnunet/core"
        "gnunet/service"
        "gnunet/service/dht"
+       "gnunet/service/dht/blocks"
+       "gnunet/util"
 
        "github.com/bfix/gospel/logger"
 )
@@ -72,7 +74,7 @@ func main() {
                socket = config.Cfg.GNS.Service.Socket
        }
        params := make(map[string]string)
-       if len(param) == 0 {
+       if len(param) > 0 {
                for _, p := range strings.Split(param, ",") {
                        kv := strings.SplitN(p, "=", 2)
                        params[kv[0]] = kv[1]
@@ -121,6 +123,33 @@ func main() {
                dhtSrv.InitRPC(rpc)
        }
 
+       // handle bootstrap: collect known addresses
+       bsList := make([]*util.Address, 0)
+       for _, bs := range config.Cfg.Bootstrap.Nodes {
+               // check for HELLO URL
+               if strings.HasPrefix(bs, "gnunet://hello/") {
+                       var hb *blocks.HelloBlock
+                       if hb, err = blocks.ParseHelloURL(bs, true); err != nil 
{
+                               logger.Printf(logger.ERROR, "[dht] failed 
bootstrap HELLO URL %s: %s", bs, err.Error())
+                               continue
+                       }
+                       // append HELLO addresses
+                       bsList = append(bsList, hb.Addresses()...)
+               } else {
+                       // parse address directly
+                       var addr *util.Address
+                       if addr, err = util.ParseAddress(bs); err != nil {
+                               logger.Printf(logger.ERROR, "[dht] failed 
bootstrap address %s: %s", bs, err.Error())
+                               continue
+                       }
+                       bsList = append(bsList, addr)
+               }
+       }
+       // send HELLO to all bootstrap addresses
+       for _, addr := range bsList {
+               c.SendHello(ctx, addr)
+       }
+
        // handle OS signals
        sigCh := make(chan os.Signal, 5)
        signal.Notify(sigCh)
diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go
index ee354e3..83da8b1 100644
--- a/src/gnunet/core/core.go
+++ b/src/gnunet/core/core.go
@@ -38,6 +38,7 @@ import (
 var (
        ErrCoreNoUpnpDyn  = errors.New("no dynamic port with UPnP")
        ErrCoreNoEndpAddr = errors.New("no endpoint for address")
+       ErrCoreNotSent    = errors.New("message not sent")
 )
 
 //----------------------------------------------------------------------
@@ -69,6 +70,9 @@ type Core struct {
 
        // List of registered endpoints
        endpoints map[string]*EndpointRef
+
+       // last HELLO message used; re-create if expired
+       lastHello *message.HelloDHTMsg
 }
 
 //----------------------------------------------------------------------
@@ -159,12 +163,13 @@ func (c *Core) pump(ctx context.Context) {
                select {
                // get (next) message from transport
                case tm := <-c.incoming:
-                       var ev *Event
+                       logger.Printf(logger.DBG, "[core] Message received from 
%s: %s", tm.Peer, transport.Dump(tm.Msg, "json"))
 
                        // inspect message for peer state events
+                       var ev *Event
                        switch msg := tm.Msg.(type) {
                        case *message.HelloDHTMsg:
-                               logger.Println(logger.INFO, "[core] Received 
HELLO message: "+msg.String())
+
                                // verify integrity of message
                                if ok, err := msg.Verify(tm.Peer); !ok || err 
!= nil {
                                        logger.Println(logger.WARN, "[core] 
Received invalid DHT_P2P_HELLO message")
@@ -176,9 +181,11 @@ func (c *Core) pump(ctx context.Context) {
                                        logger.Println(logger.WARN, "[core] 
Failed to parse addresses from DHT_P2P_HELLO message")
                                        break
                                }
-                               for _, addr := range aList {
-                                       c.Learn(ctx, tm.Peer, addr.Wrap())
+                               if err := c.Learn(ctx, tm.Peer, aList); err != 
nil {
+                                       logger.Println(logger.WARN, "[core] 
Failed to learn addresses from DHT_P2P_HELLO message: "+err.Error())
+                                       break
                                }
+
                                // generate EV_CONNECT event
                                ev = &Event{
                                        ID:   EV_CONNECT,
@@ -222,21 +229,34 @@ func (c *Core) Shutdown() {
 
 // Send is a function that allows the local peer to send a protocol
 // message to a remote peer.
-func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg 
message.Message) error {
-       // get peer label (id or "@")
-       label := "@"
-       if peer != nil {
-               label = peer.String()
-       }
+func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg 
message.Message) (err error) {
        // TODO: select best endpoint protocol for transport; now fixed to 
IP+UDP
        netw := "ip+udp"
-       addrs := c.peers.Get(label, netw)
-       if len(addrs) == 0 {
-               return ErrCoreNoEndpAddr
+
+       // try all addresses for peer
+       aList := c.peers.Get(peer.String(), netw)
+       maybe := false // message may be sent...
+       for _, addr := range aList {
+               logger.Printf(logger.WARN, "[core] Trying to send to %s", 
addr.URI())
+               // send message to address
+               if err = c.send(ctx, addr, msg); err != nil {
+                       // if it is possible that the message was not sent, try 
next address
+                       if err != transport.ErrEndpMaybeSent {
+                               logger.Printf(logger.WARN, "[core] Failed to 
send to %s: %s", addr.URI(), err.Error())
+                       } else {
+                               maybe = true
+                       }
+                       continue
+               }
+               // one successful send is enough
+               return
        }
-       // TODO: select best address; curently selects first
-       addr := addrs[0]
-       return c.send(ctx, addr, msg)
+       if maybe {
+               err = transport.ErrEndpMaybeSent
+       } else {
+               err = ErrCoreNotSent
+       }
+       return
 }
 
 // send message directly to address
@@ -247,40 +267,82 @@ func (c *Core) send(ctx context.Context, addr 
*util.Address, msg message.Message
        return c.trans.Send(ctx, addr, tm)
 }
 
-// Learn a (new) address for peer
-func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr 
*util.Address) (err error) {
-       // assemble our own HELLO message:
-       addrList := make([]*util.Address, 0)
-       for _, epRef := range c.endpoints {
-               addrList = append(addrList, epRef.addr)
-       }
-       node := c.local
-       var hello *blocks.HelloBlock
-       hello, err = node.HelloData(time.Hour, addrList)
-       if err != nil {
-               return
+// Learn (new) addresses for peer
+func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addrs 
[]*util.Address) (err error) {
+       // learn all addresses for peer
+       newPeer := false
+       for _, addr := range addrs {
+               logger.Printf(logger.INFO, "[core] Learning %s for %s (expires 
%s)", addr.URI(), peer, addr.Expires)
+               newPeer = (c.peers.Add(peer.String(), addr) == 1) || newPeer
        }
-       msg := message.NewHelloDHTMsg()
-       var aList []*message.HelloAddress
-       msg.NumAddr = uint16(len(hello.Addresses()))
-       for _, a := range hello.Addresses() {
-               ha := message.NewHelloAddress(a)
-               aList = append(aList, ha)
-       }
-       msg.SetAddresses(aList)
-
-       // if no peer is given, we send HELLO directly to address
-       if peer == nil {
-               return c.send(ctx, addr, msg)
-       }
-       // add peer address to address list
-       if c.peers.Add(peer.String(), addr) == 1 {
+       // new peer detected?
+       if newPeer {
                // we added a previously unknown peer: send a HELLO
+               var msg *message.HelloDHTMsg
+               if msg, err = c.getHello(); err != nil {
+                       return
+               }
+               logger.Printf(logger.INFO, "[core] Sending HELLO to %s: %s", 
peer, msg)
                err = c.Send(ctx, peer, msg)
+               // no error if the message might have been sent
+               if err == transport.ErrEndpMaybeSent {
+                       err = nil
+               }
        }
        return
 }
 
+// Send the currently active HELLO to given network address
+func (c *Core) SendHello(ctx context.Context, addr *util.Address) (err error) {
+       // get (buffered) HELLO
+       var msg *message.HelloDHTMsg
+       if msg, err = c.getHello(); err != nil {
+               return
+       }
+       logger.Printf(logger.INFO, "[core] Sending HELLO to %s: %s", 
addr.URI(), msg)
+       return c.send(ctx, addr, msg)
+}
+
+// get the recent HELLO if it is defined and not expired;
+// create a new HELLO otherwise.
+func (c *Core) getHello() (msg *message.HelloDHTMsg, err error) {
+       if c.lastHello == nil || c.lastHello.Expires.Expired() {
+               // assemble new HELLO message
+               addrList := make([]*util.Address, 0)
+               for _, epRef := range c.endpoints {
+                       addrList = append(addrList, epRef.addr)
+               }
+               node := c.local
+               var hello *blocks.HelloBlock
+               hello, err = node.HelloData(time.Hour, addrList)
+               if err != nil {
+                       return
+               }
+               msg = message.NewHelloDHTMsg()
+               msg.NumAddr = uint16(len(hello.Addresses()))
+               msg.SetAddresses(hello.Addresses())
+               if err = msg.Sign(c.local.prv); err != nil {
+                       return
+               }
+               // save for later use
+               c.lastHello = msg
+
+               // DEBUG
+               var ok bool
+               if ok, err = msg.Verify(c.PeerID()); !ok || err != nil {
+                       if !ok {
+                               err = errors.New("[core] failed to verify own 
HELLO")
+                       }
+                       logger.Println(logger.ERROR, err.Error())
+                       return
+               }
+               logger.Println(logger.DBG, "[core] New HELLO: 
"+transport.Dump(msg, "json"))
+               return
+       }
+       // we have a valid HELLO for re-use.
+       return c.lastHello, nil
+}
+
 // Addresses returns the list of listening endpoint addresses
 func (c *Core) Addresses() (list []*util.Address, err error) {
        for _, epRef := range c.endpoints {
@@ -308,14 +370,6 @@ func (c *Core) PeerID() *util.PeerID {
 // When the connection attempt is successful, information on the new
 // peer is offered through the PEER_CONNECTED signal.
 func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr) error {
-       // select endpoint for address
-       if ep := c.findEndpoint(peer, addr); ep == nil {
-               return transport.ErrTransNoEndpoint
-       }
-       return nil
-}
-
-func (c *Core) findEndpoint(peer *util.PeerID, addr net.Addr) 
transport.Endpoint {
        return nil
 }
 
diff --git a/src/gnunet/core/core_test.go b/src/gnunet/core/core_test.go
index 3c7d25a..29f740b 100644
--- a/src/gnunet/core/core_test.go
+++ b/src/gnunet/core/core_test.go
@@ -247,7 +247,7 @@ func (n *TestNode) Learn(ctx context.Context, peer 
*util.PeerID, addr *util.Addr
                label = peer.String()
        }
        n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), label)
-       if err := n.core.Learn(ctx, peer, addr); err != nil {
+       if err := n.core.Learn(ctx, peer, []*util.Address{addr}); err != nil {
                n.t.Log("Learn: " + err.Error())
        }
 }
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index eb17b30..bb2e58f 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -16,10 +16,9 @@ require (
        github.com/cespare/xxhash/v2 v2.1.2 // indirect
        github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // 
indirect
        github.com/huin/goupnp v1.0.0 // indirect
-       golang.org/x/mod v0.4.2 // indirect
+       golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
        golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
        golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
        golang.org/x/text v0.3.7 // indirect
-       golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect
-       golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
+       golang.org/x/tools v0.1.11 // indirect
 )
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index 17c2d00..2451a41 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -27,8 +27,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod 
h1:yigFU9vqHzYiE8U
 golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod 
h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
 golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898 
h1:SLP7Q4Di66FONjDJbCYrCRrh97focO6sLogHO7/g8F0=
 golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898/go.mod 
h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 
h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod 
h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
 golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -59,11 +60,11 @@ golang.org/x/text v0.3.7 
h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 
h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0=
 golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod 
h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.11 h1:loJ25fNOEhSXfHrpoGj91eCUThwdNX6u24rO1xnNteY=
+golang.org/x/tools v0.1.11/go.mod 
h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 
h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 
h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
diff --git a/src/gnunet/message/msg_hello.go b/src/gnunet/message/msg_hello.go
index 407d0a0..ae4c0e2 100644
--- a/src/gnunet/message/msg_hello.go
+++ b/src/gnunet/message/msg_hello.go
@@ -24,6 +24,7 @@ import (
        "fmt"
        "gnunet/util"
        "io"
+       "time"
 )
 
 //----------------------------------------------------------------------
@@ -38,10 +39,16 @@ type HelloAddress struct {
 
 // NewHelloAddress create a new HELLO address from the given address
 func NewHelloAddress(a *util.Address) *HelloAddress {
+       // use default expiration time, but adjust it if address expires earlier
+       exp := util.NewAbsoluteTime(time.Now().Add(HelloAddressExpiration))
+       if exp.Compare(a.Expires) > 0 {
+               exp = a.Expires
+       }
+       // convert address
        addr := &HelloAddress{
                transport: a.Netw,
                addrSize:  uint16(len(a.Address)),
-               expires:   a.Expires,
+               expires:   exp,
                address:   make([]byte, len(a.Address)),
        }
        copy(addr.address, a.Address)
diff --git a/src/gnunet/message/msg_hello_dht.go 
b/src/gnunet/message/msg_hello_dht.go
index a81d1fc..f51757c 100644
--- a/src/gnunet/message/msg_hello_dht.go
+++ b/src/gnunet/message/msg_hello_dht.go
@@ -25,7 +25,6 @@ import (
        "fmt"
        "gnunet/enums"
        "gnunet/util"
-       "io"
        "time"
 
        "github.com/bfix/gospel/crypto/ed25519"
@@ -67,49 +66,59 @@ func NewHelloDHTMsg() *HelloDHTMsg {
 }
 
 // Addresses returns the list of HelloAddress
-func (m *HelloDHTMsg) Addresses() (list []*HelloAddress, err error) {
-       rdr := bytes.NewReader(m.AddrList)
-       var addr *HelloAddress
-       num := 0
+func (m *HelloDHTMsg) Addresses() (list []*util.Address, err error) {
+       var addr *util.Address
+       var as string
+       num, pos := 0, 0
        for {
-               // parse address from stream
-               if addr, err = ParseHelloAddr(rdr); err != nil {
-                       // end of stream: no more addresses
-                       if err == io.EOF {
-                               err = nil
-                       }
-                       // check numbers
-                       if num != int(m.NumAddr) {
-                               logger.Printf(logger.WARN, "[HelloDHTMsg] 
Number of addresses doesn't match (got %d, expected %d)", num, m.NumAddr)
-                       }
+               // parse address string from stream
+               if as, pos = util.ReadCString(m.AddrList, pos); pos == -1 {
+                       break
+               }
+               if addr, err = util.ParseAddress(as); err != nil {
                        return
                }
+               addr.Expires = m.Expires
                list = append(list, addr)
                num++
        }
-}
-
-// String returns a human-readable representation of the message.
-func (m *HelloDHTMsg) String() string {
-       return fmt.Sprintf("HelloDHTMsg{expire:%s,addrs=%d}", m.Expires, 
m.NumAddr)
+       // check numbers
+       if num != int(m.NumAddr) {
+               logger.Printf(logger.WARN, "[HelloDHTMsg] Number of addresses 
does not match (got %d, expected %d)", num, m.NumAddr)
+       }
+       return
 }
 
 // SetAddresses adds addresses to the HELLO message.
-func (m *HelloDHTMsg) SetAddresses(list []*HelloAddress) {
+func (m *HelloDHTMsg) SetAddresses(list []*util.Address) {
        // write addresses as blob and track earliest expiration
-       exp := util.AbsoluteTimeNever()
+       exp := util.NewAbsoluteTime(time.Now().Add(HelloAddressExpiration))
        wrt := new(bytes.Buffer)
        for _, addr := range list {
                // check if address expires before current expire
-               if _, after := exp.Diff(addr.expires); !after {
-                       exp = addr.expires
+               if exp.Compare(addr.Expires) > 0 {
+                       exp = addr.Expires
                }
-               n, _ := wrt.Write(addr.Bytes())
-               m.MsgSize += uint16(n)
-               m.NumAddr++
+               n, _ := wrt.Write([]byte(addr.URI()))
+               wrt.WriteByte(0)
+               m.MsgSize += uint16(n + 1)
        }
        m.AddrList = wrt.Bytes()
        m.Expires = exp
+       m.NumAddr = uint16(len(list))
+}
+
+// String returns a human-readable representation of the message.
+func (m *HelloDHTMsg) String() string {
+       addrs, _ := m.Addresses()
+       aList := ""
+       for i, a := range addrs {
+               if i > 0 {
+                       aList += ","
+               }
+               aList += a.URI()
+       }
+       return fmt.Sprintf("HelloDHTMsg{expire:%s,addrs=%d:[%s]}", m.Expires, 
m.NumAddr, aList)
 }
 
 // Header returns the message header in a separate instance.
@@ -129,6 +138,18 @@ func (m *HelloDHTMsg) Verify(peer *util.PeerID) (bool, 
error) {
        return pub.EdVerify(sd, sig)
 }
 
+// Sign the HELLO data with private key
+func (m *HelloDHTMsg) Sign(prv *ed25519.PrivateKey) error {
+       // assemble signed data
+       sd := m.signedData()
+       sig, err := prv.EdSign(sd)
+       if err != nil {
+               return err
+       }
+       m.Signature = sig.Bytes()
+       return nil
+}
+
 // signedData assembles a data block for sign and verify operations.
 func (m *HelloDHTMsg) signedData() []byte {
        // hash address block
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 323a4df..34bf020 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -26,6 +26,8 @@ import (
        "gnunet/service"
        "gnunet/service/dht/blocks"
        "time"
+
+       "github.com/bfix/gospel/logger"
 )
 
 //======================================================================
@@ -107,13 +109,23 @@ func (m *Module) Put(ctx context.Context, key 
blocks.Query, block blocks.Block)
 // Filter returns the event filter for the module
 func (m *Module) Filter() *core.EventFilter {
        f := core.NewEventFilter()
+       // events we are interested in
        f.AddEvent(core.EV_CONNECT)
        f.AddEvent(core.EV_DISCONNECT)
+
+       // messages we are interested in:
+       // (1) DHT messages
        f.AddMsgType(message.DHT_CLIENT_GET)
        f.AddMsgType(message.DHT_CLIENT_GET_RESULTS_KNOWN)
        f.AddMsgType(message.DHT_CLIENT_GET_STOP)
        f.AddMsgType(message.DHT_CLIENT_PUT)
        f.AddMsgType(message.DHT_CLIENT_RESULT)
+       // (2) DHT_P2P messages
+       f.AddMsgType(message.DHT_P2P_PUT)
+       f.AddMsgType(message.DHT_P2P_GET)
+       f.AddMsgType(message.DHT_P2P_RESULT)
+       f.AddMsgType(message.DHT_P2P_HELLO)
+
        return f
 }
 
@@ -123,15 +135,18 @@ func (m *Module) event(ctx context.Context, ev 
*core.Event) {
        // New peer connected:
        case core.EV_CONNECT:
                // Add peer to routing table
+               logger.Printf(logger.INFO, "[dht] Peer %s connected", ev.Peer)
                m.rtable.Add(NewPeerAddress(ev.Peer))
 
        // Peer disconnected:
        case core.EV_DISCONNECT:
                // Remove peer from routing table
+               logger.Printf(logger.INFO, "[dht] Peer %s disconnected", 
ev.Peer)
                m.rtable.Remove(NewPeerAddress(ev.Peer))
 
        // Message received.
        case core.EV_MESSAGE:
+               logger.Printf(logger.INFO, "[dht] Message received: %s", 
ev.Msg.String())
                // process message (if applicable)
                if m.ProcessFcn != nil {
                        m.ProcessFcn(ctx, ev.Msg, ev.Resp)
diff --git a/src/gnunet/service/dht/service.go 
b/src/gnunet/service/dht/service.go
index 82937b9..3cf216f 100644
--- a/src/gnunet/service/dht/service.go
+++ b/src/gnunet/service/dht/service.go
@@ -134,7 +134,7 @@ func (s *Service) HandleMessage(ctx context.Context, msg 
message.Message, back t
                //----------------------------------------------------------
                // UNKNOWN message type received
                //----------------------------------------------------------
-               logger.Printf(logger.ERROR, "[dht%s] Unhandled message of type 
(%d)\n", label, msg.Header().MsgType)
+               logger.Printf(logger.ERROR, "[dht-%s] Unhandled message of type 
(%d)\n", label, msg.Header().MsgType)
                return false
        }
        return true
diff --git a/src/gnunet/service/rpc.go b/src/gnunet/service/rpc.go
index a673c2b..de3a2c1 100644
--- a/src/gnunet/service/rpc.go
+++ b/src/gnunet/service/rpc.go
@@ -48,13 +48,14 @@ func StartRPC(ctx context.Context, endpoint string) (srvRPC 
*rpc.Server, err err
                WriteTimeout: 15 * time.Second,
                ReadTimeout:  15 * time.Second,
        }
+       // start listening
+       go func() {
+               if err := srv.ListenAndServe(); err != http.ErrServerClosed {
+                       logger.Printf(logger.WARN, "[RPC] Server listen failed: 
%s", err.Error())
+               }
+       }()
+       // wait for shutdown
        go func() {
-               // start listening
-               go func() {
-                       if err := srv.ListenAndServe(); err != nil {
-                               logger.Printf(logger.WARN, "[RPC] Server listen 
failed: %s", err.Error())
-                       }
-               }()
                select {
                case <-ctx.Done():
                        if err := srv.Shutdown(context.Background()); err != 
nil {
diff --git a/src/gnunet/test/gnunet-dhtu/main.go 
b/src/gnunet/test/gnunet-dhtu/main.go
index ce3651d..2a49b9c 100644
--- a/src/gnunet/test/gnunet-dhtu/main.go
+++ b/src/gnunet/test/gnunet-dhtu/main.go
@@ -20,13 +20,14 @@ package main
 
 import (
        "context"
-       "encoding/hex"
        "flag"
        "fmt"
        "gnunet/config"
        "gnunet/core"
+       "gnunet/message"
        "gnunet/service"
        "gnunet/service/dht"
+       "gnunet/transport"
        "gnunet/util"
        "log"
        "net/rpc"
@@ -37,41 +38,31 @@ import (
 
 //----------------------------------------------------------------------
 // Test Go node with DHTU GNUnet nodes
+//
+// N.B.: THIS TEST ONLY COVERS THE BASIC MESSAGE EXCHANGE LEVEL; NO
+// MESSAGE PROCESSING EXCEPT FOR HELLO MESSAGES WILL TAKE PLACE.
 //----------------------------------------------------------------------
 
 func main() {
        // handle command-line arguments
-       var (
-               remoteId   string
-               remoteAddr string
-               cfgFile    string
-       )
+       var remoteAddr string
+       var cfgFile string
        flag.StringVar(&cfgFile, "c", "gnunet-config.json", "configuration 
file")
-       flag.StringVar(&remoteId, "i", "", "peer id of remote node")
        flag.StringVar(&remoteAddr, "a", "", "address of remote node")
        flag.Parse()
 
        // read configuration file and set missing arguments.
        if err := config.ParseConfig(cfgFile); err != nil {
-               logger.Printf(logger.ERROR, "[gnunet-dhtu] Invalid 
configuration file: %s\n", err.Error())
+               logger.Printf(logger.ERROR, "[node] Invalid configuration file: 
%s\n", err.Error())
                return
        }
 
        // convert arguments
-       var (
-               rId   *util.PeerID
-               rAddr *util.Address
-               buf   []byte
-               err   error
-       )
+       var rAddr *util.Address
+       var err error
        if rAddr, err = util.ParseAddress(remoteAddr); err != nil {
-               log.Fatal(err)
-       }
-       if len(remoteId) > 0 {
-               if buf, err = util.DecodeStringToBinary(remoteId, 32); err != 
nil {
-                       log.Fatal(err)
-               }
-               rId = util.NewPeerID(buf)
+               logger.Println(logger.ERROR, err.Error())
+               return
        }
 
        // setup execution context
@@ -84,7 +75,8 @@ func main() {
        // create and run node
        node, err := NewTestNode(ctx)
        if err != nil {
-               log.Fatal(err)
+               logger.Println(logger.ERROR, err.Error())
+               return
        }
        defer node.Shutdown()
 
@@ -93,13 +85,17 @@ func main() {
        as := fmt.Sprintf("%s://%s:%d", ep.Network, ep.Address, ep.Port)
        listen, err := util.ParseAddress(as)
        if err != nil {
-               log.Fatal(err)
+               logger.Println(logger.ERROR, err.Error())
+               return
        }
        aList := []*util.Address{listen}
-       logger.Println(logger.INFO, "HELLO: "+node.HelloURL(aList))
+       logger.Println(logger.INFO, "[node] --> "+node.HelloURL(aList))
 
-       // learn bootstrap address (triggers HELLO)
-       node.Learn(ctx, rId, rAddr)
+       // send HELLO to bootstrap address
+       if err = node.SendHello(ctx, rAddr); err != nil && err != 
transport.ErrEndpMaybeSent {
+               logger.Println(logger.ERROR, "[node] failed to send HELLO: 
"+err.Error())
+               return
+       }
 
        // run forever
        var ch chan struct{}
@@ -121,22 +117,15 @@ func (n *TestNode) Shutdown() {
        n.core.Shutdown()
 }
 func (n *TestNode) HelloURL(a []*util.Address) string {
-       hd, err := n.peer.HelloData(time.Hour, a)
+       hd, err := n.peer.HelloData(message.HelloAddressExpiration, a)
        if err != nil {
                return ""
        }
        return hd.URL()
 }
 
-func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr 
*util.Address) {
-       label := "@"
-       if peer != nil {
-               label = peer.String()
-       }
-       log.Printf("[%d] Learning %s for %s", n.id, addr.StringAll(), label)
-       if err := n.core.Learn(ctx, peer, addr); err != nil {
-               log.Println("Learn: " + err.Error())
-       }
+func (n *TestNode) SendHello(ctx context.Context, addr *util.Address) error {
+       return n.core.SendHello(ctx, addr)
 }
 
 func NewTestNode(ctx context.Context) (node *TestNode, err error) {
@@ -150,8 +139,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err 
error) {
                return
        }
        node.peer = node.core.Peer()
-       log.Printf("[%d] Node %s starting", node.id, node.peer.GetID())
-       log.Printf("[%d]   --> %s", node.id, 
hex.EncodeToString(node.peer.GetID().Key))
+       logger.Printf(logger.INFO, "[node] Node %s starting", node.peer.GetID())
 
        // start a new DHT service
        dht, err := dht.NewService(ctx, node.core)
@@ -162,7 +150,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err 
error) {
        // start JSON-RPC server on request
        var rpc *rpc.Server
        if rpc, err = service.StartRPC(ctx, config.Cfg.RPC.Endpoint); err != 
nil {
-               logger.Printf(logger.ERROR, "[gnunet-dhtu] RPC failed to start: 
%s", err.Error())
+               logger.Printf(logger.ERROR, "[node] RPC failed to start: %s", 
err.Error())
                return
        }
        dht.InitRPC(rpc)
@@ -177,7 +165,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err 
error) {
                if node.addr, err = util.ParseAddress(s); err != nil {
                        continue
                }
-               log.Printf("[%d] Listening on %s", node.id, s)
+               logger.Printf(logger.INFO, "[node] Listening on %s", s)
        }
 
        // register as event listener
@@ -195,22 +183,22 @@ func NewTestNode(ctx context.Context) (node *TestNode, 
err error) {
                        case ev := <-incoming:
                                switch ev.ID {
                                case core.EV_CONNECT:
-                                       log.Printf("[%d] <<< Peer %s 
connected", node.id, ev.Peer)
+                                       logger.Printf(logger.INFO, "[node] <<< 
Peer %s connected", ev.Peer)
                                case core.EV_DISCONNECT:
-                                       log.Printf("[%d] <<< Peer %s 
diconnected", node.id, ev.Peer)
+                                       logger.Printf(logger.INFO, "[node] <<< 
Peer %s diconnected", ev.Peer)
                                case core.EV_MESSAGE:
-                                       log.Printf("[%d] <<< Msg from %s of 
type %d", node.id, ev.Peer, ev.Msg.Header().MsgType)
-                                       log.Printf("[%d] <<<    --> %s", 
node.id, ev.Msg.String())
+                                       logger.Printf(logger.INFO, "[node] <<< 
Msg from %s of type %d", ev.Peer, ev.Msg.Header().MsgType)
+                                       logger.Printf(logger.INFO, "[node] <<<  
  --> %s", ev.Msg.String())
                                }
 
                        // handle termination signal
                        case <-ctx.Done():
-                               log.Printf("[%d] Shutting down node", node.id)
+                               logger.Println(logger.INFO, "[node] Shutting 
down node")
                                return
 
                        // handle heart beat
                        case now := <-tick.C:
-                               log.Printf("[%d] Heart beat at %s", node.id, 
now.String())
+                               logger.Printf(logger.INFO, "[node] Heart beat 
at %s", now.String())
                        }
                }
        }()
diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go
index 26e4463..d98776a 100644
--- a/src/gnunet/transport/endpoint.go
+++ b/src/gnunet/transport/endpoint.go
@@ -25,6 +25,11 @@ import (
        "gnunet/message"
        "gnunet/util"
        "net"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/bfix/gospel/logger"
 )
 
 var (
@@ -34,6 +39,8 @@ var (
        ErrEndpExists           = errors.New("endpoint exists")
        ErrEndpNoAddress        = errors.New("no address for endpoint")
        ErrEndpNoConnection     = errors.New("no connection on endpoint")
+       ErrEndpMaybeSent        = errors.New("message may have been sent - cant 
know")
+       ErrEndpWriteShort       = errors.New("write too short")
 )
 
 // Endpoint represents a local endpoint that can send and receive messages.
@@ -78,9 +85,11 @@ func NewEndpoint(addr net.Addr) (ep Endpoint, err error) {
 // PacketEndpoint for packet-oriented network protocols
 type PaketEndpoint struct {
        id   int            // endpoint identifier
+       netw string         // network identifier ("udp", "udp4", "udp6", ...)
        addr net.Addr       // endpoint address
        conn net.PacketConn // packet connection
        buf  []byte         // buffer for read/write operations
+       mtx  sync.Mutex     // mutex for send operations
 }
 
 // Run packet endpoint: send incoming messages to the handler.
@@ -94,9 +103,14 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan 
*TransportMessage) (
        // use the actual listening address
        ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String())
 
+       // save more information to detect compatible send-to addresses
+       ep.netw = ep.conn.LocalAddr().Network()
+
        // run watch dog for termination
+       active := true
        go func() {
                <-ctx.Done()
+               active = false
                ep.conn.Close()
        }()
        // run go routine to handle messages from clients
@@ -105,6 +119,15 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr 
chan *TransportMessage) (
                        // read next message
                        tm, err := ep.read()
                        if err != nil {
+                               // leave go routine if already dead
+                               if !active {
+                                       return
+                               }
+                               logger.Println(logger.WARN, "[pkt_ep] read 
failed: "+err.Error())
+                               // gracefully ignore unknown message types
+                               if strings.HasPrefix(err.Error(), "unknown 
message type") {
+                                       continue
+                               }
                                break
                        }
                        // label message
@@ -154,19 +177,27 @@ func (ep *PaketEndpoint) read() (tm *TransportMessage, 
err error) {
 
 // Send message to address from endpoint
 func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg 
*TransportMessage) (err error) {
+       // only one sender at a time
+       ep.mtx.Lock()
+       defer ep.mtx.Unlock()
+
        // check for valid connection
        if ep.conn == nil {
                return ErrEndpNoConnection
        }
+
        // resolve target address
        var a *net.UDPAddr
-       a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String())
+       if a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), 
addr.String()); err != nil {
+               return
+       }
 
        // get message content (TransportMessage)
        var buf []byte
        if buf, err = msg.Bytes(); err != nil {
                return
        }
+
        // handle extended protocol:
        switch ep.addr.Network() {
        case "ip+udp":
@@ -176,8 +207,18 @@ func (ep *PaketEndpoint) Send(ctx context.Context, addr 
net.Addr, msg *Transport
                // unknown protocol
                return ErrEndpProtocolUnknown
        }
-       _, err = ep.conn.WriteTo(buf, a)
-       return
+
+       // timeout after 1 second
+       if err = ep.conn.SetWriteDeadline(time.Now().Add(time.Second)); err != 
nil {
+               logger.Println(logger.DBG, "[pkt_ep] SetWriteDeadline failed: 
"+err.Error())
+               return
+       }
+       var n int
+       n, err = ep.conn.WriteTo(buf, a)
+       if n != len(buf) {
+               err = ErrEndpWriteShort
+       }
+       return ErrEndpMaybeSent
 }
 
 // Address returms the
@@ -188,6 +229,23 @@ func (ep *PaketEndpoint) Address() net.Addr {
 // CanSendTo returns true if the endpoint can sent to address
 func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) {
        ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network())
+       if ok {
+               // try to convert addr to compatible type
+               switch ep.netw {
+               case "udp", "udp4", "udp6":
+                       var ua *net.UDPAddr
+                       var err error
+                       if ua, err = net.ResolveUDPAddr(ep.netw, 
addr.String()); err != nil {
+                               ok = false
+                       }
+                       logger.Printf(logger.DBG, "[pkt_ep] %s + %v -> %v 
(%v)", ep.netw, addr, ua, ok)
+               default:
+                       logger.Printf(logger.DBG, "[pkt_ep] unknown network 
%s", ep.netw)
+                       ok = false
+               }
+       } else {
+               logger.Printf(logger.DBG, "[pkt_ep] protocol mismatch %s -- 
%s", EpProtocol(addr.Network()), EpProtocol(ep.addr.Network()))
+       }
        return
 }
 
diff --git a/src/gnunet/transport/reader_writer.go 
b/src/gnunet/transport/reader_writer.go
index db3527e..e99b3df 100644
--- a/src/gnunet/transport/reader_writer.go
+++ b/src/gnunet/transport/reader_writer.go
@@ -19,7 +19,10 @@
 package transport
 
 import (
+       "bytes"
        "context"
+       "encoding/hex"
+       "encoding/json"
        "errors"
        "fmt"
        "gnunet/message"
@@ -125,6 +128,26 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, 
buf []byte) (msg messag
        return msg, nil
 }
 
+//----------------------------------------------------------------------
+// Dump message
+func Dump(msg message.Message, format string) string {
+       switch format {
+       case "json":
+               buf, err := json.Marshal(msg)
+               if err != nil {
+                       return err.Error()
+               }
+               return string(buf)
+       case "hex":
+               buf := new(bytes.Buffer)
+               if err := WriteMessageDirect(buf, msg); err != nil {
+                       return err.Error()
+               }
+               return hex.EncodeToString(buf.Bytes())
+       }
+       return "unknown message dump format"
+}
+
 //----------------------------------------------------------------------
 // helper for wrapped ReadCloser/WriteCloser (close is nop)
 //----------------------------------------------------------------------
diff --git a/src/gnunet/util/address_test.go b/src/gnunet/util/address_test.go
new file mode 100644
index 0000000..d4936e8
--- /dev/null
+++ b/src/gnunet/util/address_test.go
@@ -0,0 +1,54 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019-2022 Bernd Fix  >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package util
+
+import (
+       "testing"
+)
+
+func TestAddrList(t *testing.T) {
+       // list of addresses to check
+       addrS := []string{
+               "ip+udp://127.0.0.1:10000",
+               "ip+udp://172.17.0.4:10000",
+               "ip+udp://[::ffff:172.17.0.4]:10000",
+       }
+       // convert to util.Address
+       addrA := make([]*Address, len(addrS))
+       var err error
+       for i, as := range addrS {
+               if addrA[i], err = ParseAddress(as); err != nil {
+                       t.Fatal(err)
+               }
+       }
+       // allocate AddrList
+       addrL := NewPeerAddrList()
+       for _, addr := range addrA {
+               rc := 
addrL.Add("2BHV4BN8736W5W3CJNXY2S9WABWTGH35QMFG4BPCWBH7DNBCFC60", addr)
+               t.Logf("added %s (%d)", addr.URI(), rc)
+       }
+
+       // check list
+       t.Log("checking list...")
+       list := 
addrL.Get("2BHV4BN8736W5W3CJNXY2S9WABWTGH35QMFG4BPCWBH7DNBCFC60", "ip+udp")
+       t.Logf("got: %v", list)
+       if len(list) != len(addrS) {
+               t.Fatal("list size not matching")
+       }
+}

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]