1
0
mirror of https://github.com/charlienet/go-mixed.git synced 2025-07-17 16:12:42 +08:00
This commit is contained in:
2024-06-04 16:42:02 +08:00
parent b0ff4d6fd5
commit 9d12e7fedb
4 changed files with 185 additions and 64 deletions

View File

@ -52,3 +52,8 @@ redis.ParseURL
defer gClient.Close() defer gClient.Close()
``` ```
```
VersionConstraint(">=7.0")
```

View File

@ -2,17 +2,16 @@ package redis
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strings" "strings"
"time" "time"
"github.com/charlienet/go-mixed/expr" "github.com/hashicorp/go-version"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
const ( const (
defaultSeparator = ":"
blockingQueryTimeout = 5 * time.Second blockingQueryTimeout = 5 * time.Second
readWriteTimeout = 2 * time.Second readWriteTimeout = 2 * time.Second
defaultSlowThreshold = "5000" // 慢查询(单位微秒) defaultSlowThreshold = "5000" // 慢查询(单位微秒)
@ -63,68 +62,121 @@ func (clients Clients) LoadFunction(code string) {
type Client interface { type Client interface {
redis.UniversalClient redis.UniversalClient
LoadFunction(f string) // 加载函数脚本 LoadFunction(f string) // 加载函数脚本
Prefix() string // 统一前缀 Prefix() string // 统一前缀
Separator() string // 分隔符 Separator() string // 分隔符
JoinKeys(keys ...string) string // 连接KEY AddPrefix(prefix ...string) redisClient // 添加前缀
FormatKeys(keys ...string) []string // 格式化KEY JoinKeys(keys ...string) string // 连接KEY
FormatKeys(keys ...string) []string // 格式化KEY
ServerVersion() string
} }
type redisClient struct { type redisClient struct {
redis.UniversalClient redis.UniversalClient
prefix string prefix redisPrefix
separator string conf *redis.UniversalOptions
} }
func New(opt *RedisOption) redisClient { type constraintFunc func(redisClient) error
var rdb redisClient
func Ping() constraintFunc {
return func(rc redisClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return rc.Ping(ctx).Err()
}
}
func VersionConstraint(expended string) constraintFunc {
return func(rc redisClient) error {
v := rc.ServerVersion()
if len(v) == 0 {
return errors.New("version not obtained")
}
current, err := version.NewVersion(v)
if err != nil {
return err
}
constraint, err := version.NewConstraint(expended)
if err != nil {
return err
}
if !constraint.Check(current) {
return fmt.Errorf("the desired version is %v, which does not match the expected version %v", current, expended)
}
return nil
}
}
func New(opt *RedisOption, constraints ...constraintFunc) redisClient {
if len(opt.Addrs) == 0 && len(opt.Addr) > 0 { if len(opt.Addrs) == 0 && len(opt.Addr) > 0 {
opt.Addrs = []string{opt.Addr} opt.Addrs = []string{opt.Addr}
} }
separator := expr.Ternary(len(opt.Separator) == 0, defaultSeparator, opt.Separator) conf := &redis.UniversalOptions{
prefix := expr.Ternary(len(opt.Prefix) > 0, fmt.Sprintf("%s%s", opt.Prefix, separator), "") Addrs: opt.Addrs,
Password: opt.Password,
rdb = redisClient{ DB: opt.DB,
prefix: prefix,
separator: separator,
UniversalClient: redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: opt.Addrs,
Password: opt.Password,
DB: opt.DB, MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff,
MaxRetries: opt.MaxRetries, DialTimeout: opt.DialTimeout,
MinRetryBackoff: opt.MinRetryBackoff, ReadTimeout: opt.ReadTimeout,
MaxRetryBackoff: opt.MaxRetryBackoff, WriteTimeout: opt.WriteTimeout,
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
DialTimeout: opt.DialTimeout, PoolSize: opt.PoolSize,
ReadTimeout: opt.ReadTimeout, PoolTimeout: opt.PoolTimeout,
WriteTimeout: opt.WriteTimeout, MinIdleConns: opt.MinIdleConns,
ContextTimeoutEnabled: opt.ContextTimeoutEnabled, MaxIdleConns: opt.MaxIdleConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
}
PoolSize: opt.PoolSize, rdb := new(conf, newPrefix(opt.Separator, opt.Prefix))
PoolTimeout: opt.PoolTimeout,
MinIdleConns: opt.MinIdleConns,
MaxIdleConns: opt.MaxIdleConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
})}
rdb.ConfigSet(context.Background(), "slowlog-log-slower-than", defaultSlowThreshold) if len(constraints) > 0 {
for _, f := range constraints {
if err := f(rdb); err != nil {
panic(err)
}
}
}
if len(opt.Prefix) > 0 { return rdb
rdb.AddHook(renameKey{ }
prefix: prefix,
}) func NewEnforceConstraints(opt *RedisOption, constraints ...constraintFunc) redisClient {
rdb := New(opt)
for _, f := range constraints {
if err := f(rdb); err != nil {
panic(err)
}
} }
return rdb return rdb
} }
func (rdb redisClient) Prefix() string { func (rdb redisClient) Prefix() string {
return rdb.prefix return rdb.prefix.Prefix()
}
func (rdb redisClient) Separator() string {
return rdb.prefix.Separator()
}
func (rdb redisClient) AddPrefix(prefixes ...string) redisClient {
old := rdb.prefix
p := newPrefix(old.separator, old.join(prefixes...))
return new(rdb.conf, p)
} }
func (rdb redisClient) LoadFunction(code string) { func (rdb redisClient) LoadFunction(code string) {
@ -134,23 +186,47 @@ func (rdb redisClient) LoadFunction(code string) {
} }
} }
func (rdb redisClient) Separator() string {
return rdb.separator
}
func (rdb redisClient) JoinKeys(keys ...string) string { func (rdb redisClient) JoinKeys(keys ...string) string {
return strings.Join(keys, rdb.separator) return rdb.prefix.join(keys...)
} }
func (rdb redisClient) FormatKeys(keys ...string) []string { func (rdb redisClient) FormatKeys(keys ...string) []string {
if len(rdb.prefix) == 0 { if !rdb.prefix.hasPrefix() {
return keys return keys
} }
re := make([]string, 0, len(keys)) re := make([]string, 0, len(keys))
for _, k := range keys { for _, k := range keys {
re = append(re, fmt.Sprintf("%s%s", rdb.prefix, k)) re = append(re, fmt.Sprintf("%s%s", rdb.prefix.Prefix(), k))
} }
return re return re
} }
func (rdb redisClient) ServerVersion() string {
info, err := rdb.Info(context.Background(), "server").Result()
if err != nil {
return ""
}
for _, line := range strings.Split(info, "\r\n") {
after, found := strings.CutPrefix(line, "redis_version:")
if found {
return after
}
}
return ""
}
func new(conf *redis.UniversalOptions, prefix redisPrefix) redisClient {
c := redis.NewUniversalClient(conf)
c.ConfigSet(context.Background(), "slowlog-log-slower-than", defaultSlowThreshold)
c.AddHook(renameHook{prefix: prefix})
return redisClient{
UniversalClient: c,
prefix: prefix,
conf: conf,
}
}

