[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-go] branch master updated: RC2 for Milestone 1 (NGI Assure)
From: |
gnunet |
Subject: |
[gnunet-go] branch master updated: RC2 for Milestone 1 (NGI Assure) |
Date: |
Sat, 11 Jun 2022 15:46:57 +0200 |
This is an automated email from the git hooks/post-receive script.
bernd-fix pushed a commit to branch master
in repository gnunet-go.
The following commit(s) were added to refs/heads/master by this push:
new 8f8feaf RC2 for Milestone 1 (NGI Assure)
8f8feaf is described below
commit 8f8feaf176e62f14a6d449c0a2fb6f0ca76b22b8
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Sat Jun 11 15:45:45 2022 +0200
RC2 for Milestone 1 (NGI Assure)
---
src/gnunet/cmd/gnunet-service-dht-go/main.go | 31 +++++-
src/gnunet/core/core.go | 156 ++++++++++++++++++---------
src/gnunet/core/core_test.go | 2 +-
src/gnunet/go.mod | 5 +-
src/gnunet/go.sum | 7 +-
src/gnunet/message/msg_hello.go | 9 +-
src/gnunet/message/msg_hello_dht.go | 75 ++++++++-----
src/gnunet/service/dht/module.go | 15 +++
src/gnunet/service/dht/service.go | 2 +-
src/gnunet/service/rpc.go | 13 +--
src/gnunet/test/gnunet-dhtu/main.go | 80 ++++++--------
src/gnunet/transport/endpoint.go | 64 ++++++++++-
src/gnunet/transport/reader_writer.go | 23 ++++
src/gnunet/util/address_test.go | 54 ++++++++++
14 files changed, 393 insertions(+), 143 deletions(-)
diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go
b/src/gnunet/cmd/gnunet-service-dht-go/main.go
index f9e24bc..ef6da91 100644
--- a/src/gnunet/cmd/gnunet-service-dht-go/main.go
+++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go
@@ -32,6 +32,8 @@ import (
"gnunet/core"
"gnunet/service"
"gnunet/service/dht"
+ "gnunet/service/dht/blocks"
+ "gnunet/util"
"github.com/bfix/gospel/logger"
)
@@ -72,7 +74,7 @@ func main() {
socket = config.Cfg.GNS.Service.Socket
}
params := make(map[string]string)
- if len(param) == 0 {
+ if len(param) > 0 {
for _, p := range strings.Split(param, ",") {
kv := strings.SplitN(p, "=", 2)
params[kv[0]] = kv[1]
@@ -121,6 +123,33 @@ func main() {
dhtSrv.InitRPC(rpc)
}
+ // handle bootstrap: collect known addresses
+ bsList := make([]*util.Address, 0)
+ for _, bs := range config.Cfg.Bootstrap.Nodes {
+ // check for HELLO URL
+ if strings.HasPrefix(bs, "gnunet://hello/") {
+ var hb *blocks.HelloBlock
+ if hb, err = blocks.ParseHelloURL(bs, true); err != nil
{
+ logger.Printf(logger.ERROR, "[dht] failed
bootstrap HELLO URL %s: %s", bs, err.Error())
+ continue
+ }
+ // append HELLO addresses
+ bsList = append(bsList, hb.Addresses()...)
+ } else {
+ // parse address directly
+ var addr *util.Address
+ if addr, err = util.ParseAddress(bs); err != nil {
+ logger.Printf(logger.ERROR, "[dht] failed
bootstrap address %s: %s", bs, err.Error())
+ continue
+ }
+ bsList = append(bsList, addr)
+ }
+ }
+ // send HELLO to all bootstrap addresses
+ for _, addr := range bsList {
+ c.SendHello(ctx, addr)
+ }
+
// handle OS signals
sigCh := make(chan os.Signal, 5)
signal.Notify(sigCh)
diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go
index ee354e3..83da8b1 100644
--- a/src/gnunet/core/core.go
+++ b/src/gnunet/core/core.go
@@ -38,6 +38,7 @@ import (
var (
ErrCoreNoUpnpDyn = errors.New("no dynamic port with UPnP")
ErrCoreNoEndpAddr = errors.New("no endpoint for address")
+ ErrCoreNotSent = errors.New("message not sent")
)
//----------------------------------------------------------------------
@@ -69,6 +70,9 @@ type Core struct {
// List of registered endpoints
endpoints map[string]*EndpointRef
+
+ // last HELLO message used; re-create if expired
+ lastHello *message.HelloDHTMsg
}
//----------------------------------------------------------------------
@@ -159,12 +163,13 @@ func (c *Core) pump(ctx context.Context) {
select {
// get (next) message from transport
case tm := <-c.incoming:
- var ev *Event
+ logger.Printf(logger.DBG, "[core] Message received from
%s: %s", tm.Peer, transport.Dump(tm.Msg, "json"))
// inspect message for peer state events
+ var ev *Event
switch msg := tm.Msg.(type) {
case *message.HelloDHTMsg:
- logger.Println(logger.INFO, "[core] Received
HELLO message: "+msg.String())
+
// verify integrity of message
if ok, err := msg.Verify(tm.Peer); !ok || err
!= nil {
logger.Println(logger.WARN, "[core]
Received invalid DHT_P2P_HELLO message")
@@ -176,9 +181,11 @@ func (c *Core) pump(ctx context.Context) {
logger.Println(logger.WARN, "[core]
Failed to parse addresses from DHT_P2P_HELLO message")
break
}
- for _, addr := range aList {
- c.Learn(ctx, tm.Peer, addr.Wrap())
+ if err := c.Learn(ctx, tm.Peer, aList); err !=
nil {
+ logger.Println(logger.WARN, "[core]
Failed to learn addresses from DHT_P2P_HELLO message: "+err.Error())
+ break
}
+
// generate EV_CONNECT event
ev = &Event{
ID: EV_CONNECT,
@@ -222,21 +229,34 @@ func (c *Core) Shutdown() {
// Send is a function that allows the local peer to send a protocol
// message to a remote peer.
-func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg
message.Message) error {
- // get peer label (id or "@")
- label := "@"
- if peer != nil {
- label = peer.String()
- }
+func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg
message.Message) (err error) {
// TODO: select best endpoint protocol for transport; now fixed to
IP+UDP
netw := "ip+udp"
- addrs := c.peers.Get(label, netw)
- if len(addrs) == 0 {
- return ErrCoreNoEndpAddr
+
+ // try all addresses for peer
+ aList := c.peers.Get(peer.String(), netw)
+ maybe := false // message may be sent...
+ for _, addr := range aList {
+ logger.Printf(logger.WARN, "[core] Trying to send to %s",
addr.URI())
+ // send message to address
+ if err = c.send(ctx, addr, msg); err != nil {
+ // if it is possible that the message was not sent, try
next address
+ if err != transport.ErrEndpMaybeSent {
+ logger.Printf(logger.WARN, "[core] Failed to
send to %s: %s", addr.URI(), err.Error())
+ } else {
+ maybe = true
+ }
+ continue
+ }
+ // one successful send is enough
+ return
}
- // TODO: select best address; curently selects first
- addr := addrs[0]
- return c.send(ctx, addr, msg)
+ if maybe {
+ err = transport.ErrEndpMaybeSent
+ } else {
+ err = ErrCoreNotSent
+ }
+ return
}
// send message directly to address
@@ -247,40 +267,82 @@ func (c *Core) send(ctx context.Context, addr
*util.Address, msg message.Message
return c.trans.Send(ctx, addr, tm)
}
-// Learn a (new) address for peer
-func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr
*util.Address) (err error) {
- // assemble our own HELLO message:
- addrList := make([]*util.Address, 0)
- for _, epRef := range c.endpoints {
- addrList = append(addrList, epRef.addr)
- }
- node := c.local
- var hello *blocks.HelloBlock
- hello, err = node.HelloData(time.Hour, addrList)
- if err != nil {
- return
+// Learn (new) addresses for peer
+func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addrs
[]*util.Address) (err error) {
+ // learn all addresses for peer
+ newPeer := false
+ for _, addr := range addrs {
+ logger.Printf(logger.INFO, "[core] Learning %s for %s (expires
%s)", addr.URI(), peer, addr.Expires)
+ newPeer = (c.peers.Add(peer.String(), addr) == 1) || newPeer
}
- msg := message.NewHelloDHTMsg()
- var aList []*message.HelloAddress
- msg.NumAddr = uint16(len(hello.Addresses()))
- for _, a := range hello.Addresses() {
- ha := message.NewHelloAddress(a)
- aList = append(aList, ha)
- }
- msg.SetAddresses(aList)
-
- // if no peer is given, we send HELLO directly to address
- if peer == nil {
- return c.send(ctx, addr, msg)
- }
- // add peer address to address list
- if c.peers.Add(peer.String(), addr) == 1 {
+ // new peer detected?
+ if newPeer {
// we added a previously unknown peer: send a HELLO
+ var msg *message.HelloDHTMsg
+ if msg, err = c.getHello(); err != nil {
+ return
+ }
+ logger.Printf(logger.INFO, "[core] Sending HELLO to %s: %s",
peer, msg)
err = c.Send(ctx, peer, msg)
+ // no error if the message might have been sent
+ if err == transport.ErrEndpMaybeSent {
+ err = nil
+ }
}
return
}
+// Send the currently active HELLO to given network address
+func (c *Core) SendHello(ctx context.Context, addr *util.Address) (err error) {
+ // get (buffered) HELLO
+ var msg *message.HelloDHTMsg
+ if msg, err = c.getHello(); err != nil {
+ return
+ }
+ logger.Printf(logger.INFO, "[core] Sending HELLO to %s: %s",
addr.URI(), msg)
+ return c.send(ctx, addr, msg)
+}
+
+// get the recent HELLO if it is defined and not expired;
+// create a new HELLO otherwise.
+func (c *Core) getHello() (msg *message.HelloDHTMsg, err error) {
+ if c.lastHello == nil || c.lastHello.Expires.Expired() {
+ // assemble new HELLO message
+ addrList := make([]*util.Address, 0)
+ for _, epRef := range c.endpoints {
+ addrList = append(addrList, epRef.addr)
+ }
+ node := c.local
+ var hello *blocks.HelloBlock
+ hello, err = node.HelloData(time.Hour, addrList)
+ if err != nil {
+ return
+ }
+ msg = message.NewHelloDHTMsg()
+ msg.NumAddr = uint16(len(hello.Addresses()))
+ msg.SetAddresses(hello.Addresses())
+ if err = msg.Sign(c.local.prv); err != nil {
+ return
+ }
+ // save for later use
+ c.lastHello = msg
+
+ // DEBUG
+ var ok bool
+ if ok, err = msg.Verify(c.PeerID()); !ok || err != nil {
+ if !ok {
+ err = errors.New("[core] failed to verify own
HELLO")
+ }
+ logger.Println(logger.ERROR, err.Error())
+ return
+ }
+ logger.Println(logger.DBG, "[core] New HELLO:
"+transport.Dump(msg, "json"))
+ return
+ }
+ // we have a valid HELLO for re-use.
+ return c.lastHello, nil
+}
+
// Addresses returns the list of listening endpoint addresses
func (c *Core) Addresses() (list []*util.Address, err error) {
for _, epRef := range c.endpoints {
@@ -308,14 +370,6 @@ func (c *Core) PeerID() *util.PeerID {
// When the connection attempt is successful, information on the new
// peer is offered through the PEER_CONNECTED signal.
func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr) error {
- // select endpoint for address
- if ep := c.findEndpoint(peer, addr); ep == nil {
- return transport.ErrTransNoEndpoint
- }
- return nil
-}
-
-func (c *Core) findEndpoint(peer *util.PeerID, addr net.Addr)
transport.Endpoint {
return nil
}
diff --git a/src/gnunet/core/core_test.go b/src/gnunet/core/core_test.go
index 3c7d25a..29f740b 100644
--- a/src/gnunet/core/core_test.go
+++ b/src/gnunet/core/core_test.go
@@ -247,7 +247,7 @@ func (n *TestNode) Learn(ctx context.Context, peer
*util.PeerID, addr *util.Addr
label = peer.String()
}
n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), label)
- if err := n.core.Learn(ctx, peer, addr); err != nil {
+ if err := n.core.Learn(ctx, peer, []*util.Address{addr}); err != nil {
n.t.Log("Learn: " + err.Error())
}
}
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index eb17b30..bb2e58f 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -16,10 +16,9 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f //
indirect
github.com/huin/goupnp v1.0.0 // indirect
- golang.org/x/mod v0.4.2 // indirect
+ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect
- golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect
- golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
+ golang.org/x/tools v0.1.11 // indirect
)
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index 17c2d00..2451a41 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -27,8 +27,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod
h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod
h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898
h1:SLP7Q4Di66FONjDJbCYrCRrh97focO6sLogHO7/g8F0=
golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898/go.mod
h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4
h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod
h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -59,11 +60,11 @@ golang.org/x/text v0.3.7
h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2
h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod
h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.11 h1:loJ25fNOEhSXfHrpoGj91eCUThwdNX6u24rO1xnNteY=
+golang.org/x/tools v0.1.11/go.mod
h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
diff --git a/src/gnunet/message/msg_hello.go b/src/gnunet/message/msg_hello.go
index 407d0a0..ae4c0e2 100644
--- a/src/gnunet/message/msg_hello.go
+++ b/src/gnunet/message/msg_hello.go
@@ -24,6 +24,7 @@ import (
"fmt"
"gnunet/util"
"io"
+ "time"
)
//----------------------------------------------------------------------
@@ -38,10 +39,16 @@ type HelloAddress struct {
// NewHelloAddress create a new HELLO address from the given address
func NewHelloAddress(a *util.Address) *HelloAddress {
+ // use default expiration time, but adjust it if address expires earlier
+ exp := util.NewAbsoluteTime(time.Now().Add(HelloAddressExpiration))
+ if exp.Compare(a.Expires) > 0 {
+ exp = a.Expires
+ }
+ // convert address
addr := &HelloAddress{
transport: a.Netw,
addrSize: uint16(len(a.Address)),
- expires: a.Expires,
+ expires: exp,
address: make([]byte, len(a.Address)),
}
copy(addr.address, a.Address)
diff --git a/src/gnunet/message/msg_hello_dht.go
b/src/gnunet/message/msg_hello_dht.go
index a81d1fc..f51757c 100644
--- a/src/gnunet/message/msg_hello_dht.go
+++ b/src/gnunet/message/msg_hello_dht.go
@@ -25,7 +25,6 @@ import (
"fmt"
"gnunet/enums"
"gnunet/util"
- "io"
"time"
"github.com/bfix/gospel/crypto/ed25519"
@@ -67,49 +66,59 @@ func NewHelloDHTMsg() *HelloDHTMsg {
}
// Addresses returns the list of HelloAddress
-func (m *HelloDHTMsg) Addresses() (list []*HelloAddress, err error) {
- rdr := bytes.NewReader(m.AddrList)
- var addr *HelloAddress
- num := 0
+func (m *HelloDHTMsg) Addresses() (list []*util.Address, err error) {
+ var addr *util.Address
+ var as string
+ num, pos := 0, 0
for {
- // parse address from stream
- if addr, err = ParseHelloAddr(rdr); err != nil {
- // end of stream: no more addresses
- if err == io.EOF {
- err = nil
- }
- // check numbers
- if num != int(m.NumAddr) {
- logger.Printf(logger.WARN, "[HelloDHTMsg]
Number of addresses doesn't match (got %d, expected %d)", num, m.NumAddr)
- }
+ // parse address string from stream
+ if as, pos = util.ReadCString(m.AddrList, pos); pos == -1 {
+ break
+ }
+ if addr, err = util.ParseAddress(as); err != nil {
return
}
+ addr.Expires = m.Expires
list = append(list, addr)
num++
}
-}
-
-// String returns a human-readable representation of the message.
-func (m *HelloDHTMsg) String() string {
- return fmt.Sprintf("HelloDHTMsg{expire:%s,addrs=%d}", m.Expires,
m.NumAddr)
+ // check numbers
+ if num != int(m.NumAddr) {
+ logger.Printf(logger.WARN, "[HelloDHTMsg] Number of addresses
does not match (got %d, expected %d)", num, m.NumAddr)
+ }
+ return
}
// SetAddresses adds addresses to the HELLO message.
-func (m *HelloDHTMsg) SetAddresses(list []*HelloAddress) {
+func (m *HelloDHTMsg) SetAddresses(list []*util.Address) {
// write addresses as blob and track earliest expiration
- exp := util.AbsoluteTimeNever()
+ exp := util.NewAbsoluteTime(time.Now().Add(HelloAddressExpiration))
wrt := new(bytes.Buffer)
for _, addr := range list {
// check if address expires before current expire
- if _, after := exp.Diff(addr.expires); !after {
- exp = addr.expires
+ if exp.Compare(addr.Expires) > 0 {
+ exp = addr.Expires
}
- n, _ := wrt.Write(addr.Bytes())
- m.MsgSize += uint16(n)
- m.NumAddr++
+ n, _ := wrt.Write([]byte(addr.URI()))
+ wrt.WriteByte(0)
+ m.MsgSize += uint16(n + 1)
}
m.AddrList = wrt.Bytes()
m.Expires = exp
+ m.NumAddr = uint16(len(list))
+}
+
+// String returns a human-readable representation of the message.
+func (m *HelloDHTMsg) String() string {
+ addrs, _ := m.Addresses()
+ aList := ""
+ for i, a := range addrs {
+ if i > 0 {
+ aList += ","
+ }
+ aList += a.URI()
+ }
+ return fmt.Sprintf("HelloDHTMsg{expire:%s,addrs=%d:[%s]}", m.Expires,
m.NumAddr, aList)
}
// Header returns the message header in a separate instance.
@@ -129,6 +138,18 @@ func (m *HelloDHTMsg) Verify(peer *util.PeerID) (bool,
error) {
return pub.EdVerify(sd, sig)
}
+// Sign the HELLO data with private key
+func (m *HelloDHTMsg) Sign(prv *ed25519.PrivateKey) error {
+ // assemble signed data
+ sd := m.signedData()
+ sig, err := prv.EdSign(sd)
+ if err != nil {
+ return err
+ }
+ m.Signature = sig.Bytes()
+ return nil
+}
+
// signedData assembles a data block for sign and verify operations.
func (m *HelloDHTMsg) signedData() []byte {
// hash address block
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 323a4df..34bf020 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -26,6 +26,8 @@ import (
"gnunet/service"
"gnunet/service/dht/blocks"
"time"
+
+ "github.com/bfix/gospel/logger"
)
//======================================================================
@@ -107,13 +109,23 @@ func (m *Module) Put(ctx context.Context, key
blocks.Query, block blocks.Block)
// Filter returns the event filter for the module
func (m *Module) Filter() *core.EventFilter {
f := core.NewEventFilter()
+ // events we are interested in
f.AddEvent(core.EV_CONNECT)
f.AddEvent(core.EV_DISCONNECT)
+
+ // messages we are interested in:
+ // (1) DHT messages
f.AddMsgType(message.DHT_CLIENT_GET)
f.AddMsgType(message.DHT_CLIENT_GET_RESULTS_KNOWN)
f.AddMsgType(message.DHT_CLIENT_GET_STOP)
f.AddMsgType(message.DHT_CLIENT_PUT)
f.AddMsgType(message.DHT_CLIENT_RESULT)
+ // (2) DHT_P2P messages
+ f.AddMsgType(message.DHT_P2P_PUT)
+ f.AddMsgType(message.DHT_P2P_GET)
+ f.AddMsgType(message.DHT_P2P_RESULT)
+ f.AddMsgType(message.DHT_P2P_HELLO)
+
return f
}
@@ -123,15 +135,18 @@ func (m *Module) event(ctx context.Context, ev
*core.Event) {
// New peer connected:
case core.EV_CONNECT:
// Add peer to routing table
+ logger.Printf(logger.INFO, "[dht] Peer %s connected", ev.Peer)
m.rtable.Add(NewPeerAddress(ev.Peer))
// Peer disconnected:
case core.EV_DISCONNECT:
// Remove peer from routing table
+ logger.Printf(logger.INFO, "[dht] Peer %s disconnected",
ev.Peer)
m.rtable.Remove(NewPeerAddress(ev.Peer))
// Message received.
case core.EV_MESSAGE:
+ logger.Printf(logger.INFO, "[dht] Message received: %s",
ev.Msg.String())
// process message (if applicable)
if m.ProcessFcn != nil {
m.ProcessFcn(ctx, ev.Msg, ev.Resp)
diff --git a/src/gnunet/service/dht/service.go
b/src/gnunet/service/dht/service.go
index 82937b9..3cf216f 100644
--- a/src/gnunet/service/dht/service.go
+++ b/src/gnunet/service/dht/service.go
@@ -134,7 +134,7 @@ func (s *Service) HandleMessage(ctx context.Context, msg
message.Message, back t
//----------------------------------------------------------
// UNKNOWN message type received
//----------------------------------------------------------
- logger.Printf(logger.ERROR, "[dht%s] Unhandled message of type
(%d)\n", label, msg.Header().MsgType)
+ logger.Printf(logger.ERROR, "[dht-%s] Unhandled message of type
(%d)\n", label, msg.Header().MsgType)
return false
}
return true
diff --git a/src/gnunet/service/rpc.go b/src/gnunet/service/rpc.go
index a673c2b..de3a2c1 100644
--- a/src/gnunet/service/rpc.go
+++ b/src/gnunet/service/rpc.go
@@ -48,13 +48,14 @@ func StartRPC(ctx context.Context, endpoint string) (srvRPC
*rpc.Server, err err
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
}
+ // start listening
+ go func() {
+ if err := srv.ListenAndServe(); err != http.ErrServerClosed {
+ logger.Printf(logger.WARN, "[RPC] Server listen failed:
%s", err.Error())
+ }
+ }()
+ // wait for shutdown
go func() {
- // start listening
- go func() {
- if err := srv.ListenAndServe(); err != nil {
- logger.Printf(logger.WARN, "[RPC] Server listen
failed: %s", err.Error())
- }
- }()
select {
case <-ctx.Done():
if err := srv.Shutdown(context.Background()); err !=
nil {
diff --git a/src/gnunet/test/gnunet-dhtu/main.go
b/src/gnunet/test/gnunet-dhtu/main.go
index ce3651d..2a49b9c 100644
--- a/src/gnunet/test/gnunet-dhtu/main.go
+++ b/src/gnunet/test/gnunet-dhtu/main.go
@@ -20,13 +20,14 @@ package main
import (
"context"
- "encoding/hex"
"flag"
"fmt"
"gnunet/config"
"gnunet/core"
+ "gnunet/message"
"gnunet/service"
"gnunet/service/dht"
+ "gnunet/transport"
"gnunet/util"
"log"
"net/rpc"
@@ -37,41 +38,31 @@ import (
//----------------------------------------------------------------------
// Test Go node with DHTU GNUnet nodes
+//
+// N.B.: THIS TEST ONLY COVERS THE BASIC MESSAGE EXCHANGE LEVEL; NO
+// MESSAGE PROCESSING EXCEPT FOR HELLO MESSAGES WILL TAKE PLACE.
//----------------------------------------------------------------------
func main() {
// handle command-line arguments
- var (
- remoteId string
- remoteAddr string
- cfgFile string
- )
+ var remoteAddr string
+ var cfgFile string
flag.StringVar(&cfgFile, "c", "gnunet-config.json", "configuration
file")
- flag.StringVar(&remoteId, "i", "", "peer id of remote node")
flag.StringVar(&remoteAddr, "a", "", "address of remote node")
flag.Parse()
// read configuration file and set missing arguments.
if err := config.ParseConfig(cfgFile); err != nil {
- logger.Printf(logger.ERROR, "[gnunet-dhtu] Invalid
configuration file: %s\n", err.Error())
+ logger.Printf(logger.ERROR, "[node] Invalid configuration file:
%s\n", err.Error())
return
}
// convert arguments
- var (
- rId *util.PeerID
- rAddr *util.Address
- buf []byte
- err error
- )
+ var rAddr *util.Address
+ var err error
if rAddr, err = util.ParseAddress(remoteAddr); err != nil {
- log.Fatal(err)
- }
- if len(remoteId) > 0 {
- if buf, err = util.DecodeStringToBinary(remoteId, 32); err !=
nil {
- log.Fatal(err)
- }
- rId = util.NewPeerID(buf)
+ logger.Println(logger.ERROR, err.Error())
+ return
}
// setup execution context
@@ -84,7 +75,8 @@ func main() {
// create and run node
node, err := NewTestNode(ctx)
if err != nil {
- log.Fatal(err)
+ logger.Println(logger.ERROR, err.Error())
+ return
}
defer node.Shutdown()
@@ -93,13 +85,17 @@ func main() {
as := fmt.Sprintf("%s://%s:%d", ep.Network, ep.Address, ep.Port)
listen, err := util.ParseAddress(as)
if err != nil {
- log.Fatal(err)
+ logger.Println(logger.ERROR, err.Error())
+ return
}
aList := []*util.Address{listen}
- logger.Println(logger.INFO, "HELLO: "+node.HelloURL(aList))
+ logger.Println(logger.INFO, "[node] --> "+node.HelloURL(aList))
- // learn bootstrap address (triggers HELLO)
- node.Learn(ctx, rId, rAddr)
+ // send HELLO to bootstrap address
+ if err = node.SendHello(ctx, rAddr); err != nil && err !=
transport.ErrEndpMaybeSent {
+ logger.Println(logger.ERROR, "[node] failed to send HELLO:
"+err.Error())
+ return
+ }
// run forever
var ch chan struct{}
@@ -121,22 +117,15 @@ func (n *TestNode) Shutdown() {
n.core.Shutdown()
}
func (n *TestNode) HelloURL(a []*util.Address) string {
- hd, err := n.peer.HelloData(time.Hour, a)
+ hd, err := n.peer.HelloData(message.HelloAddressExpiration, a)
if err != nil {
return ""
}
return hd.URL()
}
-func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr
*util.Address) {
- label := "@"
- if peer != nil {
- label = peer.String()
- }
- log.Printf("[%d] Learning %s for %s", n.id, addr.StringAll(), label)
- if err := n.core.Learn(ctx, peer, addr); err != nil {
- log.Println("Learn: " + err.Error())
- }
+func (n *TestNode) SendHello(ctx context.Context, addr *util.Address) error {
+ return n.core.SendHello(ctx, addr)
}
func NewTestNode(ctx context.Context) (node *TestNode, err error) {
@@ -150,8 +139,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err
error) {
return
}
node.peer = node.core.Peer()
- log.Printf("[%d] Node %s starting", node.id, node.peer.GetID())
- log.Printf("[%d] --> %s", node.id,
hex.EncodeToString(node.peer.GetID().Key))
+ logger.Printf(logger.INFO, "[node] Node %s starting", node.peer.GetID())
// start a new DHT service
dht, err := dht.NewService(ctx, node.core)
@@ -162,7 +150,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err
error) {
// start JSON-RPC server on request
var rpc *rpc.Server
if rpc, err = service.StartRPC(ctx, config.Cfg.RPC.Endpoint); err !=
nil {
- logger.Printf(logger.ERROR, "[gnunet-dhtu] RPC failed to start:
%s", err.Error())
+ logger.Printf(logger.ERROR, "[node] RPC failed to start: %s",
err.Error())
return
}
dht.InitRPC(rpc)
@@ -177,7 +165,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err
error) {
if node.addr, err = util.ParseAddress(s); err != nil {
continue
}
- log.Printf("[%d] Listening on %s", node.id, s)
+ logger.Printf(logger.INFO, "[node] Listening on %s", s)
}
// register as event listener
@@ -195,22 +183,22 @@ func NewTestNode(ctx context.Context) (node *TestNode,
err error) {
case ev := <-incoming:
switch ev.ID {
case core.EV_CONNECT:
- log.Printf("[%d] <<< Peer %s
connected", node.id, ev.Peer)
+ logger.Printf(logger.INFO, "[node] <<<
Peer %s connected", ev.Peer)
case core.EV_DISCONNECT:
- log.Printf("[%d] <<< Peer %s
diconnected", node.id, ev.Peer)
+ logger.Printf(logger.INFO, "[node] <<<
Peer %s diconnected", ev.Peer)
case core.EV_MESSAGE:
- log.Printf("[%d] <<< Msg from %s of
type %d", node.id, ev.Peer, ev.Msg.Header().MsgType)
- log.Printf("[%d] <<< --> %s",
node.id, ev.Msg.String())
+ logger.Printf(logger.INFO, "[node] <<<
Msg from %s of type %d", ev.Peer, ev.Msg.Header().MsgType)
+ logger.Printf(logger.INFO, "[node] <<<
--> %s", ev.Msg.String())
}
// handle termination signal
case <-ctx.Done():
- log.Printf("[%d] Shutting down node", node.id)
+ logger.Println(logger.INFO, "[node] Shutting
down node")
return
// handle heart beat
case now := <-tick.C:
- log.Printf("[%d] Heart beat at %s", node.id,
now.String())
+ logger.Printf(logger.INFO, "[node] Heart beat
at %s", now.String())
}
}
}()
diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go
index 26e4463..d98776a 100644
--- a/src/gnunet/transport/endpoint.go
+++ b/src/gnunet/transport/endpoint.go
@@ -25,6 +25,11 @@ import (
"gnunet/message"
"gnunet/util"
"net"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/bfix/gospel/logger"
)
var (
@@ -34,6 +39,8 @@ var (
ErrEndpExists = errors.New("endpoint exists")
ErrEndpNoAddress = errors.New("no address for endpoint")
ErrEndpNoConnection = errors.New("no connection on endpoint")
+ ErrEndpMaybeSent = errors.New("message may have been sent - cant
know")
+ ErrEndpWriteShort = errors.New("write too short")
)
// Endpoint represents a local endpoint that can send and receive messages.
@@ -78,9 +85,11 @@ func NewEndpoint(addr net.Addr) (ep Endpoint, err error) {
// PacketEndpoint for packet-oriented network protocols
type PaketEndpoint struct {
id int // endpoint identifier
+ netw string // network identifier ("udp", "udp4", "udp6", ...)
addr net.Addr // endpoint address
conn net.PacketConn // packet connection
buf []byte // buffer for read/write operations
+ mtx sync.Mutex // mutex for send operations
}
// Run packet endpoint: send incoming messages to the handler.
@@ -94,9 +103,14 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan
*TransportMessage) (
// use the actual listening address
ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String())
+ // save more information to detect compatible send-to addresses
+ ep.netw = ep.conn.LocalAddr().Network()
+
// run watch dog for termination
+ active := true
go func() {
<-ctx.Done()
+ active = false
ep.conn.Close()
}()
// run go routine to handle messages from clients
@@ -105,6 +119,15 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr
chan *TransportMessage) (
// read next message
tm, err := ep.read()
if err != nil {
+ // leave go routine if already dead
+ if !active {
+ return
+ }
+ logger.Println(logger.WARN, "[pkt_ep] read
failed: "+err.Error())
+ // gracefully ignore unknown message types
+ if strings.HasPrefix(err.Error(), "unknown
message type") {
+ continue
+ }
break
}
// label message
@@ -154,19 +177,27 @@ func (ep *PaketEndpoint) read() (tm *TransportMessage,
err error) {
// Send message to address from endpoint
func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg
*TransportMessage) (err error) {
+ // only one sender at a time
+ ep.mtx.Lock()
+ defer ep.mtx.Unlock()
+
// check for valid connection
if ep.conn == nil {
return ErrEndpNoConnection
}
+
// resolve target address
var a *net.UDPAddr
- a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String())
+ if a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()),
addr.String()); err != nil {
+ return
+ }
// get message content (TransportMessage)
var buf []byte
if buf, err = msg.Bytes(); err != nil {
return
}
+
// handle extended protocol:
switch ep.addr.Network() {
case "ip+udp":
@@ -176,8 +207,18 @@ func (ep *PaketEndpoint) Send(ctx context.Context, addr
net.Addr, msg *Transport
// unknown protocol
return ErrEndpProtocolUnknown
}
- _, err = ep.conn.WriteTo(buf, a)
- return
+
+ // timeout after 1 second
+ if err = ep.conn.SetWriteDeadline(time.Now().Add(time.Second)); err !=
nil {
+ logger.Println(logger.DBG, "[pkt_ep] SetWriteDeadline failed:
"+err.Error())
+ return
+ }
+ var n int
+ n, err = ep.conn.WriteTo(buf, a)
+ if n != len(buf) {
+ err = ErrEndpWriteShort
+ }
+ return ErrEndpMaybeSent
}
// Address returms the
@@ -188,6 +229,23 @@ func (ep *PaketEndpoint) Address() net.Addr {
// CanSendTo returns true if the endpoint can sent to address
func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) {
ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network())
+ if ok {
+ // try to convert addr to compatible type
+ switch ep.netw {
+ case "udp", "udp4", "udp6":
+ var ua *net.UDPAddr
+ var err error
+ if ua, err = net.ResolveUDPAddr(ep.netw,
addr.String()); err != nil {
+ ok = false
+ }
+ logger.Printf(logger.DBG, "[pkt_ep] %s + %v -> %v
(%v)", ep.netw, addr, ua, ok)
+ default:
+ logger.Printf(logger.DBG, "[pkt_ep] unknown network
%s", ep.netw)
+ ok = false
+ }
+ } else {
+ logger.Printf(logger.DBG, "[pkt_ep] protocol mismatch %s --
%s", EpProtocol(addr.Network()), EpProtocol(ep.addr.Network()))
+ }
return
}
diff --git a/src/gnunet/transport/reader_writer.go
b/src/gnunet/transport/reader_writer.go
index db3527e..e99b3df 100644
--- a/src/gnunet/transport/reader_writer.go
+++ b/src/gnunet/transport/reader_writer.go
@@ -19,7 +19,10 @@
package transport
import (
+ "bytes"
"context"
+ "encoding/hex"
+ "encoding/json"
"errors"
"fmt"
"gnunet/message"
@@ -125,6 +128,26 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser,
buf []byte) (msg messag
return msg, nil
}
+//----------------------------------------------------------------------
+// Dump message
+func Dump(msg message.Message, format string) string {
+ switch format {
+ case "json":
+ buf, err := json.Marshal(msg)
+ if err != nil {
+ return err.Error()
+ }
+ return string(buf)
+ case "hex":
+ buf := new(bytes.Buffer)
+ if err := WriteMessageDirect(buf, msg); err != nil {
+ return err.Error()
+ }
+ return hex.EncodeToString(buf.Bytes())
+ }
+ return "unknown message dump format"
+}
+
//----------------------------------------------------------------------
// helper for wrapped ReadCloser/WriteCloser (close is nop)
//----------------------------------------------------------------------
diff --git a/src/gnunet/util/address_test.go b/src/gnunet/util/address_test.go
new file mode 100644
index 0000000..d4936e8
--- /dev/null
+++ b/src/gnunet/util/address_test.go
@@ -0,0 +1,54 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019-2022 Bernd Fix >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package util
+
+import (
+ "testing"
+)
+
+func TestAddrList(t *testing.T) {
+ // list of addresses to check
+ addrS := []string{
+ "ip+udp://127.0.0.1:10000",
+ "ip+udp://172.17.0.4:10000",
+ "ip+udp://[::ffff:172.17.0.4]:10000",
+ }
+ // convert to util.Address
+ addrA := make([]*Address, len(addrS))
+ var err error
+ for i, as := range addrS {
+ if addrA[i], err = ParseAddress(as); err != nil {
+ t.Fatal(err)
+ }
+ }
+ // allocate AddrList
+ addrL := NewPeerAddrList()
+ for _, addr := range addrA {
+ rc :=
addrL.Add("2BHV4BN8736W5W3CJNXY2S9WABWTGH35QMFG4BPCWBH7DNBCFC60", addr)
+ t.Logf("added %s (%d)", addr.URI(), rc)
+ }
+
+ // check list
+ t.Log("checking list...")
+ list :=
addrL.Get("2BHV4BN8736W5W3CJNXY2S9WABWTGH35QMFG4BPCWBH7DNBCFC60", "ip+udp")
+ t.Logf("got: %v", list)
+ if len(list) != len(addrS) {
+ t.Fatal("list size not matching")
+ }
+}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-go] branch master updated: RC2 for Milestone 1 (NGI Assure),
gnunet <=