1
0
mirror of https://github.com/charlienet/go-mixed.git synced 2025-07-17 16:12:42 +08:00
This commit is contained in:
2024-05-28 04:23:30 +08:00
parent 1abde30d8f
commit b4ac1cc449
12 changed files with 405 additions and 29 deletions

View File

@ -6,10 +6,17 @@ type ChanLocker interface {
} }
type chanSourceLock struct { type chanSourceLock struct {
m RWLocker m rwLocker
content map[string]chan int content map[string]chan int
} }
func NewChanSourceLocker() *chanSourceLock {
return &chanSourceLock{
m: NewRWLocker(),
content: make(map[string]chan int),
}
}
func (s *chanSourceLock) Get(key string) (ch <-chan int, ok bool) { func (s *chanSourceLock) Get(key string) (ch <-chan int, ok bool) {
s.m.RLock() s.m.RLock()
ch, ok = s.content[key] ch, ok = s.content[key]

View File

@ -0,0 +1,19 @@
package locker_test
import (
"testing"
"github.com/charlienet/go-mixed/locker"
)
func TestChanSourceLocker(t *testing.T) {
l := locker.NewChanSourceLocker()
c, ok := l.Get("aaaa")
if ok {
<-c
println("ok")
}
println("fail")
}

View File

@ -0,0 +1,7 @@
package locker
import "context"
type DistributedLocker interface {
Unlock(context.Context, string)
}

View File

@ -0,0 +1,13 @@
package locker_test
import (
"testing"
"github.com/charlienet/go-mixed/redis"
"github.com/charlienet/go-mixed/tests"
)
func TestRedisDistrbutedLocker(t *testing.T) {
tests.RunOnDefaultRedis(t, func(rdb redis.Client) {
})
}

22
locker/readme.md Normal file
View File

@ -0,0 +1,22 @@
同步锁
EmptyLocker 空锁
RWLocker, 读写锁
SpinLocker, 旋转锁
锁可以添加一个外部存储成为分布式锁。WithRedis, WithZookeeper
单例锁
资源锁
分布式锁
在锁的基础上添加分布式存储升级为分布式锁
locker.WithRedis()
locker.WithZookeeper()

View File

@ -0,0 +1,33 @@
#!lua name=charlie_locker
-- 安装命令
-- cat redis_locker.lua | redis-cli -x --cluster-only-masters --cluster call 192.168.123.30:6379 FUNCTION LOAD REPLACE
local function lock(keys, args)
if redis.call("GET", keys[1]) == args[1] then
redis.call("SET", keys[1], args[1], "PX", args[2])
return "OK"
else
return redis.call("SET", keys[1], args[1], "NX", "PX", args[2])
end
end
local function del(keys, args)
if redis.call("GET", keys[1]) == args[1] then
return redis.call("DEL", keys[1])
else
return '0'
end
end
local function expire(keys, args)
if redis.call('get', keys[1]) == args[1] then
return redis.call('expire', keys[1], args[2])
else
return '0'
end
end
redis.register_function('locker_lock',lock)
redis.register_function('locker_unlock',del)
redis.register_function('locker_expire',expire)

164
locker/redis/redis_store.go Normal file
View File

@ -0,0 +1,164 @@
package redis
import (
"context"
_ "embed"
"maps"
"strings"
"sync"
"time"
"github.com/charlienet/go-mixed/rand"
"github.com/charlienet/go-mixed/redis"
goredis "github.com/redis/go-redis/v9"
)
//go:embed redis_locker.lua
var redis_locker_function string
const (
defaultExpire = time.Second * 20
retryInterval = time.Millisecond * 10
)
var once sync.Once
type redis_locker_store struct {
key string
sources map[string]string
expire time.Duration // 过期时间
mu sync.RWMutex
clients []redis.Client
}
func NewRedisStore(key string, clients ...redis.Client) *redis_locker_store {
once.Do(func() { redis.Clients(clients).LoadFunction(redis_locker_function) })
locker := &redis_locker_store{
key: key,
sources: make(map[string]string),
clients: clients,
expire: defaultExpire,
}
go locker.expandLockTime()
return locker
}
func (l *redis_locker_store) Lock(ctx context.Context, sourceName string) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if l.TryLock(ctx, sourceName) {
return nil
}
}
time.Sleep(retryInterval)
}
}
func (l *redis_locker_store) TryLock(ctx context.Context, sourceName string) bool {
value := l.getSourceValue(sourceName)
results := l.fCall(ctx, "locker_lock", sourceName, value, l.expire.Milliseconds())
if !isSuccess(results) {
for _, r := range results {
if r.Err() != nil {
println("err:", r.Err().Error())
}
}
l.Unlock(ctx, sourceName)
return false
}
return true
}
func (locker *redis_locker_store) Unlock(ctx context.Context, sourceName string) {
value := locker.getSourceValue(sourceName)
locker.fCall(ctx, "locker_unlock", sourceName, value)
locker.mu.Lock()
defer locker.mu.Unlock()
delete(locker.sources, sourceName)
}
func (l *redis_locker_store) expandLockTime() {
for {
time.Sleep(l.expire / 3)
if len(l.sources) == 0 {
continue
}
l.mu.RLock()
cloned := maps.Clone(l.sources)
l.mu.RUnlock()
for k, v := range cloned {
results := l.fCall(context.Background(), "locker_expire", k, v, l.expire.Seconds())
for _, r := range results {
if r.Err() != nil {
println("键延期失败:", r.Err().Error())
}
}
}
}
}
func (l *redis_locker_store) getSourceValue(name string) string {
l.mu.Lock()
defer l.mu.Unlock()
if v, ok := l.sources[name]; ok {
return v
}
v := rand.Hex.Generate(36)
l.sources[name] = v
return v
}
func (locker *redis_locker_store) fCall(ctx context.Context, cmd string, key string, args ...any) []*goredis.Cmd {
results := make([]*goredis.Cmd, 0, len(locker.clients))
var wg sync.WaitGroup
wg.Add(len(locker.clients))
for _, rdb := range locker.clients {
go func(rdb redis.Client) {
defer wg.Done()
newKey := rdb.JoinKeys(locker.key, key)
results = append(results, rdb.FCall(ctx, cmd, []string{newKey}, args...))
}(rdb)
}
wg.Wait()
return results
}
func isSuccess(results []*goredis.Cmd) bool {
successCount := 0
for _, ret := range results {
resp, err := ret.Result()
if err != nil || resp == nil {
return false
}
reply, ok := resp.(string)
if ok && strings.EqualFold(reply, "OK") {
successCount++
}
}
return successCount >= len(results)/2+1
}

