summaryrefslogtreecommitdiff
path: root/usagepool.go
diff options
context:
space:
mode:
Diffstat (limited to 'usagepool.go')
-rw-r--r--usagepool.go204
1 files changed, 157 insertions, 47 deletions
diff --git a/usagepool.go b/usagepool.go
index dd4f606..df69caf 100644
--- a/usagepool.go
+++ b/usagepool.go
@@ -21,23 +21,136 @@ import (
)
// UsagePool is a thread-safe map that pools values
-// based on usage; a LoadOrStore operation increments
-// the usage, and a Delete decrements from the usage.
-// If the usage count reaches 0, the value will be
-// removed from the map. There is no way to overwrite
-// existing keys in the pool without first deleting
-// it as many times as it was stored. Deleting too
-// many times will panic.
+// based on usage (reference counting). Values are
+// only inserted if they do not already exist. There
+// are two ways to add values to the pool:
+//
+// 1) LoadOrStore will increment usage and store the
+// value immediately if it does not already exist
+// 2) LoadOrNew will increment usage and construct the
+// value immediately if it does not already exist,
+// then store that value in the pool. When the
+// constructed value is finally deleted from the
+// pool (after its usage reaches 0), it will be
+// cleaned up by calling its Destruct method.
+//
+// The use of LoadOrNew allows values to be created
+// and reused and finally cleaned up only once, even
+// though they may have many references throughout
+// their lifespan. This is helpful, for example, when
+// sharing thread-safe io.Writers that you only want
+// to open and close once.
+//
+// There is no way to overwrite existing keys in the
+// pool without first deleting it as many times as it
+// was stored. Deleting too many times will panic.
+//
+// The implementation does not use a sync.Pool because
+// UsagePool needs additional atomicity to run the
+// constructor functions when creating a new value when
+// LoadOrNew is used. (We could probably use sync.Pool
+// but we'd still have to layer our own additional locks
+// on top.)
//
// An empty UsagePool is NOT safe to use; always call
-// NewUsagePool() to make a new value.
+// NewUsagePool() to make a new one.
type UsagePool struct {
- pool *sync.Map
+ sync.RWMutex
+ pool map[interface{}]*usagePoolVal
}
-// NewUsagePool returns a new usage pool.
+// NewUsagePool returns a new usage pool that is ready to use.
func NewUsagePool() *UsagePool {
- return &UsagePool{pool: new(sync.Map)}
+ return &UsagePool{
+ pool: make(map[interface{}]*usagePoolVal),
+ }
+}
+
+// LoadOrNew loads the value associated with key from the pool if it
+// already exists. If the key doesn't exist, it will call construct
+// to create a new value and then stores that in the pool. An error
+// is only returned if the constructor returns an error. The loaded
+// or constructed value is returned. The loaded return value is true
+// if the value already existed and was loaded, or false if it was
+// newly constructed.
+func (up *UsagePool) LoadOrNew(key interface{}, construct Constructor) (value interface{}, loaded bool, err error) {
+ var upv *usagePoolVal
+ up.Lock()
+ upv, loaded = up.pool[key]
+ if loaded {
+ atomic.AddInt32(&upv.refs, 1)
+ up.Unlock()
+ upv.RLock()
+ value = upv.value
+ err = upv.err
+ upv.RUnlock()
+ } else {
+ upv = &usagePoolVal{refs: 1}
+ upv.Lock()
+ up.pool[key] = upv
+ up.Unlock()
+ value, err = construct()
+ if err == nil {
+ upv.value = value
+ } else {
+ // TODO: remove error'ed entries from map
+ upv.err = err
+ }
+ upv.Unlock()
+ }
+ return
+}
+
+// LoadOrStore loads the value associated with key from the pool if it
+// already exists, or stores it if it does not exist. It returns the
+// value that was either loaded or stored, and true if the value already
+// existed and was
+func (up *UsagePool) LoadOrStore(key, val interface{}) (value interface{}, loaded bool) {
+ var upv *usagePoolVal
+ up.Lock()
+ upv, loaded = up.pool[key]
+ if loaded {
+ atomic.AddInt32(&upv.refs, 1)
+ up.Unlock()
+ upv.Lock()
+ if upv.err == nil {
+ value = upv.value
+ } else {
+ upv.value = val
+ upv.err = nil
+ }
+ upv.Unlock()
+ } else {
+ upv = &usagePoolVal{refs: 1, value: val}
+ up.pool[key] = upv
+ up.Unlock()
+ value = val
+ }
+ return
+}
+
+// Range iterates the pool similarly to how sync.Map.Range() does:
+// it calls f for every key in the pool, and if f returns false,
+// iteration is stopped. Ranging does not affect usage counts.
+//
+// This method is somewhat naive and acquires a read lock on the
+// entire pool during iteration, so do your best to make f() really
+// fast, m'kay?
+func (up *UsagePool) Range(f func(key, value interface{}) bool) {
+ up.RLock()
+ defer up.RUnlock()
+ for key, upv := range up.pool {
+ upv.RLock()
+ if upv.err != nil {
+ upv.RUnlock()
+ continue
+ }
+ val := upv.value
+ upv.RUnlock()
+ if !f(key, val) {
+ break
+ }
+ }
}
// Delete decrements the usage count for key and removes the
@@ -45,50 +158,47 @@ func NewUsagePool() *UsagePool {
// true if the usage count reached 0 and the value was deleted.
// It panics if the usage count drops below 0; always call
// Delete precisely as many times as LoadOrStore.
-func (up *UsagePool) Delete(key interface{}) (deleted bool) {
- usageVal, ok := up.pool.Load(key)
+func (up *UsagePool) Delete(key interface{}) (deleted bool, err error) {
+ up.Lock()
+ upv, ok := up.pool[key]
if !ok {
- return false
+ up.Unlock()
+ return false, nil
}
- upv := usageVal.(*usagePoolVal)
- newUsage := atomic.AddInt32(&upv.usage, -1)
- if newUsage == 0 {
- up.pool.Delete(key)
- return true
- } else if newUsage < 0 {
- panic(fmt.Sprintf("deleted more than stored: %#v (usage: %d)",
- upv.value, upv.usage))
- }
- return false
-}
-
-// LoadOrStore puts val in the pool and returns false if key does
-// not already exist; otherwise if the key exists, it loads the
-// existing value, increments the usage for that value, and returns
-// the value along with true.
-func (up *UsagePool) LoadOrStore(key, val interface{}) (actual interface{}, loaded bool) {
- usageVal := &usagePoolVal{
- usage: 1,
- value: val,
- }
- actual, loaded = up.pool.LoadOrStore(key, usageVal)
- if loaded {
- upv := actual.(*usagePoolVal)
- actual = upv.value
- atomic.AddInt32(&upv.usage, 1)
+ refs := atomic.AddInt32(&upv.refs, -1)
+ if refs == 0 {
+ delete(up.pool, key)
+ up.Unlock()
+ upv.RLock()
+ val := upv.value
+ upv.RUnlock()
+ if destructor, ok := val.(Destructor); ok {
+ err = destructor.Destruct()
+ }
+ deleted = true
+ } else {
+ up.Unlock()
+ if refs < 0 {
+ panic(fmt.Sprintf("deleted more than stored: %#v (usage: %d)",
+ upv.value, upv.refs))
+ }
}
return
}
-// Range iterates the pool the same way sync.Map.Range does.
-// This does not affect usage counts.
-func (up *UsagePool) Range(f func(key, value interface{}) bool) {
- up.pool.Range(func(key, value interface{}) bool {
- return f(key, value.(*usagePoolVal).value)
- })
+// Constructor is a function that returns a new value
+// that can destruct itself when it is no longer needed.
+type Constructor func() (Destructor, error)
+
+// Destructor is a value that can clean itself up when
+// it is deallocated.
+type Destructor interface {
+ Destruct() error
}
type usagePoolVal struct {
- usage int32 // accessed atomically; must be 64-bit aligned for 32-bit systems
+ refs int32 // accessed atomically; must be 64-bit aligned for 32-bit systems
value interface{}
+ err error
+ sync.RWMutex
}