1
0
mirror of https://github.com/charlienet/go-mixed.git synced 2025-07-18 00:22:41 +08:00
This commit is contained in:
2023-11-03 15:48:33 +08:00
parent bdbf18969e
commit 249d3b4682
8 changed files with 630 additions and 0 deletions

View File

@ -0,0 +1,75 @@
package delayqueue
import (
"context"
"time"
"github.com/charlienet/go-mixed/locker"
)
type store[T Delayed] interface {
Push(context.Context, T) error
Pop() (T, error)
Peek() (T, bool)
IsEmpty() bool // 队列是否为空
}
type delayQueue[T Delayed] struct {
mu locker.RWLocker
store store[T]
}
type Delayed interface {
Delay() time.Time
}
func New[T Delayed]() *delayQueue[T] {
return &delayQueue[T]{
mu: locker.NewRWLocker(),
store: newMemStore[T](),
}
}
func (q *delayQueue[T]) UseStore(s store[T]) *delayQueue[T] {
q.store = s
return q
}
func (q *delayQueue[T]) Push(task T) error {
q.mu.Lock()
defer q.mu.Unlock()
return q.store.Push(context.Background(), task)
}
func (q *delayQueue[T]) Peek() (T, bool) {
q.mu.RLock()
defer q.mu.RUnlock()
return q.store.Peek()
}
func (q *delayQueue[T]) Pop() (T, error) {
q.mu.Lock()
defer q.mu.Unlock()
return q.store.Pop()
}
func (q *delayQueue[T]) Channel(size int) <-chan T {
out := make(chan T, size)
go func() {
for {
entry, _ := q.Pop()
out <- entry
}
}()
return out
}
func (q *delayQueue[T]) IsEmpty() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.store.IsEmpty()
}

View File

@ -0,0 +1,48 @@
package delayqueue_test
import (
"testing"
"time"
delayqueue "github.com/charlienet/go-mixed/concurrent/delay_queue"
)
type delayTask struct {
message string
delay time.Time
}
func (t delayTask) Delay() time.Time {
return t.delay
}
func TestDelayQueue(t *testing.T) {
queue := delayqueue.New[delayTask]()
queue.Push(delayTask{})
}
func TestDelayedFunc(t *testing.T) {
q := delayqueue.New[delayTask]()
q.Push(delayTask{})
}
func TestDelayedChannel(t *testing.T) {
q := delayqueue.New[delayTask]()
c := q.Channel(10)
q.Push(delayTask{message: "abc", delay: time.Now().Add(time.Second)})
q.Push(delayTask{message: "abcaaa", delay: time.Now().Add(time.Second * 3)})
for {
if q.IsEmpty() {
t.Log("队列为空,退出")
break
}
select {
case task := <-c:
t.Log(task)
case <-time.After(time.Second * 2):
}
}
}

View File

@ -0,0 +1,37 @@
package delayqueue
import (
"context"
"github.com/charlienet/go-mixed/errors"
)
type kafkaStore[T Delayed] struct {
}
func (s *delayQueue[T]) UseKafka() *delayQueue[T] {
s.UseStore(newKafka[T]())
panic(errors.NotImplemented)
// return s.UseStore(newKafka[T]())
}
func newKafka[T Delayed]() *kafkaStore[T] {
return &kafkaStore[T]{}
}
func (*kafkaStore[T]) Push(context.Context, T) error {
return nil
}
func (*kafkaStore[T]) Pop() (T, error) {
return *new(T), nil
}
func (*kafkaStore[T]) Peek() (T, bool) {
return *new(T), false
}
func (*kafkaStore[T]) IsEmpty() bool {
return false
}

View File

@ -0,0 +1,88 @@
package delayqueue
import (
"container/heap"
"context"
"sync"
"time"
)
type delayedQueue []Delayed
type memStore[T Delayed] struct {
mu sync.Mutex
h *delayedQueue
wakeup chan struct{}
}
func (q delayedQueue) Len() int {
return len(q)
}
func (q delayedQueue) Less(i, j int) bool {
return q[i].Delay().Before(q[j].Delay())
}
func (q delayedQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
}
func (q *delayedQueue) Push(x any) {
*q = append(*q, x.(Delayed))
}
func (h *delayedQueue) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
func newMemStore[T Delayed]() *memStore[T] {
store := &memStore[T]{
h: new(delayedQueue),
wakeup: make(chan struct{}, 1),
}
heap.Init(store.h)
return store
}
func (s *memStore[T]) Push(ctx context.Context, v T) error {
s.mu.Lock()
defer s.mu.Unlock()
heap.Push(s.h, v)
return nil
}
func (s *memStore[T]) Pop() (T, error) {
for {
_, exist := s.Peek()
if exist {
return s.h.Pop().(T), nil
}
time.Sleep(time.Millisecond * 10)
}
}
func (s *memStore[T]) Peek() (T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.h.Len() > 0 {
return (*s.h)[0].(T), true
}
return *new(T), false
}
func (s *memStore[T]) Len() int {
return s.h.Len()
}
func (s *memStore[T]) IsEmpty() bool {
return s.Len() == 0
}

