This commit is contained in:
Liang Ding 2014-09-20 10:39:29 +08:00
parent 41d464237a
commit 4b7fee14f4
5 changed files with 74 additions and 43 deletions

View File

@ -88,6 +88,16 @@ func (ueqs Queues) New(sid string) *UserEventQueue {
return q
}
// 关闭一个用户事件队列.
func (ueqs Queues) Close(sid string) {
q := ueqs[sid]
if nil == q {
return
}
delete(ueqs, sid)
}
// 事件处理接口.
type Handler interface {
Handle(event *Event)

View File

@ -31,18 +31,14 @@ type Notification struct {
Message string `json:"message"`
}
// 通知通道.
// <sid, *util.WSChannel>
var notificationWSs = map[string]*util.WSChannel{}
// 用户事件处理:将事件转为通知,并通过通知通道推送给前端.
// 当用户事件队列接收到事件时将会调用该函数进行处理.
func event2Notification(e *event.Event) {
if nil == notificationWSs[e.Sid] {
if nil == session.NotificationWS[e.Sid] {
return
}
wsChannel := notificationWSs[e.Sid]
wsChannel := session.NotificationWS[e.Sid]
var notification Notification
@ -79,12 +75,12 @@ func WSHandler(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(w, r, nil, 1024, 1024)
wsChan := util.WSChannel{Sid: sid, Conn: conn, Request: r, Time: time.Now()}
notificationWSs[sid] = &wsChan
session.NotificationWS[sid] = &wsChan
ret := map[string]interface{}{"output": "Notification initialized", "cmd": "init-notification"}
wsChan.Conn.WriteJSON(&ret)
glog.V(4).Infof("Open a new [Notification] with session [%s], %d", sid, len(notificationWSs))
glog.V(4).Infof("Open a new [Notification] with session [%s], %d", sid, len(session.NotificationWS))
// 添加用户事件处理器
wSession.EventQueue.AddHandler(event.HandleFunc(event2Notification))

View File

@ -20,10 +20,6 @@ import (
"github.com/gorilla/websocket"
)
// 输出通道.
// <sid, *util.WSChannel>
var outputWS = map[string]*util.WSChannel{}
// 建立输出通道.
func WSHandler(w http.ResponseWriter, r *http.Request) {
// TODO: 会话校验
@ -32,12 +28,12 @@ func WSHandler(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(w, r, nil, 1024, 1024)
wsChan := util.WSChannel{Sid: sid, Conn: conn, Request: r, Time: time.Now()}
outputWS[sid] = &wsChan
session.OutputWS[sid] = &wsChan
ret := map[string]interface{}{"output": "Ouput initialized", "cmd": "init-output"}
wsChan.Conn.WriteJSON(&ret)
glog.V(4).Infof("Open a new [Output] with session [%s], %d", sid, len(outputWS))
glog.V(4).Infof("Open a new [Output] with session [%s], %d", sid, len(session.OutputWS))
}
// 运行一个可执行文件.
@ -114,8 +110,8 @@ func RunHandler(w http.ResponseWriter, r *http.Request) {
glog.V(3).Infof("Session [%s] 's running [id=%d, file=%s] has done", sid, runningId, filePath)
if nil != outputWS[sid] {
wsChannel := outputWS[sid]
if nil != session.OutputWS[sid] {
wsChannel := session.OutputWS[sid]
channelRet["cmd"] = "run-done"
channelRet["output"] = string(buf[:count])
@ -131,8 +127,8 @@ func RunHandler(w http.ResponseWriter, r *http.Request) {
break
} else {
if nil != outputWS[sid] {
wsChannel := outputWS[sid]
if nil != session.OutputWS[sid] {
wsChannel := session.OutputWS[sid]
channelRet["cmd"] = "run"
channelRet["output"] = string(buf[:count])
@ -299,10 +295,10 @@ func BuildHandler(w http.ResponseWriter, r *http.Request) {
channelRet["lints"] = lints
}
if nil != outputWS[sid] {
if nil != session.OutputWS[sid] {
glog.V(3).Infof("Session [%s] 's build [id=%d, file=%s] has done", sid, runningId, filePath)
wsChannel := outputWS[sid]
wsChannel := session.OutputWS[sid]
err := wsChannel.Conn.WriteJSON(&channelRet)
if nil != err {
glog.Error(err)
@ -426,10 +422,10 @@ func GoInstallHandler(w http.ResponseWriter, r *http.Request) {
channelRet["lints"] = lints
}
if nil != outputWS[sid] {
if nil != session.OutputWS[sid] {
glog.V(3).Infof("Session [%s] 's running [go install] [id=%d, dir=%s] has done", sid, runningId, curDir)
wsChannel := outputWS[sid]
wsChannel := session.OutputWS[sid]
err := wsChannel.Conn.WriteJSON(&channelRet)
if nil != err {
glog.Error(err)
@ -510,8 +506,8 @@ func GoGetHandler(w http.ResponseWriter, r *http.Request) {
channelRet["output"] = string(buf[:count])
channelRet["cmd"] = "go get"
if nil != outputWS[sid] {
wsChannel := outputWS[sid]
if nil != session.OutputWS[sid] {
wsChannel := session.OutputWS[sid]
err := wsChannel.Conn.WriteJSON(&channelRet)
if nil != err {

View File

@ -3,7 +3,7 @@
// 1. HTTP 会话:主要用于验证登录
// 2. Wide 会话:浏览器 tab 打开/刷新会创建一个,并和 HTTP 会话进行关联
//
// TODO: HTTP 会话失效时,关联的 Wide 会话也会做失效处理:释放所有和该会话相关的资源,例如运行中的程序进程、事件队列等
// 当会话失效时:释放所有和该会话相关的资源,例如运行中的程序进程、事件队列等.
package session
import (
@ -23,11 +23,19 @@ import (
const (
SessionStateActive = iota // 会话状态:活的
SessionStateClosed // 会话状态:已关闭(这个状态目前暂时没有使用到)
)
// 输出通道.
// <sid, *util.WSChannel>
var sessionWS = map[string]*util.WSChannel{}
var (
// 会话通道. <sid, *util.WSChannel>var
sessionWS = map[string]*util.WSChannel{}
// 输出通道. <sid, *util.WSChannel>
OutputWS = map[string]*util.WSChannel{}
// 通知通道. <sid, *util.WSChannel>
NotificationWS = map[string]*util.WSChannel{}
)
// 用户 HTTP 会话,用于验证登录.
var HTTPSession = sessions.NewCookieStore([]byte("BEYOND"))
@ -78,10 +86,15 @@ func WSHandler(w http.ResponseWriter, r *http.Request) {
if err := wsChan.Conn.ReadJSON(&input); err != nil {
glog.V(3).Infof("[Session Channel] of session [%s] disconnected, releases all resources with it", sid)
s := WideSessions.Get(sid)
for i, s := range WideSessions {
if s.Id == sid {
mutex.Lock()
// 关闭事件队列
close(s.EventQueue.Queue)
// 从会话集中移除
WideSessions = append(WideSessions[:i], WideSessions[i+1:]...)
// 关闭用户事件队列
event.UserEventQueues.Close(sid)
// 杀进程
for _, p := range s.Processes {
@ -92,12 +105,22 @@ func WSHandler(w http.ResponseWriter, r *http.Request) {
}
}
// TODO: 回收相关通道
// 回收所有通道
OutputWS[sid].Close()
delete(OutputWS, sid)
WideSessions.Remove(sid)
NotificationWS[sid].Close()
delete(NotificationWS, sid)
sessionWS[sid].Close()
delete(sessionWS, sid)
mutex.Unlock()
return
}
}
}
ret = map[string]interface{}{"output": "", "cmd": "session-output"}
@ -132,6 +155,7 @@ func (sessions *Sessions) New(httpSession *sessions.Session) *WideSession {
id := strconv.Itoa(rand.Int())
now := time.Now()
// 创建用户事件队列
userEventQueue := event.UserEventQueues.New(id)
ret := &WideSession{

View File

@ -14,3 +14,8 @@ type WSChannel struct {
Request *http.Request // 关联的 HTTP 请求
Time time.Time // 该通道最近一次使用时间
}
// 关闭通道.
func (c *WSChannel) Close() {
c.Conn.Close()
}