View File

@ -0,0 +1,31 @@
package redis
import (
"context"
"testing"
"time"
"github.com/charlienet/go-mixed/redis"
"github.com/charlienet/go-mixed/tests"
)
func TestCreateRedisStore(t *testing.T) {
tests.RunOnDefaultRedis(t, func(rdb redis.Client) {
keyName := "source"
l := NewRedisStore("locker_key", rdb)
ret := l.TryLock(context.Background(), keyName)
if !ret {
t.Log("加锁失败")
}
l.Lock(context.Background(), keyName)
t.Log("锁重入完成")
l.Unlock(context.Background(), keyName)
time.Sleep(time.Second * 15)
// l.Unlock(context.Background())
})
}

View File

@ -1,27 +1,45 @@
package locker package locker
import ( import (
"context"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
redis_store "github.com/charlienet/go-mixed/locker/redis"
"github.com/charlienet/go-mixed/redis"
) )
// 带计数器锁 // 带计数器锁
type countLocker struct { type countLocker struct {
Locker rw rwLocker
Count int32 Count int32
} }
// SourceLocker 资源锁 // SourceLocker 资源锁
type SourceLocker struct { type SourceLocker struct {
m RWLocker m RWLocker
locks map[string]*countLocker distributedLocker DistributedLocker
locks map[string]*countLocker
err error
} }
func NewSourceLocker() *SourceLocker { func NewSourceLocker() *SourceLocker {
return &SourceLocker{ l := &SourceLocker{
m: NewRWLocker(),
locks: make(map[string]*countLocker), locks: make(map[string]*countLocker),
} }
l.m.Synchronize()
return l
}
func (s *SourceLocker) WithRedis(key string, clients ...redis.Client) *SourceLocker {
redisStore := redis_store.NewRedisStore(key, clients...)
return s.WithDistributedLocker(redisStore)
}
func (s *SourceLocker) WithDistributedLocker(distributed DistributedLocker) *SourceLocker {
s.distributedLocker = distributed
return s
} }
func (s *SourceLocker) Lock(key string) { func (s *SourceLocker) Lock(key string) {
@ -31,7 +49,7 @@ func (s *SourceLocker) Lock(key string) {
if ok { if ok {
atomic.AddInt32(&l.Count, 1) atomic.AddInt32(&l.Count, 1)
l.Lock() l.rw.Lock()
fmt.Println("加锁") fmt.Println("加锁")
} else { } else {
@ -40,18 +58,15 @@ func (s *SourceLocker) Lock(key string) {
if l2, ok := s.locks[key]; ok { if l2, ok := s.locks[key]; ok {
s.m.Unlock() s.m.Unlock()
l2.Lock() l2.rw.Lock()
fmt.Println("二次检查加锁") fmt.Println("二次检查加锁")
} else { } else {
n := NewLocker() n := NewRWLocker()
s.locks[key] = &countLocker{Locker: n, Count: 1} s.locks[key] = &countLocker{rw: n, Count: 1}
s.m.Unlock() s.m.Unlock()
fmt.Printf("新锁准备加锁:%p\n", n)
n.Lock() n.Lock()
fmt.Println("初始加锁")
} }
} }
} }
@ -61,8 +76,11 @@ func (s *SourceLocker) Unlock(key string) {
if l, ok := s.locks[key]; ok { if l, ok := s.locks[key]; ok {
atomic.AddInt32(&l.Count, -1) atomic.AddInt32(&l.Count, -1)
fmt.Printf("解锁%p\n", l) l.rw.Unlock()
l.Unlock()
if s.distributedLocker != nil {
s.distributedLocker.Unlock(context.Background(), key)
}
if l.Count == 0 { if l.Count == 0 {
delete(s.locks, key) delete(s.locks, key)
@ -75,18 +93,14 @@ func (s *SourceLocker) TryLock(key string) bool {
// 加读锁 // 加读锁
s.m.RLock() s.m.RLock()
l, ok := s.locks[key] l, ok := s.locks[key]
s.m.RUnlock()
if ok { if ok {
ret := l.TryLock() ret := l.rw.TryLock()
s.m.RUnlock()
return ret return ret
} else { } else {
s.m.RUnlock()
s.m.Lock() s.m.Lock()
n := NewLocker() n := NewRWLocker()
s.locks[key] = &countLocker{Locker: n, Count: 1} s.locks[key] = &countLocker{rw: n, Count: 1}
s.m.Unlock() s.m.Unlock()
return n.TryLock() return n.TryLock()

View File

@ -1,19 +1,39 @@
package locker package locker_test
import ( import (
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/charlienet/go-mixed/locker"
"github.com/stretchr/testify/assert"
) )
var sourcekey = "u-0001" var sourcekey = "u-0001"
func TestTryLock(t *testing.T) { func TestTryLock(t *testing.T) {
l := locker.NewSourceLocker()
l.Lock("aa")
assert.False(t, l.TryLock("aa"))
assert.True(t, l.TryLock("bb"))
defer l.Unlock("aa")
}
func TestM(t *testing.T) {
l := locker.NewSourceLocker()
for i := 0; i < 10000000; i++ {
l.Lock("aaa")
l.Unlock("aaa")
}
t.Logf("%+v", l)
} }
func TestSourceLocker(t *testing.T) { func TestSourceLocker(t *testing.T) {
l := NewSourceLocker() l := locker.NewSourceLocker()
c := 5 c := 5
n := 0 n := 0
@ -41,7 +61,7 @@ func TestSourceTryLock(t *testing.T) {
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
wg.Add(c) wg.Add(c)
l := NewSourceLocker() l := locker.NewSourceLocker()
for i := 0; i < c; i++ { for i := 0; i < c; i++ {
go func() { go func() {
@ -61,7 +81,7 @@ func TestSourceTryLock(t *testing.T) {
} }
func BenchmarkSourceLocker(b *testing.B) { func BenchmarkSourceLocker(b *testing.B) {
l := NewSourceLocker() l := locker.NewSourceLocker()
b.RunParallel(func(p *testing.PB) { b.RunParallel(func(p *testing.PB) {
for p.Next() { for p.Next() {

View File

@ -1,12 +1,14 @@
package locker package locker_test
import ( import (
"sync" "sync"
"testing" "testing"
"github.com/charlienet/go-mixed/locker"
) )
func TestSpinLock(t *testing.T) { func TestSpinLock(t *testing.T) {
l := NewSpinLocker() l := locker.NewSpinLocker()
n := 10 n := 10
c := 0 c := 0

View File

@ -0,0 +1,44 @@
package locker_test
import (
"testing"
"github.com/charlienet/go-mixed/locker"
)
func TestLocker(t *testing.T) {
var l locker.Locker
l.Synchronize()
l.Lock()
defer l.Unlock()
}
func TestNew(t *testing.T) {
var a locker.RWLocker
a.Synchronize()
}
func TestSpinLocker(t *testing.T) {
var l locker.SpinLocker
l.Synchronize()
l.Lock()
defer l.Unlock()
}
func TestRWLocker(t *testing.T) {
var l locker.RWLocker
l.Lock()
}
func TestPointLocker(t *testing.T) {
l := locker.NewLocker()
l.Lock()
l.Lock()
defer l.Unlock()
}