From 4b7fee14f4dfe1ad09bddb3bdb675037f3d8e0ef Mon Sep 17 00:00:00 2001 From: Liang Ding Date: Sat, 20 Sep 2014 10:39:29 +0800 Subject: [PATCH] Fix #58 --- event/events.go | 10 ++++++ notification/notifications.go | 12 +++---- output/outputs.go | 28 +++++++--------- session/sessions.go | 62 ++++++++++++++++++++++++----------- util/websocket.go | 5 +++ 5 files changed, 74 insertions(+), 43 deletions(-) diff --git a/event/events.go b/event/events.go index 52b167e..979a9cf 100644 --- a/event/events.go +++ b/event/events.go @@ -88,6 +88,16 @@ func (ueqs Queues) New(sid string) *UserEventQueue { return q } +// 关闭一个用户事件队列. +func (ueqs Queues) Close(sid string) { + q := ueqs[sid] + if nil == q { + return + } + + delete(ueqs, sid) +} + // 事件处理接口. type Handler interface { Handle(event *Event) diff --git a/notification/notifications.go b/notification/notifications.go index e15860d..7081a28 100644 --- a/notification/notifications.go +++ b/notification/notifications.go @@ -31,18 +31,14 @@ type Notification struct { Message string `json:"message"` } -// 通知通道. -// -var notificationWSs = map[string]*util.WSChannel{} - // 用户事件处理:将事件转为通知,并通过通知通道推送给前端. // 当用户事件队列接收到事件时将会调用该函数进行处理. func event2Notification(e *event.Event) { - if nil == notificationWSs[e.Sid] { + if nil == session.NotificationWS[e.Sid] { return } - wsChannel := notificationWSs[e.Sid] + wsChannel := session.NotificationWS[e.Sid] var notification Notification @@ -79,12 +75,12 @@ func WSHandler(w http.ResponseWriter, r *http.Request) { conn, _ := websocket.Upgrade(w, r, nil, 1024, 1024) wsChan := util.WSChannel{Sid: sid, Conn: conn, Request: r, Time: time.Now()} - notificationWSs[sid] = &wsChan + session.NotificationWS[sid] = &wsChan ret := map[string]interface{}{"output": "Notification initialized", "cmd": "init-notification"} wsChan.Conn.WriteJSON(&ret) - glog.V(4).Infof("Open a new [Notification] with session [%s], %d", sid, len(notificationWSs)) + glog.V(4).Infof("Open a new [Notification] with session [%s], %d", sid, len(session.NotificationWS)) // 添加用户事件处理器 wSession.EventQueue.AddHandler(event.HandleFunc(event2Notification)) diff --git a/output/outputs.go b/output/outputs.go index 32af1aa..79975f3 100644 --- a/output/outputs.go +++ b/output/outputs.go @@ -20,10 +20,6 @@ import ( "github.com/gorilla/websocket" ) -// 输出通道. -// -var outputWS = map[string]*util.WSChannel{} - // 建立输出通道. func WSHandler(w http.ResponseWriter, r *http.Request) { // TODO: 会话校验 @@ -32,12 +28,12 @@ func WSHandler(w http.ResponseWriter, r *http.Request) { conn, _ := websocket.Upgrade(w, r, nil, 1024, 1024) wsChan := util.WSChannel{Sid: sid, Conn: conn, Request: r, Time: time.Now()} - outputWS[sid] = &wsChan + session.OutputWS[sid] = &wsChan ret := map[string]interface{}{"output": "Ouput initialized", "cmd": "init-output"} wsChan.Conn.WriteJSON(&ret) - glog.V(4).Infof("Open a new [Output] with session [%s], %d", sid, len(outputWS)) + glog.V(4).Infof("Open a new [Output] with session [%s], %d", sid, len(session.OutputWS)) } // 运行一个可执行文件. @@ -114,8 +110,8 @@ func RunHandler(w http.ResponseWriter, r *http.Request) { glog.V(3).Infof("Session [%s] 's running [id=%d, file=%s] has done", sid, runningId, filePath) - if nil != outputWS[sid] { - wsChannel := outputWS[sid] + if nil != session.OutputWS[sid] { + wsChannel := session.OutputWS[sid] channelRet["cmd"] = "run-done" channelRet["output"] = string(buf[:count]) @@ -131,8 +127,8 @@ func RunHandler(w http.ResponseWriter, r *http.Request) { break } else { - if nil != outputWS[sid] { - wsChannel := outputWS[sid] + if nil != session.OutputWS[sid] { + wsChannel := session.OutputWS[sid] channelRet["cmd"] = "run" channelRet["output"] = string(buf[:count]) @@ -299,10 +295,10 @@ func BuildHandler(w http.ResponseWriter, r *http.Request) { channelRet["lints"] = lints } - if nil != outputWS[sid] { + if nil != session.OutputWS[sid] { glog.V(3).Infof("Session [%s] 's build [id=%d, file=%s] has done", sid, runningId, filePath) - wsChannel := outputWS[sid] + wsChannel := session.OutputWS[sid] err := wsChannel.Conn.WriteJSON(&channelRet) if nil != err { glog.Error(err) @@ -426,10 +422,10 @@ func GoInstallHandler(w http.ResponseWriter, r *http.Request) { channelRet["lints"] = lints } - if nil != outputWS[sid] { + if nil != session.OutputWS[sid] { glog.V(3).Infof("Session [%s] 's running [go install] [id=%d, dir=%s] has done", sid, runningId, curDir) - wsChannel := outputWS[sid] + wsChannel := session.OutputWS[sid] err := wsChannel.Conn.WriteJSON(&channelRet) if nil != err { glog.Error(err) @@ -510,8 +506,8 @@ func GoGetHandler(w http.ResponseWriter, r *http.Request) { channelRet["output"] = string(buf[:count]) channelRet["cmd"] = "go get" - if nil != outputWS[sid] { - wsChannel := outputWS[sid] + if nil != session.OutputWS[sid] { + wsChannel := session.OutputWS[sid] err := wsChannel.Conn.WriteJSON(&channelRet) if nil != err { diff --git a/session/sessions.go b/session/sessions.go index f65b74b..673a637 100644 --- a/session/sessions.go +++ b/session/sessions.go @@ -3,7 +3,7 @@ // 1. HTTP 会话:主要用于验证登录 // 2. Wide 会话:浏览器 tab 打开/刷新会创建一个,并和 HTTP 会话进行关联 // -// TODO: 当 HTTP 会话失效时,关联的 Wide 会话也会做失效处理:释放所有和该会话相关的资源,例如运行中的程序进程、事件队列等 +// 当会话失效时:释放所有和该会话相关的资源,例如运行中的程序进程、事件队列等. package session import ( @@ -23,11 +23,19 @@ import ( const ( SessionStateActive = iota // 会话状态:活的 + SessionStateClosed // 会话状态:已关闭(这个状态目前暂时没有使用到) ) -// 输出通道. -// -var sessionWS = map[string]*util.WSChannel{} +var ( + // 会话通道. var + sessionWS = map[string]*util.WSChannel{} + + // 输出通道. + OutputWS = map[string]*util.WSChannel{} + + // 通知通道. + NotificationWS = map[string]*util.WSChannel{} +) // 用户 HTTP 会话,用于验证登录. var HTTPSession = sessions.NewCookieStore([]byte("BEYOND")) @@ -78,25 +86,40 @@ func WSHandler(w http.ResponseWriter, r *http.Request) { if err := wsChan.Conn.ReadJSON(&input); err != nil { glog.V(3).Infof("[Session Channel] of session [%s] disconnected, releases all resources with it", sid) - s := WideSessions.Get(sid) + for i, s := range WideSessions { + if s.Id == sid { + mutex.Lock() - // 关闭事件队列 - close(s.EventQueue.Queue) + // 从会话集中移除 + WideSessions = append(WideSessions[:i], WideSessions[i+1:]...) - // 杀进程 - for _, p := range s.Processes { - if err := p.Kill(); nil != err { - glog.Errorf("Can't kill process [%d] of session [%s]", p.Pid, sid) - } else { - glog.V(3).Infof("Killed a process [%d] of session [%s]", p.Pid, sid) + // 关闭用户事件队列 + event.UserEventQueues.Close(sid) + + // 杀进程 + for _, p := range s.Processes { + if err := p.Kill(); nil != err { + glog.Errorf("Can't kill process [%d] of session [%s]", p.Pid, sid) + } else { + glog.V(3).Infof("Killed a process [%d] of session [%s]", p.Pid, sid) + } + } + + // 回收所有通道 + OutputWS[sid].Close() + delete(OutputWS, sid) + + NotificationWS[sid].Close() + delete(NotificationWS, sid) + + sessionWS[sid].Close() + delete(sessionWS, sid) + + mutex.Unlock() + + return } } - - // TODO: 回收相关通道 - - WideSessions.Remove(sid) - - return } ret = map[string]interface{}{"output": "", "cmd": "session-output"} @@ -132,6 +155,7 @@ func (sessions *Sessions) New(httpSession *sessions.Session) *WideSession { id := strconv.Itoa(rand.Int()) now := time.Now() + // 创建用户事件队列 userEventQueue := event.UserEventQueues.New(id) ret := &WideSession{ diff --git a/util/websocket.go b/util/websocket.go index 475de5c..d8dd7e3 100644 --- a/util/websocket.go +++ b/util/websocket.go @@ -14,3 +14,8 @@ type WSChannel struct { Request *http.Request // 关联的 HTTP 请求 Time time.Time // 该通道最近一次使用时间 } + +// 关闭通道. +func (c *WSChannel) Close() { + c.Conn.Close() +}