redigo的一些高级操作

redigo的doc链接

Pipelining(管道操作)

管道操作可以将多个独立的命令累积后一次性发送执行,然后同时返回所有命令的执行情况。通过管道操作可以减少网络通信次数,降低总的延迟。

客户端通过多次调用Send()方法发送命令到缓冲区,用Flush()方法将缓冲区的命令一次性发送到服务器,用Receive()方法按照FIFO顺序读取命令结果。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
main () {
	//...连接代码
	conn.Send("HSET", "person1", "name", "tom")
	conn.Send("HSET", "person1", "age", "100")
	conn.Send("HGETALL", "person1")
	conn.Flush()

	res1, err := conn.Receive()
	fmt.Printf("Receive res1:%v \n", res1)
	res2, err := conn.Receive()
	fmt.Printf("Receive res2:%v\n", res2)
	res3, err := conn.Receive()
	fmt.Printf("Receive res3:%s\n", res3)
}
/* 输出:
Receive res1:1 
Receive res2:1
Receive res3:[name tom age 100]
*/

发布/订阅

Redis可以订阅即时消息,要订阅者先订阅,之后发布者发布内容才能收到。

如果事先没有订阅,那么发布的消息就会被丢弃,不会保存。

由于订阅者要阻塞等待,所以通过goroutine启动一个订阅者。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//订阅者
func subs(wg *sync.WaitGroup) {
	conn, err := redis.Dial("tcp", "localhost:6379")
	if err != nil {
		fmt.Println("connect redis error :", err)
		return
	}
	defer conn.Close()
	psc := redis.PubSubConn{conn}
	psc.Subscribe("channel1") //订阅channel1频道
	wg.Done()
	for {
		switch v := psc.Receive().(type) {
		case redis.Message:
			fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
		case redis.Subscription:
			fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
		case error:
			fmt.Println(v)
			return
		}
	}
}

//发布者
func push(wg *sync.WaitGroup, message string) {
	// 稍等一下,保证订阅者先订阅
	wg.Wait()
	conn, _ := redis.Dial("tcp", "localhost:6379")
	_, err1 := conn.Do("PUBLISH", "channel1", message)
	if err1 != nil {
		fmt.Println("pub err: ", err1)
		return
	}
}

main() {
	wg := new(sync.WaitGroup)
	wg.Add(1)

	go subs(wg)
	go push(wg, "this is msg")

	time.Sleep(time.Second * 3)
}

事务操作

事务操作可以保证一系列命令连续执行,不被别的请求打断。类似pipelining操作。

示例:

1
2
3
4
5
6
7
8
9
10
11
main () {
    //...连接代码
    conn.Send("MULTI")
    conn.Send("INCR", "foo")
    conn.Send("INCR", "bar")
    r, err := conn.Do("EXEC")
    fmt.Println(r)
}
/* 输出:
[1, 1]
*/
  • 由于Redis是单线程的,所以执行lua脚本命令也可以被用来执行事务操作。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
main () {
	//...连接代码
	res, err := redis.String(conn.Do("eval", 
	"redis.call('set',KEYS[1],ARGV[1]) return redis.call('get',KEYS[1])", 
	1, "name1", "value你好"))
	if err != nil {
		fmt.Println("redis set error:", err)
	}
	fmt.Println("eval result: ", res)
}
/* 输出:
eval result:  value你好
*/

连接池

连接池如其名,可以帮助保持一定数量的连接,避免频繁建立、关闭连接带来的开销。

  • 连接池一般初始化要在包的初始化函数init中,在程序退出时自动释放。
  • 初始化时有很多参数,可以根据实际业务情况调整。
  • 可以用Pool.Get()获得一个连接,记得用完要Close把连接放入连接池。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
var Pool redis.Pool
func init() {
	Pool = redis.Pool{
		MaxIdle:     16,
		MaxActive:   32,
		IdleTimeout: 120,
		Dial: func() (redis.Conn, error) {
			return redis.Dial("tcp", "localhost:6379")
		},
	}
}

func main() {
	conn := Pool.Get()
	defer conn.Close()
	
	_, err := conn.Do("SET", "name1", "value你好")
	if err != nil {
		fmt.Println("redis set error:", err)
	}
	
	value, err := redis.String(conn.Do("GET", "name1"))
	if err != nil {
		fmt.Println("redis get error:", err)
	} else {
		fmt.Printf("Got name1: %s \n", value)
	}
}
/* 输出:
Got name1: value你好
*/