gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-go] branch master updated: File/block caching strategy improved.


From: gnunet
Subject: [gnunet-go] branch master updated: File/block caching strategy improved.
Date: Sat, 11 Jun 2022 20:22:52 +0200

This is an automated email from the git hooks/post-receive script.

bernd-fix pushed a commit to branch master
in repository gnunet-go.

The following commit(s) were added to refs/heads/master by this push:
     new 4261e07  File/block caching strategy improved.
4261e07 is described below

commit 4261e07def81e7c3eb183b9d5c4059a2e9c53759
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Sat Jun 11 20:21:38 2022 +0200

    File/block caching strategy improved.
---
 src/gnunet/config/config.go             |  33 +++-
 src/gnunet/config/gnunet-config.json    |  25 ++-
 src/gnunet/service/dht/dhtstore_test.go |  19 ++-
 src/gnunet/service/dht/module.go        |   4 -
 src/gnunet/service/store.go             | 293 +++++++++++++++++++++++---------
 5 files changed, 266 insertions(+), 108 deletions(-)

diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index a92bdd6..b4d2840 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -93,15 +93,34 @@ type GNSConfig struct {
        MaxDepth     int            `json:"maxDepth"`     // maximum recursion 
depth in resolution
 }
 
+//----------------------------------------------------------------------
+// Generic parameter configuration (handle any key/value settings)
+//----------------------------------------------------------------------
+
+// ParameterConfig handle arbitrary values for a key strings. This necessary
+// e.g. in the 'Storage' configuration, as custom storage implementations
+// require different sets of parameters.
+type ParameterConfig map[string]any
+
+// Get a parameter value with given type 'V'
+func GetParam[V any](params ParameterConfig, key string) (i V, ok bool) {
+       var v any
+       if v, ok = params[key]; ok {
+               if i, ok = v.(V); ok {
+                       return
+               }
+       }
+       return
+}
+
 //----------------------------------------------------------------------
 // DHT configuration
 //----------------------------------------------------------------------
 
 // DHTConfig contains parameters for the distributed hash table (DHT)
 type DHTConfig struct {
-       Service *ServiceConfig `json:"service"` // socket for DHT service
-       Storage string         `json:"storage"` // filesystem storage location
-       Cache   string         `json:"cache"`   // key/value cache
+       Service *ServiceConfig  `json:"service"` // socket for DHT service
+       Storage ParameterConfig `json:"storage"` // filesystem storage location
 }
 
 //----------------------------------------------------------------------
@@ -110,8 +129,8 @@ type DHTConfig struct {
 
 // NamecacheConfig contains parameters for the local name cache
 type NamecacheConfig struct {
-       Service *ServiceConfig `json:"service"` // socket for Namecache service
-       Storage string         `json:"storage"` // key/value cache
+       Service *ServiceConfig  `json:"service"` // socket for Namecache service
+       Storage ParameterConfig `json:"storage"` // key/value cache
 }
 
 //----------------------------------------------------------------------
@@ -120,8 +139,8 @@ type NamecacheConfig struct {
 
 // RevocationConfig contains parameters for the key revocation service
 type RevocationConfig struct {
-       Service *ServiceConfig `json:"service"` // socket for Revocation service
-       Storage string         `json:"storage"` // persistance mechanism for 
revocation data
+       Service *ServiceConfig  `json:"service"` // socket for Revocation 
service
+       Storage ParameterConfig `json:"storage"` // persistance mechanism for 
revocation data
 }
 
 //----------------------------------------------------------------------
diff --git a/src/gnunet/config/gnunet-config.json 
b/src/gnunet/config/gnunet-config.json
index 7927cda..167bfa0 100644
--- a/src/gnunet/config/gnunet-config.json
+++ b/src/gnunet/config/gnunet-config.json
@@ -11,7 +11,7 @@
             {
                 "id": "test",
                 "network": "ip+udp",
-                "address": "upnp:192.168.178.1",
+                "address": "upnp:192.168.134.1",
                 "port": 6666,
                 "ttl": 86400
             }
@@ -28,8 +28,12 @@
                 "perm": "0770"
             }
         },
-        "storage": "dht_file_store+/var/lib/gnunet/dht/store",
-        "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000"
+        "storage": {
+            "mode": "file",
+            "cache": false,
+            "path": "/var/lib/gnunet/dht/store",
+            "maxGB": 10
+        }
     },
     "gns": {
         "service": {
@@ -48,7 +52,13 @@
                 "perm": "0770"
             }
         },
