diff --git a/vendor-log b/vendor-log index d637f28..ca61de4 100644 --- a/vendor-log +++ b/vendor-log @@ -93,3 +93,4 @@ a1a5df8f92af764f378f07d6a3dd8eb3f7aa190a github.com/xtaci/smux 6c23252515492caf9b228a9d5cabcdbde29f7f82 golang.org/x/net/internal/iana 6c23252515492caf9b228a9d5cabcdbde29f7f82 golang.org/x/net/internal/netreflect 6c23252515492caf9b228a9d5cabcdbde29f7f82 golang.org/x/net/ipv4 +8bf1e9bacbf65b10c81d0f4314cf2b1ebef728b5 github.com/streamrail/concurrent-map diff --git a/vendor/github.com/streamrail/concurrent-map/concurrent_map.go b/vendor/github.com/streamrail/concurrent-map/concurrent_map.go new file mode 100644 index 0000000..f6e5c06 --- /dev/null +++ b/vendor/github.com/streamrail/concurrent-map/concurrent_map.go @@ -0,0 +1,301 @@ +package cmap + +import ( + "encoding/json" + "sync" +) + +var SHARD_COUNT = 32 + +// A "thread" safe map of type string:Anything. +// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. +type ConcurrentMap []*ConcurrentMapShared + +// A "thread" safe string to anything map. +type ConcurrentMapShared struct { + items map[string]interface{} + sync.RWMutex // Read Write mutex, guards access to internal map. +} + +// Creates a new concurrent map. +func New() ConcurrentMap { + m := make(ConcurrentMap, SHARD_COUNT) + for i := 0; i < SHARD_COUNT; i++ { + m[i] = &ConcurrentMapShared{items: make(map[string]interface{})} + } + return m +} + +// Returns shard under given key +func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { + return m[uint(fnv32(key))%uint(SHARD_COUNT)] +} + +func (m ConcurrentMap) MSet(data map[string]interface{}) { + for key, value := range data { + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() + } +} + +// Sets the given value under the specified key. +func (m *ConcurrentMap) Set(key string, value interface{}) { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() +} + +// Callback to return new element to be inserted into the map +// It is called while lock is held, therefore it MUST NOT +// try to access other keys in same map, as it can lead to deadlock since +// Go sync.RWLock is not reentrant +type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{} + +// Insert or Update - updates existing element or inserts a new one using UpsertCb +func (m *ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) { + shard := m.GetShard(key) + shard.Lock() + v, ok := shard.items[key] + res = cb(ok, v, value) + shard.items[key] = res + shard.Unlock() + return res +} + +// Sets the given value under the specified key if no value was associated with it. +func (m *ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + _, ok := shard.items[key] + if !ok { + shard.items[key] = value + } + shard.Unlock() + return !ok +} + +// Retrieves an element from map under given key. +func (m ConcurrentMap) Get(key string) (interface{}, bool) { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // Get item from shard. + val, ok := shard.items[key] + shard.RUnlock() + return val, ok +} + +// Returns the number of elements within the map. +func (m ConcurrentMap) Count() int { + count := 0 + for i := 0; i < SHARD_COUNT; i++ { + shard := m[i] + shard.RLock() + count += len(shard.items) + shard.RUnlock() + } + return count +} + +// Looks up an item under specified key +func (m *ConcurrentMap) Has(key string) bool { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // See if element is within shard. + _, ok := shard.items[key] + shard.RUnlock() + return ok +} + +// Removes an element from the map. +func (m *ConcurrentMap) Remove(key string) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + delete(shard.items, key) + shard.Unlock() +} + +// Removes an element from the map and returns it +func (m *ConcurrentMap) Pop(key string) (v interface{}, exists bool) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + v, exists = shard.items[key] + delete(shard.items, key) + shard.Unlock() + return v, exists +} + +// Checks if map is empty. +func (m *ConcurrentMap) IsEmpty() bool { + return m.Count() == 0 +} + +// Used by the Iter & IterBuffered functions to wrap two variables together over a channel, +type Tuple struct { + Key string + Val interface{} +} + +// Returns an iterator which could be used in a for range loop. +// +// Deprecated: using IterBuffered() will get a better performence +func (m ConcurrentMap) Iter() <-chan Tuple { + ch := make(chan Tuple) + go func() { + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + // Foreach shard. + for _, shard := range m { + go func(shard *ConcurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key, val := range shard.items { + ch <- Tuple{key, val} + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + return ch +} + +// Returns a buffered iterator which could be used in a for range loop. +func (m ConcurrentMap) IterBuffered() <-chan Tuple { + ch := make(chan Tuple, m.Count()) + go func() { + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + // Foreach shard. + for _, shard := range m { + go func(shard *ConcurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key, val := range shard.items { + ch <- Tuple{key, val} + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + return ch +} + +// Returns all items as map[string]interface{} +func (m ConcurrentMap) Items() map[string]interface{} { + tmp := make(map[string]interface{}) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + + return tmp +} + +// Iterator callback,called for every key,value found in +// maps. RLock is held for all calls for a given shard +// therefore callback sess consistent view of a shard, +// but not across the shards +type IterCb func(key string, v interface{}) + +// Callback based iterator, cheapest way to read +// all elements in a map. +func (m *ConcurrentMap) IterCb(fn IterCb) { + for idx := range *m { + shard := (*m)[idx] + shard.RLock() + for key, value := range shard.items { + fn(key, value) + } + shard.RUnlock() + } +} + +// Return all keys as []string +func (m ConcurrentMap) Keys() []string { + count := m.Count() + ch := make(chan string, count) + go func() { + // Foreach shard. + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + for _, shard := range m { + go func(shard *ConcurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key := range shard.items { + ch <- key + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + + // Generate keys + keys := make([]string, count) + for i := 0; i < count; i++ { + keys[i] = <-ch + } + return keys +} + +//Reviles ConcurrentMap "private" variables to json marshal. +func (m ConcurrentMap) MarshalJSON() ([]byte, error) { + // Create a temporary map, which will hold all item spread across shards. + tmp := make(map[string]interface{}) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + return json.Marshal(tmp) +} + +func fnv32(key string) uint32 { + hash := uint32(2166136261) + const prime32 = uint32(16777619) + for i := 0; i < len(key); i++ { + hash *= prime32 + hash ^= uint32(key[i]) + } + return hash +} + +// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal +// will probably won't know which to type to unmarshal into, in such case +// we'll end up with a value of type map[string]interface{}, In most cases this isn't +// out value type, this is why we've decided to remove this functionality. + +// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) { +// // Reverse process of Marshal. + +// tmp := make(map[string]interface{}) + +// // Unmarshal into a single map. +// if err := json.Unmarshal(b, &tmp); err != nil { +// return nil +// } + +// // foreach key,value pair in temporary map insert into our concurrent map. +// for key, val := range tmp { +// m.Set(key, val) +// } +// return nil +// }