[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-go] branch master updated: Heartbeat handling in modules added.
From: |
gnunet |
Subject: |
[gnunet-go] branch master updated: Heartbeat handling in modules added. |
Date: |
Sat, 04 Jun 2022 11:33:35 +0200 |
This is an automated email from the git hooks/post-receive script.
bernd-fix pushed a commit to branch master
in repository gnunet-go.
The following commit(s) were added to refs/heads/master by this push:
new de57742 Heartbeat handling in modules added.
de57742 is described below
commit de577428a69a0002c3194afdf0562cf5a4dc1bdc
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Sat Jun 4 11:32:09 2022 +0200
Heartbeat handling in modules added.
---
src/gnunet/cmd/peer_mockup/main.go | 2 +-
src/gnunet/cmd/revoke-zonekey/main.go | 2 +-
src/gnunet/go.mod | 4 +-
src/gnunet/go.sum | 4 +-
src/gnunet/service/dht/module.go | 22 ++++-
src/gnunet/service/dht/routingtable.go | 120 ++++++++++++++++++++--------
src/gnunet/service/dht/routingtable_test.go | 3 +-
src/gnunet/service/gns/module.go | 2 +-
src/gnunet/service/module.go | 23 +++++-
src/gnunet/service/revocation/module.go | 2 +-
src/gnunet/util/time.go | 29 ++++++-
11 files changed, 162 insertions(+), 51 deletions(-)
diff --git a/src/gnunet/cmd/peer_mockup/main.go
b/src/gnunet/cmd/peer_mockup/main.go
index 4288fb1..58f4baf 100644
--- a/src/gnunet/cmd/peer_mockup/main.go
+++ b/src/gnunet/cmd/peer_mockup/main.go
@@ -71,7 +71,7 @@ func main() {
// handle messages coming from network
module := service.NewModuleImpl()
- listener := module.Run(ctx, process, nil)
+ listener := module.Run(ctx, process, nil, 0, nil)
c.Register("mockup", listener)
if !asServer {
diff --git a/src/gnunet/cmd/revoke-zonekey/main.go
b/src/gnunet/cmd/revoke-zonekey/main.go
index 298a7e4..aab9602 100644
--- a/src/gnunet/cmd/revoke-zonekey/main.go
+++ b/src/gnunet/cmd/revoke-zonekey/main.go
@@ -299,7 +299,7 @@ func main() {
}
}
// update elapsed time
- rd.T.Add(util.AbsoluteTimeNow().Diff(startTime))
+ rd.T.Add(startTime.Elapsed())
rd.Last = last
log.Println("Writing revocation data to file...")
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index 7383eaf..ad203ca 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -3,7 +3,7 @@ module gnunet
go 1.18
require (
- github.com/bfix/gospel v1.2.11
+ github.com/bfix/gospel v1.2.14
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.6.0
github.com/gorilla/mux v1.8.0
@@ -21,5 +21,3 @@ require (
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)
-
-replace github.com/bfix/gospel v1.2.11 => /vault/prj/libs/Go/Gospel
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index ea3c328..f2baf8e 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -1,5 +1,5 @@
-github.com/bfix/gospel v1.2.11 h1:z/c6MFNq/lz4mO8+PK60a3NvH+lbTKAlLCShuFFZUvg=
-github.com/bfix/gospel v1.2.11/go.mod
h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
+github.com/bfix/gospel v1.2.14 h1:lIdagJvkebG+uYbVdfK6XbT1udnq/ezd/Gi54EaMtV0=
+github.com/bfix/gospel v1.2.14/go.mod
h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
github.com/cespare/xxhash/v2 v2.1.2
h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index d369f3f..5f04d54 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -26,6 +26,7 @@ import (
"gnunet/service"
"gnunet/service/dht/blocks"
"net/http"
+ "time"
)
//======================================================================
@@ -72,7 +73,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module)
{
rtable: rt,
}
// register as listener for core events
- listener := m.Run(ctx, m.event, m.Filter())
+ listener := m.Run(ctx, m.event, m.Filter(), 15*time.Minute, m.heartbeat)
c.Register("dht", listener)
return
@@ -119,9 +120,24 @@ func (m *Module) Filter() *core.EventFilter {
return f
}
-// Event handler
-func (nc *Module) event(ctx context.Context, ev *core.Event) {
+// Event handler for infrastructure signals
+func (m *Module) event(ctx context.Context, ev *core.Event) {
+ switch ev.ID {
+ // New peer connected:
+ case core.EV_CONNECT:
+ // Add peer to routing table
+ }
+
+}
+
+// Heartbeat handler for periodic tasks
+func (m *Module) heartbeat(ctx context.Context) {
+ // update the estimated network size
+ m.rtable.l2nse = m.core.L2NSE()
+
+ // run heartbeat for routing table
+ m.rtable.heartbeat(ctx)
}
//----------------------------------------------------------------------
diff --git a/src/gnunet/service/dht/routingtable.go
b/src/gnunet/service/dht/routingtable.go
index 895a1b2..0078b71 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -20,12 +20,15 @@ package dht
import (
"bytes"
+ "context"
"crypto/sha512"
"encoding/hex"
"gnunet/util"
"math/rand"
"sync"
+ "time"
+ "github.com/bfix/gospel/logger"
"github.com/bfix/gospel/math"
)
@@ -48,7 +51,10 @@ const (
// PeerAddress is the identifier for a peer in the DHT network.
// It is the SHA-512 hash of the PeerID (public Ed25519 key).
type PeerAddress struct {
- addr [sizeAddr]byte
+ addr [sizeAddr]byte // hash value as bytes
+ connected bool // is peer connected?
+ lastSeen util.AbsoluteTime // time the peer was last seen
+ lastUsed util.AbsoluteTime // time the peer was last used
}
// NewPeerAddress returns the DHT address of a peer.
@@ -57,6 +63,8 @@ func NewPeerAddress(peer *util.PeerID) *PeerAddress {
h := rtHash()
h.Write(peer.Key)
copy(r.addr[:], h.Sum(nil))
+ r.lastSeen = util.AbsoluteTimeNow()
+ r.lastUsed = util.AbsoluteTimeNow()
return r
}
@@ -90,36 +98,51 @@ func (addr *PeerAddress) Distance(p *PeerAddress)
(*math.Int, int) {
// distance to the reference address, so smaller index means
// "nearer" to the reference address.
type RoutingTable struct {
- ref *PeerAddress // reference address for distance
- buckets []*Bucket // list of buckets
- list map[*PeerAddress]bool // keep list of peers
- rwlock sync.RWMutex // lock for write operations
- l2nse float64 // log2 of estimated network size
+ ref *PeerAddress // reference address for distance
+ buckets []*Bucket // list of buckets
+ list map[*PeerAddress]struct{} // keep list of peers
+ rwlock sync.RWMutex // lock for write operations
+ l2nse float64 // log2 of estimated network size
+ inProcess bool // flag if Process() is running
}
// NewRoutingTable creates a new routing table for the reference address.
func NewRoutingTable(ref *PeerAddress) *RoutingTable {
- rt := new(RoutingTable)
- rt.ref = ref
- rt.list = make(map[*PeerAddress]bool)
- rt.buckets = make([]*Bucket, numBuckets)
+ // create routing table
+ rt := &RoutingTable{
+ ref: ref,
+ list: make(map[*PeerAddress]struct{}),
+ buckets: make([]*Bucket, numBuckets),
+ l2nse: 0.,
+ inProcess: false,
+ }
+ // fill buckets
for i := range rt.buckets {
rt.buckets[i] = NewBucket(numK)
}
return rt
}
+//----------------------------------------------------------------------
+// Peer management
+//----------------------------------------------------------------------
+
// Add new peer address to routing table.
// Returns true if the entry was added, false otherwise.
-func (rt *RoutingTable) Add(p *PeerAddress, connected bool) bool {
+func (rt *RoutingTable) Add(p *PeerAddress) bool {
// ensure one write and no readers
rt.rwlock.Lock()
defer rt.rwlock.Unlock()
+ // check if peer is already known
+ if _, ok := rt.list[p]; ok {
+ return false
+ }
+
// compute distance (bucket index) and insert address.
_, idx := p.Distance(rt.ref)
- if rt.buckets[idx].Add(p, connected) {
- rt.list[p] = true
+ if rt.buckets[idx].Add(p) {
+ rt.list[p] = struct{}{}
return true
}
// Full bucket: we did not add the address to the routing table.
@@ -139,11 +162,23 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool {
delete(rt.list, p)
return true
}
+ // remove from internal list
+ delete(rt.list, p)
return false
}
//----------------------------------------------------------------------
-// routing functions
+
+// Process a function f in the locked context of a routing table
+func (rt *RoutingTable) Process(f func() error) error {
+ // ensure one write and no readers
+ rt.rwlock.Lock()
+ defer rt.rwlock.Unlock()
+ return f()
+}
+
+//----------------------------------------------------------------------
+// Routing functions
//----------------------------------------------------------------------
// SelectClosestPeer for a given peer address and bloomfilter.
@@ -160,6 +195,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress,
bf *PeerBloomFilter) (
n = k
}
}
+ // mark peer as used
+ n.lastUsed = util.AbsoluteTimeNow()
return
}
@@ -175,6 +212,8 @@ func (rt *RoutingTable) SelectRandomPeer(bf
*PeerBloomFilter) *PeerAddress {
idx := rand.Intn(size)
for k := range rt.list {
if idx == 0 {
+ // mark peer as used
+ k.lastUsed = util.AbsoluteTimeNow()
return k
}
idx--
@@ -221,33 +260,50 @@ func (rt *RoutingTable) ComputeOutDegree(repl, hop int)
int {
return 1 + int(rm1/(rt.l2nse+rm1*hf))
}
+//----------------------------------------------------------------------
+
+// Heartbeat handler for periodic tasks
+func (rt *RoutingTable) heartbeat(ctx context.Context) {
+
+ // check for dead or expired peers
+ timeout := util.NewRelativeTime(3 * time.Hour)
+ if err := rt.Process(func() error {
+ for addr := range rt.list {
+ if addr.connected {
+ continue
+ }
+ // check if we can/need to drop a peer
+ drop := timeout.Compare(addr.lastSeen.Elapsed()) < 0
+ if drop || timeout.Compare(addr.lastUsed.Elapsed()) < 0
{
+ rt.Remove(addr)
+ }
+ }
+ return nil
+ }); err != nil {
+ logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error())
+ }
+}
+
//======================================================================
// Routing table buckets
//======================================================================
-// PeerEntry in a k-Bucket: use routing specific attributes
-// for book-keeping
-type PeerEntry struct {
- addr *PeerAddress // peer address
- connected bool // is peer connected?
-}
-
// Bucket holds peer entries with approx. same distance from node
type Bucket struct {
- list []*PeerEntry
+ list []*PeerAddress
rwlock sync.RWMutex
}
// NewBucket creates a new entry list of given size
func NewBucket(n int) *Bucket {
return &Bucket{
- list: make([]*PeerEntry, 0, n),
+ list: make([]*PeerAddress, 0, n),
}
}
// Add peer address to the bucket if there is free space.
// Returns true if entry is added, false otherwise.
-func (b *Bucket) Add(p *PeerAddress, connected bool) bool {
+func (b *Bucket) Add(p *PeerAddress) bool {
// only one writer and no readers
b.rwlock.Lock()
defer b.rwlock.Unlock()
@@ -255,11 +311,7 @@ func (b *Bucket) Add(p *PeerAddress, connected bool) bool {
// check for free space in bucket
if len(b.list) < numK {
// append entry at the end
- pe := &PeerEntry{
- addr: p,
- connected: connected,
- }
- b.list = append(b.list, pe)
+ b.list = append(b.list, p)
return true
}
return false
@@ -273,7 +325,7 @@ func (b *Bucket) Remove(p *PeerAddress) bool {
defer b.rwlock.Unlock()
for i, pe := range b.list {
- if pe.addr.Equals(p) {
+ if pe.Equals(p) {
// found entry: remove it
b.list = append(b.list[:i], b.list[i+1:]...)
return true
@@ -289,16 +341,16 @@ func (b *Bucket) SelectClosestPeer(p *PeerAddress, bf
*PeerBloomFilter) (n *Peer
b.rwlock.RLock()
defer b.rwlock.RUnlock()
- for _, pe := range b.list {
+ for _, addr := range b.list {
// skip addresses in bloomfilter
- if bf.Contains(pe.addr) {
+ if bf.Contains(addr) {
continue
}
// check for shorter distance
- if d, _ := p.Distance(pe.addr); n == nil || d.Cmp(dist) < 0 {
+ if d, _ := p.Distance(addr); n == nil || d.Cmp(dist) < 0 {
// remember best match
dist = d
- n = pe.addr
+ n = addr
}
}
return
diff --git a/src/gnunet/service/dht/routingtable_test.go
b/src/gnunet/service/dht/routingtable_test.go
index 2579356..659f9d4 100644
--- a/src/gnunet/service/dht/routingtable_test.go
+++ b/src/gnunet/service/dht/routingtable_test.go
@@ -87,7 +87,8 @@ func TestRT(t *testing.T) {
// actions:
connected := func(task *Entry, e int64, msg string) {
- rt.Add(task.addr, true)
+ task.addr.connected = true
+ rt.Add(task.addr)
task.online = true
task.last = e
t.Logf("[%6d] %s %s\n", e, task.addr, msg)
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index 31ad6c7..4878aa0 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -103,7 +103,7 @@ func NewModule(ctx context.Context, c *core.Core) (m
*Module) {
ModuleImpl: *service.NewModuleImpl(),
}
// register as listener for core events
- listener := m.Run(ctx, m.event, m.Filter())
+ listener := m.Run(ctx, m.event, m.Filter(), 0, nil)
c.Register("gns", listener)
return
diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go
index 98307d6..5f95975 100644
--- a/src/gnunet/service/module.go
+++ b/src/gnunet/service/module.go
@@ -22,6 +22,7 @@ import (
"context"
"gnunet/core"
"net/http"
+ "time"
)
// Module is an interface for GNUnet service modules (workers).
@@ -36,6 +37,9 @@ type Module interface {
// EventHandler is a function prototype for event handling
type EventHandler func(context.Context, *core.Event)
+// Heartbeat is a function prototype for periodic tasks
+type Heartbeat func(context.Context)
+
// ModuleImpl is an event-handling type used by Module implementations.
type ModuleImpl struct {
ch chan *core.Event // channel for core events.
@@ -49,9 +53,19 @@ func NewModuleImpl() (m *ModuleImpl) {
}
// Run event handling loop
-func (m *ModuleImpl) Run(ctx context.Context, hdlr EventHandler, filter
*core.EventFilter) (listener *core.Listener) {
+func (m *ModuleImpl) Run(
+ ctx context.Context,
+ hdlr EventHandler, filter *core.EventFilter,
+ pulse time.Duration, heartbeat Heartbeat,
+) (listener *core.Listener) {
// listener for registration
listener = core.NewListener(m.ch, filter)
+
+ // if no heartbeat handler is defined, set pulse to near flatline.
+ if heartbeat == nil {
+ pulse = 365 * 24 * time.Hour // once a year
+ }
+ tick := time.Tick(pulse)
// run event loop
go func() {
for {
@@ -63,6 +77,13 @@ func (m *ModuleImpl) Run(ctx context.Context, hdlr
EventHandler, filter *core.Ev
// wait for terminate signal
case <-ctx.Done():
return
+
+ // handle heartbeat
+ case <-tick:
+ // check for defined heartbeat handler
+ if heartbeat != nil {
+ heartbeat(ctx)
+ }
}
}
}()
diff --git a/src/gnunet/service/revocation/module.go
b/src/gnunet/service/revocation/module.go
index 37b57ab..1f0ab48 100644
--- a/src/gnunet/service/revocation/module.go
+++ b/src/gnunet/service/revocation/module.go
@@ -74,7 +74,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module)
{
return nil
}
// register as listener for core events
- listener := m.Run(ctx, m.event, m.Filter())
+ listener := m.Run(ctx, m.event, m.Filter(), 0, nil)
c.Register("gns", listener)
return m
}
diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go
index 70b91e1..53589b8 100644
--- a/src/gnunet/util/time.go
+++ b/src/gnunet/util/time.go
@@ -81,16 +81,28 @@ func (t AbsoluteTime) Add(d time.Duration) AbsoluteTime {
}
}
+// Elapsed time since 't'. Return 0 if 't' is in the future.
+func (t AbsoluteTime) Elapsed() RelativeTime {
+ dt, elapsed := t.Diff(AbsoluteTimeNow())
+ if !elapsed {
+ dt = NewRelativeTime(0)
+ }
+ return dt
+}
+
// Diff returns the relative time between two absolute times;
-// the ordering of the absolute times doesn't matter.
-func (t AbsoluteTime) Diff(t2 AbsoluteTime) RelativeTime {
+// returns true if t2 is after t1.
+func (t AbsoluteTime) Diff(t2 AbsoluteTime) (dt RelativeTime, elapsed bool) {
var d uint64
if t.Compare(t2) == 1 {
d = t.Val - t2.Val
+ elapsed = false
} else {
d = t2.Val - t.Val
+ elapsed = true
}
- return RelativeTime{d}
+ dt = RelativeTime{d}
+ return
}
// Expired returns true if the timestamp is in the past.
@@ -150,3 +162,14 @@ func (t RelativeTime) String() string {
func (t RelativeTime) Add(t2 RelativeTime) {
t.Val += t2.Val
}
+
+// Compare two durations
+func (t RelativeTime) Compare(t2 RelativeTime) int {
+ switch {
+ case t.Val < t2.Val:
+ return -1
+ case t.Val > t2.Val:
+ return 1
+ }
+ return 0
+}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-go] branch master updated: Heartbeat handling in modules added.,
gnunet <=