首页 > golang > Go WebSocket + Redis 实现轻量级的订阅和实时消息推送
2019
07-30

Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

上一篇介绍了Golang中封装WebSocket功能,让WebSocket更简单好用和稳定。

这里借助Redis自身的订阅和发布机制和WebSocket结合,实现轻量级的订阅发布和消息推送。本来消息订阅和推送打算用mqtt实现,但是这样还得有一个MqttBroker代理服务器,或采用网上开源的实现,或使用go语言自己实现个mqtt的broker。这都不够轻量级,这里介绍下借助redis的轻量级的实现。

大致框图如下:

Go WebSocket + Redis 实现轻量级的订阅和实时消息推送_websocket

涉及实时性和性能相关的服务可以直接在OnMessage里让客户端同后台业务直接交互实现。

关于提高性能的思考,首先看如果是高并发的情况下,瓶颈可能会在哪。

内部的两个redis客户端,一个负责发布,订阅,一个负责接收。当消息量大的情况下未必受用。那么首先负责发布的客户端,可考虑用redis的连接池实现。

消息的发布和订阅,固定为两个事件,一个是OnPublish,一个是OnSubcribe。并定义相关的报文结构如下:

Go WebSocket + Redis 实现轻量级的订阅和实时消息推送_websocket_02

Go WebSocket + Redis 实现轻量级的订阅和实时消息推送_edn_03

Go WebSocket + Redis 实现轻量级的订阅和实时消息推送_websocket_04

收到的Publish事件,发布消息到Redis: 

// 接收到发布消息事件
	c.On("Publish", func(msg string) {
		// 将消息打印到控制台
		fmt.Printf("%s received publish: %sn", c.Context().ClientIP(), msg)
		pubMsg := websocket.PushMsg{ID: c.ID()}
		err := json.Unmarshal([]byte(msg), &pubMsg)
		if err != nil {
			log.Printf("解析json串错误,err=", err)
			return
		}
		if pubMsg.Type != "pub" {
			log.Println("pub msg type error")
			return
		}
		//发布消息到Redis
		websocket.Publish(pubMsg.Topic, pubMsg.Payload)
	})

 收到的订阅事件,发布消息到Redis:

// 接收到订阅的事件
	c.On("Subscribe", func(msg string) {
		// 将消息打印到控制台,c .Context()是iris的http上下文。
		fmt.Printf("%s received subscribe: %sn", c.Context().ClientIP(), msg)
		subMsg := websocket.SubMsg{ID: c.ID()}
		err := json.Unmarshal([]byte(msg), &subMsg)
		if err != nil {
			log.Printf("解析json串错误,err=", err)
			return
		}
		if pubMsg.Type != "pub" {
			log.Println("pub msg type error")
			return
		}
		//订阅到Redis
		sub.Subscribe(subMsg.Topic, subMsg.ID)
	})

开启一个Redis客户端,负责收到的消息:

func (c *Subscriber) Init(ws *Server) {

	conn := RedisClient.Get()

	c.client = redis.PubSubConn{conn}
	c.Ws = ws
	go func() {
		for {
			log.Println("redis wait...")
			switch res := c.client.Receive().(type) {
			case redis.Message:
				fmt.Printf("receive:%#vn", res)
				topic := res.Channel
				message := string(res.Data)
				fnSubReceived(c.cbMap, topic, message)
			case redis.Subscription:
				fmt.Printf("%s: %s %dn", res.Channel, res.Kind, res.Count)
			case error:
				log.Println("error handle", res)
				if IsConnError(res) {
					conn, err := RedisClient.Dial()
					if err != nil {
						log.Printf("err=%sn", err)
					}
					c.client = redis.PubSubConn{conn}
				}
				continue
			}
		}
	}()

}

 附完整实现:

package websocket

import (
	"log"
)

//订阅的消息格式定义
type SubMsg struct {
	ID    string `json:"id"`   //请求ID
	Type  string `json:"type"` //订阅时固定为sub,取消订阅时固定为unsub
	Topic string `json:"topic"`
	Param string `json:"param"`
}

//平台推送消息定义
type PushMsg struct {
	ID      string `json:"id"`
	Type    string `json:"type"` //发布,类型为pub
	Topic   string `json:"topic"`
	Payload string `json:"payload"`
	Result  string `json:"result"`
}

