mirror of
https://github.com/charlienet/go-mixed.git
synced 2025-07-18 08:32:40 +08:00
Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
9d12e7fedb | |||
b0ff4d6fd5 | |||
38f7cc75c9 |
@ -52,3 +52,8 @@ redis.ParseURL
|
||||
defer gClient.Close()
|
||||
|
||||
```
|
||||
|
||||
```
|
||||
VersionConstraint(">=7.0")
|
||||
|
||||
```
|
172
redis/redis.go
172
redis/redis.go
@ -2,17 +2,16 @@ package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/charlienet/go-mixed/expr"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultSeparator = ":"
|
||||
|
||||
blockingQueryTimeout = 5 * time.Second
|
||||
readWriteTimeout = 2 * time.Second
|
||||
defaultSlowThreshold = "5000" // 慢查询(单位微秒)
|
||||
@ -63,68 +62,121 @@ func (clients Clients) LoadFunction(code string) {
|
||||
|
||||
type Client interface {
|
||||
redis.UniversalClient
|
||||
LoadFunction(f string) // 加载函数脚本
|
||||
Prefix() string // 统一前缀
|
||||
Separator() string // 分隔符
|
||||
JoinKeys(keys ...string) string // 连接KEY
|
||||
FormatKeys(keys ...string) []string // 格式化KEY
|
||||
LoadFunction(f string) // 加载函数脚本
|
||||
Prefix() string // 统一前缀
|
||||
Separator() string // 分隔符
|
||||
AddPrefix(prefix ...string) redisClient // 添加前缀
|
||||
JoinKeys(keys ...string) string // 连接KEY
|
||||
FormatKeys(keys ...string) []string // 格式化KEY
|
||||
ServerVersion() string
|
||||
}
|
||||
|
||||
type redisClient struct {
|
||||
redis.UniversalClient
|
||||
prefix string
|
||||
separator string
|
||||
prefix redisPrefix
|
||||
conf *redis.UniversalOptions
|
||||
}
|
||||
|
||||
func New(opt *RedisOption) redisClient {
|
||||
var rdb redisClient
|
||||
type constraintFunc func(redisClient) error
|
||||
|
||||
func Ping() constraintFunc {
|
||||
return func(rc redisClient) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
return rc.Ping(ctx).Err()
|
||||
}
|
||||
}
|
||||
|
||||
func VersionConstraint(expended string) constraintFunc {
|
||||
return func(rc redisClient) error {
|
||||
v := rc.ServerVersion()
|
||||
if len(v) == 0 {
|
||||
return errors.New("version not obtained")
|
||||
}
|
||||
current, err := version.NewVersion(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
constraint, err := version.NewConstraint(expended)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !constraint.Check(current) {
|
||||
return fmt.Errorf("the desired version is %v, which does not match the expected version %v", current, expended)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func New(opt *RedisOption, constraints ...constraintFunc) redisClient {
|
||||
if len(opt.Addrs) == 0 && len(opt.Addr) > 0 {
|
||||
opt.Addrs = []string{opt.Addr}
|
||||
}
|
||||
|
||||
separator := expr.Ternary(len(opt.Separator) == 0, defaultSeparator, opt.Separator)
|
||||
prefix := expr.Ternary(len(opt.Prefix) > 0, fmt.Sprintf("%s%s", opt.Prefix, separator), "")
|
||||
conf := &redis.UniversalOptions{
|
||||
Addrs: opt.Addrs,
|
||||
Password: opt.Password,
|
||||
|
||||
rdb = redisClient{
|
||||
prefix: prefix,
|
||||
separator: separator,
|
||||
UniversalClient: redis.NewUniversalClient(&redis.UniversalOptions{
|
||||
Addrs: opt.Addrs,
|
||||
Password: opt.Password,
|
||||
DB: opt.DB,
|
||||
|
||||
DB: opt.DB,
|
||||
MaxRetries: opt.MaxRetries,
|
||||
MinRetryBackoff: opt.MinRetryBackoff,
|
||||
MaxRetryBackoff: opt.MaxRetryBackoff,
|
||||
|
||||
MaxRetries: opt.MaxRetries,
|
||||
MinRetryBackoff: opt.MinRetryBackoff,
|
||||
MaxRetryBackoff: opt.MaxRetryBackoff,
|
||||
DialTimeout: opt.DialTimeout,
|
||||
ReadTimeout: opt.ReadTimeout,
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
|
||||
|
||||
DialTimeout: opt.DialTimeout,
|
||||
ReadTimeout: opt.ReadTimeout,
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
|
||||
PoolSize: opt.PoolSize,
|
||||
PoolTimeout: opt.PoolTimeout,
|
||||
MinIdleConns: opt.MinIdleConns,
|
||||
MaxIdleConns: opt.MaxIdleConns,
|
||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||
}
|
||||
|
||||
PoolSize: opt.PoolSize,
|
||||
PoolTimeout: opt.PoolTimeout,
|
||||
MinIdleConns: opt.MinIdleConns,
|
||||
MaxIdleConns: opt.MaxIdleConns,
|
||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||
})}
|
||||
rdb := new(conf, newPrefix(opt.Separator, opt.Prefix))
|
||||
|
||||
rdb.ConfigSet(context.Background(), "slowlog-log-slower-than", defaultSlowThreshold)
|
||||
if len(constraints) > 0 {
|
||||
for _, f := range constraints {
|
||||
if err := f(rdb); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(opt.Prefix) > 0 {
|
||||
rdb.AddHook(renameKey{
|
||||
prefix: prefix,
|
||||
})
|
||||
return rdb
|
||||
}
|
||||
|
||||
func NewEnforceConstraints(opt *RedisOption, constraints ...constraintFunc) redisClient {
|
||||
rdb := New(opt)
|
||||
for _, f := range constraints {
|
||||
if err := f(rdb); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return rdb
|
||||
}
|
||||
|
||||
func (rdb redisClient) Prefix() string {
|
||||
return rdb.prefix
|
||||
return rdb.prefix.Prefix()
|
||||
}
|
||||
|
||||
func (rdb redisClient) Separator() string {
|
||||
return rdb.prefix.Separator()
|
||||
}
|
||||
|
||||
func (rdb redisClient) AddPrefix(prefixes ...string) redisClient {
|
||||
old := rdb.prefix
|
||||
p := newPrefix(old.separator, old.join(prefixes...))
|
||||
|
||||
return new(rdb.conf, p)
|
||||
}
|
||||
|
||||
func (rdb redisClient) LoadFunction(code string) {
|
||||
@ -134,23 +186,47 @@ func (rdb redisClient) LoadFunction(code string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (rdb redisClient) Separator() string {
|
||||
return rdb.separator
|
||||
}
|
||||
|
||||
func (rdb redisClient) JoinKeys(keys ...string) string {
|
||||
return strings.Join(keys, rdb.separator)
|
||||
return rdb.prefix.join(keys...)
|
||||
}
|
||||
|
||||
func (rdb redisClient) FormatKeys(keys ...string) []string {
|
||||
if len(rdb.prefix) == 0 {
|
||||
if !rdb.prefix.hasPrefix() {
|
||||
return keys
|
||||
}
|
||||
|
||||
re := make([]string, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
re = append(re, fmt.Sprintf("%s%s", rdb.prefix, k))
|
||||
re = append(re, fmt.Sprintf("%s%s", rdb.prefix.Prefix(), k))
|
||||
}
|
||||
|
||||
return re
|
||||
}
|
||||
|
||||
func (rdb redisClient) ServerVersion() string {
|
||||
info, err := rdb.Info(context.Background(), "server").Result()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
for _, line := range strings.Split(info, "\r\n") {
|
||||
after, found := strings.CutPrefix(line, "redis_version:")
|
||||
if found {
|
||||
return after
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func new(conf *redis.UniversalOptions, prefix redisPrefix) redisClient {
|
||||
c := redis.NewUniversalClient(conf)
|
||||
c.ConfigSet(context.Background(), "slowlog-log-slower-than", defaultSlowThreshold)
|
||||
c.AddHook(renameHook{prefix: prefix})
|
||||
|
||||
return redisClient{
|
||||
UniversalClient: c,
|
||||
prefix: prefix,
|
||||
conf: conf,
|
||||
}
|
||||
}
|
||||
|
45
redis/redis_prefix.go
Normal file
45
redis/redis_prefix.go
Normal file
@ -0,0 +1,45 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/charlienet/go-mixed/expr"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultSeparator = ":"
|
||||
)
|
||||
|
||||
type redisPrefix struct {
|
||||
prefix string
|
||||
separator string
|
||||
}
|
||||
|
||||
func newPrefix(separator string, prefix ...string) redisPrefix {
|
||||
s := expr.Ternary(len(separator) == 0, defaultSeparator, separator)
|
||||
|
||||
return redisPrefix{
|
||||
separator: s,
|
||||
prefix: expr.Ternary(len(prefix) > 0, strings.Join(prefix, separator), ""),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *redisPrefix) Prefix() string {
|
||||
return p.prefix
|
||||
}
|
||||
|
||||
func (p *redisPrefix) Separator() string {
|
||||
return p.separator
|
||||
}
|
||||
|
||||
func (p *redisPrefix) hasPrefix() bool {
|
||||
return len(p.prefix) > 0
|
||||
}
|
||||
|
||||
func (p *redisPrefix) join(key ...string) string {
|
||||
s := make([]string, 0, len(key)+1)
|
||||
s = append(s, p.prefix)
|
||||
s = append(s, key...)
|
||||
|
||||
return strings.Join(s, p.separator)
|
||||
}
|
@ -8,18 +8,17 @@ import (
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type renameKey struct {
|
||||
prefix string
|
||||
separator string
|
||||
type renameHook struct {
|
||||
prefix redisPrefix
|
||||
}
|
||||
|
||||
func (r renameKey) DialHook(next redis.DialHook) redis.DialHook {
|
||||
func (r renameHook) DialHook(next redis.DialHook) redis.DialHook {
|
||||
return func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return next(ctx, network, addr)
|
||||
}
|
||||
}
|
||||
|
||||
func (r renameKey) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
||||
func (r renameHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
||||
return func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
|
||||
// 对多个KEY进行更名操作
|
||||
@ -31,15 +30,15 @@ func (r renameKey) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.Pro
|
||||
}
|
||||
}
|
||||
|
||||
func (r renameKey) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
||||
func (r renameHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
||||
return func(ctx context.Context, cmd redis.Cmder) error {
|
||||
r.renameKey(cmd)
|
||||
return next(ctx, cmd)
|
||||
}
|
||||
}
|
||||
|
||||
func (r renameKey) renameKey(cmd redis.Cmder) {
|
||||
if len(r.prefix) == 0 {
|
||||
func (r renameHook) renameKey(cmd redis.Cmder) {
|
||||
if !r.prefix.hasPrefix() {
|
||||
return
|
||||
}
|
||||
|
||||
@ -49,7 +48,7 @@ func (r renameKey) renameKey(cmd redis.Cmder) {
|
||||
}
|
||||
|
||||
switch strings.ToUpper(cmd.Name()) {
|
||||
case "SELECT", "FUNCTION":
|
||||
case "SELECT", "FUNCTION", "INFO":
|
||||
// 无KEY指令
|
||||
case
|
||||
"RENAME", "RENAMENX",
|
||||
@ -80,15 +79,11 @@ func (r renameKey) renameKey(cmd redis.Cmder) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r renameKey) rename(args []any, indexes ...int) {
|
||||
func (r renameHook) rename(args []any, indexes ...int) {
|
||||
for _, i := range indexes {
|
||||
if key, ok := args[i].(string); ok {
|
||||
var builder strings.Builder
|
||||
builder.WriteString(r.prefix)
|
||||
builder.WriteString(r.separator)
|
||||
builder.WriteString(key)
|
||||
|
||||
args[i] = builder.String()
|
||||
newKey := r.prefix.join(key)
|
||||
args[i] = newKey
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,14 +13,13 @@ import (
|
||||
var _ Set[string] = &hash_set[string]{}
|
||||
|
||||
type hash_set[T constraints.Ordered] struct {
|
||||
m map[T]struct{}
|
||||
lock locker.RWLocker
|
||||
m map[T]struct{}
|
||||
locker locker.RWLocker
|
||||
}
|
||||
|
||||
func NewHashSet[T constraints.Ordered](values ...T) *hash_set[T] {
|
||||
set := hash_set[T]{
|
||||
m: make(map[T]struct{}, len(values)),
|
||||
lock: locker.EmptyLocker,
|
||||
m: make(map[T]struct{}, len(values)),
|
||||
}
|
||||
|
||||
set.Add(values...)
|
||||
@ -28,37 +27,41 @@ func NewHashSet[T constraints.Ordered](values ...T) *hash_set[T] {
|
||||
}
|
||||
|
||||
func (s *hash_set[T]) Sync() *hash_set[T] {
|
||||
s.lock = locker.NewRWLocker()
|
||||
s.locker.Synchronize()
|
||||
return s
|
||||
}
|
||||
|
||||
func (s hash_set[T]) Add(values ...T) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
func (s *hash_set[T]) Add(values ...T) Set[T] {
|
||||
s.locker.Lock()
|
||||
defer s.locker.Unlock()
|
||||
|
||||
for _, v := range values {
|
||||
s.m[v] = struct{}{}
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s hash_set[T]) Remove(v T) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
func (s *hash_set[T]) Remove(v T) Set[T] {
|
||||
s.locker.Lock()
|
||||
defer s.locker.Unlock()
|
||||
|
||||
delete(s.m, v)
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s hash_set[T]) Contains(value T) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
func (s *hash_set[T]) Contains(value T) bool {
|
||||
s.locker.RLock()
|
||||
defer s.locker.RUnlock()
|
||||
|
||||
_, ok := s.m[value]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (s hash_set[T]) ContainsAny(values ...T) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
func (s *hash_set[T]) ContainsAny(values ...T) bool {
|
||||
s.locker.RLock()
|
||||
defer s.locker.RUnlock()
|
||||
|
||||
for _, v := range values {
|
||||
if _, ok := s.m[v]; ok {
|
||||
@ -69,9 +72,9 @@ func (s hash_set[T]) ContainsAny(values ...T) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (s hash_set[T]) ContainsAll(values ...T) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
func (s *hash_set[T]) ContainsAll(values ...T) bool {
|
||||
s.locker.RLock()
|
||||
defer s.locker.RUnlock()
|
||||
|
||||
for _, v := range values {
|
||||
if _, ok := s.m[v]; !ok {
|
||||
@ -82,15 +85,15 @@ func (s hash_set[T]) ContainsAll(values ...T) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (s hash_set[T]) Asc() Set[T] {
|
||||
func (s *hash_set[T]) Asc() Set[T] {
|
||||
return s.copyToSorted().Asc()
|
||||
}
|
||||
|
||||
func (s hash_set[T]) Desc() Set[T] {
|
||||
func (s *hash_set[T]) Desc() Set[T] {
|
||||
return s.copyToSorted().Desc()
|
||||
}
|
||||
|
||||
func (s hash_set[T]) copyToSorted() Set[T] {
|
||||
func (s *hash_set[T]) copyToSorted() Set[T] {
|
||||
orderd := NewSortedSet[T]()
|
||||
for k := range s.m {
|
||||
orderd.Add(k)
|
||||
@ -109,13 +112,13 @@ func (s *hash_set[T]) Clone() *hash_set[T] {
|
||||
return set
|
||||
}
|
||||
|
||||
func (s hash_set[T]) Iterate(fn func(value T)) {
|
||||
func (s *hash_set[T]) Iterate(fn func(value T)) {
|
||||
for v := range s.m {
|
||||
fn(v)
|
||||
}
|
||||
}
|
||||
|
||||
func (s hash_set[T]) ToSlice() []T {
|
||||
func (s *hash_set[T]) ToSlice() []T {
|
||||
values := make([]T, 0, s.Size())
|
||||
s.Iterate(func(value T) {
|
||||
values = append(values, value)
|
||||
@ -124,15 +127,15 @@ func (s hash_set[T]) ToSlice() []T {
|
||||
return values
|
||||
}
|
||||
|
||||
func (s hash_set[T]) IsEmpty() bool {
|
||||
func (s *hash_set[T]) IsEmpty() bool {
|
||||
return len(s.m) == 0
|
||||
}
|
||||
|
||||
func (s hash_set[T]) Size() int {
|
||||
func (s *hash_set[T]) Size() int {
|
||||
return len(s.m)
|
||||
}
|
||||
|
||||
func (s hash_set[T]) MarshalJSON() ([]byte, error) {
|
||||
func (s *hash_set[T]) MarshalJSON() ([]byte, error) {
|
||||
items := make([]string, 0, s.Size())
|
||||
|
||||
for ele := range s.m {
|
||||
@ -147,7 +150,7 @@ func (s hash_set[T]) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf("[%s]", strings.Join(items, ", "))), nil
|
||||
}
|
||||
|
||||
func (s hash_set[T]) UnmarshalJSON(b []byte) error {
|
||||
func (s *hash_set[T]) UnmarshalJSON(b []byte) error {
|
||||
var i []any
|
||||
|
||||
d := json.NewDecoder(bytes.NewReader(b))
|
||||
@ -166,7 +169,7 @@ func (s hash_set[T]) UnmarshalJSON(b []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s hash_set[T]) String() string {
|
||||
func (s *hash_set[T]) String() string {
|
||||
l := make([]string, 0, len(s.m))
|
||||
for k := range s.m {
|
||||
l = append(l, fmt.Sprint(k))
|
||||
|
16
sets/set.go
16
sets/set.go
@ -1,15 +1,13 @@
|
||||
package sets
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/charlienet/go-mixed/locker"
|
||||
"golang.org/x/exp/constraints"
|
||||
)
|
||||
|
||||
type Set[T comparable] interface {
|
||||
Add(...T)
|
||||
Remove(v T)
|
||||
Add(...T) Set[T]
|
||||
Remove(v T) Set[T]
|
||||
Asc() Set[T]
|
||||
Desc() Set[T]
|
||||
Contains(T) bool
|
||||
@ -19,18 +17,16 @@ type Set[T comparable] interface {
|
||||
ToSlice() []T // 转换为切片
|
||||
}
|
||||
|
||||
var defaultOptions = option{locker: locker.NewEmptyLocker()}
|
||||
|
||||
type option struct {
|
||||
locker sync.Locker
|
||||
locker locker.Locker
|
||||
}
|
||||
|
||||
type setFunc func(option)
|
||||
type setFunc func(*option)
|
||||
|
||||
func WithSync() setFunc {
|
||||
|
||||
return func(o option) {
|
||||
o.locker = &sync.RWMutex{}
|
||||
return func(o *option) {
|
||||
o.locker.Synchronize()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,20 +14,23 @@ type sorted_set[T constraints.Ordered] struct {
|
||||
|
||||
func NewSortedSet[T constraints.Ordered](t ...T) *sorted_set[T] {
|
||||
return &sorted_set[T]{
|
||||
set: NewHashSet[T](),
|
||||
sorted: t,
|
||||
set: NewHashSet(t...),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sorted_set[T]) Add(values ...T) {
|
||||
func (s *sorted_set[T]) Add(values ...T) Set[T] {
|
||||
for _, v := range values {
|
||||
if !s.set.Contains(v) {
|
||||
s.sorted = append(s.sorted, v)
|
||||
s.set.Add(v)
|
||||
}
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *sorted_set[T]) Remove(v T) {
|
||||
func (s *sorted_set[T]) Remove(v T) Set[T] {
|
||||
if s.set.Contains(v) {
|
||||
for index := range s.sorted {
|
||||
if s.sorted[index] == v {
|
||||
@ -38,6 +41,8 @@ func (s *sorted_set[T]) Remove(v T) {
|
||||
|
||||
s.set.Remove(v)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *sorted_set[T]) Asc() Set[T] {
|
||||
|
Reference in New Issue
Block a user