wide/session/sessions.go

300 lines
6.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// 会话操作.
//
// Wide 服务器端需要维护两种会话:
//
// 1. HTTP 会话:主要用于验证登录
// 2. Wide 会话:浏览器 tab 打开/刷新会创建一个,并和 HTTP 会话进行关联
//
// 当会话失效时:释放所有和该会话相关的资源,例如运行中的程序进程、事件队列等.
package session
import (
"encoding/json"
"math/rand"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/b3log/wide/conf"
"github.com/b3log/wide/event"
"github.com/b3log/wide/util"
"github.com/golang/glog"
"github.com/gorilla/sessions"
"github.com/gorilla/websocket"
)
const (
SessionStateActive = iota // 会话状态:活的
SessionStateClosed // 会话状态:已关闭(这个状态目前暂时没有使用到)
)
var (
// 会话通道. <sid, *util.WSChannel>
SessionWS = map[string]*util.WSChannel{}
// 输出通道. <sid, *util.WSChannel>
OutputWS = map[string]*util.WSChannel{}
// 通知通道. <sid, *util.WSChannel>
NotificationWS = map[string]*util.WSChannel{}
)
// 用户 HTTP 会话,用于验证登录.
var HTTPSession = sessions.NewCookieStore([]byte("BEYOND"))
// Wide 会话,对应一个浏览器 tab.
type WideSession struct {
Id string // 唯一标识
Username string // 用户名
HTTPSession *sessions.Session // 关联的 HTTP 会话
Processes []*os.Process // 关联的进程集
EventQueue *event.UserEventQueue // 关联的事件队列
State int // 状态
Content *conf.LatestSessionContent // 最近一次会话内容
Created time.Time // 创建时间
Updated time.Time // 最近一次使用时间
}
// 会话集类型.
type Sessions []*WideSession
// 所有 Wide 会话集.
var WideSessions Sessions
// 排它锁,防止并发修改.
var mutex sync.Mutex
// 在一些特殊情况(例如浏览器不间断刷新/在源代码视图刷新)下 Wide 会话集内会出现无效会话该函数定时1 小时)检查并移除这些无效会话.
//
// 无效会话:在检查时间内 30 分钟都没有使用过的会话,参考 WideSession.Updated 字段.
func FixedTimeRelease() {
go func() {
for {
hour, _ := time.ParseDuration("-30m")
threshold := time.Now().Add(hour)
for _, s := range WideSessions {
if s.Updated.Before(threshold) {
glog.V(3).Infof("Removes a invalid session [%s]", s.Id)
WideSessions.Remove(s.Id)
}
}
time.Sleep(time.Hour)
}
}()
}
// 建立会话通道. 通道断开时销毁会话状态,回收相关资源.
func WSHandler(w http.ResponseWriter, r *http.Request) {
sid := r.URL.Query()["sid"][0]
wSession := WideSessions.Get(sid)
if nil == wSession {
glog.Errorf("Session [%s] not found", sid)
return
}
conn, _ := websocket.Upgrade(w, r, nil, 1024, 1024)
wsChan := util.WSChannel{Sid: sid, Conn: conn, Request: r, Time: time.Now()}
SessionWS[sid] = &wsChan
ret := map[string]interface{}{"output": "Ouput initialized", "cmd": "init-session"}
wsChan.Conn.WriteJSON(&ret)
glog.V(4).Infof("Open a new [Session Channel] with session [%s], %d", sid, len(SessionWS))
input := map[string]interface{}{}
for {
if err := wsChan.Conn.ReadJSON(&input); err != nil {
glog.V(3).Infof("[Session Channel] of session [%s] disconnected, releases all resources with it", sid)
WideSessions.Remove(sid)
return
}
ret = map[string]interface{}{"output": "", "cmd": "session-output"}
if err := wsChan.Conn.WriteJSON(&ret); err != nil {
glog.Error("Session WS ERROR: " + err.Error())
return
}
wsChan.Time = time.Now()
}
}
// 会话内容保存.
func SaveContent(w http.ResponseWriter, r *http.Request) {
data := map[string]interface{}{"succ": true}
defer util.RetJSON(w, r, data)
args := struct {
Sid string
*conf.LatestSessionContent
}{}
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
glog.Error(err)
data["succ"] = false
return
}
wSession := WideSessions.Get(args.Sid)
if nil == wSession {
data["succ"] = false
return
}
wSession.Content = args.LatestSessionContent
for _, user := range conf.Wide.Users {
if user.Name == wSession.Username {
// 更新配置内存变量conf.FixedTimeSave() 会负责定时持久化
user.LatestSessionContent = wSession.Content
wSession.Refresh()
return
}
}
}
// 设置会话关联的进程集.
func (s *WideSession) SetProcesses(ps []*os.Process) {
s.Processes = ps
s.Refresh()
}
// 刷新会话最近一次使用时间.
func (s *WideSession) Refresh() {
s.Updated = time.Now()
}
// 创建一个 Wide 会话.
func (sessions *Sessions) New(httpSession *sessions.Session) *WideSession {
mutex.Lock()
defer mutex.Unlock()
rand.Seed(time.Now().UnixNano())
id := strconv.Itoa(rand.Int())
now := time.Now()
// 创建用户事件队列
userEventQueue := event.UserEventQueues.New(id)
ret := &WideSession{
Id: id,
Username: httpSession.Values["username"].(string),
HTTPSession: httpSession,
EventQueue: userEventQueue,
State: SessionStateActive,
Content: &conf.LatestSessionContent{},
Created: now,
Updated: now,
}
*sessions = append(*sessions, ret)
return ret
}
// 获取 Wide 会话.
func (sessions *Sessions) Get(sid string) *WideSession {
mutex.Lock()
defer mutex.Unlock()
for _, s := range *sessions {
if s.Id == sid {
return s
}
}
return nil
}
// 移除 Wide 会话,释放相关资源.
//
// 会话相关资源:
//
// 1. 用户事件队列
// 2. 运行中的进程集
// 3. WebSocket 通道
func (sessions *Sessions) Remove(sid string) {
mutex.Lock()
defer mutex.Unlock()
for i, s := range *sessions {
if s.Id == sid {
// 从会话集中移除
*sessions = append((*sessions)[:i], (*sessions)[i+1:]...)
// 关闭用户事件队列
event.UserEventQueues.Close(sid)
// 杀进程
for _, p := range s.Processes {
if err := p.Kill(); nil != err {
glog.Errorf("Can't kill process [%d] of session [%s]", p.Pid, sid)
} else {
glog.V(3).Infof("Killed a process [%d] of session [%s]", p.Pid, sid)
}
}
// 回收所有通道
if ws, ok := OutputWS[sid]; ok {
ws.Close()
delete(OutputWS, sid)
}
if ws, ok := NotificationWS[sid]; ok {
ws.Close()
delete(NotificationWS, sid)
}
if ws, ok := SessionWS[sid]; ok {
ws.Close()
delete(SessionWS, sid)
}
glog.V(3).Infof("Removed a session [%s]", s.Id)
cnt := 0 // 统计当前 HTTP 会话关联的 Wide 会话数量
for _, s := range *sessions {
if s.HTTPSession.ID == s.HTTPSession.ID {
cnt++
}
}
glog.V(3).Infof("User [%s] has [%d] sessions", s.Username, cnt)
return
}
}
}
// 获取 username 指定的用户的所有 Wide 会话.
func (sessions *Sessions) GetByUsername(username string) []*WideSession {
mutex.Lock()
defer mutex.Unlock()
ret := []*WideSession{}
for _, s := range *sessions {
if s.Username == username {
ret = append(ret, s)
}
}
return ret
}