From 249d3b46821dbdbcb15ab155e66ff0ab6f1a60c0 Mon Sep 17 00:00:00 2001 From: charlie <3140647@qq.com> Date: Fri, 3 Nov 2023 15:48:33 +0800 Subject: [PATCH] delay --- concurrent/delay_queue/delay_queue.go | 75 ++++++++++ concurrent/delay_queue/delay_queue_test.go | 48 +++++++ concurrent/delay_queue/kafka_store.go | 37 +++++ concurrent/delay_queue/mem_store.go | 88 ++++++++++++ concurrent/delay_queue/mem_store_test.go | 119 ++++++++++++++++ concurrent/delay_queue/redis_store.go | 154 +++++++++++++++++++++ concurrent/delay_queue/redis_store_test.go | 89 ++++++++++++ concurrent/readme.md | 20 +++ 8 files changed, 630 insertions(+) create mode 100644 concurrent/delay_queue/delay_queue.go create mode 100644 concurrent/delay_queue/delay_queue_test.go create mode 100644 concurrent/delay_queue/kafka_store.go create mode 100644 concurrent/delay_queue/mem_store.go create mode 100644 concurrent/delay_queue/mem_store_test.go create mode 100644 concurrent/delay_queue/redis_store.go create mode 100644 concurrent/delay_queue/redis_store_test.go create mode 100644 concurrent/readme.md diff --git a/concurrent/delay_queue/delay_queue.go b/concurrent/delay_queue/delay_queue.go new file mode 100644 index 0000000..726e87d --- /dev/null +++ b/concurrent/delay_queue/delay_queue.go @@ -0,0 +1,75 @@ +package delayqueue + +import ( + "context" + "time" + + "github.com/charlienet/go-mixed/locker" +) + +type store[T Delayed] interface { + Push(context.Context, T) error + Pop() (T, error) + Peek() (T, bool) + IsEmpty() bool // 队列是否为空 +} + +type delayQueue[T Delayed] struct { + mu locker.RWLocker + store store[T] +} + +type Delayed interface { + Delay() time.Time +} + +func New[T Delayed]() *delayQueue[T] { + return &delayQueue[T]{ + mu: locker.NewRWLocker(), + store: newMemStore[T](), + } +} + +func (q *delayQueue[T]) UseStore(s store[T]) *delayQueue[T] { + q.store = s + return q +} + +func (q *delayQueue[T]) Push(task T) error { + q.mu.Lock() + defer q.mu.Unlock() + + return q.store.Push(context.Background(), task) +} + +func (q *delayQueue[T]) Peek() (T, bool) { + q.mu.RLock() + defer q.mu.RUnlock() + + return q.store.Peek() +} + +func (q *delayQueue[T]) Pop() (T, error) { + q.mu.Lock() + defer q.mu.Unlock() + + return q.store.Pop() +} + +func (q *delayQueue[T]) Channel(size int) <-chan T { + out := make(chan T, size) + go func() { + for { + entry, _ := q.Pop() + out <- entry + } + }() + return out +} + +func (q *delayQueue[T]) IsEmpty() bool { + q.mu.RLock() + defer q.mu.RUnlock() + + return q.store.IsEmpty() +} diff --git a/concurrent/delay_queue/delay_queue_test.go b/concurrent/delay_queue/delay_queue_test.go new file mode 100644 index 0000000..566b87a --- /dev/null +++ b/concurrent/delay_queue/delay_queue_test.go @@ -0,0 +1,48 @@ +package delayqueue_test + +import ( + "testing" + "time" + + delayqueue "github.com/charlienet/go-mixed/concurrent/delay_queue" +) + +type delayTask struct { + message string + delay time.Time +} + +func (t delayTask) Delay() time.Time { + return t.delay +} + +func TestDelayQueue(t *testing.T) { + queue := delayqueue.New[delayTask]() + queue.Push(delayTask{}) +} + +func TestDelayedFunc(t *testing.T) { + q := delayqueue.New[delayTask]() + q.Push(delayTask{}) +} + +func TestDelayedChannel(t *testing.T) { + q := delayqueue.New[delayTask]() + c := q.Channel(10) + + q.Push(delayTask{message: "abc", delay: time.Now().Add(time.Second)}) + q.Push(delayTask{message: "abcaaa", delay: time.Now().Add(time.Second * 3)}) + + for { + if q.IsEmpty() { + t.Log("队列为空,退出") + break + } + + select { + case task := <-c: + t.Log(task) + case <-time.After(time.Second * 2): + } + } +} diff --git a/concurrent/delay_queue/kafka_store.go b/concurrent/delay_queue/kafka_store.go new file mode 100644 index 0000000..5cae77f --- /dev/null +++ b/concurrent/delay_queue/kafka_store.go @@ -0,0 +1,37 @@ +package delayqueue + +import ( + "context" + + "github.com/charlienet/go-mixed/errors" +) + +type kafkaStore[T Delayed] struct { +} + +func (s *delayQueue[T]) UseKafka() *delayQueue[T] { + s.UseStore(newKafka[T]()) + + panic(errors.NotImplemented) + // return s.UseStore(newKafka[T]()) +} + +func newKafka[T Delayed]() *kafkaStore[T] { + return &kafkaStore[T]{} +} + +func (*kafkaStore[T]) Push(context.Context, T) error { + return nil +} + +func (*kafkaStore[T]) Pop() (T, error) { + return *new(T), nil +} + +func (*kafkaStore[T]) Peek() (T, bool) { + return *new(T), false +} + +func (*kafkaStore[T]) IsEmpty() bool { + return false +} diff --git a/concurrent/delay_queue/mem_store.go b/concurrent/delay_queue/mem_store.go new file mode 100644 index 0000000..99f3d45 --- /dev/null +++ b/concurrent/delay_queue/mem_store.go @@ -0,0 +1,88 @@ +package delayqueue + +import ( + "container/heap" + "context" + "sync" + "time" +) + +type delayedQueue []Delayed + +type memStore[T Delayed] struct { + mu sync.Mutex + h *delayedQueue + wakeup chan struct{} +} + +func (q delayedQueue) Len() int { + return len(q) +} + +func (q delayedQueue) Less(i, j int) bool { + return q[i].Delay().Before(q[j].Delay()) +} + +func (q delayedQueue) Swap(i, j int) { + q[i], q[j] = q[j], q[i] +} + +func (q *delayedQueue) Push(x any) { + *q = append(*q, x.(Delayed)) +} + +func (h *delayedQueue) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func newMemStore[T Delayed]() *memStore[T] { + store := &memStore[T]{ + h: new(delayedQueue), + wakeup: make(chan struct{}, 1), + } + + heap.Init(store.h) + return store +} + +func (s *memStore[T]) Push(ctx context.Context, v T) error { + s.mu.Lock() + defer s.mu.Unlock() + + heap.Push(s.h, v) + return nil +} + +func (s *memStore[T]) Pop() (T, error) { + for { + _, exist := s.Peek() + if exist { + return s.h.Pop().(T), nil + } + + time.Sleep(time.Millisecond * 10) + } +} + +func (s *memStore[T]) Peek() (T, bool) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.h.Len() > 0 { + return (*s.h)[0].(T), true + } + + return *new(T), false +} + +func (s *memStore[T]) Len() int { + return s.h.Len() +} + +func (s *memStore[T]) IsEmpty() bool { + return s.Len() == 0 +} diff --git a/concurrent/delay_queue/mem_store_test.go b/concurrent/delay_queue/mem_store_test.go new file mode 100644 index 0000000..09508a3 --- /dev/null +++ b/concurrent/delay_queue/mem_store_test.go @@ -0,0 +1,119 @@ +package delayqueue + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/charlienet/go-mixed/calendar" + "github.com/charlienet/go-mixed/rand" +) + +type delayTask struct { + Message string + At time.Time +} + +func (t delayTask) Delay() time.Time { + return t.At +} + +func (t delayTask) execute() { + println(t.Message) +} + +// var _ encoding.BinaryMarshaler = new(myStruct) +// var _ encoding.BinaryUnmarshaler = new(myStruct) + +func (t delayTask) BinaryUnmarshaler(data []byte, v any) { + json.Unmarshal(data, v) +} + +func (t delayTask) MarshalBinary() (data []byte, err error) { + return json.Marshal(t) +} + +func TestMemStore(t *testing.T) { + s := newMemStore[delayTask]() + + for i := 0; i < 10; i++ { + s.Push( + context.Background(), + delayTask{ + Message: "tesss", + At: time.Now().Add(-time.Minute * time.Duration(rand.Intn(20))), + }) + } + + t.Log("count:", s.Len()) + + v, exists := s.Peek() + t.Logf("Peek %v:%v %v", exists, v.Message, calendar.Create(v.Delay()).ToDateTimeString()) + + for i := 0; i < 10; i++ { + v, _ := s.Pop() + t.Logf("POP:%v %v", v.Message, calendar.Create(v.Delay()).ToDateTimeString()) + } + + v, exists = s.Peek() + t.Logf("Peek %v:%v %v", exists, v.Message, calendar.Create(v.At).ToDateTimeString()) +} + +func TestMemPush(t *testing.T) { + s := newMemStore[delayTask]() + + for i := 0; i < 10; i++ { + s.Push( + context.Background(), + delayTask{ + Message: fmt.Sprintf("abc:%d", i), + At: time.Now().Add(time.Second * time.Duration(rand.IntRange(5, 30))), + }) + } + + now := time.Now() + + delay, _ := s.Pop() + after := delay.Delay().Sub(now) + + t.Log("after:", calendar.String(now), calendar.String(delay.Delay()), after) +} + +func TestExecute(t *testing.T) { + s := newMemStore[delayTask]() + + s.Push(context.Background(), + delayTask{ + Message: "这是消息", + At: time.Now().Add(time.Second * 2), + }) + + s.Push(context.Background(), + delayTask{ + Message: "这是消息", + At: time.Now().Add(time.Second * 4), + }) + + t.Log("start:", calendar.String(time.Now())) + + for { + if s.IsEmpty() { + break + } + + task, _ := s.Pop() + + for { + if task.Delay().Before(time.Now()) { + task.execute() + t.Log("end:", calendar.String(time.Now())) + break + } + + time.Sleep(time.Millisecond * 20) + } + } + +} diff --git a/concurrent/delay_queue/redis_store.go b/concurrent/delay_queue/redis_store.go new file mode 100644 index 0000000..9f43c26 --- /dev/null +++ b/concurrent/delay_queue/redis_store.go @@ -0,0 +1,154 @@ +package delayqueue + +import ( + "context" + "encoding" + "encoding/json" + "strconv" + "time" + + goredis "github.com/redis/go-redis/v9" + + "github.com/charlienet/go-mixed/hash" + "github.com/charlienet/go-mixed/redis" +) + +// 使用Redis存储队列 + +type redisStore[T Delayed] struct { + rdb redis.Client + delayQueue string + executeQueue string + delayTaskSet string +} + +func (q *delayQueue[T]) UseRedis(delayQueueName, executeQueueName, delayTaskName string, rdb redis.Client) *delayQueue[T] { + q.store = newRedisStroe[T](delayQueueName, executeQueueName, delayTaskName, rdb) + return q +} +func newRedisStroe[T Delayed](delayQueueName, executeQueueName, delayTaskName string, rdb redis.Client) *redisStore[T] { + + store := &redisStore[T]{ + delayQueue: delayQueueName, + executeQueue: executeQueueName, + delayTaskSet: delayTaskName, + + rdb: rdb, + } + + go func() { + for { + store.pushToExecute() + time.Sleep(time.Millisecond * 100) + } + }() + + return store +} + +func (s *redisStore[T]) Push(ctx context.Context, v T) error { + o := any(v).(encoding.BinaryMarshaler) + bytes, err := o.MarshalBinary() + if err != nil { + return err + } + + tx := s.rdb.TxPipeline() + tx.HSet(context.Background(), s.delayTaskSet, hash.Sha1(bytes).Hex(), bytes) + tx.Exec(context.Background()) + + tx.HSet(context.Background(), s.delayTaskSet) + + // tx.Exec() + ret := s.rdb.ZAdd(ctx, s.delayQueue, goredis.Z{ + Score: float64(v.Delay().Unix()), + Member: v, + }) + + return ret.Err() +} + +func (s *redisStore[T]) pushToExecute() error { + now := time.Now().Unix() + + ret, err := s.rdb.ZRangeByScore( + context.Background(), + s.delayQueue, + &goredis.ZRangeBy{ + Min: "-inf", + Max: strconv.FormatInt(now, 10), + }).Result() + + if err != nil { + return err + } + + if len(ret) > 0 { + pipe := s.rdb.TxPipeline() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + pipe.LPush(ctx, s.executeQueue, ret) + pipe.ZRem(ctx, s.delayQueue, ret) + + if _, err := pipe.Exec(ctx); err != nil { + return err + } + } + + return nil +} + +func (s *redisStore[T]) Pop() (T, error) { + for { + v, err := s.rdb.RPop(context.Background(), s.executeQueue).Result() + if err != nil { + if err == redis.Nil { + time.Sleep(time.Millisecond * 10) + continue + } + + return *new(T), err + } + + if len(v) > 0 { + var task T + if err := json.Unmarshal([]byte(v), &task); err != nil { + return *new(T), err + } + + return task, nil + } + } +} + +func (s *redisStore[T]) Peek() (t T, r bool) { + m, err := s.rdb.ZRange(context.Background(), s.delayQueue, 0, 0).Result() + if err != nil { + return *new(T), false + } + + if len(m) == 1 { + var t T + + s := m[0] + if err := json.Unmarshal([]byte(s), &t); err != nil { + return *new(T), false + } + return t, true + } + + return *new(T), false +} + +func (s *redisStore[T]) Clear() { + s.rdb.Del(context.Background(), s.delayQueue) + s.rdb.Del(context.Background(), s.executeQueue) +} + +func (s *redisStore[T]) IsEmpty() bool { + n, _ := s.rdb.LLen(context.Background(), s.executeQueue).Result() + m, _ := s.rdb.ZCard(context.Background(), s.delayQueue).Result() + + return (m + n) == 0 +} diff --git a/concurrent/delay_queue/redis_store_test.go b/concurrent/delay_queue/redis_store_test.go new file mode 100644 index 0000000..fff502d --- /dev/null +++ b/concurrent/delay_queue/redis_store_test.go @@ -0,0 +1,89 @@ +package delayqueue + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/charlienet/go-mixed/redis" + "github.com/charlienet/go-mixed/tests" + "github.com/stretchr/testify/assert" +) + +const ( + redisAddr = "192.168.123.100:6379" + delay_queue = "delay_queue" + execute_queue = "execute_queue" + delay_task_set = "task_set" +) + +func TestRedis(t *testing.T) { + tests.RunOnRedis(t, func(client redis.Client) { + defer client.Close() + + q := New[delayTask]().UseRedis(delay_queue, execute_queue, delay_task_set, client) + + err := q.Push(delayTask{ + Message: "abc1111111111111", + At: time.Now().Add(time.Second * 2)}) + + if err != nil { + t.Fatal(err) + } + + t.Log(time.Now()) + + task, _ := q.Pop() + t.Logf("%+v", task) + + t.Log(time.Now()) + + task.execute() + + }, redis.RedisOption{Addr: redisAddr, Prefix: "redis_test"}) +} + +func TestMutiTask(t *testing.T) { + tests.RunOnRedis(t, func(client redis.Client) { + defer client.Close() + + timer := time.NewTimer(time.Second) + ticker := time.NewTicker(time.Second) + + timer.Reset(time.Microsecond) + ticker.Reset(time.Millisecond) + + store := newRedisStroe[delayTask](delay_queue, execute_queue, delay_task_set, client) + + for i := 1; i <= 5; i++ { + store.Push(context.Background(), delayTask{ + Message: fmt.Sprintf("abc:%d", i), + At: time.Now().Add(time.Second * time.Duration(i)), + }) + } + + for !store.IsEmpty() { + v, err := store.Pop() + assert.Nil(t, err) + t.Log(time.Now(), v) + } + }) +} + +func TestIsEmpty(t *testing.T) { + tests.RunOnRedis(t, func(client redis.Client) { + defer client.Close() + + store := newRedisStroe[delayTask](delay_queue, execute_queue, delay_task_set, client) + store.Clear() + + assert.True(t, store.IsEmpty()) + + store.Push(context.Background(), delayTask{Message: "bbb", At: time.Now().Add(time.Second)}) + assert.False(t, store.IsEmpty()) + }, redis.RedisOption{ + Addrs: []string{"redis-10448.c90.us-east-1-3.ec2.cloud.redislabs.com:10448"}, + Password: "E7HFwvENEqimiB1EG4IjJSa2IUi0B22o", + }) +} diff --git a/concurrent/readme.md b/concurrent/readme.md new file mode 100644 index 0000000..6fb803a --- /dev/null +++ b/concurrent/readme.md @@ -0,0 +1,20 @@ +延迟队列 + +包含以下实现模型 + +1. 内存模式。 +2. Redis模式 +3. MQ模式 + + +内存模式,在队列中放入。定时检查过期时间,过期时间到达时取出并使用协程执行。 + +Redis模式,使用ZSET存储任务队列。定时取出规则内的任务。取出后使用ZREM删除,并放入执行队列LPUSH。放入成功后在任务执行通道发送消息,执行通道使用LPOP取出并执行。 + + +``` +queue := delayqueue.New() + + +``` +