会话管理

This commit is contained in:
Liang Ding 2014-09-19 19:21:13 +08:00
parent 892a2aa48c
commit 387d04bd3d
6 changed files with 134 additions and 52 deletions

View File

@ -24,12 +24,17 @@ type Event struct {
var EventQueue = make(chan int, MaxQueueLength)
// 用户事件队列.
// <sid, chan>
var UserEventQueues = map[string]chan int{}
type UserEventQueue struct {
Sid string // 关联的会话 id
Queue chan int // 队列
Handlers []Handler // 事件处理器集
}
// 用户事件处理器集.
// <sid, *Handlers>
var UserEventHandlers = map[string]*Handlers{}
type Queues map[string]*UserEventQueue
// 用户事件队列集.
// <sid, *UserEventQueue>
var UserEventQueues = Queues{}
// 加载事件处理.
func Load() {
@ -39,43 +44,48 @@ func Load() {
// 将事件分发到每个用户的事件队列里
for _, userQueue := range UserEventQueues {
userQueue <- event
userQueue.Queue <- event
}
}
}()
}
// 为用户队列添加事件处理器.
func (uq *UserEventQueue) AddHandler(handlers ...Handler) {
for _, handler := range handlers {
uq.Handlers = append(uq.Handlers, handler)
}
}
// 初始化一个用户事件队列.
func InitUserQueue(sid string, handlers ...Handler) {
// FIXME: 会话过期后需要销毁对应的用户事件队列
q := UserEventQueues[sid]
func (ueqs Queues) New(sid string) *UserEventQueue {
q := ueqs[sid]
if nil != q {
return
glog.Warningf("Already exist a user queue in session [%s]", sid)
return q
}
q = make(chan int, MaxQueueLength)
UserEventQueues[sid] = q
if nil == UserEventHandlers[sid] {
UserEventHandlers[sid] = new(Handlers)
q = &UserEventQueue{
Sid: sid,
Queue: make(chan int, MaxQueueLength),
}
for _, handler := range handlers {
UserEventHandlers[sid].add(handler)
}
ueqs[sid] = q
go func() {
for evtCode := range q {
go func() { // 队列开始监听事件
for evtCode := range q.Queue {
glog.V(5).Infof("Session [%s] received a event [%d]", sid, evtCode)
// 将事件交给事件处理器进行处理
for _, handler := range *UserEventHandlers[sid] {
e := Event{Code: evtCode, Sid: sid}
handler.Handle(&e)
for _, handler := range q.Handlers {
handler.Handle(&Event{Code: evtCode, Sid: sid})
}
}
}()
return q
}
// 事件处理接口.
@ -90,9 +100,3 @@ type HandleFunc func(event *Event)
func (fn HandleFunc) Handle(event *Event) {
fn(event)
}
type Handlers []Handler
func (handlers *Handlers) add(handler Handler) {
*handlers = append(*handlers, handler)
}

View File

@ -9,6 +9,7 @@ import (
"strconv"
"github.com/b3log/wide/event"
"github.com/b3log/wide/i18n"
"github.com/b3log/wide/session"
"github.com/b3log/wide/util"
"github.com/golang/glog"
"github.com/gorilla/websocket"
@ -66,9 +67,15 @@ func event2Notification(e *event.Event) {
// 建立通知通道.
func WSHandler(w http.ResponseWriter, r *http.Request) {
// TODO: 会话校验
sid := r.URL.Query()["sid"][0]
wSession := session.WideSessions.Get(sid)
if nil == wSession {
glog.Errorf("Session [%s] not found", sid)
return
}
conn, _ := websocket.Upgrade(w, r, nil, 1024, 1024)
wsChan := util.WSChannel{Sid: sid, Conn: conn, Request: r, Time: time.Now()}
@ -79,8 +86,8 @@ func WSHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infof("Open a new [Notification] with session [%s], %d", sid, len(notificationWSs))
// 初始化用户事件队列
event.InitUserQueue(sid, event.HandleFunc(event2Notification))
// 添加用户事件处理器
wSession.EventQueue.AddHandler(event.HandleFunc(event2Notification))
input := map[string]interface{}{}

View File

@ -56,8 +56,13 @@ func RunHandler(w http.ResponseWriter, r *http.Request) {
return
}
// TODO: 会话校验
sid := args["sid"].(string)
wSession := session.WideSessions.Get(sid)
if nil == wSession {
data["succ"] = false
return
}
filePath := args["executable"].(string)
curDir := filePath[:strings.LastIndex(filePath, string(os.PathSeparator))]
@ -91,7 +96,7 @@ func RunHandler(w http.ResponseWriter, r *http.Request) {
}
// 添加到用户进程集中
processes.add(sid, cmd.Process)
processes.add(wSession, cmd.Process)
channelRet := map[string]interface{}{}
channelRet["pid"] = cmd.Process.Pid
@ -105,7 +110,7 @@ func RunHandler(w http.ResponseWriter, r *http.Request) {
if nil != err || 0 == count {
// 从用户进程集中移除这个执行完毕的进程
processes.remove(sid, cmd.Process)
processes.remove(wSession, cmd.Process)
glog.V(3).Infof("Session [%s] 's running [id=%d, file=%s] has done", sid, runningId, filePath)

View File

@ -2,7 +2,9 @@ package output
import (
"os"
"sync"
"github.com/b3log/wide/session"
"github.com/golang/glog"
)
@ -12,18 +14,33 @@ type procs map[string][]*os.Process
var processes = procs{}
// 排它锁,防止并发修改.
var mutex sync.Mutex
// 添加用户执行进程.
func (procs *procs) add(sid string, proc *os.Process) {
func (procs *procs) add(wSession *session.WideSession, proc *os.Process) {
mutex.Lock()
defer mutex.Unlock()
sid := wSession.Id
userProcesses := (*procs)[sid]
userProcesses = append(userProcesses, proc)
(*procs)[sid] = userProcesses
// 会话关联进程
wSession.SetProcesses(userProcesses)
glog.V(3).Infof("Session [%s] has [%d] processes", sid, len((*procs)[sid]))
}
// 移除用户执行进程.
func (procs *procs) remove(sid string, proc *os.Process) {
func (procs *procs) remove(wSession *session.WideSession, proc *os.Process) {
mutex.Lock()
defer mutex.Unlock()
sid := wSession.Id
userProcesses := (*procs)[sid]
var newProcesses []*os.Process
@ -32,6 +49,9 @@ func (procs *procs) remove(sid string, proc *os.Process) {
newProcesses = append(userProcesses[:i], userProcesses[i+1:]...)
(*procs)[sid] = newProcesses
// 会话关联进程
wSession.SetProcesses(newProcesses)
glog.V(3).Infof("Session [%s] has [%d] processes", sid, len((*procs)[sid]))
return
@ -40,14 +60,27 @@ func (procs *procs) remove(sid string, proc *os.Process) {
}
// 结束用户正在执行的进程.
func (procs *procs) kill(sid string, pid int) {
pros := (*procs)[sid]
func (procs *procs) kill(wSession *session.WideSession, pid int) {
mutex.Lock()
defer mutex.Unlock()
for _, p := range pros {
sid := wSession.Id
userProcesses := (*procs)[sid]
for i, p := range userProcesses {
if p.Pid == pid {
if err := p.Kill(); nil != err {
glog.Error("Kill a process [pid=%d] of session [%s] failed [error=%v]", pid, sid, err)
} else {
var newProcesses []*os.Process
newProcesses = append(userProcesses[:i], userProcesses[i+1:]...)
(*procs)[sid] = newProcesses
// 会话关联进程
wSession.SetProcesses(newProcesses)
glog.V(3).Infof("Killed a process [pid=%d] of session [%s]", pid, sid)
}
}

View File

@ -8,10 +8,12 @@ package session
import (
"math/rand"
"os"
"strconv"
"sync"
"time"
"github.com/b3log/wide/event"
"github.com/golang/glog"
"github.com/gorilla/sessions"
)
@ -27,6 +29,8 @@ var HTTPSession = sessions.NewCookieStore([]byte("BEYOND"))
type WideSession struct {
Id string // 唯一标识
HTTPSession *sessions.Session // 关联的 HTTP 会话
Processes []*os.Process // 关联的进程集
EventQueue *event.UserEventQueue // 关联的事件队列
State int // 状态
Created time.Time // 创建时间
Updated time.Time // 最近一次使用时间
@ -37,9 +41,21 @@ type Sessions []*WideSession
// 所有 Wide 会话集.
var WideSessions Sessions
// 排它锁,防止并发问题.
// 排它锁,防止并发修改.
var mutex sync.Mutex
// 设置会话关联的进程集.
func (s *WideSession) SetProcesses(ps []*os.Process) {
s.Processes = ps
s.Refresh()
}
// 刷新会话最近一次使用时间.
func (s *WideSession) Refresh() {
s.Updated = time.Now()
}
// 创建一个 Wide 会话.
func (sessions *Sessions) New(httpSession *sessions.Session) *WideSession {
mutex.Lock()
@ -50,9 +66,12 @@ func (sessions *Sessions) New(httpSession *sessions.Session) *WideSession {
id := strconv.Itoa(rand.Int())
now := time.Now()
userEventQueue := event.UserEventQueues.New(id)
ret := &WideSession{
Id: id,
HTTPSession: httpSession,
EventQueue: userEventQueue,
State: SessionStateActive,
Created: now,
Updated: now,
@ -63,6 +82,20 @@ func (sessions *Sessions) New(httpSession *sessions.Session) *WideSession {
return ret
}
// 获取 Wide 会话.
func (sessions *Sessions) Get(sid string) *WideSession {
mutex.Lock()
defer mutex.Unlock()
for _, s := range *sessions {
if s.Id == sid {
return s
}
}
return nil
}
// 移除 Wide 会话.
func (sessions *Sessions) Remove(sid string) {
mutex.Lock()

View File

@ -74,7 +74,7 @@
<span>{{.i18n.help}}</span>
<div class="frame">
<ul>
<li onclick="window.open('/doc/{{.locale}}/index.html')">
<li onclick="window.open('https://www.gitbook.io/book/88250/wide-user-guide')">
<span>{{.i18n.wide_doc}}</span>
</li>
<li onclick="window.open('https://github.com/b3log/wide/issues/new')">