mirror of
https://github.com/charlienet/go-mixed.git
synced 2025-07-18 00:22:41 +08:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
9d12e7fedb | |||
b0ff4d6fd5 | |||
38f7cc75c9 | |||
b4ac1cc449 | |||
1abde30d8f | |||
822932fe15 | |||
85c5a611e1 | |||
fe5c0b54b6 | |||
54fbe8eb0a |
@ -2,15 +2,15 @@ package bloom
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/bits-and-blooms/bitset"
|
"github.com/bits-and-blooms/bitset"
|
||||||
"github.com/charlienet/go-mixed/locker"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type memStore struct {
|
type memStore struct {
|
||||||
size uint
|
size uint
|
||||||
set *bitset.BitSet // 内存位图
|
set *bitset.BitSet // 内存位图
|
||||||
lock locker.RWLocker // 同步锁
|
lock sync.RWMutex // 同步锁
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMemStore(size uint) *memStore {
|
func newMemStore(size uint) *memStore {
|
||||||
|
@ -6,10 +6,17 @@ type ChanLocker interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type chanSourceLock struct {
|
type chanSourceLock struct {
|
||||||
m RWLocker
|
m rwLocker
|
||||||
content map[string]chan int
|
content map[string]chan int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewChanSourceLocker() *chanSourceLock {
|
||||||
|
return &chanSourceLock{
|
||||||
|
m: NewRWLocker(),
|
||||||
|
content: make(map[string]chan int),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *chanSourceLock) Get(key string) (ch <-chan int, ok bool) {
|
func (s *chanSourceLock) Get(key string) (ch <-chan int, ok bool) {
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
ch, ok = s.content[key]
|
ch, ok = s.content[key]
|
||||||
|
19
locker/chan_source_locker_test.go
Normal file
19
locker/chan_source_locker_test.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package locker_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/locker"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestChanSourceLocker(t *testing.T) {
|
||||||
|
l := locker.NewChanSourceLocker()
|
||||||
|
c, ok := l.Get("aaaa")
|
||||||
|
if ok {
|
||||||
|
<-c
|
||||||
|
|
||||||
|
println("ok")
|
||||||
|
}
|
||||||
|
|
||||||
|
println("fail")
|
||||||
|
}
|
7
locker/distributed_locker.go
Normal file
7
locker/distributed_locker.go
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
package locker
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type DistributedLocker interface {
|
||||||
|
Unlock(context.Context, string)
|
||||||
|
}
|
13
locker/distributed_locker_test.go
Normal file
13
locker/distributed_locker_test.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package locker_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/redis"
|
||||||
|
"github.com/charlienet/go-mixed/tests"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRedisDistrbutedLocker(t *testing.T) {
|
||||||
|
tests.RunOnDefaultRedis(t, func(rdb redis.Client) {
|
||||||
|
})
|
||||||
|
}
|
@ -1,14 +1,9 @@
|
|||||||
package locker
|
package locker
|
||||||
|
|
||||||
var _ RWLocker = &emptyLocker{}
|
var _ rwLocker = &emptyLocker{}
|
||||||
var _ Locker = &emptyLocker{}
|
var _ locker = &emptyLocker{}
|
||||||
|
|
||||||
var EmptyLocker = &emptyLocker{}
|
type emptyLocker struct {
|
||||||
|
|
||||||
type emptyLocker struct{}
|
|
||||||
|
|
||||||
func NewEmptyLocker() *emptyLocker {
|
|
||||||
return &emptyLocker{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *emptyLocker) RLock() {}
|
func (l *emptyLocker) RLock() {}
|
||||||
|
@ -2,14 +2,16 @@ package locker
|
|||||||
|
|
||||||
import "sync"
|
import "sync"
|
||||||
|
|
||||||
type Locker interface {
|
type locker interface {
|
||||||
Lock()
|
Lock()
|
||||||
Unlock()
|
Unlock()
|
||||||
TryLock() bool
|
TryLock() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type RWLocker interface {
|
type rwLocker interface {
|
||||||
Locker
|
Lock()
|
||||||
|
Unlock()
|
||||||
|
TryLock() bool
|
||||||
RLock()
|
RLock()
|
||||||
RUnlock()
|
RUnlock()
|
||||||
TryRLock() bool
|
TryRLock() bool
|
||||||
@ -18,3 +20,7 @@ type RWLocker interface {
|
|||||||
func NewLocker() *sync.Mutex {
|
func NewLocker() *sync.Mutex {
|
||||||
return &sync.Mutex{}
|
return &sync.Mutex{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewRWLocker() *sync.RWMutex {
|
||||||
|
return &sync.RWMutex{}
|
||||||
|
}
|
||||||
|
22
locker/readme.md
Normal file
22
locker/readme.md
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
同步锁
|
||||||
|
|
||||||
|
EmptyLocker, 空锁
|
||||||
|
RWLocker, 读写锁
|
||||||
|
SpinLocker, 旋转锁
|
||||||
|
|
||||||
|
|
||||||
|
锁可以添加一个外部存储成为分布式锁。WithRedis, WithZookeeper
|
||||||
|
|
||||||
|
单例锁
|
||||||
|
|
||||||
|
|
||||||
|
资源锁
|
||||||
|
|
||||||
|
|
||||||
|
分布式锁
|
||||||
|
在锁的基础上添加分布式存储升级为分布式锁
|
||||||
|
|
||||||
|
locker.WithRedis()
|
||||||
|
locker.WithZookeeper()
|
||||||
|
|
||||||
|
|
33
locker/redis/redis_locker.lua
Normal file
33
locker/redis/redis_locker.lua
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
#!lua name=charlie_locker
|
||||||
|
|
||||||
|
-- 安装命令
|
||||||
|
-- cat redis_locker.lua | redis-cli -x --cluster-only-masters --cluster call 192.168.123.30:6379 FUNCTION LOAD REPLACE
|
||||||
|
|
||||||
|
local function lock(keys, args)
|
||||||
|
if redis.call("GET", keys[1]) == args[1] then
|
||||||
|
redis.call("SET", keys[1], args[1], "PX", args[2])
|
||||||
|
return "OK"
|
||||||
|
else
|
||||||
|
return redis.call("SET", keys[1], args[1], "NX", "PX", args[2])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
local function del(keys, args)
|
||||||
|
if redis.call("GET", keys[1]) == args[1] then
|
||||||
|
return redis.call("DEL", keys[1])
|
||||||
|
else
|
||||||
|
return '0'
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
local function expire(keys, args)
|
||||||
|
if redis.call('get', keys[1]) == args[1] then
|
||||||
|
return redis.call('expire', keys[1], args[2])
|
||||||
|
else
|
||||||
|
return '0'
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
redis.register_function('locker_lock',lock)
|
||||||
|
redis.register_function('locker_unlock',del)
|
||||||
|
redis.register_function('locker_expire',expire)
|
164
locker/redis/redis_store.go
Normal file
164
locker/redis/redis_store.go
Normal file
@ -0,0 +1,164 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
_ "embed"
|
||||||
|
"maps"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/rand"
|
||||||
|
"github.com/charlienet/go-mixed/redis"
|
||||||
|
goredis "github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed redis_locker.lua
|
||||||
|
var redis_locker_function string
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultExpire = time.Second * 20
|
||||||
|
retryInterval = time.Millisecond * 10
|
||||||
|
)
|
||||||
|
|
||||||
|
var once sync.Once
|
||||||
|
|
||||||
|
type redis_locker_store struct {
|
||||||
|
key string
|
||||||
|
sources map[string]string
|
||||||
|
expire time.Duration // 过期时间
|
||||||
|
mu sync.RWMutex
|
||||||
|
clients []redis.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRedisStore(key string, clients ...redis.Client) *redis_locker_store {
|
||||||
|
once.Do(func() { redis.Clients(clients).LoadFunction(redis_locker_function) })
|
||||||
|
|
||||||
|
locker := &redis_locker_store{
|
||||||
|
key: key,
|
||||||
|
sources: make(map[string]string),
|
||||||
|
clients: clients,
|
||||||
|
expire: defaultExpire,
|
||||||
|
}
|
||||||
|
|
||||||
|
go locker.expandLockTime()
|
||||||
|
|
||||||
|
return locker
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *redis_locker_store) Lock(ctx context.Context, sourceName string) error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
if l.TryLock(ctx, sourceName) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(retryInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *redis_locker_store) TryLock(ctx context.Context, sourceName string) bool {
|
||||||
|
value := l.getSourceValue(sourceName)
|
||||||
|
|
||||||
|
results := l.fCall(ctx, "locker_lock", sourceName, value, l.expire.Milliseconds())
|
||||||
|
|
||||||
|
if !isSuccess(results) {
|
||||||
|
for _, r := range results {
|
||||||
|
if r.Err() != nil {
|
||||||
|
println("err:", r.Err().Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
l.Unlock(ctx, sourceName)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (locker *redis_locker_store) Unlock(ctx context.Context, sourceName string) {
|
||||||
|
value := locker.getSourceValue(sourceName)
|
||||||
|
locker.fCall(ctx, "locker_unlock", sourceName, value)
|
||||||
|
|
||||||
|
locker.mu.Lock()
|
||||||
|
defer locker.mu.Unlock()
|
||||||
|
|
||||||
|
delete(locker.sources, sourceName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *redis_locker_store) expandLockTime() {
|
||||||
|
for {
|
||||||
|
time.Sleep(l.expire / 3)
|
||||||
|
|
||||||
|
if len(l.sources) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
l.mu.RLock()
|
||||||
|
cloned := maps.Clone(l.sources)
|
||||||
|
l.mu.RUnlock()
|
||||||
|
|
||||||
|
for k, v := range cloned {
|
||||||
|
results := l.fCall(context.Background(), "locker_expire", k, v, l.expire.Seconds())
|
||||||
|
for _, r := range results {
|
||||||
|
if r.Err() != nil {
|
||||||
|
println("键延期失败:", r.Err().Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *redis_locker_store) getSourceValue(name string) string {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
|
||||||
|
if v, ok := l.sources[name]; ok {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
v := rand.Hex.Generate(36)
|
||||||
|
l.sources[name] = v
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (locker *redis_locker_store) fCall(ctx context.Context, cmd string, key string, args ...any) []*goredis.Cmd {
|
||||||
|
results := make([]*goredis.Cmd, 0, len(locker.clients))
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(locker.clients))
|
||||||
|
for _, rdb := range locker.clients {
|
||||||
|
go func(rdb redis.Client) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
newKey := rdb.JoinKeys(locker.key, key)
|
||||||
|
results = append(results, rdb.FCall(ctx, cmd, []string{newKey}, args...))
|
||||||
|
}(rdb)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
|
func isSuccess(results []*goredis.Cmd) bool {
|
||||||
|
successCount := 0
|
||||||
|
for _, ret := range results {
|
||||||
|
resp, err := ret.Result()
|
||||||
|
|
||||||
|
if err != nil || resp == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
reply, ok := resp.(string)
|
||||||
|
if ok && strings.EqualFold(reply, "OK") {
|
||||||
|
successCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return successCount >= len(results)/2+1
|
||||||
|
}
|
31
locker/redis/redis_store_test.go
Normal file
31
locker/redis/redis_store_test.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/redis"
|
||||||
|
"github.com/charlienet/go-mixed/tests"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCreateRedisStore(t *testing.T) {
|
||||||
|
tests.RunOnDefaultRedis(t, func(rdb redis.Client) {
|
||||||
|
keyName := "source"
|
||||||
|
|
||||||
|
l := NewRedisStore("locker_key", rdb)
|
||||||
|
ret := l.TryLock(context.Background(), keyName)
|
||||||
|
if !ret {
|
||||||
|
t.Log("加锁失败")
|
||||||
|
}
|
||||||
|
|
||||||
|
l.Lock(context.Background(), keyName)
|
||||||
|
t.Log("锁重入完成")
|
||||||
|
|
||||||
|
l.Unlock(context.Background(), keyName)
|
||||||
|
|
||||||
|
time.Sleep(time.Second * 15)
|
||||||
|
|
||||||
|
// l.Unlock(context.Background())
|
||||||
|
})
|
||||||
|
}
|
@ -1,7 +0,0 @@
|
|||||||
package locker
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
func NewRWLocker() *sync.RWMutex {
|
|
||||||
return &sync.RWMutex{}
|
|
||||||
}
|
|
@ -1,12 +0,0 @@
|
|||||||
package locker
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TestRWLokcer(t *testing.T) {
|
|
||||||
l := NewRWLocker()
|
|
||||||
l.RLock()
|
|
||||||
|
|
||||||
t.Log(l.TryRLock())
|
|
||||||
|
|
||||||
l.RUnlock()
|
|
||||||
}
|
|
@ -1,27 +1,45 @@
|
|||||||
package locker
|
package locker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
redis_store "github.com/charlienet/go-mixed/locker/redis"
|
||||||
|
"github.com/charlienet/go-mixed/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 带计数器锁
|
// 带计数器锁
|
||||||
type countLocker struct {
|
type countLocker struct {
|
||||||
Locker
|
rw rwLocker
|
||||||
Count int32
|
Count int32
|
||||||
}
|
}
|
||||||
|
|
||||||
// SourceLocker 资源锁
|
// SourceLocker 资源锁
|
||||||
type SourceLocker struct {
|
type SourceLocker struct {
|
||||||
m RWLocker
|
m RWLocker
|
||||||
locks map[string]*countLocker
|
distributedLocker DistributedLocker
|
||||||
|
locks map[string]*countLocker
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSourceLocker() *SourceLocker {
|
func NewSourceLocker() *SourceLocker {
|
||||||
return &SourceLocker{
|
l := &SourceLocker{
|
||||||
m: NewRWLocker(),
|
|
||||||
locks: make(map[string]*countLocker),
|
locks: make(map[string]*countLocker),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.m.Synchronize()
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SourceLocker) WithRedis(key string, clients ...redis.Client) *SourceLocker {
|
||||||
|
redisStore := redis_store.NewRedisStore(key, clients...)
|
||||||
|
return s.WithDistributedLocker(redisStore)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SourceLocker) WithDistributedLocker(distributed DistributedLocker) *SourceLocker {
|
||||||
|
s.distributedLocker = distributed
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceLocker) Lock(key string) {
|
func (s *SourceLocker) Lock(key string) {
|
||||||
@ -31,7 +49,7 @@ func (s *SourceLocker) Lock(key string) {
|
|||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
atomic.AddInt32(&l.Count, 1)
|
atomic.AddInt32(&l.Count, 1)
|
||||||
l.Lock()
|
l.rw.Lock()
|
||||||
|
|
||||||
fmt.Println("加锁")
|
fmt.Println("加锁")
|
||||||
} else {
|
} else {
|
||||||
@ -40,18 +58,15 @@ func (s *SourceLocker) Lock(key string) {
|
|||||||
if l2, ok := s.locks[key]; ok {
|
if l2, ok := s.locks[key]; ok {
|
||||||
s.m.Unlock()
|
s.m.Unlock()
|
||||||
|
|
||||||
l2.Lock()
|
l2.rw.Lock()
|
||||||
fmt.Println("二次检查加锁")
|
fmt.Println("二次检查加锁")
|
||||||
} else {
|
} else {
|
||||||
n := NewLocker()
|
n := NewRWLocker()
|
||||||
s.locks[key] = &countLocker{Locker: n, Count: 1}
|
s.locks[key] = &countLocker{rw: n, Count: 1}
|
||||||
|
|
||||||
s.m.Unlock()
|
s.m.Unlock()
|
||||||
|
|
||||||
fmt.Printf("新锁准备加锁:%p\n", n)
|
|
||||||
n.Lock()
|
n.Lock()
|
||||||
|
|
||||||
fmt.Println("初始加锁")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -61,8 +76,11 @@ func (s *SourceLocker) Unlock(key string) {
|
|||||||
|
|
||||||
if l, ok := s.locks[key]; ok {
|
if l, ok := s.locks[key]; ok {
|
||||||
atomic.AddInt32(&l.Count, -1)
|
atomic.AddInt32(&l.Count, -1)
|
||||||
fmt.Printf("解锁%p\n", l)
|
l.rw.Unlock()
|
||||||
l.Unlock()
|
|
||||||
|
if s.distributedLocker != nil {
|
||||||
|
s.distributedLocker.Unlock(context.Background(), key)
|
||||||
|
}
|
||||||
|
|
||||||
if l.Count == 0 {
|
if l.Count == 0 {
|
||||||
delete(s.locks, key)
|
delete(s.locks, key)
|
||||||
@ -75,18 +93,14 @@ func (s *SourceLocker) TryLock(key string) bool {
|
|||||||
// 加读锁
|
// 加读锁
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
l, ok := s.locks[key]
|
l, ok := s.locks[key]
|
||||||
|
s.m.RUnlock()
|
||||||
if ok {
|
if ok {
|
||||||
ret := l.TryLock()
|
ret := l.rw.TryLock()
|
||||||
s.m.RUnlock()
|
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
} else {
|
} else {
|
||||||
s.m.RUnlock()
|
|
||||||
|
|
||||||
s.m.Lock()
|
s.m.Lock()
|
||||||
n := NewLocker()
|
n := NewRWLocker()
|
||||||
s.locks[key] = &countLocker{Locker: n, Count: 1}
|
s.locks[key] = &countLocker{rw: n, Count: 1}
|
||||||
s.m.Unlock()
|
s.m.Unlock()
|
||||||
|
|
||||||
return n.TryLock()
|
return n.TryLock()
|
||||||
|
@ -1,19 +1,39 @@
|
|||||||
package locker
|
package locker_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/locker"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
var sourcekey = "u-0001"
|
var sourcekey = "u-0001"
|
||||||
|
|
||||||
func TestTryLock(t *testing.T) {
|
func TestTryLock(t *testing.T) {
|
||||||
|
l := locker.NewSourceLocker()
|
||||||
|
l.Lock("aa")
|
||||||
|
|
||||||
|
assert.False(t, l.TryLock("aa"))
|
||||||
|
assert.True(t, l.TryLock("bb"))
|
||||||
|
|
||||||
|
defer l.Unlock("aa")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestM(t *testing.T) {
|
||||||
|
l := locker.NewSourceLocker()
|
||||||
|
|
||||||
|
for i := 0; i < 10000000; i++ {
|
||||||
|
l.Lock("aaa")
|
||||||
|
l.Unlock("aaa")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("%+v", l)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSourceLocker(t *testing.T) {
|
func TestSourceLocker(t *testing.T) {
|
||||||
l := NewSourceLocker()
|
l := locker.NewSourceLocker()
|
||||||
|
|
||||||
c := 5
|
c := 5
|
||||||
n := 0
|
n := 0
|
||||||
@ -41,7 +61,7 @@ func TestSourceTryLock(t *testing.T) {
|
|||||||
wg := new(sync.WaitGroup)
|
wg := new(sync.WaitGroup)
|
||||||
wg.Add(c)
|
wg.Add(c)
|
||||||
|
|
||||||
l := NewSourceLocker()
|
l := locker.NewSourceLocker()
|
||||||
|
|
||||||
for i := 0; i < c; i++ {
|
for i := 0; i < c; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
@ -61,7 +81,7 @@ func TestSourceTryLock(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkSourceLocker(b *testing.B) {
|
func BenchmarkSourceLocker(b *testing.B) {
|
||||||
l := NewSourceLocker()
|
l := locker.NewSourceLocker()
|
||||||
|
|
||||||
b.RunParallel(func(p *testing.PB) {
|
b.RunParallel(func(p *testing.PB) {
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
package locker
|
package locker_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/locker"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSpinLock(t *testing.T) {
|
func TestSpinLock(t *testing.T) {
|
||||||
l := NewSpinLocker()
|
l := locker.NewSpinLocker()
|
||||||
|
|
||||||
n := 10
|
n := 10
|
||||||
c := 0
|
c := 0
|
||||||
|
@ -3,94 +3,111 @@ package locker
|
|||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WithLocker struct {
|
var empty = &emptyLocker{}
|
||||||
once sync.Once
|
|
||||||
mu Locker
|
type Locker struct {
|
||||||
|
once sync.Once
|
||||||
|
distributedLocker DistributedLocker // 分布式锁
|
||||||
|
mu locker
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithLocker) Synchronize() {
|
func (w *Locker) WithRedis(key string, rdb redis.Client) *Locker {
|
||||||
if w.mu == nil || w.mu == EmptyLocker {
|
return w
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Locker) WithDistributedLocker(d DistributedLocker) *Locker {
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Locker) Synchronize() *Locker {
|
||||||
|
if w.mu == nil || w.mu == empty {
|
||||||
w.mu = NewLocker()
|
w.mu = NewLocker()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithLocker) Lock() {
|
func (w *Locker) Lock() {
|
||||||
w.ensureLocker().Lock()
|
w.ensureLocker().mu.Lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithLocker) Unlock() {
|
func (w *Locker) Unlock() {
|
||||||
w.ensureLocker().Unlock()
|
w.ensureLocker().mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithLocker) TryLock() bool {
|
func (w *Locker) TryLock() bool {
|
||||||
return w.ensureLocker().TryLock()
|
return w.ensureLocker().mu.TryLock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithLocker) ensureLocker() Locker {
|
func (w *Locker) ensureLocker() *Locker {
|
||||||
w.once.Do(func() {
|
w.once.Do(func() {
|
||||||
if w.mu == nil {
|
if w.mu == nil {
|
||||||
w.mu = EmptyLocker
|
w.mu = empty
|
||||||
}
|
}
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return w.mu
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
type WithSpinLocker struct {
|
type SpinLocker struct {
|
||||||
WithLocker
|
Locker
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithSpinLocker) Synchronize() {
|
func (w *SpinLocker) Synchronize() {
|
||||||
if w.mu == nil || w.mu == EmptyLocker {
|
if w.mu == nil || w.mu == empty {
|
||||||
w.mu = NewSpinLocker()
|
w.mu = NewSpinLocker()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type WithRWLocker struct {
|
type RWLocker struct {
|
||||||
once sync.Once
|
once sync.Once
|
||||||
mu RWLocker
|
mu rwLocker
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithRWLocker) Synchronize() {
|
func (w *RWLocker) Synchronize() *RWLocker {
|
||||||
if w.mu == nil || w.mu == EmptyLocker {
|
if w.mu == nil || w.mu == empty {
|
||||||
log.Println("初始化有效锁")
|
|
||||||
w.mu = NewRWLocker()
|
w.mu = NewRWLocker()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithRWLocker) Lock() {
|
func (w *RWLocker) Lock() {
|
||||||
w.ensureLocker().Lock()
|
w.ensureLocker().mu.Lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithRWLocker) TryLock() bool {
|
func (w *RWLocker) TryLock() bool {
|
||||||
return w.ensureLocker().TryLock()
|
return w.ensureLocker().mu.TryLock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithRWLocker) Unlock() {
|
func (w *RWLocker) Unlock() {
|
||||||
w.ensureLocker().Unlock()
|
w.ensureLocker().mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithRWLocker) RLock() {
|
func (w *RWLocker) RLock() {
|
||||||
w.ensureLocker().RLock()
|
w.ensureLocker().mu.RLock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithRWLocker) TryRLock() bool {
|
func (w *RWLocker) TryRLock() bool {
|
||||||
return w.ensureLocker().TryRLock()
|
return w.ensureLocker().mu.TryRLock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithRWLocker) RUnlock() {
|
func (w *RWLocker) RUnlock() {
|
||||||
w.ensureLocker().RUnlock()
|
w.ensureLocker().mu.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WithRWLocker) ensureLocker() RWLocker {
|
func (w *RWLocker) ensureLocker() *RWLocker {
|
||||||
w.once.Do(func() {
|
w.once.Do(func() {
|
||||||
if w.mu == nil {
|
if w.mu == nil {
|
||||||
log.Println("初始化一个空锁")
|
log.Println("初始化一个空锁")
|
||||||
w.mu = EmptyLocker
|
w.mu = empty
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
return w.mu
|
return w
|
||||||
}
|
}
|
||||||
|
44
locker/synchronizeable_test.go
Normal file
44
locker/synchronizeable_test.go
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
package locker_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/locker"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLocker(t *testing.T) {
|
||||||
|
|
||||||
|
var l locker.Locker
|
||||||
|
|
||||||
|
l.Synchronize()
|
||||||
|
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNew(t *testing.T) {
|
||||||
|
var a locker.RWLocker
|
||||||
|
a.Synchronize()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSpinLocker(t *testing.T) {
|
||||||
|
var l locker.SpinLocker
|
||||||
|
l.Synchronize()
|
||||||
|
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRWLocker(t *testing.T) {
|
||||||
|
var l locker.RWLocker
|
||||||
|
l.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPointLocker(t *testing.T) {
|
||||||
|
l := locker.NewLocker()
|
||||||
|
l.Lock()
|
||||||
|
l.Lock()
|
||||||
|
|
||||||
|
defer l.Unlock()
|
||||||
|
}
|
@ -52,3 +52,8 @@ redis.ParseURL
|
|||||||
defer gClient.Close()
|
defer gClient.Close()
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
VersionConstraint(">=7.0")
|
||||||
|
|
||||||
|
```
|
172
redis/redis.go
172
redis/redis.go
@ -2,17 +2,16 @@ package redis
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/charlienet/go-mixed/expr"
|
"github.com/hashicorp/go-version"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultSeparator = ":"
|
|
||||||
|
|
||||||
blockingQueryTimeout = 5 * time.Second
|
blockingQueryTimeout = 5 * time.Second
|
||||||
readWriteTimeout = 2 * time.Second
|
readWriteTimeout = 2 * time.Second
|
||||||
defaultSlowThreshold = "5000" // 慢查询(单位微秒)
|
defaultSlowThreshold = "5000" // 慢查询(单位微秒)
|
||||||
@ -63,68 +62,121 @@ func (clients Clients) LoadFunction(code string) {
|
|||||||
|
|
||||||
type Client interface {
|
type Client interface {
|
||||||
redis.UniversalClient
|
redis.UniversalClient
|
||||||
LoadFunction(f string) // 加载函数脚本
|
LoadFunction(f string) // 加载函数脚本
|
||||||
Prefix() string // 统一前缀
|
Prefix() string // 统一前缀
|
||||||
Separator() string // 分隔符
|
Separator() string // 分隔符
|
||||||
JoinKeys(keys ...string) string // 连接KEY
|
AddPrefix(prefix ...string) redisClient // 添加前缀
|
||||||
FormatKeys(keys ...string) []string // 格式化KEY
|
JoinKeys(keys ...string) string // 连接KEY
|
||||||
|
FormatKeys(keys ...string) []string // 格式化KEY
|
||||||
|
ServerVersion() string
|
||||||
}
|
}
|
||||||
|
|
||||||
type redisClient struct {
|
type redisClient struct {
|
||||||
redis.UniversalClient
|
redis.UniversalClient
|
||||||
prefix string
|
prefix redisPrefix
|
||||||
separator string
|
conf *redis.UniversalOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(opt *RedisOption) redisClient {
|
type constraintFunc func(redisClient) error
|
||||||
var rdb redisClient
|
|
||||||
|
|
||||||
|
func Ping() constraintFunc {
|
||||||
|
return func(rc redisClient) error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
return rc.Ping(ctx).Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func VersionConstraint(expended string) constraintFunc {
|
||||||
|
return func(rc redisClient) error {
|
||||||
|
v := rc.ServerVersion()
|
||||||
|
if len(v) == 0 {
|
||||||
|
return errors.New("version not obtained")
|
||||||
|
}
|
||||||
|
current, err := version.NewVersion(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
constraint, err := version.NewConstraint(expended)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !constraint.Check(current) {
|
||||||
|
return fmt.Errorf("the desired version is %v, which does not match the expected version %v", current, expended)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(opt *RedisOption, constraints ...constraintFunc) redisClient {
|
||||||
if len(opt.Addrs) == 0 && len(opt.Addr) > 0 {
|
if len(opt.Addrs) == 0 && len(opt.Addr) > 0 {
|
||||||
opt.Addrs = []string{opt.Addr}
|
opt.Addrs = []string{opt.Addr}
|
||||||
}
|
}
|
||||||
|
|
||||||
separator := expr.Ternary(len(opt.Separator) == 0, defaultSeparator, opt.Separator)
|
conf := &redis.UniversalOptions{
|
||||||
prefix := expr.Ternary(len(opt.Prefix) > 0, fmt.Sprintf("%s%s", opt.Prefix, separator), "")
|
Addrs: opt.Addrs,
|
||||||
|
Password: opt.Password,
|
||||||
|
|
||||||
rdb = redisClient{
|
DB: opt.DB,
|
||||||
prefix: prefix,
|
|
||||||
separator: separator,
|
|
||||||
UniversalClient: redis.NewUniversalClient(&redis.UniversalOptions{
|
|
||||||
Addrs: opt.Addrs,
|
|
||||||
Password: opt.Password,
|
|
||||||
|
|
||||||
DB: opt.DB,
|
MaxRetries: opt.MaxRetries,
|
||||||
|
MinRetryBackoff: opt.MinRetryBackoff,
|
||||||
|
MaxRetryBackoff: opt.MaxRetryBackoff,
|
||||||
|
|
||||||
MaxRetries: opt.MaxRetries,
|
DialTimeout: opt.DialTimeout,
|
||||||
MinRetryBackoff: opt.MinRetryBackoff,
|
ReadTimeout: opt.ReadTimeout,
|
||||||
MaxRetryBackoff: opt.MaxRetryBackoff,
|
WriteTimeout: opt.WriteTimeout,
|
||||||
|
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
|
||||||
|
|
||||||
DialTimeout: opt.DialTimeout,
|
PoolSize: opt.PoolSize,
|
||||||
ReadTimeout: opt.ReadTimeout,
|
PoolTimeout: opt.PoolTimeout,
|
||||||
WriteTimeout: opt.WriteTimeout,
|
MinIdleConns: opt.MinIdleConns,
|
||||||
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
|
MaxIdleConns: opt.MaxIdleConns,
|
||||||
|
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||||
|
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||||
|
}
|
||||||
|
|
||||||
PoolSize: opt.PoolSize,
|
rdb := new(conf, newPrefix(opt.Separator, opt.Prefix))
|
||||||
PoolTimeout: opt.PoolTimeout,
|
|
||||||
MinIdleConns: opt.MinIdleConns,
|
|
||||||
MaxIdleConns: opt.MaxIdleConns,
|
|
||||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
|
||||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
|
||||||
})}
|
|
||||||
|
|
||||||
rdb.ConfigSet(context.Background(), "slowlog-log-slower-than", defaultSlowThreshold)
|
if len(constraints) > 0 {
|
||||||
|
for _, f := range constraints {
|
||||||
|
if err := f(rdb); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if len(opt.Prefix) > 0 {
|
return rdb
|
||||||
rdb.AddHook(renameKey{
|
}
|
||||||
prefix: prefix,
|
|
||||||
})
|
func NewEnforceConstraints(opt *RedisOption, constraints ...constraintFunc) redisClient {
|
||||||
|
rdb := New(opt)
|
||||||
|
for _, f := range constraints {
|
||||||
|
if err := f(rdb); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rdb
|
return rdb
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rdb redisClient) Prefix() string {
|
func (rdb redisClient) Prefix() string {
|
||||||
return rdb.prefix
|
return rdb.prefix.Prefix()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rdb redisClient) Separator() string {
|
||||||
|
return rdb.prefix.Separator()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rdb redisClient) AddPrefix(prefixes ...string) redisClient {
|
||||||
|
old := rdb.prefix
|
||||||
|
p := newPrefix(old.separator, old.join(prefixes...))
|
||||||
|
|
||||||
|
return new(rdb.conf, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rdb redisClient) LoadFunction(code string) {
|
func (rdb redisClient) LoadFunction(code string) {
|
||||||
@ -134,23 +186,47 @@ func (rdb redisClient) LoadFunction(code string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rdb redisClient) Separator() string {
|
|
||||||
return rdb.separator
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rdb redisClient) JoinKeys(keys ...string) string {
|
func (rdb redisClient) JoinKeys(keys ...string) string {
|
||||||
return strings.Join(keys, rdb.separator)
|
return rdb.prefix.join(keys...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rdb redisClient) FormatKeys(keys ...string) []string {
|
func (rdb redisClient) FormatKeys(keys ...string) []string {
|
||||||
if len(rdb.prefix) == 0 {
|
if !rdb.prefix.hasPrefix() {
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
|
||||||
re := make([]string, 0, len(keys))
|
re := make([]string, 0, len(keys))
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
re = append(re, fmt.Sprintf("%s%s", rdb.prefix, k))
|
re = append(re, fmt.Sprintf("%s%s", rdb.prefix.Prefix(), k))
|
||||||
}
|
}
|
||||||
|
|
||||||
return re
|
return re
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rdb redisClient) ServerVersion() string {
|
||||||
|
info, err := rdb.Info(context.Background(), "server").Result()
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, line := range strings.Split(info, "\r\n") {
|
||||||
|
after, found := strings.CutPrefix(line, "redis_version:")
|
||||||
|
if found {
|
||||||
|
return after
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func new(conf *redis.UniversalOptions, prefix redisPrefix) redisClient {
|
||||||
|
c := redis.NewUniversalClient(conf)
|
||||||
|
c.ConfigSet(context.Background(), "slowlog-log-slower-than", defaultSlowThreshold)
|
||||||
|
c.AddHook(renameHook{prefix: prefix})
|
||||||
|
|
||||||
|
return redisClient{
|
||||||
|
UniversalClient: c,
|
||||||
|
prefix: prefix,
|
||||||
|
conf: conf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
45
redis/redis_prefix.go
Normal file
45
redis/redis_prefix.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/charlienet/go-mixed/expr"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultSeparator = ":"
|
||||||
|
)
|
||||||
|
|
||||||
|
type redisPrefix struct {
|
||||||
|
prefix string
|
||||||
|
separator string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPrefix(separator string, prefix ...string) redisPrefix {
|
||||||
|
s := expr.Ternary(len(separator) == 0, defaultSeparator, separator)
|
||||||
|
|
||||||
|
return redisPrefix{
|
||||||
|
separator: s,
|
||||||
|
prefix: expr.Ternary(len(prefix) > 0, strings.Join(prefix, separator), ""),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *redisPrefix) Prefix() string {
|
||||||
|
return p.prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *redisPrefix) Separator() string {
|
||||||
|
return p.separator
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *redisPrefix) hasPrefix() bool {
|
||||||
|
return len(p.prefix) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *redisPrefix) join(key ...string) string {
|
||||||
|
s := make([]string, 0, len(key)+1)
|
||||||
|
s = append(s, p.prefix)
|
||||||
|
s = append(s, key...)
|
||||||
|
|
||||||
|
return strings.Join(s, p.separator)
|
||||||
|
}
|
@ -8,18 +8,17 @@ import (
|
|||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
type renameKey struct {
|
type renameHook struct {
|
||||||
prefix string
|
prefix redisPrefix
|
||||||
separator string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r renameKey) DialHook(next redis.DialHook) redis.DialHook {
|
func (r renameHook) DialHook(next redis.DialHook) redis.DialHook {
|
||||||
return func(ctx context.Context, network, addr string) (net.Conn, error) {
|
return func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
return next(ctx, network, addr)
|
return next(ctx, network, addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r renameKey) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
func (r renameHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
||||||
return func(ctx context.Context, cmds []redis.Cmder) error {
|
return func(ctx context.Context, cmds []redis.Cmder) error {
|
||||||
|
|
||||||
// 对多个KEY进行更名操作
|
// 对多个KEY进行更名操作
|
||||||
@ -31,15 +30,15 @@ func (r renameKey) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.Pro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r renameKey) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
func (r renameHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
||||||
return func(ctx context.Context, cmd redis.Cmder) error {
|
return func(ctx context.Context, cmd redis.Cmder) error {
|
||||||
r.renameKey(cmd)
|
r.renameKey(cmd)
|
||||||
return next(ctx, cmd)
|
return next(ctx, cmd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r renameKey) renameKey(cmd redis.Cmder) {
|
func (r renameHook) renameKey(cmd redis.Cmder) {
|
||||||
if len(r.prefix) == 0 {
|
if !r.prefix.hasPrefix() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,7 +48,7 @@ func (r renameKey) renameKey(cmd redis.Cmder) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
switch strings.ToUpper(cmd.Name()) {
|
switch strings.ToUpper(cmd.Name()) {
|
||||||
case "SELECT", "FUNCTION":
|
case "SELECT", "FUNCTION", "INFO":
|
||||||
// 无KEY指令
|
// 无KEY指令
|
||||||
case
|
case
|
||||||
"RENAME", "RENAMENX",
|
"RENAME", "RENAMENX",
|
||||||
@ -80,15 +79,11 @@ func (r renameKey) renameKey(cmd redis.Cmder) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r renameKey) rename(args []any, indexes ...int) {
|
func (r renameHook) rename(args []any, indexes ...int) {
|
||||||
for _, i := range indexes {
|
for _, i := range indexes {
|
||||||
if key, ok := args[i].(string); ok {
|
if key, ok := args[i].(string); ok {
|
||||||
var builder strings.Builder
|
newKey := r.prefix.join(key)
|
||||||
builder.WriteString(r.prefix)
|
args[i] = newKey
|
||||||
builder.WriteString(r.separator)
|
|
||||||
builder.WriteString(key)
|
|
||||||
|
|
||||||
args[i] = builder.String()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,14 +13,13 @@ import (
|
|||||||
var _ Set[string] = &hash_set[string]{}
|
var _ Set[string] = &hash_set[string]{}
|
||||||
|
|
||||||
type hash_set[T constraints.Ordered] struct {
|
type hash_set[T constraints.Ordered] struct {
|
||||||
m map[T]struct{}
|
m map[T]struct{}
|
||||||
lock locker.RWLocker
|
locker locker.RWLocker
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHashSet[T constraints.Ordered](values ...T) *hash_set[T] {
|
func NewHashSet[T constraints.Ordered](values ...T) *hash_set[T] {
|
||||||
set := hash_set[T]{
|
set := hash_set[T]{
|
||||||
m: make(map[T]struct{}, len(values)),
|
m: make(map[T]struct{}, len(values)),
|
||||||
lock: locker.EmptyLocker,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
set.Add(values...)
|
set.Add(values...)
|
||||||
@ -28,37 +27,41 @@ func NewHashSet[T constraints.Ordered](values ...T) *hash_set[T] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *hash_set[T]) Sync() *hash_set[T] {
|
func (s *hash_set[T]) Sync() *hash_set[T] {
|
||||||
s.lock = locker.NewRWLocker()
|
s.locker.Synchronize()
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) Add(values ...T) {
|
func (s *hash_set[T]) Add(values ...T) Set[T] {
|
||||||
s.lock.Lock()
|
s.locker.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.locker.Unlock()
|
||||||
|
|
||||||
for _, v := range values {
|
for _, v := range values {
|
||||||
s.m[v] = struct{}{}
|
s.m[v] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) Remove(v T) {
|
func (s *hash_set[T]) Remove(v T) Set[T] {
|
||||||
s.lock.Lock()
|
s.locker.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.locker.Unlock()
|
||||||
|
|
||||||
delete(s.m, v)
|
delete(s.m, v)
|
||||||
|
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) Contains(value T) bool {
|
func (s *hash_set[T]) Contains(value T) bool {
|
||||||
s.lock.RLock()
|
s.locker.RLock()
|
||||||
defer s.lock.RUnlock()
|
defer s.locker.RUnlock()
|
||||||
|
|
||||||
_, ok := s.m[value]
|
_, ok := s.m[value]
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) ContainsAny(values ...T) bool {
|
func (s *hash_set[T]) ContainsAny(values ...T) bool {
|
||||||
s.lock.RLock()
|
s.locker.RLock()
|
||||||
defer s.lock.RUnlock()
|
defer s.locker.RUnlock()
|
||||||
|
|
||||||
for _, v := range values {
|
for _, v := range values {
|
||||||
if _, ok := s.m[v]; ok {
|
if _, ok := s.m[v]; ok {
|
||||||
@ -69,9 +72,9 @@ func (s hash_set[T]) ContainsAny(values ...T) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) ContainsAll(values ...T) bool {
|
func (s *hash_set[T]) ContainsAll(values ...T) bool {
|
||||||
s.lock.RLock()
|
s.locker.RLock()
|
||||||
defer s.lock.RUnlock()
|
defer s.locker.RUnlock()
|
||||||
|
|
||||||
for _, v := range values {
|
for _, v := range values {
|
||||||
if _, ok := s.m[v]; !ok {
|
if _, ok := s.m[v]; !ok {
|
||||||
@ -82,15 +85,15 @@ func (s hash_set[T]) ContainsAll(values ...T) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) Asc() Set[T] {
|
func (s *hash_set[T]) Asc() Set[T] {
|
||||||
return s.copyToSorted().Asc()
|
return s.copyToSorted().Asc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) Desc() Set[T] {
|
func (s *hash_set[T]) Desc() Set[T] {
|
||||||
return s.copyToSorted().Desc()
|
return s.copyToSorted().Desc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) copyToSorted() Set[T] {
|
func (s *hash_set[T]) copyToSorted() Set[T] {
|
||||||
orderd := NewSortedSet[T]()
|
orderd := NewSortedSet[T]()
|
||||||
for k := range s.m {
|
for k := range s.m {
|
||||||
orderd.Add(k)
|
orderd.Add(k)
|
||||||
@ -109,13 +112,13 @@ func (s *hash_set[T]) Clone() *hash_set[T] {
|
|||||||
return set
|
return set
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) Iterate(fn func(value T)) {
|
func (s *hash_set[T]) Iterate(fn func(value T)) {
|
||||||
for v := range s.m {
|
for v := range s.m {
|
||||||
fn(v)
|
fn(v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) ToSlice() []T {
|
func (s *hash_set[T]) ToSlice() []T {
|
||||||
values := make([]T, 0, s.Size())
|
values := make([]T, 0, s.Size())
|
||||||
s.Iterate(func(value T) {
|
s.Iterate(func(value T) {
|
||||||
values = append(values, value)
|
values = append(values, value)
|
||||||
@ -124,15 +127,15 @@ func (s hash_set[T]) ToSlice() []T {
|
|||||||
return values
|
return values
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) IsEmpty() bool {
|
func (s *hash_set[T]) IsEmpty() bool {
|
||||||
return len(s.m) == 0
|
return len(s.m) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) Size() int {
|
func (s *hash_set[T]) Size() int {
|
||||||
return len(s.m)
|
return len(s.m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) MarshalJSON() ([]byte, error) {
|
func (s *hash_set[T]) MarshalJSON() ([]byte, error) {
|
||||||
items := make([]string, 0, s.Size())
|
items := make([]string, 0, s.Size())
|
||||||
|
|
||||||
for ele := range s.m {
|
for ele := range s.m {
|
||||||
@ -147,7 +150,7 @@ func (s hash_set[T]) MarshalJSON() ([]byte, error) {
|
|||||||
return []byte(fmt.Sprintf("[%s]", strings.Join(items, ", "))), nil
|
return []byte(fmt.Sprintf("[%s]", strings.Join(items, ", "))), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) UnmarshalJSON(b []byte) error {
|
func (s *hash_set[T]) UnmarshalJSON(b []byte) error {
|
||||||
var i []any
|
var i []any
|
||||||
|
|
||||||
d := json.NewDecoder(bytes.NewReader(b))
|
d := json.NewDecoder(bytes.NewReader(b))
|
||||||
@ -166,7 +169,7 @@ func (s hash_set[T]) UnmarshalJSON(b []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hash_set[T]) String() string {
|
func (s *hash_set[T]) String() string {
|
||||||
l := make([]string, 0, len(s.m))
|
l := make([]string, 0, len(s.m))
|
||||||
for k := range s.m {
|
for k := range s.m {
|
||||||
l = append(l, fmt.Sprint(k))
|
l = append(l, fmt.Sprint(k))
|
||||||
|
16
sets/set.go
16
sets/set.go
@ -1,15 +1,13 @@
|
|||||||
package sets
|
package sets
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/charlienet/go-mixed/locker"
|
"github.com/charlienet/go-mixed/locker"
|
||||||
"golang.org/x/exp/constraints"
|
"golang.org/x/exp/constraints"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Set[T comparable] interface {
|
type Set[T comparable] interface {
|
||||||
Add(...T)
|
Add(...T) Set[T]
|
||||||
Remove(v T)
|
Remove(v T) Set[T]
|
||||||
Asc() Set[T]
|
Asc() Set[T]
|
||||||
Desc() Set[T]
|
Desc() Set[T]
|
||||||
Contains(T) bool
|
Contains(T) bool
|
||||||
@ -19,18 +17,16 @@ type Set[T comparable] interface {
|
|||||||
ToSlice() []T // 转换为切片
|
ToSlice() []T // 转换为切片
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultOptions = option{locker: locker.NewEmptyLocker()}
|
|
||||||
|
|
||||||
type option struct {
|
type option struct {
|
||||||
locker sync.Locker
|
locker locker.Locker
|
||||||
}
|
}
|
||||||
|
|
||||||
type setFunc func(option)
|
type setFunc func(*option)
|
||||||
|
|
||||||
func WithSync() setFunc {
|
func WithSync() setFunc {
|
||||||
|
|
||||||
return func(o option) {
|
return func(o *option) {
|
||||||
o.locker = &sync.RWMutex{}
|
o.locker.Synchronize()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,20 +14,23 @@ type sorted_set[T constraints.Ordered] struct {
|
|||||||
|
|
||||||
func NewSortedSet[T constraints.Ordered](t ...T) *sorted_set[T] {
|
func NewSortedSet[T constraints.Ordered](t ...T) *sorted_set[T] {
|
||||||
return &sorted_set[T]{
|
return &sorted_set[T]{
|
||||||
set: NewHashSet[T](),
|
sorted: t,
|
||||||
|
set: NewHashSet(t...),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sorted_set[T]) Add(values ...T) {
|
func (s *sorted_set[T]) Add(values ...T) Set[T] {
|
||||||
for _, v := range values {
|
for _, v := range values {
|
||||||
if !s.set.Contains(v) {
|
if !s.set.Contains(v) {
|
||||||
s.sorted = append(s.sorted, v)
|
s.sorted = append(s.sorted, v)
|
||||||
s.set.Add(v)
|
s.set.Add(v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sorted_set[T]) Remove(v T) {
|
func (s *sorted_set[T]) Remove(v T) Set[T] {
|
||||||
if s.set.Contains(v) {
|
if s.set.Contains(v) {
|
||||||
for index := range s.sorted {
|
for index := range s.sorted {
|
||||||
if s.sorted[index] == v {
|
if s.sorted[index] == v {
|
||||||
@ -38,6 +41,8 @@ func (s *sorted_set[T]) Remove(v T) {
|
|||||||
|
|
||||||
s.set.Remove(v)
|
s.set.Remove(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sorted_set[T]) Asc() Set[T] {
|
func (s *sorted_set[T]) Asc() Set[T] {
|
||||||
|
Reference in New Issue
Block a user