From b4ac1cc4497263d1034284be47963fb39de03fa2 Mon Sep 17 00:00:00 2001 From: charlie <3140647@qq.com> Date: Tue, 28 May 2024 04:23:30 +0800 Subject: [PATCH] locker --- locker/chan_source_locker.go | 9 +- locker/chan_source_locker_test.go | 19 ++++ locker/distributed_locker.go | 7 ++ locker/distributed_locker_test.go | 13 +++ locker/readme.md | 22 ++++ locker/redis/redis_locker.lua | 33 ++++++ locker/redis/redis_store.go | 164 ++++++++++++++++++++++++++++++ locker/redis/redis_store_test.go | 31 ++++++ locker/source_locker.go | 58 +++++++---- locker/source_locker_test.go | 28 ++++- locker/spin_locker_test.go | 6 +- locker/synchronizeable_test.go | 44 ++++++++ 12 files changed, 405 insertions(+), 29 deletions(-) create mode 100644 locker/chan_source_locker_test.go create mode 100644 locker/distributed_locker.go create mode 100644 locker/distributed_locker_test.go create mode 100644 locker/readme.md create mode 100644 locker/redis/redis_locker.lua create mode 100644 locker/redis/redis_store.go create mode 100644 locker/redis/redis_store_test.go create mode 100644 locker/synchronizeable_test.go diff --git a/locker/chan_source_locker.go b/locker/chan_source_locker.go index 68fe22e..fca4214 100644 --- a/locker/chan_source_locker.go +++ b/locker/chan_source_locker.go @@ -6,10 +6,17 @@ type ChanLocker interface { } type chanSourceLock struct { - m RWLocker + m rwLocker content map[string]chan int } +func NewChanSourceLocker() *chanSourceLock { + return &chanSourceLock{ + m: NewRWLocker(), + content: make(map[string]chan int), + } +} + func (s *chanSourceLock) Get(key string) (ch <-chan int, ok bool) { s.m.RLock() ch, ok = s.content[key] diff --git a/locker/chan_source_locker_test.go b/locker/chan_source_locker_test.go new file mode 100644 index 0000000..433f3c8 --- /dev/null +++ b/locker/chan_source_locker_test.go @@ -0,0 +1,19 @@ +package locker_test + +import ( + "testing" + + "github.com/charlienet/go-mixed/locker" +) + +func TestChanSourceLocker(t *testing.T) { + l := locker.NewChanSourceLocker() + c, ok := l.Get("aaaa") + if ok { + <-c + + println("ok") + } + + println("fail") +} diff --git a/locker/distributed_locker.go b/locker/distributed_locker.go new file mode 100644 index 0000000..6a982c4 --- /dev/null +++ b/locker/distributed_locker.go @@ -0,0 +1,7 @@ +package locker + +import "context" + +type DistributedLocker interface { + Unlock(context.Context, string) +} diff --git a/locker/distributed_locker_test.go b/locker/distributed_locker_test.go new file mode 100644 index 0000000..857eaf0 --- /dev/null +++ b/locker/distributed_locker_test.go @@ -0,0 +1,13 @@ +package locker_test + +import ( + "testing" + + "github.com/charlienet/go-mixed/redis" + "github.com/charlienet/go-mixed/tests" +) + +func TestRedisDistrbutedLocker(t *testing.T) { + tests.RunOnDefaultRedis(t, func(rdb redis.Client) { + }) +} diff --git a/locker/readme.md b/locker/readme.md new file mode 100644 index 0000000..0ed0b6a --- /dev/null +++ b/locker/readme.md @@ -0,0 +1,22 @@ +同步锁 + +EmptyLocker, 空锁 +RWLocker, 读写锁 +SpinLocker, 旋转锁 + + +锁可以添加一个外部存储成为分布式锁。WithRedis, WithZookeeper + +单例锁 + + +资源锁 + + +分布式锁 +在锁的基础上添加分布式存储升级为分布式锁 + +locker.WithRedis() +locker.WithZookeeper() + + diff --git a/locker/redis/redis_locker.lua b/locker/redis/redis_locker.lua new file mode 100644 index 0000000..3bac375 --- /dev/null +++ b/locker/redis/redis_locker.lua @@ -0,0 +1,33 @@ +#!lua name=charlie_locker + +-- 安装命令 +-- cat redis_locker.lua | redis-cli -x --cluster-only-masters --cluster call 192.168.123.30:6379 FUNCTION LOAD REPLACE + +local function lock(keys, args) + if redis.call("GET", keys[1]) == args[1] then + redis.call("SET", keys[1], args[1], "PX", args[2]) + return "OK" + else + return redis.call("SET", keys[1], args[1], "NX", "PX", args[2]) + end +end + +local function del(keys, args) + if redis.call("GET", keys[1]) == args[1] then + return redis.call("DEL", keys[1]) + else + return '0' + end +end + +local function expire(keys, args) + if redis.call('get', keys[1]) == args[1] then + return redis.call('expire', keys[1], args[2]) + else + return '0' + end +end + +redis.register_function('locker_lock',lock) +redis.register_function('locker_unlock',del) +redis.register_function('locker_expire',expire) diff --git a/locker/redis/redis_store.go b/locker/redis/redis_store.go new file mode 100644 index 0000000..d81eae4 --- /dev/null +++ b/locker/redis/redis_store.go @@ -0,0 +1,164 @@ +package redis + +import ( + "context" + _ "embed" + "maps" + "strings" + "sync" + "time" + + "github.com/charlienet/go-mixed/rand" + "github.com/charlienet/go-mixed/redis" + goredis "github.com/redis/go-redis/v9" +) + +//go:embed redis_locker.lua +var redis_locker_function string + +const ( + defaultExpire = time.Second * 20 + retryInterval = time.Millisecond * 10 +) + +var once sync.Once + +type redis_locker_store struct { + key string + sources map[string]string + expire time.Duration // 过期时间 + mu sync.RWMutex + clients []redis.Client +} + +func NewRedisStore(key string, clients ...redis.Client) *redis_locker_store { + once.Do(func() { redis.Clients(clients).LoadFunction(redis_locker_function) }) + + locker := &redis_locker_store{ + key: key, + sources: make(map[string]string), + clients: clients, + expire: defaultExpire, + } + + go locker.expandLockTime() + + return locker +} + +func (l *redis_locker_store) Lock(ctx context.Context, sourceName string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if l.TryLock(ctx, sourceName) { + return nil + } + } + + time.Sleep(retryInterval) + } +} + +func (l *redis_locker_store) TryLock(ctx context.Context, sourceName string) bool { + value := l.getSourceValue(sourceName) + + results := l.fCall(ctx, "locker_lock", sourceName, value, l.expire.Milliseconds()) + + if !isSuccess(results) { + for _, r := range results { + if r.Err() != nil { + println("err:", r.Err().Error()) + } + } + + l.Unlock(ctx, sourceName) + return false + } + + return true +} + +func (locker *redis_locker_store) Unlock(ctx context.Context, sourceName string) { + value := locker.getSourceValue(sourceName) + locker.fCall(ctx, "locker_unlock", sourceName, value) + + locker.mu.Lock() + defer locker.mu.Unlock() + + delete(locker.sources, sourceName) +} + +func (l *redis_locker_store) expandLockTime() { + for { + time.Sleep(l.expire / 3) + + if len(l.sources) == 0 { + continue + } + + l.mu.RLock() + cloned := maps.Clone(l.sources) + l.mu.RUnlock() + + for k, v := range cloned { + results := l.fCall(context.Background(), "locker_expire", k, v, l.expire.Seconds()) + for _, r := range results { + if r.Err() != nil { + println("键延期失败:", r.Err().Error()) + } + } + } + } +} + +func (l *redis_locker_store) getSourceValue(name string) string { + l.mu.Lock() + defer l.mu.Unlock() + + if v, ok := l.sources[name]; ok { + return v + } + + v := rand.Hex.Generate(36) + l.sources[name] = v + return v +} + +func (locker *redis_locker_store) fCall(ctx context.Context, cmd string, key string, args ...any) []*goredis.Cmd { + results := make([]*goredis.Cmd, 0, len(locker.clients)) + + var wg sync.WaitGroup + wg.Add(len(locker.clients)) + for _, rdb := range locker.clients { + go func(rdb redis.Client) { + defer wg.Done() + + newKey := rdb.JoinKeys(locker.key, key) + results = append(results, rdb.FCall(ctx, cmd, []string{newKey}, args...)) + }(rdb) + } + + wg.Wait() + + return results +} + +func isSuccess(results []*goredis.Cmd) bool { + successCount := 0 + for _, ret := range results { + resp, err := ret.Result() + + if err != nil || resp == nil { + return false + } + + reply, ok := resp.(string) + if ok && strings.EqualFold(reply, "OK") { + successCount++ + } + } + + return successCount >= len(results)/2+1 +} diff --git a/locker/redis/redis_store_test.go b/locker/redis/redis_store_test.go new file mode 100644 index 0000000..31e4674 --- /dev/null +++ b/locker/redis/redis_store_test.go @@ -0,0 +1,31 @@ +package redis + +import ( + "context" + "testing" + "time" + + "github.com/charlienet/go-mixed/redis" + "github.com/charlienet/go-mixed/tests" +) + +func TestCreateRedisStore(t *testing.T) { + tests.RunOnDefaultRedis(t, func(rdb redis.Client) { + keyName := "source" + + l := NewRedisStore("locker_key", rdb) + ret := l.TryLock(context.Background(), keyName) + if !ret { + t.Log("加锁失败") + } + + l.Lock(context.Background(), keyName) + t.Log("锁重入完成") + + l.Unlock(context.Background(), keyName) + + time.Sleep(time.Second * 15) + + // l.Unlock(context.Background()) + }) +} diff --git a/locker/source_locker.go b/locker/source_locker.go index f743cf4..1f6625b 100644 --- a/locker/source_locker.go +++ b/locker/source_locker.go @@ -1,27 +1,45 @@ package locker import ( + "context" "fmt" "sync/atomic" + + redis_store "github.com/charlienet/go-mixed/locker/redis" + "github.com/charlienet/go-mixed/redis" ) // 带计数器锁 type countLocker struct { - Locker + rw rwLocker Count int32 } // SourceLocker 资源锁 type SourceLocker struct { - m RWLocker - locks map[string]*countLocker + m RWLocker + distributedLocker DistributedLocker + locks map[string]*countLocker + err error } func NewSourceLocker() *SourceLocker { - return &SourceLocker{ - m: NewRWLocker(), + l := &SourceLocker{ locks: make(map[string]*countLocker), } + + l.m.Synchronize() + return l +} + +func (s *SourceLocker) WithRedis(key string, clients ...redis.Client) *SourceLocker { + redisStore := redis_store.NewRedisStore(key, clients...) + return s.WithDistributedLocker(redisStore) +} + +func (s *SourceLocker) WithDistributedLocker(distributed DistributedLocker) *SourceLocker { + s.distributedLocker = distributed + return s } func (s *SourceLocker) Lock(key string) { @@ -31,7 +49,7 @@ func (s *SourceLocker) Lock(key string) { if ok { atomic.AddInt32(&l.Count, 1) - l.Lock() + l.rw.Lock() fmt.Println("加锁") } else { @@ -40,18 +58,15 @@ func (s *SourceLocker) Lock(key string) { if l2, ok := s.locks[key]; ok { s.m.Unlock() - l2.Lock() + l2.rw.Lock() fmt.Println("二次检查加锁") } else { - n := NewLocker() - s.locks[key] = &countLocker{Locker: n, Count: 1} + n := NewRWLocker() + s.locks[key] = &countLocker{rw: n, Count: 1} s.m.Unlock() - fmt.Printf("新锁准备加锁:%p\n", n) n.Lock() - - fmt.Println("初始加锁") } } } @@ -61,8 +76,11 @@ func (s *SourceLocker) Unlock(key string) { if l, ok := s.locks[key]; ok { atomic.AddInt32(&l.Count, -1) - fmt.Printf("解锁%p\n", l) - l.Unlock() + l.rw.Unlock() + + if s.distributedLocker != nil { + s.distributedLocker.Unlock(context.Background(), key) + } if l.Count == 0 { delete(s.locks, key) @@ -75,18 +93,14 @@ func (s *SourceLocker) TryLock(key string) bool { // 加读锁 s.m.RLock() l, ok := s.locks[key] - + s.m.RUnlock() if ok { - ret := l.TryLock() - s.m.RUnlock() - + ret := l.rw.TryLock() return ret } else { - s.m.RUnlock() - s.m.Lock() - n := NewLocker() - s.locks[key] = &countLocker{Locker: n, Count: 1} + n := NewRWLocker() + s.locks[key] = &countLocker{rw: n, Count: 1} s.m.Unlock() return n.TryLock() diff --git a/locker/source_locker_test.go b/locker/source_locker_test.go index dd080a0..53b2a52 100644 --- a/locker/source_locker_test.go +++ b/locker/source_locker_test.go @@ -1,19 +1,39 @@ -package locker +package locker_test import ( "sync" "testing" "time" + + "github.com/charlienet/go-mixed/locker" + "github.com/stretchr/testify/assert" ) var sourcekey = "u-0001" func TestTryLock(t *testing.T) { + l := locker.NewSourceLocker() + l.Lock("aa") + assert.False(t, l.TryLock("aa")) + assert.True(t, l.TryLock("bb")) + + defer l.Unlock("aa") +} + +func TestM(t *testing.T) { + l := locker.NewSourceLocker() + + for i := 0; i < 10000000; i++ { + l.Lock("aaa") + l.Unlock("aaa") + } + + t.Logf("%+v", l) } func TestSourceLocker(t *testing.T) { - l := NewSourceLocker() + l := locker.NewSourceLocker() c := 5 n := 0 @@ -41,7 +61,7 @@ func TestSourceTryLock(t *testing.T) { wg := new(sync.WaitGroup) wg.Add(c) - l := NewSourceLocker() + l := locker.NewSourceLocker() for i := 0; i < c; i++ { go func() { @@ -61,7 +81,7 @@ func TestSourceTryLock(t *testing.T) { } func BenchmarkSourceLocker(b *testing.B) { - l := NewSourceLocker() + l := locker.NewSourceLocker() b.RunParallel(func(p *testing.PB) { for p.Next() { diff --git a/locker/spin_locker_test.go b/locker/spin_locker_test.go index 229d615..ad4f190 100644 --- a/locker/spin_locker_test.go +++ b/locker/spin_locker_test.go @@ -1,12 +1,14 @@ -package locker +package locker_test import ( "sync" "testing" + + "github.com/charlienet/go-mixed/locker" ) func TestSpinLock(t *testing.T) { - l := NewSpinLocker() + l := locker.NewSpinLocker() n := 10 c := 0 diff --git a/locker/synchronizeable_test.go b/locker/synchronizeable_test.go new file mode 100644 index 0000000..dec8350 --- /dev/null +++ b/locker/synchronizeable_test.go @@ -0,0 +1,44 @@ +package locker_test + +import ( + "testing" + + "github.com/charlienet/go-mixed/locker" +) + +func TestLocker(t *testing.T) { + + var l locker.Locker + + l.Synchronize() + + l.Lock() + defer l.Unlock() +} + +func TestNew(t *testing.T) { + var a locker.RWLocker + a.Synchronize() + +} + +func TestSpinLocker(t *testing.T) { + var l locker.SpinLocker + l.Synchronize() + + l.Lock() + defer l.Unlock() +} + +func TestRWLocker(t *testing.T) { + var l locker.RWLocker + l.Lock() +} + +func TestPointLocker(t *testing.T) { + l := locker.NewLocker() + l.Lock() + l.Lock() + + defer l.Unlock() +}