View File

@ -0,0 +1,119 @@
package delayqueue
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
"github.com/charlienet/go-mixed/calendar"
"github.com/charlienet/go-mixed/rand"
)
type delayTask struct {
Message string
At time.Time
}
func (t delayTask) Delay() time.Time {
return t.At
}
func (t delayTask) execute() {
println(t.Message)
}
// var _ encoding.BinaryMarshaler = new(myStruct)
// var _ encoding.BinaryUnmarshaler = new(myStruct)
func (t delayTask) BinaryUnmarshaler(data []byte, v any) {
json.Unmarshal(data, v)
}
func (t delayTask) MarshalBinary() (data []byte, err error) {
return json.Marshal(t)
}
func TestMemStore(t *testing.T) {
s := newMemStore[delayTask]()
for i := 0; i < 10; i++ {
s.Push(
context.Background(),
delayTask{
Message: "tesss",
At: time.Now().Add(-time.Minute * time.Duration(rand.Intn(20))),
})
}
t.Log("count:", s.Len())
v, exists := s.Peek()
t.Logf("Peek %v:%v %v", exists, v.Message, calendar.Create(v.Delay()).ToDateTimeString())
for i := 0; i < 10; i++ {
v, _ := s.Pop()
t.Logf("POP:%v %v", v.Message, calendar.Create(v.Delay()).ToDateTimeString())
}
v, exists = s.Peek()
t.Logf("Peek %v:%v %v", exists, v.Message, calendar.Create(v.At).ToDateTimeString())
}
func TestMemPush(t *testing.T) {
s := newMemStore[delayTask]()
for i := 0; i < 10; i++ {
s.Push(
context.Background(),
delayTask{
Message: fmt.Sprintf("abc:%d", i),
At: time.Now().Add(time.Second * time.Duration(rand.IntRange(5, 30))),
})
}
now := time.Now()
delay, _ := s.Pop()
after := delay.Delay().Sub(now)
t.Log("after:", calendar.String(now), calendar.String(delay.Delay()), after)
}
func TestExecute(t *testing.T) {
s := newMemStore[delayTask]()
s.Push(context.Background(),
delayTask{
Message: "这是消息",
At: time.Now().Add(time.Second * 2),
})
s.Push(context.Background(),
delayTask{
Message: "这是消息",
At: time.Now().Add(time.Second * 4),
})
t.Log("start:", calendar.String(time.Now()))
for {
if s.IsEmpty() {
break
}
task, _ := s.Pop()
for {
if task.Delay().Before(time.Now()) {
task.execute()
t.Log("end:", calendar.String(time.Now()))
break
}
time.Sleep(time.Millisecond * 20)
}
}
}

View File