-        "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000"
+        "storage": {
+            "mode": "file",
+            "cache": true,
+            "path": "/var/lib/gnunet/namecache",
+            "num": 1000,
+            "expire": 43200
+        }
     },
     "revocation": {
         "service": {
@@ -57,7 +67,12 @@
                 "perm": "0770"
             }
         },
-        "storage": "redis:localhost:6397::15"
+        "storage": {
+            "mode": "redis",
+            "addr": "localhost:6397",
+            "passwd": "",
+            "id": 15
+        }
     },
     "rpc": {
         "endpoint": "tcp:127.0.0.1:80"
diff --git a/src/gnunet/service/dht/dhtstore_test.go 
b/src/gnunet/service/dht/dhtstore_test.go
index 3cb8080..d9fc1d0 100644
--- a/src/gnunet/service/dht/dhtstore_test.go
+++ b/src/gnunet/service/dht/dhtstore_test.go
@@ -20,9 +20,10 @@ package dht
 
 import (
        "encoding/hex"
+       "gnunet/config"
        "gnunet/crypto"
        "gnunet/service"
-       blocks "gnunet/service/dht/blocks"
+       "gnunet/service/dht/blocks"
        "math/rand"
        "testing"
 )
@@ -37,8 +38,15 @@ const (
 // each block from storage and checks for matching hash.
 func TestDHTFilesStore(t *testing.T) {
 
+       // test configuration
+       cfg := make(config.ParameterConfig)
+       cfg["mode"] = "file"
+       cfg["cache"] = false
+       cfg["path"] = "/var/lib/gnunet/dht/store"
+       cfg["maxGB"] = 10
+
        // create file store
-       fs, err := service.NewFileCache("/var/lib/gnunet/dht/cache", "100")
+       fs, err := service.NewFileStore(cfg)
        if err != nil {
                t.Fatal(err)
        }
@@ -48,33 +56,30 @@ func TestDHTFilesStore(t *testing.T) {
        // First round: save blocks
        for i := 0; i < fsNumBlocks; i++ {
                // generate random block
-               size := 20 // 1024 + rand.Intn(62000)
+               size := 1024 + rand.Intn(62000)
                buf := make([]byte, size)
                rand.Read(buf)
                val := blocks.NewGenericBlock(buf)
                // generate associated key
                k := crypto.Hash(buf).Bits
                key := blocks.NewGenericQuery(k)
-               t.Logf("> %d: %s -- %s", i, hex.EncodeToString(k), 
hex.EncodeToString(buf))
 
                // store block
                if err := fs.Put(key, val); err != nil {
                        t.Fatal(err)
                }
-
                // remember key
                keys = append(keys, key)
        }
 
        // Second round: retrieve blocks and check
-       for i, key := range keys {
+       for _, key := range keys {
                // get block
                val, err := fs.Get(key)
                if err != nil {
                        t.Fatal(err)
                }
                buf := val.Data()
-               t.Logf("< %d: %s -- %s", i, hex.EncodeToString(key.Key().Bits), 
hex.EncodeToString(buf))
 
                // re-create key
                k := crypto.Hash(buf)
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 34bf020..612d0ec 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -57,10 +57,6 @@ func NewModule(ctx context.Context, c *core.Core) (m 
*Module, err error) {
        if store, err = service.NewDHTStore(config.Cfg.DHT.Storage); err != nil 
{
                return
        }
-       // create cache handler
-       if cache, err = service.NewDHTStore(config.Cfg.DHT.Cache); err != nil {
-               return
-       }
        // create routing table
        rt := NewRoutingTable(NewPeerAddress(c.PeerID()))
 
diff --git a/src/gnunet/service/store.go b/src/gnunet/service/store.go
index c599c54..5de5415 100644
--- a/src/gnunet/service/store.go
+++ b/src/gnunet/service/store.go
@@ -22,17 +22,21 @@ import (
        "context"
        "database/sql"
        "encoding/binary"
+       "encoding/gob"
        "encoding/hex"
        "errors"
        "fmt"
+       "gnunet/config"
        "gnunet/crypto"
        "gnunet/service/dht/blocks"
        "gnunet/util"
+       "io"
        "io/ioutil"
        "os"
-       "strconv"
-       "strings"
+       "sort"
+       "sync"
 
+       "github.com/bfix/gospel/logger"
        redis "github.com/go-redis/redis/v8"
 )
 
@@ -46,22 +50,20 @@ var (
 //------------------------------------------------------------
 // Generic storage interface. Can be used for persistent or
 // transient (caching) storage of key/value data.
-// One set of methods (Get/Put) work on DHT queries and blocks,
-// the other set (GetS, PutS) work on key/value strings.
-// Each custom implementation can decide which sets to support.
 //------------------------------------------------------------
 
 // Store is a key/value storage where the type of the key is either
 // a SHA512 hash value or a string and the value is either a DHT
-// block or a string.
+// block or a string. It is possiblle to mix any key/value types,
+// but not used in this implementation.
 type Store[K, V any] interface {
-       // Put block into storage under given key
+       // Put value into storage under given key
        Put(key K, val V) error
 
-       // Get block with given key from storage
+       // Get value with given key from storage
        Get(key K) (V, error)
 
-       // List all store queries
+       // List all store keys
        List() ([]K, error)
 }
 
@@ -78,22 +80,18 @@ type KVStore Store[string, string]
 //------------------------------------------------------------
 // NewDHTStore creates a new storage handler with given spec
 // for use with DHT queries and blocks
-func NewDHTStore(spec string) (DHTStore, error) {
-       specs := strings.Split(spec, ":")
-       if len(specs) < 2 {
+func NewDHTStore(spec config.ParameterConfig) (DHTStore, error) {
+       // get the mode parameter
+       mode, ok := config.GetParam[string](spec, "mode")
+       if !ok {
                return nil, ErrStoreInvalidSpec
        }
-       switch specs[0] {
+       switch mode {
        //------------------------------------------------------------------
        // File-base storage
        //------------------------------------------------------------------
-       case "file_store":
-               return NewFileStore(specs[1])
-       case "file_cache":
-               if len(specs) < 3 {
-                       return nil, ErrStoreInvalidSpec
-               }
-               return NewFileCache(specs[1], specs[2])
+       case "file":
+               return NewFileStore(spec)
        }
        return nil, ErrStoreUnknown
 }
@@ -101,29 +99,24 @@ func NewDHTStore(spec string) (DHTStore, error) {
 //------------------------------------------------------------
 // NewKVStore creates a new storage handler with given spec
 // for use with key/value string pairs.
-func NewKVStore(spec string) (KVStore, error) {
-       specs := strings.SplitN(spec, ":", 2)
-       if len(specs) < 2 {
+func NewKVStore(spec config.ParameterConfig) (KVStore, error) {
+       // get the mode parameter
+       mode, ok := config.GetParam[string](spec, "mode")
+       if !ok {
                return nil, ErrStoreInvalidSpec
        }
-       switch specs[0] {
+       switch mode {
        //--------------------------------------------------------------
        // Redis service
        //--------------------------------------------------------------
        case "redis":
-               if len(specs) < 4 {
-                       return nil, ErrStoreInvalidSpec
-               }
-               return NewRedisStore(specs[1], specs[2], specs[3])
+               return NewRedisStore(spec)
 
        //--------------------------------------------------------------
        // SQL database service
        //--------------------------------------------------------------
        case "sql":
-               if len(specs) < 4 {
-                       return nil, ErrStoreInvalidSpec
-               }
-               return NewSQLStore(specs[1])
+               return NewSQLStore(spec)
        }
        return nil, errors.New("unknown storage mechanism")
 }
@@ -132,63 +125,121 @@ func NewKVStore(spec string) (KVStore, error) {
 // Filesystem-based storage
 //------------------------------------------------------------
 
+// FileHeader is the layout of a file managed by the storage handler.
+// On start-up the file store recreates the list of file entries from
+// traversing the actual filesystem. This is done in the background.
+type FileHeader struct {
+       key       string            // storage key
+       size      uint64            // size of file
+       btype     uint16            // block type
+       stored    util.AbsoluteTime // time added to store
+       expires   util.AbsoluteTime // expiration time
+       lastUsed  util.AbsoluteTime // time last used
+       usedCount uint64            // usage count
+}
+
 // FileStore implements a filesystem-based storage mechanism for
 // DHT queries and blocks.
 type FileStore struct {
-       path   string             // storage path
-       cached []*crypto.HashCode // list of cached entries (key)
-       wrPos  int                // write position in cyclic list
+       path  string                 // storage path
+       cache bool                   // storage works as cache
+       args  config.ParameterConfig // arguments / settings
+
+       totalSize uint64                 // total storage size (logical, not 
physical)
+       files     map[string]*FileHeader // list of file headers
+       wrPos     int                    // write position in cyclic list
+       mtx       sync.Mutex             // serialize operations (prune)
 }
 
 // NewFileStore instantiates a new file storage.
-func NewFileStore(path string) (DHTStore, error) {
-       // create file store
-       return &FileStore{
-               path: path,
-       }, nil
-}
-
-// NewFileCache instantiates a new file-based cache.
-func NewFileCache(path, param string) (DHTStore, error) {
+func NewFileStore(spec config.ParameterConfig) (DHTStore, error) {
+       // get path parameter
+       path, ok := config.GetParam[string](spec, "path")
+       if !ok {
+               return nil, ErrStoreInvalidSpec
+       }
+       isCache, ok := config.GetParam[bool](spec, "cache")
+       if !ok {
+               isCache = false
+       }
        // remove old cache content
-       os.RemoveAll(path)
+       if isCache {
+               os.RemoveAll(path)
+       }
+       // create file store handler
+       fs := &FileStore{
+               path:  path,
+               args:  spec,
+               cache: isCache,
+               files: make(map[string]*FileHeader),
+       }
+       // load file header list
+       if !isCache {
+               if fp, err := os.Open(path + "/files.db"); err == nil {
+                       dec := gob.NewDecoder(fp)
+                       for {
+                               hdr := new(FileHeader)
+                               if dec.Decode(hdr) != nil {
+                                       if err != io.EOF {
+                                               return nil, err
+                                       }
+                                       break
+                               }
+                               fs.files[hdr.key] = hdr
+                               fs.totalSize += hdr.size
+                       }
+                       fp.Close()
+               }
+       }
+       return fs, nil
+}
 
-       // get number of cache entries
-       num, err := strconv.ParseUint(param, 10, 32)
-       if err != nil {
-               return nil, err
+// Close file storage. write metadata to file
+func (s *FileStore) Close() (err error) {
+       if !s.cache {
+               if fp, err := os.Create(s.path + "/files.db"); err == nil {
+                       defer fp.Close()
+                       enc := gob.NewEncoder(fp)
+                       for _, hdr := range s.files {
+                               if err = enc.Encode(hdr); err != nil {
+                                       break
+                               }
+                       }
+               }
        }
-       // create file store
-       return &FileStore{
-               path:   path,
-               cached: make([]*crypto.HashCode, num),
-               wrPos:  0,
-       }, nil
+       return
 }
 
 // Put block into storage under given key
 func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) {
+       // check for free space
+       if s.cache {
+               // caching is limited by explicit number of files
+               num, ok := config.GetParam[int](s.args, "num")
+               if !ok {
+                       num = 100
+               }
+               if len(s.files) >= num {
+                       // make space for at least one new entry
+                       s.prune(1)
+               }
+       } else {
+               // normal storage is limited by quota (default: 10GB)
+               max, ok := config.GetParam[int](s.args, "maxGB")
+               if !ok {
+                       max = 10
+               }
+               if int(s.totalSize>>30) > max {
+                       // drop a significant number of blocks
+                       s.prune(20)
+               }
+       }
        // get query parameters for entry
-       var (
-               btype  uint16            // block type
-               expire util.AbsoluteTime // block expiration
-       )
+       var btype uint16 // block type
        query.Get("blkType", &btype)
+       var expire util.AbsoluteTime // block expiration
        query.Get("expire", &expire)
 
-       // are we in caching mode?
-       if s.cached != nil {
-               // release previous block if defined
-               if key := s.cached[s.wrPos]; key != nil {
-                       // get path and filename from key
-                       path, fname := s.expandPath(key)
-                       if err = os.Remove(path + "/" + fname); err != nil {
-                               return
-                       }
-                       // free slot
-                       s.cached[s.wrPos] = nil
-               }
-       }
        // get path and filename from key
        path, fname := s.expandPath(query.Key())
        // make sure the path exists
@@ -197,6 +248,7 @@ func (s *FileStore) Put(query blocks.Query, block 
blocks.Block) (err error) {
        }
        // write to file for storage
        var fp *os.File
+       var fpSize int
        if fp, err = os.Create(path + "/" + fname); err == nil {
                defer fp.Close()
                // write block data
@@ -206,11 +258,18 @@ func (s *FileStore) Put(query blocks.Query, block 
blocks.Block) (err error) {
                        }
                }
        }
-       // update cache list
-       if s.cached != nil {
-               s.cached[s.wrPos] = query.Key()
-               s.wrPos = (s.wrPos + 1) % len(s.cached)
+       // add header to internal list
+       now := util.AbsoluteTimeNow()
+       hdr := &FileHeader{
+               key:       hex.EncodeToString(query.Key().Bits),
+               size:      uint64(fpSize),
+               btype:     btype,
+               expires:   expire,
+               stored:    now,
+               lastUsed:  now,
+               usedCount: 1,
        }
+       s.files[hdr.key] = hdr
        return
 }
 
@@ -258,6 +317,53 @@ func (s *FileStore) expandPath(key *crypto.HashCode) 
(string, string) {
        return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:]
 }
 
+// Prune list of file headers so we drop at least n entries.
+// returns number of removed entries.
+func (s *FileStore) prune(n int) (del int) {
+       // get list of headers; remove expired entries on the fly
+       list := make([]*FileHeader, 0)
+       for key, hdr := range s.files {
+               // remove expired entry
+               if hdr.expires.Expired() {
+                       s.dropFile(key)
+                       del++
+               }
+               // append to list
+               list = append(list, hdr)
+       }
+       // check if we are already done.
+       if del >= n {
+               return
+       }
+       // sort list by decending rate "(lifetime * size) / usedCount"
+       sort.Slice(list, func(i, j int) bool {
+               ri := (list[i].stored.Elapsed().Val * list[i].size) / 
list[i].usedCount
+               rj := (list[j].stored.Elapsed().Val * list[j].size) / 
list[j].usedCount
+               return ri > rj
+       })
+       // remove from start of list until prune limit is reached
+       for _, hdr := range list {
+               s.dropFile(hdr.key)
+               del++
+               if del == n {
+                       break
+               }
+       }
+       return
+}
+
+// drop file removes a file from the internal list and the physical storage.
+func (s *FileStore) dropFile(key string) {
+       // remove for internal list
+       delete(s.files, key)
+       // remove from filesystem
+       path := fmt.Sprintf("%s/%s/%s/%s", s.path, key[:2], key[2:4], key[4:])
+       if err := os.Remove(path); err != nil {
+               logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", 
path, err.Error())
+               return
+       }
+}
+
 //------------------------------------------------------------
 // Redis: only use for caching purposes on key/value strings
 //------------------------------------------------------------
@@ -269,16 +375,28 @@ type RedisStore struct {
 }
 
 // NewRedisStore creates a Redis service client instance.
-func NewRedisStore(addr, passwd, db string) (s KVStore, err error) {
-       kvs := new(RedisStore)
-       if kvs.db, err = strconv.Atoi(db); err != nil {
-               err = ErrStoreInvalidSpec
-               return
+func NewRedisStore(spec config.ParameterConfig) (s KVStore, err error) {
+       // get connection parameters
+       addr, ok := config.GetParam[string](spec, "addr")
+       if !ok {
+               return nil, ErrStoreInvalidSpec
+       }
+       passwd, ok := config.GetParam[string](spec, "passwd")
+       if !ok {
+               passwd = ""
        }
+       db, ok := config.GetParam[int](spec, "db")
+       if !ok {
+               return nil, ErrStoreInvalidSpec
+       }
+
+       // create new Redis store
+       kvs := new(RedisStore)
+       kvs.db = db
        kvs.client = redis.NewClient(&redis.Options{
                Addr:     addr,
                Password: passwd,
-               DB:       kvs.db,
+               DB:       db,
        })
        if kvs.client == nil {
                err = ErrStoreNotAvailable
@@ -328,11 +446,17 @@ type SQLStore struct {
 }
 
 // NewSQLStore creates a new SQL-based key/value store.
-func NewSQLStore(spec string) (s KVStore, err error) {
+func NewSQLStore(spec config.ParameterConfig) (s KVStore, err error) {
+       // get connection parameters
+       connect, ok := config.GetParam[string](spec, "connect")
+       if !ok {
+               return nil, ErrStoreInvalidSpec
+       }
+       // create SQL store
        kvs := new(SQLStore)
 
        // connect to SQL database
-       kvs.db, err = util.DbPool.Connect(spec)
+       kvs.db, err = util.DbPool.Connect(connect)
        if err != nil {
                return nil, err
        }
@@ -343,7 +467,6 @@ func NewSQLStore(spec string) (s KVStore, err error) {
                return nil, ErrStoreNotAvailable
        }
        return kvs, nil
-
 }
 
 // Put a key/value pair into the store

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]