[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-go] branch master updated: File/block caching strategy improved.
From: |
gnunet |
Subject: |
[gnunet-go] branch master updated: File/block caching strategy improved. |
Date: |
Sat, 11 Jun 2022 20:22:52 +0200 |
This is an automated email from the git hooks/post-receive script.
bernd-fix pushed a commit to branch master
in repository gnunet-go.
The following commit(s) were added to refs/heads/master by this push:
new 4261e07 File/block caching strategy improved.
4261e07 is described below
commit 4261e07def81e7c3eb183b9d5c4059a2e9c53759
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Sat Jun 11 20:21:38 2022 +0200
File/block caching strategy improved.
---
src/gnunet/config/config.go | 33 +++-
src/gnunet/config/gnunet-config.json | 25 ++-
src/gnunet/service/dht/dhtstore_test.go | 19 ++-
src/gnunet/service/dht/module.go | 4 -
src/gnunet/service/store.go | 293 +++++++++++++++++++++++---------
5 files changed, 266 insertions(+), 108 deletions(-)
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index a92bdd6..b4d2840 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -93,15 +93,34 @@ type GNSConfig struct {
MaxDepth int `json:"maxDepth"` // maximum recursion
depth in resolution
}
+//----------------------------------------------------------------------
+// Generic parameter configuration (handle any key/value settings)
+//----------------------------------------------------------------------
+
+// ParameterConfig handle arbitrary values for a key strings. This necessary
+// e.g. in the 'Storage' configuration, as custom storage implementations
+// require different sets of parameters.
+type ParameterConfig map[string]any
+
+// Get a parameter value with given type 'V'
+func GetParam[V any](params ParameterConfig, key string) (i V, ok bool) {
+ var v any
+ if v, ok = params[key]; ok {
+ if i, ok = v.(V); ok {
+ return
+ }
+ }
+ return
+}
+
//----------------------------------------------------------------------
// DHT configuration
//----------------------------------------------------------------------
// DHTConfig contains parameters for the distributed hash table (DHT)
type DHTConfig struct {
- Service *ServiceConfig `json:"service"` // socket for DHT service
- Storage string `json:"storage"` // filesystem storage location
- Cache string `json:"cache"` // key/value cache
+ Service *ServiceConfig `json:"service"` // socket for DHT service
+ Storage ParameterConfig `json:"storage"` // filesystem storage location
}
//----------------------------------------------------------------------
@@ -110,8 +129,8 @@ type DHTConfig struct {
// NamecacheConfig contains parameters for the local name cache
type NamecacheConfig struct {
- Service *ServiceConfig `json:"service"` // socket for Namecache service
- Storage string `json:"storage"` // key/value cache
+ Service *ServiceConfig `json:"service"` // socket for Namecache service
+ Storage ParameterConfig `json:"storage"` // key/value cache
}
//----------------------------------------------------------------------
@@ -120,8 +139,8 @@ type NamecacheConfig struct {
// RevocationConfig contains parameters for the key revocation service
type RevocationConfig struct {
- Service *ServiceConfig `json:"service"` // socket for Revocation service
- Storage string `json:"storage"` // persistance mechanism for
revocation data
+ Service *ServiceConfig `json:"service"` // socket for Revocation
service
+ Storage ParameterConfig `json:"storage"` // persistance mechanism for
revocation data
}
//----------------------------------------------------------------------
diff --git a/src/gnunet/config/gnunet-config.json
b/src/gnunet/config/gnunet-config.json
index 7927cda..167bfa0 100644
--- a/src/gnunet/config/gnunet-config.json
+++ b/src/gnunet/config/gnunet-config.json
@@ -11,7 +11,7 @@
{
"id": "test",
"network": "ip+udp",
- "address": "upnp:192.168.178.1",
+ "address": "upnp:192.168.134.1",
"port": 6666,
"ttl": 86400
}
@@ -28,8 +28,12 @@
"perm": "0770"
}
},
- "storage": "dht_file_store+/var/lib/gnunet/dht/store",
- "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000"
+ "storage": {
+ "mode": "file",
+ "cache": false,
+ "path": "/var/lib/gnunet/dht/store",
+ "maxGB": 10
+ }
},
"gns": {
"service": {
@@ -48,7 +52,13 @@
"perm": "0770"
}
},
- "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000"
+ "storage": {
+ "mode": "file",
+ "cache": true,
+ "path": "/var/lib/gnunet/namecache",
+ "num": 1000,
+ "expire": 43200
+ }
},
"revocation": {
"service": {
@@ -57,7 +67,12 @@
"perm": "0770"
}
},
- "storage": "redis:localhost:6397::15"
+ "storage": {
+ "mode": "redis",
+ "addr": "localhost:6397",
+ "passwd": "",
+ "id": 15
+ }
},
"rpc": {
"endpoint": "tcp:127.0.0.1:80"
diff --git a/src/gnunet/service/dht/dhtstore_test.go
b/src/gnunet/service/dht/dhtstore_test.go
index 3cb8080..d9fc1d0 100644
--- a/src/gnunet/service/dht/dhtstore_test.go
+++ b/src/gnunet/service/dht/dhtstore_test.go
@@ -20,9 +20,10 @@ package dht
import (
"encoding/hex"
+ "gnunet/config"
"gnunet/crypto"
"gnunet/service"
- blocks "gnunet/service/dht/blocks"
+ "gnunet/service/dht/blocks"
"math/rand"
"testing"
)
@@ -37,8 +38,15 @@ const (
// each block from storage and checks for matching hash.
func TestDHTFilesStore(t *testing.T) {
+ // test configuration
+ cfg := make(config.ParameterConfig)
+ cfg["mode"] = "file"
+ cfg["cache"] = false
+ cfg["path"] = "/var/lib/gnunet/dht/store"
+ cfg["maxGB"] = 10
+
// create file store
- fs, err := service.NewFileCache("/var/lib/gnunet/dht/cache", "100")
+ fs, err := service.NewFileStore(cfg)
if err != nil {
t.Fatal(err)
}
@@ -48,33 +56,30 @@ func TestDHTFilesStore(t *testing.T) {
// First round: save blocks
for i := 0; i < fsNumBlocks; i++ {
// generate random block
- size := 20 // 1024 + rand.Intn(62000)
+ size := 1024 + rand.Intn(62000)
buf := make([]byte, size)
rand.Read(buf)
val := blocks.NewGenericBlock(buf)
// generate associated key
k := crypto.Hash(buf).Bits
key := blocks.NewGenericQuery(k)
- t.Logf("> %d: %s -- %s", i, hex.EncodeToString(k),
hex.EncodeToString(buf))
// store block
if err := fs.Put(key, val); err != nil {
t.Fatal(err)
}
-
// remember key
keys = append(keys, key)
}
// Second round: retrieve blocks and check
- for i, key := range keys {
+ for _, key := range keys {
// get block
val, err := fs.Get(key)
if err != nil {
t.Fatal(err)
}
buf := val.Data()
- t.Logf("< %d: %s -- %s", i, hex.EncodeToString(key.Key().Bits),
hex.EncodeToString(buf))
// re-create key
k := crypto.Hash(buf)
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 34bf020..612d0ec 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -57,10 +57,6 @@ func NewModule(ctx context.Context, c *core.Core) (m
*Module, err error) {
if store, err = service.NewDHTStore(config.Cfg.DHT.Storage); err != nil
{
return
}
- // create cache handler
- if cache, err = service.NewDHTStore(config.Cfg.DHT.Cache); err != nil {
- return
- }
// create routing table
rt := NewRoutingTable(NewPeerAddress(c.PeerID()))
diff --git a/src/gnunet/service/store.go b/src/gnunet/service/store.go
index c599c54..5de5415 100644
--- a/src/gnunet/service/store.go
+++ b/src/gnunet/service/store.go
@@ -22,17 +22,21 @@ import (
"context"
"database/sql"
"encoding/binary"
+ "encoding/gob"
"encoding/hex"
"errors"
"fmt"
+ "gnunet/config"
"gnunet/crypto"
"gnunet/service/dht/blocks"
"gnunet/util"
+ "io"
"io/ioutil"
"os"
- "strconv"
- "strings"
+ "sort"
+ "sync"
+ "github.com/bfix/gospel/logger"
redis "github.com/go-redis/redis/v8"
)
@@ -46,22 +50,20 @@ var (
//------------------------------------------------------------
// Generic storage interface. Can be used for persistent or
// transient (caching) storage of key/value data.
-// One set of methods (Get/Put) work on DHT queries and blocks,
-// the other set (GetS, PutS) work on key/value strings.
-// Each custom implementation can decide which sets to support.
//------------------------------------------------------------
// Store is a key/value storage where the type of the key is either
// a SHA512 hash value or a string and the value is either a DHT
-// block or a string.
+// block or a string. It is possiblle to mix any key/value types,
+// but not used in this implementation.
type Store[K, V any] interface {
- // Put block into storage under given key
+ // Put value into storage under given key
Put(key K, val V) error
- // Get block with given key from storage
+ // Get value with given key from storage
Get(key K) (V, error)
- // List all store queries
+ // List all store keys
List() ([]K, error)
}
@@ -78,22 +80,18 @@ type KVStore Store[string, string]
//------------------------------------------------------------
// NewDHTStore creates a new storage handler with given spec
// for use with DHT queries and blocks
-func NewDHTStore(spec string) (DHTStore, error) {
- specs := strings.Split(spec, ":")
- if len(specs) < 2 {
+func NewDHTStore(spec config.ParameterConfig) (DHTStore, error) {
+ // get the mode parameter
+ mode, ok := config.GetParam[string](spec, "mode")
+ if !ok {
return nil, ErrStoreInvalidSpec
}
- switch specs[0] {
+ switch mode {
//------------------------------------------------------------------
// File-base storage
//------------------------------------------------------------------
- case "file_store":
- return NewFileStore(specs[1])
- case "file_cache":
- if len(specs) < 3 {
- return nil, ErrStoreInvalidSpec
- }
- return NewFileCache(specs[1], specs[2])
+ case "file":
+ return NewFileStore(spec)
}
return nil, ErrStoreUnknown
}
@@ -101,29 +99,24 @@ func NewDHTStore(spec string) (DHTStore, error) {
//------------------------------------------------------------
// NewKVStore creates a new storage handler with given spec
// for use with key/value string pairs.
-func NewKVStore(spec string) (KVStore, error) {
- specs := strings.SplitN(spec, ":", 2)
- if len(specs) < 2 {
+func NewKVStore(spec config.ParameterConfig) (KVStore, error) {
+ // get the mode parameter
+ mode, ok := config.GetParam[string](spec, "mode")
+ if !ok {
return nil, ErrStoreInvalidSpec
}
- switch specs[0] {
+ switch mode {
//--------------------------------------------------------------
// Redis service
//--------------------------------------------------------------
case "redis":
- if len(specs) < 4 {
- return nil, ErrStoreInvalidSpec
- }
- return NewRedisStore(specs[1], specs[2], specs[3])
+ return NewRedisStore(spec)
//--------------------------------------------------------------
// SQL database service
//--------------------------------------------------------------
case "sql":
- if len(specs) < 4 {
- return nil, ErrStoreInvalidSpec
- }
- return NewSQLStore(specs[1])
+ return NewSQLStore(spec)
}
return nil, errors.New("unknown storage mechanism")
}
@@ -132,63 +125,121 @@ func NewKVStore(spec string) (KVStore, error) {
// Filesystem-based storage
//------------------------------------------------------------
+// FileHeader is the layout of a file managed by the storage handler.
+// On start-up the file store recreates the list of file entries from
+// traversing the actual filesystem. This is done in the background.
+type FileHeader struct {
+ key string // storage key
+ size uint64 // size of file
+ btype uint16 // block type
+ stored util.AbsoluteTime // time added to store
+ expires util.AbsoluteTime // expiration time
+ lastUsed util.AbsoluteTime // time last used
+ usedCount uint64 // usage count
+}
+
// FileStore implements a filesystem-based storage mechanism for
// DHT queries and blocks.
type FileStore struct {
- path string // storage path
- cached []*crypto.HashCode // list of cached entries (key)
- wrPos int // write position in cyclic list
+ path string // storage path
+ cache bool // storage works as cache
+ args config.ParameterConfig // arguments / settings
+
+ totalSize uint64 // total storage size (logical, not
physical)
+ files map[string]*FileHeader // list of file headers
+ wrPos int // write position in cyclic list
+ mtx sync.Mutex // serialize operations (prune)
}
// NewFileStore instantiates a new file storage.
-func NewFileStore(path string) (DHTStore, error) {
- // create file store
- return &FileStore{
- path: path,
- }, nil
-}
-
-// NewFileCache instantiates a new file-based cache.
-func NewFileCache(path, param string) (DHTStore, error) {
+func NewFileStore(spec config.ParameterConfig) (DHTStore, error) {
+ // get path parameter
+ path, ok := config.GetParam[string](spec, "path")
+ if !ok {
+ return nil, ErrStoreInvalidSpec
+ }
+ isCache, ok := config.GetParam[bool](spec, "cache")
+ if !ok {
+ isCache = false
+ }
// remove old cache content
- os.RemoveAll(path)
+ if isCache {
+ os.RemoveAll(path)
+ }
+ // create file store handler
+ fs := &FileStore{
+ path: path,
+ args: spec,
+ cache: isCache,
+ files: make(map[string]*FileHeader),
+ }
+ // load file header list
+ if !isCache {
+ if fp, err := os.Open(path + "/files.db"); err == nil {
+ dec := gob.NewDecoder(fp)
+ for {
+ hdr := new(FileHeader)
+ if dec.Decode(hdr) != nil {
+ if err != io.EOF {
+ return nil, err
+ }
+ break
+ }
+ fs.files[hdr.key] = hdr
+ fs.totalSize += hdr.size
+ }
+ fp.Close()
+ }
+ }
+ return fs, nil
+}
- // get number of cache entries
- num, err := strconv.ParseUint(param, 10, 32)
- if err != nil {
- return nil, err
+// Close file storage. write metadata to file
+func (s *FileStore) Close() (err error) {
+ if !s.cache {
+ if fp, err := os.Create(s.path + "/files.db"); err == nil {
+ defer fp.Close()
+ enc := gob.NewEncoder(fp)
+ for _, hdr := range s.files {
+ if err = enc.Encode(hdr); err != nil {
+ break
+ }
+ }
+ }
}
- // create file store
- return &FileStore{
- path: path,
- cached: make([]*crypto.HashCode, num),
- wrPos: 0,
- }, nil
+ return
}
// Put block into storage under given key
func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) {
+ // check for free space
+ if s.cache {
+ // caching is limited by explicit number of files
+ num, ok := config.GetParam[int](s.args, "num")
+ if !ok {
+ num = 100
+ }
+ if len(s.files) >= num {
+ // make space for at least one new entry
+ s.prune(1)
+ }
+ } else {
+ // normal storage is limited by quota (default: 10GB)
+ max, ok := config.GetParam[int](s.args, "maxGB")
+ if !ok {
+ max = 10
+ }
+ if int(s.totalSize>>30) > max {
+ // drop a significant number of blocks
+ s.prune(20)
+ }
+ }
// get query parameters for entry
- var (
- btype uint16 // block type
- expire util.AbsoluteTime // block expiration
- )
+ var btype uint16 // block type
query.Get("blkType", &btype)
+ var expire util.AbsoluteTime // block expiration
query.Get("expire", &expire)
- // are we in caching mode?
- if s.cached != nil {
- // release previous block if defined
- if key := s.cached[s.wrPos]; key != nil {
- // get path and filename from key
- path, fname := s.expandPath(key)
- if err = os.Remove(path + "/" + fname); err != nil {
- return
- }
- // free slot
- s.cached[s.wrPos] = nil
- }
- }
// get path and filename from key
path, fname := s.expandPath(query.Key())
// make sure the path exists
@@ -197,6 +248,7 @@ func (s *FileStore) Put(query blocks.Query, block
blocks.Block) (err error) {
}
// write to file for storage
var fp *os.File
+ var fpSize int
if fp, err = os.Create(path + "/" + fname); err == nil {
defer fp.Close()
// write block data
@@ -206,11 +258,18 @@ func (s *FileStore) Put(query blocks.Query, block
blocks.Block) (err error) {
}
}
}
- // update cache list
- if s.cached != nil {
- s.cached[s.wrPos] = query.Key()
- s.wrPos = (s.wrPos + 1) % len(s.cached)
+ // add header to internal list
+ now := util.AbsoluteTimeNow()
+ hdr := &FileHeader{
+ key: hex.EncodeToString(query.Key().Bits),
+ size: uint64(fpSize),
+ btype: btype,
+ expires: expire,
+ stored: now,
+ lastUsed: now,
+ usedCount: 1,
}
+ s.files[hdr.key] = hdr
return
}
@@ -258,6 +317,53 @@ func (s *FileStore) expandPath(key *crypto.HashCode)
(string, string) {
return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:]
}
+// Prune list of file headers so we drop at least n entries.
+// returns number of removed entries.
+func (s *FileStore) prune(n int) (del int) {
+ // get list of headers; remove expired entries on the fly
+ list := make([]*FileHeader, 0)
+ for key, hdr := range s.files {
+ // remove expired entry
+ if hdr.expires.Expired() {
+ s.dropFile(key)
+ del++
+ }
+ // append to list
+ list = append(list, hdr)
+ }
+ // check if we are already done.
+ if del >= n {
+ return
+ }
+ // sort list by decending rate "(lifetime * size) / usedCount"
+ sort.Slice(list, func(i, j int) bool {
+ ri := (list[i].stored.Elapsed().Val * list[i].size) /
list[i].usedCount
+ rj := (list[j].stored.Elapsed().Val * list[j].size) /
list[j].usedCount
+ return ri > rj
+ })
+ // remove from start of list until prune limit is reached
+ for _, hdr := range list {
+ s.dropFile(hdr.key)
+ del++
+ if del == n {
+ break
+ }
+ }
+ return
+}
+
+// drop file removes a file from the internal list and the physical storage.
+func (s *FileStore) dropFile(key string) {
+ // remove for internal list
+ delete(s.files, key)
+ // remove from filesystem
+ path := fmt.Sprintf("%s/%s/%s/%s", s.path, key[:2], key[2:4], key[4:])
+ if err := os.Remove(path); err != nil {
+ logger.Printf(logger.ERROR, "[store] can't remove file %s: %s",
path, err.Error())
+ return
+ }
+}
+
//------------------------------------------------------------
// Redis: only use for caching purposes on key/value strings
//------------------------------------------------------------
@@ -269,16 +375,28 @@ type RedisStore struct {
}
// NewRedisStore creates a Redis service client instance.
-func NewRedisStore(addr, passwd, db string) (s KVStore, err error) {
- kvs := new(RedisStore)
- if kvs.db, err = strconv.Atoi(db); err != nil {
- err = ErrStoreInvalidSpec
- return
+func NewRedisStore(spec config.ParameterConfig) (s KVStore, err error) {
+ // get connection parameters
+ addr, ok := config.GetParam[string](spec, "addr")
+ if !ok {
+ return nil, ErrStoreInvalidSpec
+ }
+ passwd, ok := config.GetParam[string](spec, "passwd")
+ if !ok {
+ passwd = ""
}
+ db, ok := config.GetParam[int](spec, "db")
+ if !ok {
+ return nil, ErrStoreInvalidSpec
+ }
+
+ // create new Redis store
+ kvs := new(RedisStore)
+ kvs.db = db
kvs.client = redis.NewClient(&redis.Options{
Addr: addr,
Password: passwd,
- DB: kvs.db,
+ DB: db,
})
if kvs.client == nil {
err = ErrStoreNotAvailable
@@ -328,11 +446,17 @@ type SQLStore struct {
}
// NewSQLStore creates a new SQL-based key/value store.
-func NewSQLStore(spec string) (s KVStore, err error) {
+func NewSQLStore(spec config.ParameterConfig) (s KVStore, err error) {
+ // get connection parameters
+ connect, ok := config.GetParam[string](spec, "connect")
+ if !ok {
+ return nil, ErrStoreInvalidSpec
+ }
+ // create SQL store
kvs := new(SQLStore)
// connect to SQL database
- kvs.db, err = util.DbPool.Connect(spec)
+ kvs.db, err = util.DbPool.Connect(connect)
if err != nil {
return nil, err
}
@@ -343,7 +467,6 @@ func NewSQLStore(spec string) (s KVStore, err error) {
return nil, ErrStoreNotAvailable
}
return kvs, nil
-
}
// Put a key/value pair into the store
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-go] branch master updated: File/block caching strategy improved.,
gnunet <=