mirror of
https://github.com/charlienet/go-mixed.git
synced 2025-07-17 16:12:42 +08:00
use redis function(Version 7 and above is required
)
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
package bloom
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/charlienet/go-mixed/bytesconv"
|
||||
@ -15,7 +16,7 @@ var seeds = []uint{7, 11, 13, 31, 37, 61}
|
||||
|
||||
type bitStore interface {
|
||||
Clear()
|
||||
Set(pos ...uint) error
|
||||
Set(ctx context.Context, pos ...uint) error
|
||||
Test(pos ...uint) (bool, error)
|
||||
}
|
||||
|
||||
@ -63,9 +64,9 @@ func New(expectedInsertions uint, fpp float64, opts ...option) *BloomFilter {
|
||||
return bf
|
||||
}
|
||||
|
||||
func (bf *BloomFilter) Add(data string) {
|
||||
func (bf *BloomFilter) Add(ctx context.Context, data string) {
|
||||
offsets := bf.geOffsets([]byte(data))
|
||||
bf.store.Set(offsets...)
|
||||
bf.store.Set(ctx, offsets...)
|
||||
}
|
||||
|
||||
func (bf *BloomFilter) ExistString(data string) (bool, error) {
|
||||
|
20
bloom/bloom.lua
Normal file
20
bloom/bloom.lua
Normal file
@ -0,0 +1,20 @@
|
||||
#!lua name=charlie_bloom
|
||||
|
||||
|
||||
local function set_bit(keys, args)
|
||||
for _, offset in ipairs(args) do
|
||||
redis.call("setbit", keys[1], offset, 1)
|
||||
end
|
||||
end
|
||||
|
||||
local function test_bit(keys, args)
|
||||
for _, offset in ipairs(args) do
|
||||
if tonumber(redis.call("getbit", keys[1], offset)) == 0 then
|
||||
return false
|
||||
end
|
||||
end
|
||||
return true
|
||||
end
|
||||
|
||||
redis.register_function('set_bit',set_bit)
|
||||
redis.register_function('test_bit',test_bit)
|
@ -1,6 +1,7 @@
|
||||
package bloom_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
@ -19,13 +20,13 @@ func TestBloom(t *testing.T) {
|
||||
b := bloom.New(1000, 0.03)
|
||||
|
||||
for i := 0; i < 1000000; i++ {
|
||||
b.Add(strconv.Itoa(i))
|
||||
b.Add(context.Background(), strconv.Itoa(i))
|
||||
}
|
||||
|
||||
v := "6943553521463296-1635402930"
|
||||
|
||||
t.Log(b.ExistString(v))
|
||||
b.Add(v)
|
||||
b.Add(context.Background(), v)
|
||||
t.Log(b.ExistString(v))
|
||||
|
||||
isSet, err := b.ExistString(strconv.Itoa(9999))
|
||||
@ -50,7 +51,7 @@ func TestOptimize(t *testing.T) {
|
||||
|
||||
func TestRedis(t *testing.T) {
|
||||
|
||||
client := redis.New(&redis.ReidsOption{
|
||||
client := redis.New(&redis.RedisOption{
|
||||
Addrs: []string{"192.168.2.222:6379"},
|
||||
Password: "123456",
|
||||
})
|
||||
@ -58,7 +59,7 @@ func TestRedis(t *testing.T) {
|
||||
bf := bloom.New(10000, 0.03, bloom.WithRedis(client, "bloom:test"))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
bf.Add(strconv.Itoa(i))
|
||||
bf.Add(context.Background(), strconv.Itoa(i))
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
@ -82,7 +83,7 @@ func TestClear(t *testing.T) {
|
||||
bf := bloom.New(1000, 0.03)
|
||||
|
||||
v := "abc"
|
||||
bf.Add(v)
|
||||
bf.Add(context.Background(), v)
|
||||
isSet, _ := bf.ExistString(v)
|
||||
assert.True(t, isSet)
|
||||
|
||||
@ -97,7 +98,7 @@ func TestParallel(t *testing.T) {
|
||||
for i := 0; i < 10000; i++ {
|
||||
v := rand.Hex.Generate(10)
|
||||
|
||||
f.Add(v)
|
||||
f.Add(context.Background(), v)
|
||||
isSet, _ := f.ExistString(v)
|
||||
|
||||
assert.True(t, isSet)
|
||||
@ -109,8 +110,9 @@ func BenchmarkFilter(b *testing.B) {
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
|
||||
v := rand.Hex.Generate(10)
|
||||
f.Add(v)
|
||||
f.Add(context.Background(), v)
|
||||
|
||||
f.ExistString(v)
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
package bloom
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/bits-and-blooms/bitset"
|
||||
"github.com/charlienet/go-mixed/locker"
|
||||
)
|
||||
@ -26,7 +28,7 @@ func (s *memStore) Clear() {
|
||||
s.set.ClearAll()
|
||||
}
|
||||
|
||||
func (s *memStore) Set(offsets ...uint) error {
|
||||
func (s *memStore) Set(ctx context.Context, offsets ...uint) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
|
@ -4,33 +4,18 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "embed"
|
||||
|
||||
"github.com/charlienet/go-mixed/redis"
|
||||
)
|
||||
|
||||
const (
|
||||
// ARGV:偏移量offset数组
|
||||
// KYES[1]: setbit操作的key
|
||||
// 全部设置为1
|
||||
setScript = `
|
||||
for _, offset in ipairs(ARGV) do
|
||||
redis.call("setbit", KEYS[1], offset, 1)
|
||||
end
|
||||
`
|
||||
//go:embed bloom.lua
|
||||
var redis_bloom_function string
|
||||
|
||||
//ARGV:偏移量offset数组
|
||||
//KYES[1]: setbit操作的key
|
||||
//检查是否全部为1
|
||||
testScript = `
|
||||
for _, offset in ipairs(ARGV) do
|
||||
if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
|
||||
return false
|
||||
end
|
||||
end
|
||||
return true
|
||||
`
|
||||
)
|
||||
var once sync.Once
|
||||
|
||||
var ErrTooLargeOffset = errors.New("超出最大偏移量")
|
||||
|
||||
@ -44,6 +29,8 @@ type redisBitSet struct {
|
||||
}
|
||||
|
||||
func newRedisStore(store redis.Client, key string, bits uint) *redisBitSet {
|
||||
once.Do(func() { store.LoadFunction(redis_bloom_function) })
|
||||
|
||||
return &redisBitSet{
|
||||
store: store,
|
||||
key: key,
|
||||
@ -51,16 +38,13 @@ func newRedisStore(store redis.Client, key string, bits uint) *redisBitSet {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *redisBitSet) Set(offsets ...uint) error {
|
||||
func (s *redisBitSet) Set(ctx context.Context, offsets ...uint) error {
|
||||
args, err := s.buildOffsetArgs(offsets)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||
defer cancel()
|
||||
|
||||
_, err = s.store.Eval(ctx, setScript, []string{s.key}, args).Result()
|
||||
_, err = s.store.FCall(ctx, "set_bit", []string{s.key}, args...).Result()
|
||||
|
||||
//底层使用的是go-redis,redis.Nil表示操作的key不存在
|
||||
//需要针对key不存在的情况特殊判断
|
||||
@ -82,7 +66,7 @@ func (s *redisBitSet) Test(offsets ...uint) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||
defer cancel()
|
||||
|
||||
resp, err := s.store.Eval(ctx, testScript, []string{s.key}, args).Result()
|
||||
resp, err := s.store.FCall(ctx, "test_bit", []string{s.key}, args...).Result()
|
||||
|
||||
// key 不存在,表示还未存放任何数据
|
||||
if err == redis.Nil {
|
||||
@ -103,8 +87,8 @@ func (s *redisBitSet) Clear() {
|
||||
|
||||
}
|
||||
|
||||
func (r *redisBitSet) buildOffsetArgs(offsets []uint) ([]string, error) {
|
||||
args := make([]string, 0, len(offsets))
|
||||
func (r *redisBitSet) buildOffsetArgs(offsets []uint) ([]any, error) {
|
||||
args := make([]any, 0, len(offsets))
|
||||
for _, offset := range offsets {
|
||||
if offset >= r.bits {
|
||||
return nil, ErrTooLargeOffset
|
||||
|
@ -1,16 +1,22 @@
|
||||
package bloom
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/charlienet/go-mixed/redis"
|
||||
"github.com/charlienet/go-mixed/tests"
|
||||
)
|
||||
|
||||
func TestRedisStore(t *testing.T) {
|
||||
tests.RunOnRedis(t, func(client redis.Client) {
|
||||
tests.RunOnDefaultRedis(t, func(client redis.Client) {
|
||||
store := newRedisStore(client, "abcdef", 10000)
|
||||
err := store.Set(1, 2, 3, 9, 1223)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
err := store.Set(ctx, 1, 2, 3, 9, 1223)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user