45
redis/redis_prefix.go Normal file
View File

@ -0,0 +1,45 @@
package redis
import (
"strings"
"github.com/charlienet/go-mixed/expr"
)
const (
defaultSeparator = ":"
)
type redisPrefix struct {
prefix string
separator string
}
func newPrefix(separator string, prefix ...string) redisPrefix {
s := expr.Ternary(len(separator) == 0, defaultSeparator, separator)
return redisPrefix{
separator: s,
prefix: expr.Ternary(len(prefix) > 0, strings.Join(prefix, separator), ""),
}
}
func (p *redisPrefix) Prefix() string {
return p.prefix
}
func (p *redisPrefix) Separator() string {
return p.separator
}
func (p *redisPrefix) hasPrefix() bool {
return len(p.prefix) > 0
}
func (p *redisPrefix) join(key ...string) string {
s := make([]string, 0, len(key)+1)
s = append(s, p.prefix)
s = append(s, key...)
return strings.Join(s, p.separator)
}

View File

@ -8,18 +8,17 @@ import (
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
type renameKey struct { type renameHook struct {
prefix string prefix redisPrefix
separator string
} }
func (r renameKey) DialHook(next redis.DialHook) redis.DialHook { func (r renameHook) DialHook(next redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) { return func(ctx context.Context, network, addr string) (net.Conn, error) {
return next(ctx, network, addr) return next(ctx, network, addr)
} }
} }
func (r renameKey) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook { func (r renameHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error { return func(ctx context.Context, cmds []redis.Cmder) error {
// 对多个KEY进行更名操作 // 对多个KEY进行更名操作
@ -31,15 +30,15 @@ func (r renameKey) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.Pro
} }
} }
func (r renameKey) ProcessHook(next redis.ProcessHook) redis.ProcessHook { func (r renameHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error { return func(ctx context.Context, cmd redis.Cmder) error {
r.renameKey(cmd) r.renameKey(cmd)
return next(ctx, cmd) return next(ctx, cmd)
} }
} }
func (r renameKey) renameKey(cmd redis.Cmder) { func (r renameHook) renameKey(cmd redis.Cmder) {
if len(r.prefix) == 0 { if !r.prefix.hasPrefix() {
return return
} }
@ -49,7 +48,7 @@ func (r renameKey) renameKey(cmd redis.Cmder) {
} }
switch strings.ToUpper(cmd.Name()) { switch strings.ToUpper(cmd.Name()) {
case "SELECT", "FUNCTION": case "SELECT", "FUNCTION", "INFO":
// 无KEY指令 // 无KEY指令
case case
"RENAME", "RENAMENX", "RENAME", "RENAMENX",
@ -80,15 +79,11 @@ func (r renameKey) renameKey(cmd redis.Cmder) {
} }
} }
func (r renameKey) rename(args []any, indexes ...int) { func (r renameHook) rename(args []any, indexes ...int) {
for _, i := range indexes { for _, i := range indexes {
if key, ok := args[i].(string); ok { if key, ok := args[i].(string); ok {
var builder strings.Builder newKey := r.prefix.join(key)
builder.WriteString(r.prefix) args[i] = newKey
builder.WriteString(r.separator)
builder.WriteString(key)
args[i] = builder.String()
} }
} }
} }