1
0
mirror of https://github.com/charlienet/go-mixed.git synced 2025-07-18 00:22:41 +08:00
This commit is contained in:
2022-06-10 17:04:34 +08:00
parent 853b19fb02
commit 9bb232be93
22 changed files with 641 additions and 147 deletions

38
cache/big_cache.go vendored
View File

@ -4,20 +4,42 @@ import (
"errors"
"time"
"github.com/allegro/bigcache"
"github.com/allegro/bigcache/v3"
"github.com/charlienet/go-mixed/logx"
)
var _ MemCache = &bigCacheClient{}
type BigCacheConfig struct {
Shards int
LifeWindow time.Duration
CleanWindow time.Duration
MaxEntriesInWindow int
MaxEntrySize int
HardMaxCacheSize int
log logx.Logger
}
type bigCacheClient struct {
cache *bigcache.BigCache
}
func NewBigCache(c *BigCacheConfig) (*bigCacheClient, error) {
bigCache, err := bigcache.NewBigCache(bigcache.Config{})
func NewBigCache(c BigCacheConfig) (*bigCacheClient, error) {
config := bigcache.DefaultConfig(time.Minute * 10)
config.LifeWindow = c.LifeWindow
config.LifeWindow = c.LifeWindow
config.CleanWindow = c.CleanWindow
config.MaxEntriesInWindow = c.MaxEntriesInWindow
config.MaxEntrySize = c.MaxEntrySize
config.HardMaxCacheSize = c.HardMaxCacheSize
config.Logger = c.log
if c.Shards > 0 {
config.Shards = c.Shards
}
bigCache, err := bigcache.NewBigCache(config)
if err != nil {
return nil, err
}
@ -35,8 +57,14 @@ func (c *bigCacheClient) Set(key string, entry []byte, expire time.Duration) err
return c.cache.Set(key, entry)
}
func (c *bigCacheClient) Delete(key string) error {
return c.cache.Delete(key)
func (c *bigCacheClient) Delete(keys ...string) error {
for _, k := range keys {
if err := c.cache.Delete(k); err != nil {
return err
}
}
return nil
}
func (c *bigCacheClient) Exist(key string) {

56
cache/cache.go vendored
View File

@ -1,36 +1,40 @@
package cache
import (
"context"
"errors"
"time"
"github.com/charlienet/go-mixed/bytesconv"
"github.com/charlienet/go-mixed/logx"
)
var ErrNotFound = errors.New("not found")
var ErrNotFound = errors.New("key not found")
type LoadFunc func() (any, error)
type LoadFunc func(context.Context) (any, error)
type Cache struct {
prefix string // 键前缀
retry int // 资源获取时的重试次数
mem MemCache // 内存缓存
distributdCache DistributdCache // 分布式缓存
publishSubscribe PublishSubscribe // 发布订阅
qps *qps
qps *qps //
logger logx.Logger // 日志记录
}
func NewCache(opts ...option) (*Cache, error) {
c := &Cache{
qps: NewQps(),
}
func NewCache(opts ...option) *Cache {
c := acquireDefaultCache()
for _, f := range opts {
f(c)
if err := f(c); err != nil {
return c
}
}
go c.subscribe()
return c, nil
return c
}
func (c *Cache) Set(key string, value any, expiration time.Duration) error {
@ -60,13 +64,16 @@ func (c *Cache) Get(key string, out any) error {
return nil
}
func (c *Cache) GetFn(key string, out any, fn LoadFunc, expiration time.Duration) (bool, error) {
ret, err := fn()
func (c *Cache) GetFn(ctx context.Context, key string, out any, fn LoadFunc, expiration time.Duration) (bool, error) {
c.Get(key, out)
// 多级缓存中未找到时,放置缓存对象
ret, err := fn(ctx)
if err != nil {
return false, err
}
_ = ret
c.Set(key, ret, expiration)
return false, nil
}
@ -75,20 +82,22 @@ func (c *Cache) Exist(key string) (bool, error) {
return false, nil
}
func (c *Cache) Delete(key string) error {
func (c *Cache) Delete(key ...string) error {
if c.mem != nil {
c.mem.Delete(key)
c.mem.Delete(key...)
}
if c.distributdCache != nil {
c.distributdCache.Delete(key)
c.distributdCache.Delete(key...)
}
return nil
}
func (c *Cache) getFromMem(key string, out any) error {
func (c *Cache) subscribe() {
}
func (c *Cache) getFromMem(key string, out any) error {
bytes, err := c.mem.Get(key)
if err != nil {
return err
@ -101,13 +110,16 @@ func (c *Cache) getFromMem(key string, out any) error {
return nil
}
func (c *Cache) subscribe() {
// 从缓存加载数据
func (c *Cache) getFromCache() {
}
func (c *Cache) genKey(key string) string {
if len(c.prefix) == 0 {
return key
}
// 从数据源加载数据
func (c *Cache) getFromSource(ctx context.Context, key string, fn LoadFunc) {
// 1. 尝试获取资源锁,如成功获取到锁加载数据
// 2. 未获取到锁,等待从缓存中获取
fn(ctx)
return c.prefix + "-" + key
}

99
cache/cache_builder.go vendored Normal file
View File

@ -0,0 +1,99 @@
package cache
import "github.com/charlienet/go-mixed/logx"
const defaultPrefix = "cache"
type option func(*Cache) error
type options struct {
Prefix string
}
func acquireDefaultCache() *Cache {
return &Cache{
prefix: defaultPrefix,
qps: NewQps(),
}
}
type cacheBuilder struct {
prefix string
redisOptions RedisConfig
bigCacheConfig BigCacheConfig
freeSize int
publishSubscribe PublishSubscribe
log logx.Logger
}
func NewCacheBuilder() *cacheBuilder {
return &cacheBuilder{}
}
func (b *cacheBuilder) WithLogger(log logx.Logger) *cacheBuilder {
b.log = log
return b
}
func (b *cacheBuilder) WithPrefix(prefix string) *cacheBuilder {
b.prefix = prefix
return b
}
func (b *cacheBuilder) WithRedis(opts RedisConfig) *cacheBuilder {
b.redisOptions = opts
return b
}
func (b *cacheBuilder) WithBigCache(opts BigCacheConfig) *cacheBuilder {
b.bigCacheConfig = opts
return b
}
func (b *cacheBuilder) WithFreeCache(size int) *cacheBuilder {
b.freeSize = size
return b
}
// 使用自定义分布式缓存
func WithDistributedCache(c DistributdCache) {
}
func (b *cacheBuilder) WithPublishSubscribe(p PublishSubscribe) *cacheBuilder {
b.publishSubscribe = p
return b
}
func (b cacheBuilder) Build() (*Cache, error) {
var err error
cache := acquireDefaultCache()
if len(b.prefix) > 0 {
cache.prefix = b.prefix
}
b.redisOptions.Prefix = cache.prefix
redis := NewRedis(b.redisOptions)
if err := redis.Ping(); err != nil {
return cache, err
}
var mem MemCache
if b.freeSize > 0 {
mem = NewFreeCache(b.freeSize)
} else {
if b.log != nil {
b.bigCacheConfig.log = b.log
}
mem, err = NewBigCache(b.bigCacheConfig)
}
cache.distributdCache = redis
cache.mem = mem
cache.publishSubscribe = b.publishSubscribe
cache.logger = b.log
return cache, err
}

29
cache/cache_builder_test.go vendored Normal file
View File

@ -0,0 +1,29 @@
package cache
import (
"testing"
"time"
"github.com/charlienet/go-mixed/logx"
)
func TestBuilder(t *testing.T) {
cache, err := NewCacheBuilder().
WithLogger(logx.NewLogrus(logx.WithFormatter(logx.NewNestedFormatter(logx.NestedFormatterOption{
Color: true,
})))).
WithRedis(RedisConfig{
Addrs: []string{"192.168.2.222:6379"},
Password: "123456",
}).
WithBigCache(BigCacheConfig{}).
// WithFreeCache(10 * 1024 * 1024).
Build()
if err != nil {
t.Fatal(err)
}
u := SimpleUser{FirstName: "Radomir", LastName: "Sohlich"}
t.Log(cache.Set(defaultKey, u, time.Minute*10))
}

124
cache/cache_test.go vendored
View File

@ -1,24 +1,40 @@
package cache
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/charlienet/go-mixed/bytesconv"
"github.com/charlienet/go-mixed/logx"
)
var (
defaultKey = "u-000"
)
func TestNewCache(t *testing.T) {
r := NewRedis(&RedisConfig{})
if err := r.Ping(); err != nil {
t.Fatal(err)
}
c, err := NewCacheBuilder().
WithRedis(RedisConfig{
Addrs: []string{"192.168.2.222:6379"},
Password: "123456",
}).
WithPrefix("cache_test").
WithLogger(logx.NewLogrus()).
Build()
c, err := NewCache(
WithDistributdCache(r),
WithPrefix("cache_test"))
if err != nil {
t.Fatal(err)
}
c.Set("abc", "value", time.Minute*10)
var s string
c.Get("abc", &s)
t.Log(s)
}
type SimpleUser struct {
@ -26,60 +42,108 @@ type SimpleUser struct {
LastName string
}
func TestMem(t *testing.T) {
c, err := NewCache(WithFreeCache(10 * 1024 * 1024))
if err != nil {
t.Fatal(err)
func TestMemCache(t *testing.T) {
b, _ := NewBigCache(BigCacheConfig{})
var mems = []MemCache{
NewFreeCache(10 * 1024 * 1024),
b,
}
key := "u-000"
u := SimpleUser{FirstName: "Radomir", LastName: "Sohlich"}
encoded, _ := bytesconv.Encode(u)
for _, m := range mems {
m.Set(defaultKey, encoded, time.Second)
ret, err := m.Get(defaultKey)
if err != nil {
t.Fatal(err)
}
c.Set(key, u, time.Second)
var u2 SimpleUser
c.Get(key, &u2)
t.Logf("%+v", u2)
var u2 SimpleUser
bytesconv.Decode(ret, &u2)
t.Log(u2)
}
}
func TestDistributedCache(t *testing.T) {
key := "key-001"
c := NewRedis(&RedisConfig{Addrs: []string{"192.168.2.222:6379"}, DB: 6, Password: "123456"})
c := NewRedis(RedisConfig{Addrs: []string{"192.168.2.222:6379"}, DB: 6, Password: "123456", Prefix: "abcdef"})
if err := c.Ping(); err != nil {
t.Fatal(err)
}
t.Log(c.Exist(defaultKey))
u := SimpleUser{FirstName: "redis client"}
c.Set(key, u, time.Second)
var u2 SimpleUser
if err := c.Get(key, &u2); err != nil {
c.Get(defaultKey, &u2)
c.Set(defaultKey, u, time.Minute*10)
t.Log(c.Exist(defaultKey))
if err := c.Get(defaultKey, &u2); err != nil {
t.Fatal("err:", err)
}
t.Logf("%+v", u2)
// c.Delete(defaultKey)
}
func TestGetFn(t *testing.T) {
c, err := NewCache(WithBigCache(&BigCacheConfig{}))
if err != nil {
t.Fatal(err)
}
key := "u-000"
c := buildCache()
var u2 SimpleUser
c.GetFn(key, &u2, func() (out any, err error) {
c.GetFn(context.Background(), defaultKey, &u2, func(ctx context.Context) (out any, err error) {
v := &u2
v.FirstName = "abc"
v.LastName = "aaaa"
return nil, nil
}, time.Second)
}, time.Minute*1)
t.Logf("%+v", u2)
}
func TestGetFromSource(t *testing.T) {
var count int32
n := 10
c := &Cache{}
wg := &sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
c.getFromSource(context.Background(), defaultKey, func(ctx context.Context) (any, error) {
atomic.AddInt32(&count, 1)
time.Sleep(time.Second)
return "abc", nil
})
wg.Done()
}()
}
wg.Wait()
t.Log("count:", count)
}
func BenchmarkMemCache(b *testing.B) {
}
func load() (any, error) {
return nil, nil
}
func buildCache() *Cache {
c, err := NewCacheBuilder().
WithFreeCache(10 * 1024 * 1024).
WithRedis(RedisConfig{Addrs: []string{"192.168.2.222:6379"}, DB: 6, Password: "123456"}).
Build()
if err != nil {
panic(err)
}
return c
}

View File

@ -4,7 +4,7 @@ import "time"
type DistributdCache interface {
Get(key string, out any) error
Set(key string, value any, expiration time.Duration)
Delete(key string) error
Set(key string, value any, expiration time.Duration) error
Delete(key ...string) error
Ping() error
}

12
cache/free_cache.go vendored
View File

@ -4,6 +4,7 @@ import (
"errors"
"time"
"github.com/charlienet/go-mixed/bytesconv"
"github.com/coocood/freecache"
)
@ -37,10 +38,13 @@ func (c *freeCache) Set(key string, value []byte, d time.Duration) error {
return c.cache.Set([]byte(key), value, s)
}
func (c *freeCache) Delete(key string) error {
affected := c.cache.Del([]byte(key))
if !affected {
return errors.New("不存在")
func (c *freeCache) Delete(keys ...string) error {
for _, k := range keys {
affected := c.cache.Del(bytesconv.StringToBytes(k))
if !affected {
return errors.New("不存在")
}
}
return nil

2
cache/mem.go vendored
View File

@ -5,5 +5,5 @@ import "time"
type MemCache interface {
Get(key string) ([]byte, error)
Set(key string, entry []byte, expire time.Duration) error
Delete(key string) error
Delete(key ...string) error
}

31
cache/options.go vendored
View File

@ -1,31 +0,0 @@
package cache
type option func(*Cache)
type options struct {
Prefix string
}
func WithPrefix(prefix string) option {
return func(o *Cache) { o.prefix = prefix }
}
func WithDistributdCache(d DistributdCache) option {
return func(o *Cache) { o.distributdCache = d }
}
func WithBigCache(config *BigCacheConfig) option {
return func(o *Cache) {
c, err := NewBigCache(config)
_ = err
o.mem = c
}
}
func WithFreeCache(size int) option {
return func(o *Cache) { o.mem = NewFreeCache(size) }
}
func WithPublishSubscribe(p PublishSubscribe) option {
return func(o *Cache) {}
}

13
cache/readme.md vendored Normal file
View File

@ -0,0 +1,13 @@
# 多级缓存模块
1. 一级缓存可使用freecache或bigcache作为本地缓存当数据在本地缓存不存在时会向二级缓存请求数据
2. 二级缓存使用redis作为缓存模块当数据在二级缓存不存在时向资源请求数据。
3. 更新数据时将二级分布式缓存中对应的数据删除,一级缓存使用订阅/发布机制进行删除。
## 功能列表
1. 支持自定义一级或二级缓存和订阅/发布机制。
2. 缓存击穿;单机资源互斥锁,集群环境中每台机器只有一次请求到资源层,其它请求等待数据同步到缓存后获取数据。
3. 缓存穿透;从数据源中未找到数据时,在缓存中缓存空值。
4. 缓存雪崩;为防止缓存雪崩将资源放入缓存时,对过期时间添加一个随机过期时间,防止缓存同时过期。
5. 自动续期;当访问二级缓存时对使用的资源进行延期。

54
cache/redis.go vendored
View File

@ -2,6 +2,7 @@ package cache
import (
"context"
"errors"
"fmt"
"time"
@ -11,8 +12,10 @@ import (
"github.com/go-redis/redis/v8"
)
const redisEmptyObject = "redis object not exist"
type RedisConfig struct {
Perfix string // key perfix
Prefix string // key perfix
Addrs []string
// Database to be selected after connecting to the server.
@ -33,10 +36,10 @@ type RedisConfig struct {
type redisClient struct {
client redis.UniversalClient
emptyStamp string // 空对象标识,每个实例隔离
perfix string // 缓存键前缀
prefix string // 缓存键前缀
}
func NewRedis(c *RedisConfig) *redisClient {
func NewRedis(c RedisConfig) *redisClient {
client := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: c.Addrs,
DB: c.DB,
@ -46,35 +49,48 @@ func NewRedis(c *RedisConfig) *redisClient {
return &redisClient{
emptyStamp: fmt.Sprintf("redis-empty-%d-%s", time.Now().Unix(), rand.Hex.Generate(6)),
perfix: c.Perfix,
prefix: c.Prefix,
client: client,
}
}
func (c *redisClient) Get(key string, out any) error {
cmd := c.client.Get(context.Background(), key)
str, err := cmd.Result()
val, err := c.client.Get(context.Background(), c.getKey(key)).Result()
if errors.Is(err, redis.Nil) {
return ErrNotFound
}
if err != nil {
return err
}
err = json.Unmarshal(bytesconv.StringToBytes(str), out)
return err
// redis 保存键为空值时返回键不存在错误
if val == redisEmptyObject {
return ErrNotFound
}
return json.Unmarshal(bytesconv.StringToBytes(val), out)
}
func (c *redisClient) Set(key string, value any, expiration time.Duration) {
func (c *redisClient) Set(key string, value any, expiration time.Duration) error {
j, _ := json.Marshal(value)
c.client.Set(context.Background(), key, j, expiration)
return c.client.Set(context.Background(), c.getKey(key), j, expiration).Err()
}
func (c *redisClient) Exist(key string) (bool, error) {
return false, nil
val, err := c.client.Exists(context.Background(), c.getKey(key)).Result()
return val > 0, err
}
func (c *redisClient) Delete(key string) error {
cmd := c.client.Del(context.Background(), key)
if cmd.Err() != nil {
return cmd.Err()
func (c *redisClient) Delete(key ...string) error {
keys := make([]string, 0, len(key))
for _, k := range key {
keys = append(keys, c.getKey(k))
}
_ , err := c.client.Del(context.Background(), keys...).Result()
if err != nil {
return err
}
return nil
@ -84,3 +100,11 @@ func (c *redisClient) Ping() error {
_, err := c.client.Ping(context.Background()).Result()
return err
}
func (c *redisClient) getKey(key string) string {
if c.prefix != "" {
return c.prefix + ":" + key
}
return key
}