mirror of
https://github.com/charlienet/go-mixed.git
synced 2025-07-18 00:22:41 +08:00
update
This commit is contained in:
170
cache/cache.go
vendored
170
cache/cache.go
vendored
@ -6,62 +6,73 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/charlienet/go-mixed/bytesconv"
|
||||
"github.com/charlienet/go-mixed/json"
|
||||
"github.com/charlienet/go-mixed/locker"
|
||||
"github.com/charlienet/go-mixed/logx"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
var ErrNotFound = errors.New("key not found")
|
||||
|
||||
// 数据加载函数定义
|
||||
type LoadFunc func(context.Context) (any, error)
|
||||
|
||||
type Cache struct {
|
||||
prefix string // 键前缀
|
||||
retry int // 资源获取时的重试次数
|
||||
mem MemCache // 内存缓存
|
||||
distributdCache DistributdCache // 分布式缓存
|
||||
publishSubscribe PublishSubscribe // 发布订阅
|
||||
lock locker.ChanLocker // 资源锁
|
||||
stats *Stats // 缓存命中计数
|
||||
qps *qps // 访问计数
|
||||
logger logx.Logger // 日志记录
|
||||
type ICache interface {
|
||||
}
|
||||
|
||||
func NewCache(opts ...option) *Cache {
|
||||
type Cache struct {
|
||||
prefix string // 键前缀
|
||||
retry int // 资源获取时的重试次数
|
||||
mem MemCache // 内存缓存
|
||||
rds DistributedCache // 远程缓存
|
||||
publishSubscribe PublishSubscribe // 发布订阅
|
||||
group singleflight.Group // singleflight.Group
|
||||
lock locker.ChanLocker // 资源锁
|
||||
stats *Stats // 缓存命中计数
|
||||
qps *qps // 访问计数
|
||||
logger logx.Logger // 日志记录
|
||||
}
|
||||
|
||||
func New(opts ...option) (*Cache, error) {
|
||||
|
||||
c := acquireDefaultCache()
|
||||
for _, f := range opts {
|
||||
if err := f(c); err != nil {
|
||||
return c
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
go c.subscribe()
|
||||
// 未设置内存缓存时,添加默认缓存
|
||||
if c.mem == nil {
|
||||
c.mem = NewTinyLFU(1<<12, time.Second*30)
|
||||
}
|
||||
|
||||
return c
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Cache) Set(key string, value any, expiration time.Duration) error {
|
||||
if c.mem != nil {
|
||||
bytes, err := bytesconv.Encode(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (c *Cache) Set(ctx context.Context, key string, value any, expiration time.Duration) error {
|
||||
buf, err := Marshal(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.mem.Set(key, bytes, expiration)
|
||||
if c.mem != nil {
|
||||
c.mem.Set(key, buf, expiration)
|
||||
}
|
||||
|
||||
if c.rds != nil {
|
||||
c.rds.Set(ctx, key, buf, expiration)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) Get(key string, out any) error {
|
||||
func (c *Cache) Get(ctx context.Context, key string, out any) error {
|
||||
if c.mem != nil {
|
||||
c.getFromMem(key, out)
|
||||
c.getFromMem(key)
|
||||
}
|
||||
|
||||
if c.distributdCache != nil {
|
||||
if err := c.distributdCache.Get(key, out); err != nil {
|
||||
if c.rds != nil {
|
||||
if err := c.rds.Get(ctx, key, out); err != nil {
|
||||
|
||||
}
|
||||
}
|
||||
@ -70,7 +81,7 @@ func (c *Cache) Get(key string, out any) error {
|
||||
}
|
||||
|
||||
func (c *Cache) GetFn(ctx context.Context, key string, out any, fn LoadFunc, expiration time.Duration) (bool, error) {
|
||||
c.Get(key, out)
|
||||
c.Get(ctx, key, out)
|
||||
|
||||
// 多级缓存中未找到时,放置缓存对象
|
||||
ret, err := fn(ctx)
|
||||
@ -78,7 +89,7 @@ func (c *Cache) GetFn(ctx context.Context, key string, out any, fn LoadFunc, exp
|
||||
return false, err
|
||||
}
|
||||
|
||||
c.Set(key, ret, expiration)
|
||||
c.Set(ctx, key, ret, expiration)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
@ -87,32 +98,69 @@ func (c *Cache) Exist(key string) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *Cache) Delete(key ...string) error {
|
||||
func (c *Cache) Delete(ctx context.Context, key ...string) error {
|
||||
if c.mem != nil {
|
||||
c.mem.Delete(key...)
|
||||
}
|
||||
|
||||
if c.distributdCache != nil {
|
||||
c.distributdCache.Delete(key...)
|
||||
if c.rds != nil {
|
||||
c.rds.Delete(ctx, key...)
|
||||
}
|
||||
|
||||
for _, k := range key {
|
||||
c.group.Forget(k)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) subscribe() {
|
||||
// 清除本地缓存
|
||||
func (c *Cache) ClearMem() {
|
||||
if c.mem != nil {
|
||||
c.mem.Clear()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) getFromMem(key string, out any) error {
|
||||
bytes, err := c.mem.Get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (c *Cache) Clear() {
|
||||
|
||||
if err := bytesconv.Decode(bytes, out); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
func (c *Cache) Disable() {
|
||||
|
||||
}
|
||||
|
||||
func (c *Cache) Enable() {
|
||||
|
||||
}
|
||||
|
||||
func (c *Cache) getOnce(ctx context.Context, key string) (b []byte, cached bool, err error) {
|
||||
if c.mem != nil {
|
||||
b, ok := c.mem.Get(key)
|
||||
if ok {
|
||||
return b, true, nil
|
||||
}
|
||||
}
|
||||
c.group.Do(key, func() (any, error) {
|
||||
if c.mem != nil {
|
||||
b, ok := c.mem.Get(key)
|
||||
if ok {
|
||||
return b, nil
|
||||
}
|
||||
}
|
||||
|
||||
if c.rds != nil {
|
||||
c.rds.Get(ctx, key, nil)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Cache) getFromMem(key string) ([]byte, bool) {
|
||||
bytes, cached := c.mem.Get(key)
|
||||
return bytes, cached
|
||||
}
|
||||
|
||||
// 从缓存加载数据
|
||||
@ -125,8 +173,6 @@ func (c *Cache) getFromCache() {
|
||||
// 从数据源加载数据
|
||||
func (c *Cache) getFromSource(ctx context.Context, key string, fn LoadFunc) error {
|
||||
|
||||
// 1. 尝试获取资源锁,如成功获取到锁加载数据
|
||||
// 2. 未获取到锁,等待从缓存中获取
|
||||
ch, ok := c.lock.Get(key)
|
||||
if ok {
|
||||
defer c.lock.Release(key)
|
||||
@ -150,39 +196,3 @@ func (c *Cache) getFromSource(ctx context.Context, key string, fn LoadFunc) erro
|
||||
return c.getFromSource(ctx, key, fn)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) marshal(value any) ([]byte, error) {
|
||||
switch value := value.(type) {
|
||||
case nil:
|
||||
return nil, nil
|
||||
case []byte:
|
||||
return value, nil
|
||||
case string:
|
||||
return []byte(value), nil
|
||||
}
|
||||
|
||||
b, err := json.Marshal(value)
|
||||
return b, err
|
||||
}
|
||||
|
||||
func (c *Cache) unmarshal(b []byte, value any) error {
|
||||
if len(b) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch value := value.(type) {
|
||||
case nil:
|
||||
return nil
|
||||
case *[]byte:
|
||||
clone := make([]byte, len(b))
|
||||
copy(clone, b)
|
||||
*value = clone
|
||||
return nil
|
||||
case *string:
|
||||
*value = string(b)
|
||||
return nil
|
||||
}
|
||||
|
||||
err := json.Unmarshal(b, value)
|
||||
return err
|
||||
}
|
||||
|
Reference in New Issue
Block a user