func Publish(topic string, msg string) (interface{}, error) {
	resp, err := Redo("Publish", topic, msg)
	if err != nil {
		log.Println(err)
	}
	return resp, err
}

package websocket

import (
	"errors"
	"fmt"
	"io"
	"log"
	"strings"
	"sync"
	"time"
	//"unsafe"

	"github.com/gomodule/redigo/redis"
)

var (
	// RD redis全局client
	RedisClient *redis.Pool
)

// InitRedis 初始设置
func InitRedis(host string, auth string, db int) error {

	// 连接Redis
	RedisClient = &redis.Pool{
		MaxIdle:     3,
		MaxActive:   4000,
		IdleTimeout: 180 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
			if nil != err {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t) < time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	rd := RedisClient.Get()
	defer rd.Close()

	c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
	defer c.Close()
	if err != nil {
		fmt.Println("Connect to redis error", err)
		return err
	}
	fmt.Println("Connect to redis ok")
	return nil

}

func IsConnError(err error) bool {
	var needNewConn bool

	if err == nil {
		return false
	}

	if err == io.EOF {
		needNewConn = true
	}
	if strings.Contains(err.Error(), "use of closed network connection") {
		needNewConn = true
	}
	if strings.Contains(err.Error(), "connect: connection refused") {
		needNewConn = true
	}
	if strings.Contains(err.Error(), "connection closed") {
		needNewConn = true
	}
	return needNewConn
}

// 在pool加入TestOnBorrow方法来去除扫描坏连接
func Redo(command string, opt ...interface{}) (interface{}, error) {
	if RedisClient == nil {
		return "", errors.New("error,redis client is null")
	}
	rd := RedisClient.Get()
	defer rd.Close()

	var conn redis.Conn
	var err error
	var maxretry = 3
	var needNewConn bool

	resp, err := rd.Do(command, opt...)
	needNewConn = IsConnError(err)
	if needNewConn == false {
		return resp, err
	} else {
		conn, err = RedisClient.Dial()
	}

	for index := 0; index < maxretry; index++ {
		if conn == nil && index+1 > maxretry {
			return resp, err
		}
		if conn == nil {
			conn, err = RedisClient.Dial()
		}
		if err != nil {
			continue
		}

		resp, err := conn.Do(command, opt...)
		needNewConn = IsConnError(err)
		if needNewConn == false {
			return resp, err
		} else {
			conn, err = RedisClient.Dial()
		}
	}

	conn.Close()
	return "", errors.New("redis error")
}

type SubscribeCallback func(topicMap sync.Map, topic, msg string)

type Subscriber struct {
	client   redis.PubSubConn
	Ws       *Server //websocket
	cbMap    sync.Map
	CallBack interface {
		OnReceive(SubscribeCallback)
	}
}

var fnSubReceived SubscribeCallback

func (c *Subscriber) OnReceive(cb SubscribeCallback) {
	fnSubReceived = cb
}

func (c *Subscriber) Init(ws *Server) {

	conn := RedisClient.Get()

	c.client = redis.PubSubConn{conn}
	c.Ws = ws
	go func() {
		for {
			log.Println("redis wait...")
			switch res := c.client.Receive().(type) {
			case redis.Message:
				fmt.Printf("receive:%#vn", res)
				topic := res.Channel
				message := string(res.Data)
				fnSubReceived(c.cbMap, topic, message)
			case redis.Subscription:
				fmt.Printf("%s: %s %dn", res.Channel, res.Kind, res.Count)
			case error:
				log.Println("error handle", res)
				if IsConnError(res) {
					conn, err := RedisClient.Dial()
					if err != nil {
						log.Printf("err=%sn", err)
					}
					c.client = redis.PubSubConn{conn}
				}
				continue
			}
		}
	}()

}

func (c *Subscriber) Close() {
	err := c.client.Close()
	if err != nil {
		log.Println("redis close error.")
	}
}

func (c *Subscriber) Subscribe(channel interface{}, clientid string) {
	err := c.client.Subscribe(channel)
	if err != nil {
		log.Println("redis Subscribe error.", err)
	}
	c.cbMap.Store(clientid, channel.(string))
}

func (c *Subscriber) PSubscribe(channel interface{}, clientid string) {
	err := c.client.PSubscribe(channel)
	if err != nil {
		log.Println("redis PSubscribe error.", err)
	}

	c.cbMap.Store(clientid, channel.(string))
}
package main

import (
	"fmt"
	"log"
	"sync"
	//"github.com/gin-contrib/cors"
	"encoding/json"
	"github.com/gin-gonic/gin"
	"net/http"
	"strings"
	"websockTest/websocket"
)

func main() {
	ws := websocket.New(websocket.Config{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
	})

	ws.OnConnection(handleConnection)

	// 初始化连接redis
	var sub websocket.Subscriber
	err := websocket.InitRedis("127.0.0.1:6379", "", 0)
	if err != nil {
		fmt.Printf("InitRedis error: %sn", err)
	} else {
		sub.Init(ws)
		//redis client收到的消息分发到websocket
		sub.OnReceive(func(topicMap sync.Map, topic, msg string) {
			fmt.Printf("sub msg received,topic=%s,msg=%sn", topic, msg)
			topicMap.Range(func(k, v interface{}) bool {
				fmt.Println("topicMap:", k, v)
				if v.(string) == topic {
					conn := sub.Ws.GetConnection(k.(string))
					if conn != nil {
						conn.Write(1, []byte(msg))
					}
				}
				return true
			})
		})

	}

	r := gin.Default()
	//允许跨域
	//config := cors.DefaultConfig()
	//config.AllowOrigins = []string{"http://127.0.0.1:9090"}
	//r.Use(Cors())
	//静态资源
	r.Static("/static", "./static")
	r.LoadHTMLGlob("views/*")
	r.GET("/ws", ws.Handler())
	r.GET("/api/v3/device", ws.Handler())
	r.GET("/test", func(c *gin.Context) {
		c.HTML(http.StatusOK, "test.html", gin.H{
			"title": "this is a test",
		})
	})
	r.Run(":9090")
}

func handleConnection(c websocket.Connection) {
	fmt.Println("client connected,id=", c.ID())
	c.Write(1, []byte("welcome client"))
	// 从浏览器中读取事件
	c.On("chat", func(msg string) {
		// 将消息打印到控制台,c .Context()是iris的http上下文。
		fmt.Printf("%s chat sent: %sn", c.Context().ClientIP(), msg)
		// 将消息写回客户端消息所有者:
		// c.Emit("chat", msg)
		c.To(websocket.All).Emit("chat", msg)
	})

	c.OnMessage(func(msg []byte) {
		fmt.Println("received msg:", string(msg))
		c.Write(1, []byte("hello aa"))
		c.To(websocket.All).Emit("chat", msg)
	})

	c.OnDisconnect(func() {
		fmt.Println("client Disconnect,id=", c.ID())
	})

	// 接收到发布消息事件
	c.On("Publish", func(msg string) {
		// 将消息打印到控制台,c .Context()是iris的http上下文。
		fmt.Printf("%s received publish: %sn", c.Context().ClientIP(), msg)
		pubMsg := websocket.PushMsg{ID: c.ID()}
		err := json.Unmarshal([]byte(msg), &pubMsg)
		if err != nil {
			log.Printf("解析json串错误,err=", err)
			return
		}
		if pubMsg.Type != "pub" {
			log.Println("pub msg type error")
			return
		}
		//发布消息到Redis
		websocket.Publish(pubMsg.Topic, pubMsg.Payload)
	})

	// 接收到订阅的事件
	c.On("Subscribe", func(msg string) {
		// 将消息打印到控制台,c .Context()是iris的http上下文。
		fmt.Printf("%s received subscribe: %sn", c.Context().ClientIP(), msg)
		subMsg := websocket.SubMsg{ID: c.ID()}
		err := json.Unmarshal([]byte(msg), &subMsg)
		if err != nil {
			log.Printf("解析json串错误,err=", err)
			return
		}
		if pubMsg.Type != "pub" {
			log.Println("pub msg type error")
			return
		}
		//订阅到Redis
		sub.Subscribe(subMsg.Topic, subMsg.ID)
	})
}

作者:golang中国
golang中国

本文》有 7588 条评论

留下一个回复