网站开发需要用什么市场调研的五个步骤
目录
- Day5-Part3:Zinx 的连接管理
- 创建连接管理模块
- 将连接管理模块集成到 Zinx 当中
- 将 ConnManager 集成到 Server 当中
- 在 Connection 的工厂函数中将连接添加到 ConnManager
- Server 中连接数量的判断
- 连接的删除
- 补充:连接的带缓冲发包方式
- 补充:注册连接启动/停止时自定义的 hook 方法
Day5-Part3:Zinx 的连接管理
现在我们要为 Zinx 框架增加连接个数的限定,如果与 Server 相连的 Client 超过了一定的个数,Zinx 为了保证后端的及时响应,应拒绝连接请求。
创建连接管理模块
我们分别在 ziface
和 znet
下建立 iconnmanager.go
和 connmanager.go
。
首先在 iconnmanager.go
当中定义接口:
package zifacetype IConnManager interface {Add(conn IConnection) // 添加连接Remove(conn IConnection) // 删除连接Get(connID uint32) (IConnection, error) // 利用 ConnID 获取连接Len() int // 获取当前连接数量ClearConn() // 删除并停止所有连接
}
之后我们在 connmanager.go
当中定义连接管理模块 ConnManager,并使其实现 IConnManager 接口:
package znetimport ("errors""fmt""sync""zinx/ziface"
)type ConnManager struct {connections map[uint32]ziface.IConnection // 管理连接的信息connLock sync.RWMutex // 读写连接的读写锁
}func NewConnManager() *ConnManager {return &ConnManager{connections: make(map[uint32]ziface.IConnection),}
}// Add 添加连接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {// 保护共享资源 Map, 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()// 将 conn 连接添加到 ConnManagerconnMgr.connections[conn.GetConnID()] = connfmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}// Remove 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {// 保护共享资源 map, 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()// 删除连接信息delete(connMgr.connections, conn.GetConnID())fmt.Println("connection Remove connID = ", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}// Get 利用 ConnID 获取连接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {// 保护共享资源 Map, 加读锁connMgr.connLock.RLock()defer connMgr.connLock.RUnlock()if conn, ok := connMgr.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection not found")}
}// Len 获取当前连接个数
func (connMgr *ConnManager) Len() int {return len(connMgr.connections)
}// ClearConn 停止并清除当前所有连接
func (connMgr *ConnManager) ClearConn() {// 保护共享资源 Map, 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()// 停止并删除全部的连接信息for connID, conn := range connMgr.connections {// 停止conn.Stop()// 删除delete(connMgr.connections, connID)}fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}
在 ConnManager
中,用一个 map 来承载全部的连接信息,key 是 ConnID,它在 Server 的 Start 方法中,调用 NewConnection 之前确定,而 value 就是对连接的封装 Connection 本身。ConnManager 中还定义了一个 connLock,它的作用是对 map 在多任务修改场景下进行并发保护。
Remove()
方法用于删除指定的连接,而ClearConn()
则会首先停止所有的连接,再删除,应当在服务器停止前调用。
将连接管理模块集成到 Zinx 当中
将 ConnManager 集成到 Server 当中
现在我们先将 ConnManager 集成到 Server 当中,作为 Server 的成员:
type Server struct {Name string // Name 为服务器的名称IPVersion string // IPVersion: IPv4 or otherIP string // IP: 服务器绑定的 IP 地址Port int // Port: 服务器绑定的端口msgHandler ziface.IMsgHandle // 将 Router 替换为 MsgHandler, 绑定 MsgId 与对应的处理方法ConnMgr ziface.IConnManager // 当前 Server 的连接管理器
}
// NewServer 将创建一个服务器的 Handler
func NewServer() ziface.IServer {s := &Server{Name: settings.Conf.Name,IPVersion: "tcp4",IP: settings.Conf.Host,Port: settings.Conf.Port,msgHandler: NewMsgHandle(),ConnMgr: NewConnManager(),}return s
}
既然 Server 具备了 ConnManager 成员,我们为其设置一个方法,用于获取 ConnManager 对象(通过成员函数来获取类内的成员):
// zinx/ziface/iserver.go
// 定义服务器接口
type IServer interface {Start() // Start 启动服务器方法Stop() // Stop 停止服务器方法Serve() // Serve 开启服务器方法AddRouter(msgId uint32, router IRouter) // 路由功能: 给当前服务注册一个路由业务方法GetConnMgr() IConnManager // 得到连接管理器
}
// zinx/znet/server.go
func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}
我们可能会需要在 Connection 当中使用 ConnManager,毕竟 ConnManager 是对连接进行管理的模块,我们现在需要一种方法来使得 ConnManager 对 Connection 可见,Zinx 教程中采取的做法是将 Server 作为 Connection 成员的一部分,从而使得 Server 和 Connection 成为互相引用的关系,即:为 Connection 添加一个 Server 成员,使得 Connection 知晓自己隶属于哪个 Server 的管辖下:
type Connection struct {TCPServer ziface.IServer // 标记当前 Conn 属于哪个 ServerConn *net.TCPConn // 当前连接的 socket TCP 套接字ConnID uint32 // 当前连接的 ID, 也可称为 SessionID, 全局唯一isClosed bool // 当前连接的开启/关闭状态Msghandler ziface.IMsgHandle // 将 Router 替换为消息管理模块ExitBuffChan chan bool // 告知该连接一经退出/停止的 channelmsgChan chan []byte // 无缓冲 channel, 用于读/写两个 goroutine 之间的消息通信msgBuffChan chan []byte // 定义 msgBuffChan
}
当然,传递进来的是 Server 的指针,不可能在服务器应用的运行时再复制一个 Server 的对象。可以这样理解,Connection 应该对其所属 Server 的指针具有访问权。
在 Connection 的工厂函数中将连接添加到 ConnManager
在 NewConnection
工厂函数调用时,由于 Connection 对象对其隶属的 Server 可见,直接调用 Server 的 GetConnMgr 方法并使用 ConnManager 的 Add 方法将 Connection 添加即可:
// NewConnection 创建新的连接
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {c := &Connection{TCPServer: server,Conn: conn,ConnID: connID,isClosed: false,Msghandler: msgHandler,ExitBuffChan: make(chan bool, 1),msgChan: make(chan []byte), // msgChan 初始化}// 将新创建的 Conn 添加到连接管理器中c.TCPServer.GetConnMgr().Add(c)return c
}
Server 中连接数量的判断
回顾我们引入连接管理模块的动机,我们建立连接管理模块,是为了控制连接的数量,以避免服务器同时与过多的客户端建立连接。因此,当连接建立之前,我们应该首先判断一下,当前与 Server 连接的 Client 的数量是否达到上限,如果达到了上限,那么就拒绝连接。实现的方法非常简单,在 Server 的 Start 方法下加入一个 if 条件句即可:
func (s *Server) Start() {fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)go func() {// ....// 3 启动server网络连接业务for {// 3.1 阻塞等待客户端建立连接请求conn, err := listenner.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}// 3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接if s.ConnMgr.Len() >= settings.Conf.MaxConn {conn.Close()continue}//=============// 3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的dealConn := NewConntion(s, conn, cid, s.msgHandler)cid ++// 3.4 启动当前链接的处理业务go dealConn.Start()}}()
}
当然,我们应该在 yaml 中定义好我们期望服务器连接的最大数目。
还有一点就是,服务器的连接控制或许可以引入 LRU 这类算法,比如长时间没有消息收发的 Client 可以做断连处理,从而接入新的连接。
连接的删除
在连接停止时,我们应该将连接从 ConnManager 当中删除,因此在 Connection 的 Stop 方法中使用 ConnManager 的 Remove:
// Stop 停止连接, 结束当前连接状态
func (c *Connection) Stop() {fmt.Println("Conn Stop()... ConnID = ", c.ConnID)// 1. 如果当前连接已经关闭if c.isClosed == true {return}c.isClosed = truefmt.Println("Im here")// 关闭 socket 连接c.Conn.Close()// 通知从缓冲队列读数据的业务, 该链接已经关闭c.ExitBuffChan <- true// 将连接从管理器中删除c.TCPServer.GetConnMgr().Remove(c)// 关闭该链接全部管道close(c.ExitBuffChan)close(c.msgBuffChan)
}
在 Server Stop 的时候,应该将连接全部清空,调用 ClearConn:
func (s *Server) Stop() {fmt.Println("[STOP] Zinx server , name ", s.Name)// Server.Stop() 将其它需要清理的连接信息或其他信息一并停止或清理s.ConnMgr.ClearConn()
}
至此我们成功地将连接管理模块集成到了 Zinx 当中。
补充:连接的带缓冲发包方式
这一部分与连接管理的关联不大,但是与 Zinx 的教学文档同时被列在第九节,因此作为补充写在连接管理的下方。
我们之前给 Connection 提供了一个 SendMsg()
方法,SendMsg()
当中,数据将会被发送到无缓冲的 msgChan
当中。如果客户端的连接较多,处理不及时,可能会产生短暂的阻塞,因此我们可以补充一个带有缓冲的消息发送方法,来做一些非阻塞的用户体验。
首先修改 Connection 的接口 IConnection,添加将消息发送到带缓冲区的通道的方法:
type IConnection interface {Start() // 启动连接Stop() // 停止连接GetConnID() uint32 // 获取远程客户端地址信息GetTCPConnection() *net.TCPConn // 从当前连接获取原始的 socket TCPConnRemoteAddr() net.Addr // 获取远程客户端地址信息SendMsg(msgId uint32, data []byte) error // 直接将 Message 数据发给远程的 TCP 客户端SendBuffMsg(msgId uint32, data []byte) error // 添加带缓冲的发送消息接口
}
之后为 Connection 结构加入带缓冲区的 msgChan,并补充 SendBuffMsg(...)
方法:
type Connection struct {TCPServer ziface.IServer // 标记当前 Conn 属于哪个 ServerConn *net.TCPConn // 当前连接的 socket TCP 套接字ConnID uint32 // 当前连接的 ID, 也可称为 SessionID, 全局唯一isClosed bool // 当前连接的开启/关闭状态Msghandler ziface.IMsgHandle // 将 Router 替换为消息管理模块ExitBuffChan chan bool // 告知该连接一经退出/停止的 channelmsgChan chan []byte // 无缓冲 channel, 用于读/写两个 goroutine 之间的消息通信msgBuffChan chan []byte // 定义 msgBuffChan
}// NewConnection 创建新的连接
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {c := &Connection{TCPServer: server,Conn: conn,ConnID: connID,isClosed: false,Msghandler: msgHandler,ExitBuffChan: make(chan bool, 1),msgChan: make(chan []byte), // msgChan 初始化msgBuffChan: make(chan []byte, settings.Conf.MaxMsgChanLen),}// 将新创建的 Conn 添加到连接管理器中c.TCPServer.GetConnMgr().Add(c)return c
}func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send buff msg")}// 将 data 封包并发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg")}c.msgBuffChan <- msgreturn nil
}
我们在 Writer 中也要对 msgBuffChan 进行数据监控,如果有数据发送到这个 channel,应当将数据写给客户端,因此在 select 中添加一个接收 msgBuffChan 数据的 case 事件:
func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")for {select {case data := <-c.msgChan:if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Data error:", err, " Conn Writer exit~")return}case data, ok := <-c.msgBuffChan:if ok {if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Buff Data error:", err, " Conn Writer exit")return}} else {// 注意: 这里刚才写错了, else 的对象是 ok 的 if 语句而不是 err 的 if 语句fmt.Println("msgBuffChan is Closed")return}case <-c.ExitBuffChan:// conn 关闭return}}
}
补充:注册连接启动/停止时自定义的 hook 方法
有时在创建连接之后,或是在连接断开之前,我们希望执行一些用户自定义的业务,所以我们为 Zinx 新增两个连接断开前和创建后的回调函数,它们也成为 hook(钩子)函数。
我们通过 Server 来注册 conn 的 hook 方法,首先修改 IServer 接口:
// 定义服务器接口
type IServer interface {Start() // Start 启动服务器方法Stop() // Stop 停止服务器方法Serve() // Serve 开启服务器方法AddRouter(msgId uint32, router IRouter) // 路由功能: 给当前服务注册一个路由业务方法GetConnMgr() IConnManager // 得到连接管理器SetOnConnStart(func(IConnection)) // 设置该 Server 在连接创建时的 hook 函数SetOnConnStop(func(IConnection)) // 设置该 Server 在连接断开时的 hook 函数CallOnConnStart(conn IConnection) // 调用连接 onConnStart Hook 函数CallOnConnStop(conn IConnection) // 调用连接 onConnStop Hook 函数
}
再修改 Server:
type Server struct {Name string // Name 为服务器的名称IPVersion string // IPVersion: IPv4 or otherIP string // IP: 服务器绑定的 IP 地址Port int // Port: 服务器绑定的端口msgHandler ziface.IMsgHandle // 将 Router 替换为 MsgHandler, 绑定 MsgId 与对应的处理方法ConnMgr ziface.IConnManager // 当前 Server 的连接管理器onConnStart func(conn ziface.IConnection) // Server 在连接创建时的 Hook 函数onConnStop func(conn ziface.IConnection) // Server 在连接删除时的 Hook 函数
}
为 Server 实现 IServer 的方法:
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {s.onConnStart = hookFunc
}func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {s.onConnStop = hookFunc
}func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.onConnStart != nil {fmt.Println("---> CallOnConnStart ...")s.onConnStart(conn)}
}func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.onConnStop != nil {fmt.Println("---> CallOnConnStop ...")s.onConnStop(conn)}
}
在连接创建的后调用 hook 方法:
// Start 实现 IConnection 中的方法, 它启动连接并让当前连接开始工作
func (c *Connection) Start() {// 开启处理该连接读取到客户端数据之后的业务请求go c.StartWriter()go c.StartReader()c.TCPServer.CallOnConnStart(c)for {select {case <-c.ExitBuffChan:// 得到退出消息则不再阻塞return}}
}
停止连接前调用 hook 方法:
// Stop 停止连接, 结束当前连接状态
func (c *Connection) Stop() {fmt.Println("Conn Stop()... ConnID = ", c.ConnID)// 1. 如果当前连接已经关闭if c.isClosed == true {return}c.isClosed = truefmt.Println("Im here")// Connection Stop() 如果用户注册了该连接的关闭回调业务, 那么应该在此刻显式调用c.TCPServer.CallOnConnStop(c)// 关闭 socket 连接c.Conn.Close()// 通知从缓冲队列读数据的业务, 该链接已经关闭c.ExitBuffChan <- true// 将连接从管理器中删除c.TCPServer.GetConnMgr().Remove(c)// 关闭该链接全部管道close(c.ExitBuffChan)close(c.msgBuffChan)
}