@ -0,0 +1,154 @@
package delayqueue
import (
"context"
"encoding"
"encoding/json"
"strconv"
"time"
goredis "github.com/redis/go-redis/v9"
"github.com/charlienet/go-mixed/hash"
"github.com/charlienet/go-mixed/redis"
)
// 使用Redis存储队列
type redisStore[T Delayed] struct {
rdb redis.Client
delayQueue string
executeQueue string
delayTaskSet string
}
func (q *delayQueue[T]) UseRedis(delayQueueName, executeQueueName, delayTaskName string, rdb redis.Client) *delayQueue[T] {
q.store = newRedisStroe[T](delayQueueName, executeQueueName, delayTaskName, rdb)
return q
}
func newRedisStroe[T Delayed](delayQueueName, executeQueueName, delayTaskName string, rdb redis.Client) *redisStore[T] {
store := &redisStore[T]{
delayQueue: delayQueueName,
executeQueue: executeQueueName,
delayTaskSet: delayTaskName,
rdb: rdb,
}
go func() {
for {
store.pushToExecute()
time.Sleep(time.Millisecond * 100)
}
}()
return store
}
func (s *redisStore[T]) Push(ctx context.Context, v T) error {
o := any(v).(encoding.BinaryMarshaler)
bytes, err := o.MarshalBinary()
if err != nil {
return err
}
tx := s.rdb.TxPipeline()
tx.HSet(context.Background(), s.delayTaskSet, hash.Sha1(bytes).Hex(), bytes)
tx.Exec(context.Background())
tx.HSet(context.Background(), s.delayTaskSet)
// tx.Exec()
ret := s.rdb.ZAdd(ctx, s.delayQueue, goredis.Z{
Score: float64(v.Delay().Unix()),
Member: v,
})
return ret.Err()
}
func (s *redisStore[T]) pushToExecute() error {
now := time.Now().Unix()
ret, err := s.rdb.ZRangeByScore(
context.Background(),
s.delayQueue,
&goredis.ZRangeBy{
Min: "-inf",
Max: strconv.FormatInt(now, 10),
}).Result()
if err != nil {
return err
}
if len(ret) > 0 {
pipe := s.rdb.TxPipeline()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
pipe.LPush(ctx, s.executeQueue, ret)
pipe.ZRem(ctx, s.delayQueue, ret)
if _, err := pipe.Exec(ctx); err != nil {
return err
}
}
return nil
}
func (s *redisStore[T]) Pop() (T, error) {
for {
v, err := s.rdb.RPop(context.Background(), s.executeQueue).Result()
if err != nil {
if err == redis.Nil {
time.Sleep(time.Millisecond * 10)
continue
}
return *new(T), err
}
if len(v) > 0 {
var task T
if err := json.Unmarshal([]byte(v), &task); err != nil {
return *new(T), err
}
return task, nil
}
}
}
func (s *redisStore[T]) Peek() (t T, r bool) {
m, err := s.rdb.ZRange(context.Background(), s.delayQueue, 0, 0).Result()
if err != nil {
return *new(T), false
}
if len(m) == 1 {
var t T
s := m[0]
if err := json.Unmarshal([]byte(s), &t); err != nil {
return *new(T), false
}
return t, true
}
return *new(T), false
}
func (s *redisStore[T]) Clear() {
s.rdb.Del(context.Background(), s.delayQueue)
s.rdb.Del(context.Background(), s.executeQueue)
}
func (s *redisStore[T]) IsEmpty() bool {
n, _ := s.rdb.LLen(context.Background(), s.executeQueue).Result()
m, _ := s.rdb.ZCard(context.Background(), s.delayQueue).Result()
return (m + n) == 0
}

View File

@ -0,0 +1,89 @@
package delayqueue
import (
"context"
"fmt"
"testing"
"time"
"github.com/charlienet/go-mixed/redis"
"github.com/charlienet/go-mixed/tests"
"github.com/stretchr/testify/assert"
)
const (
redisAddr = "192.168.123.100:6379"
delay_queue = "delay_queue"
execute_queue = "execute_queue"
delay_task_set = "task_set"
)
func TestRedis(t *testing.T) {
tests.RunOnRedis(t, func(client redis.Client) {
defer client.Close()
q := New[delayTask]().UseRedis(delay_queue, execute_queue, delay_task_set, client)
err := q.Push(delayTask{
Message: "abc1111111111111",
At: time.Now().Add(time.Second * 2)})
if err != nil {
t.Fatal(err)
}
t.Log(time.Now())
task, _ := q.Pop()
t.Logf("%+v", task)
t.Log(time.Now())
task.execute()
}, redis.RedisOption{Addr: redisAddr, Prefix: "redis_test"})
}
func TestMutiTask(t *testing.T) {
tests.RunOnRedis(t, func(client redis.Client) {
defer client.Close()
timer := time.NewTimer(time.Second)
ticker := time.NewTicker(time.Second)
timer.Reset(time.Microsecond)
ticker.Reset(time.Millisecond)
store := newRedisStroe[delayTask](delay_queue, execute_queue, delay_task_set, client)
for i := 1; i <= 5; i++ {
store.Push(context.Background(), delayTask{
Message: fmt.Sprintf("abc:%d", i),
At: time.Now().Add(time.Second * time.Duration(i)),
})
}
for !store.IsEmpty() {
v, err := store.Pop()
assert.Nil(t, err)
t.Log(time.Now(), v)
}
})
}
func TestIsEmpty(t *testing.T) {
tests.RunOnRedis(t, func(client redis.Client) {
defer client.Close()
store := newRedisStroe[delayTask](delay_queue, execute_queue, delay_task_set, client)
store.Clear()
assert.True(t, store.IsEmpty())
store.Push(context.Background(), delayTask{Message: "bbb", At: time.Now().Add(time.Second)})
assert.False(t, store.IsEmpty())
}, redis.RedisOption{
Addrs: []string{"redis-10448.c90.us-east-1-3.ec2.cloud.redislabs.com:10448"},
Password: "E7HFwvENEqimiB1EG4IjJSa2IUi0B22o",
})
}