From b84338bb87098206ead5c1ea102536b0d449f3c4 Mon Sep 17 00:00:00 2001 From: Liang Ding Date: Sun, 19 May 2019 01:20:47 +0800 Subject: [PATCH] =?UTF-8?q?:recycle:=20=E9=87=8D=E6=9E=84=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- output/processes.go | 105 ---------------- output/run.go | 188 +--------------------------- playground/run.go | 190 +--------------------------- session/processes.go | 291 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 299 insertions(+), 475 deletions(-) delete mode 100644 output/processes.go create mode 100644 session/processes.go diff --git a/output/processes.go b/output/processes.go deleted file mode 100644 index 1bbd12f..0000000 --- a/output/processes.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright (c) 2014-present, b3log.org -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package output - -import ( - "os" - "sync" - - "github.com/b3log/wide/session" -) - -// Type of process set. -type procs map[string][]*os.Process - -// Processse of all users. -// -// -var Processes = procs{} - -// Exclusive lock. -var mutex sync.Mutex - -// Add adds the specified process to the user process set. -func (procs *procs) Add(wSession *session.WideSession, proc *os.Process) { - mutex.Lock() - defer mutex.Unlock() - - sid := wSession.ID - userProcesses := (*procs)[sid] - - userProcesses = append(userProcesses, proc) - (*procs)[sid] = userProcesses - - // bind process with wide session - wSession.SetProcesses(userProcesses) - - logger.Tracef("Session [%s] has [%d] processes", sid, len((*procs)[sid])) -} - -// Remove removes the specified process from the user process set. -func (procs *procs) Remove(wSession *session.WideSession, proc *os.Process) { - mutex.Lock() - defer mutex.Unlock() - - sid := wSession.ID - - userProcesses := (*procs)[sid] - - var newProcesses []*os.Process - for i, p := range userProcesses { - if p.Pid == proc.Pid { - newProcesses = append(userProcesses[:i], userProcesses[i+1:]...) // remove it - (*procs)[sid] = newProcesses - - // bind process with wide session - wSession.SetProcesses(newProcesses) - - logger.Tracef("Session [%s] has [%d] processes", sid, len((*procs)[sid])) - - return - } - } -} - -// Kill kills a process specified by the given pid. -func (procs *procs) Kill(wSession *session.WideSession, pid int) { - mutex.Lock() - defer mutex.Unlock() - - sid := wSession.ID - - userProcesses := (*procs)[sid] - - for i, p := range userProcesses { - if p.Pid == pid { - if err := p.Kill(); nil != err { - logger.Errorf("Kill a process [pid=%d] of user [%s, %s] failed [error=%v]", pid, wSession.UserId, sid, err) - } else { - var newProcesses []*os.Process - - newProcesses = append(userProcesses[:i], userProcesses[i+1:]...) - (*procs)[sid] = newProcesses - - // bind process with wide session - wSession.SetProcesses(newProcesses) - - logger.Debugf("Killed a process [pid=%d] of user [%s, %s]", pid, wSession.UserId, sid) - } - - return - } - } -} diff --git a/output/run.go b/output/run.go index fa2b797..f7bc78b 100644 --- a/output/run.go +++ b/output/run.go @@ -15,196 +15,16 @@ package output import ( - "bufio" - "encoding/json" - "math/rand" - "net/http" - "os/exec" - "path/filepath" - "strconv" - "strings" - "time" - - "github.com/b3log/wide/conf" "github.com/b3log/wide/session" - "github.com/b3log/wide/util" + "net/http" ) // RunHandler handles request of executing a binary file. func RunHandler(w http.ResponseWriter, r *http.Request) { - result := util.NewResult() - defer util.RetResult(w, r, result) - - var args map[string]interface{} - - if err := json.NewDecoder(r.Body).Decode(&args); err != nil { - logger.Error(err) - result.Succ = false - } - - sid := args["sid"].(string) - wSession := session.WideSessions.Get(sid) - if nil == wSession { - result.Succ = false - } - - filePath := args["executable"].(string) - - randInt := rand.Int() - rid := strconv.Itoa(randInt) - var cmd *exec.Cmd - if conf.Docker { - fileName := filepath.Base(filePath) - cmd = exec.Command("docker", "run", "--rm", "--cpus", "0.1", "--name", rid, "-v", filePath+":/"+fileName, conf.DockerImageGo, "/"+fileName) - } else { - cmd = exec.Command(filePath) - curDir := filepath.Dir(filePath) - cmd.Dir = curDir - } - - stdout, err := cmd.StdoutPipe() - if nil != err { - logger.Error(err) - result.Succ = false - } - - stderr, err := cmd.StderrPipe() - if nil != err { - logger.Error(err) - result.Succ = false - } - - outReader := bufio.NewReader(stdout) - errReader := bufio.NewReader(stderr) - - if err := cmd.Start(); nil != err { - logger.Error(err) - result.Succ = false - } - wsChannel := session.OutputWS[sid] - channelRet := map[string]interface{}{} - if !result.Succ { - channelRet["cmd"] = "run-done" - channelRet["output"] = "" - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - - return - } - - done := make(chan error) - go func() { done <- cmd.Wait() }() - - channelRet["pid"] = cmd.Process.Pid - - // add the process to user's process set - Processes.Add(wSession, cmd.Process) - - // push once for front-end to get the 'run' state and pid - if nil != wsChannel { - channelRet["cmd"] = "run" - channelRet["output"] = "" - if nil != wsChannel { - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - } - } - - go func() { - defer util.Recover() - - logger.Debugf("User [%s, %s] is running [id=%d, file=%s]", wSession.UserId, sid, rid, filePath) - - go func() { - defer util.Recover() - - for { - r, _, err := outReader.ReadRune() - if nil != err { - break - } - - oneRuneStr := string(r) - oneRuneStr = strings.Replace(oneRuneStr, "<", "<", -1) - oneRuneStr = strings.Replace(oneRuneStr, ">", ">", -1) - channelRet["cmd"] = "run" - channelRet["output"] = oneRuneStr - wsChannel := session.OutputWS[sid] - if nil != wsChannel { - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - } - } - }() - - for { - r, _, err := errReader.ReadRune() - if nil != err { - break - } - - oneRuneStr := string(r) - oneRuneStr = strings.Replace(oneRuneStr, "<", "<", -1) - oneRuneStr = strings.Replace(oneRuneStr, ">", ">", -1) - channelRet["cmd"] = "run" - channelRet["output"] = "" + oneRuneStr + "" - wsChannel := session.OutputWS[sid] - if nil != wsChannel { - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - } - } - }() - - after := time.After(5 * time.Second) - channelRet["cmd"] = "run-done" - select { - case <-after: - if conf.Docker { - killCmd := exec.Command("docker", "rm", "-f", rid) - if err := killCmd.Run(); nil != err { - logger.Errorf("executes [docker rm -f " + rid + "] failed [" + err.Error() + "], this will cause resource leaking") - } - } else { - cmd.Process.Kill() - } - - channelRet["output"] = "run program timeout in 5s\n" - case <-done: - channelRet["output"] = "\nrun program complete\n" - } - - Processes.Remove(wSession, cmd.Process) - logger.Debugf("User [%s, %s] done running [id=%s, file=%s]", wSession.UserId, sid, rid, filePath) - - if nil != wsChannel { - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - } + session.RunHandler(w, r, session.OutputWS) } -// StopHandler handles request of stoping a running process. +// StopHandler handles request of stopping a running process. func StopHandler(w http.ResponseWriter, r *http.Request) { - result := util.NewResult() - defer util.RetResult(w, r, result) - - var args map[string]interface{} - if err := json.NewDecoder(r.Body).Decode(&args); err != nil { - logger.Error(err) - result.Succ = false - - return - } - - sid := args["sid"].(string) - pid := int(args["pid"].(float64)) - - wSession := session.WideSessions.Get(sid) - if nil == wSession { - result.Succ = false - - return - } - - Processes.Kill(wSession, pid) + session.StopHandler(w, r) } diff --git a/playground/run.go b/playground/run.go index 8980015..994f51d 100644 --- a/playground/run.go +++ b/playground/run.go @@ -15,198 +15,16 @@ package playground import ( - "bufio" - "encoding/json" - "math/rand" - "net/http" - "os/exec" - "path/filepath" - "strconv" - "strings" - "time" - - "github.com/b3log/wide/conf" - "github.com/b3log/wide/output" "github.com/b3log/wide/session" - "github.com/b3log/wide/util" + "net/http" ) // RunHandler handles request of executing a binary file. func RunHandler(w http.ResponseWriter, r *http.Request) { - result := util.NewResult() - defer util.RetResult(w, r, result) - - var args map[string]interface{} - - if err := json.NewDecoder(r.Body).Decode(&args); err != nil { - logger.Error(err) - result.Succ = false - } - - sid := args["sid"].(string) - wSession := session.WideSessions.Get(sid) - if nil == wSession { - result.Succ = false - } - - filePath := args["executable"].(string) - - randInt := rand.Int() - rid := strconv.Itoa(randInt) - var cmd *exec.Cmd - if conf.Docker { - fileName := filepath.Base(filePath) - cmd = exec.Command("docker", "run", "--rm", "--cpus", "0.1", "--name", rid, "-v", filePath+":/"+fileName, conf.DockerImageGo, "/"+fileName) - } else { - cmd = exec.Command(filePath) - curDir := filepath.Dir(filePath) - cmd.Dir = curDir - } - - stdout, err := cmd.StdoutPipe() - if nil != err { - logger.Error(err) - result.Succ = false - } - - stderr, err := cmd.StderrPipe() - if nil != err { - logger.Error(err) - result.Succ = false - } - - outReader := bufio.NewReader(stdout) - errReader := bufio.NewReader(stderr) - - if err := cmd.Start(); nil != err { - logger.Error(err) - result.Succ = false - } - - wsChannel := session.PlaygroundWS[sid] - channelRet := map[string]interface{}{} - if !result.Succ { - channelRet["cmd"] = "run-done" - channelRet["output"] = "" - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - - return - } - - done := make(chan error) - go func() { done <- cmd.Wait() }() - - channelRet["pid"] = cmd.Process.Pid - - // add the process to user's process set - output.Processes.Add(wSession, cmd.Process) - - // push once for front-end to get the 'run' state and pid - if nil != wsChannel { - channelRet["cmd"] = "run" - channelRet["output"] = "" - if nil != wsChannel { - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - } - } - - go func() { - defer util.Recover() - - logger.Debugf("User [%s, %s] is running [id=%s, file=%s]", wSession.UserId, sid, rid, filePath) - - go func() { - defer util.Recover() - - for { - r, _, err := outReader.ReadRune() - if nil != err { - break - } - - oneRuneStr := string(r) - oneRuneStr = strings.Replace(oneRuneStr, "<", "<", -1) - oneRuneStr = strings.Replace(oneRuneStr, ">", ">", -1) - channelRet["cmd"] = "run" - channelRet["output"] = oneRuneStr - wsChannel := session.PlaygroundWS[sid] - if nil != wsChannel { - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - } - } - }() - - for { - r, _, err := errReader.ReadRune() - if nil != err { - break - } - - oneRuneStr := string(r) - oneRuneStr = strings.Replace(oneRuneStr, "<", "<", -1) - oneRuneStr = strings.Replace(oneRuneStr, ">", ">", -1) - channelRet["cmd"] = "run" - channelRet["output"] = oneRuneStr - wsChannel := session.PlaygroundWS[sid] - if nil != wsChannel { - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - } - } - }() - - after := time.After(5 * time.Second) - channelRet["cmd"] = "run-done" - select { - case <-after: - if conf.Docker { - killCmd := exec.Command("docker", "rm", "-f", rid) - if err := killCmd.Run(); nil != err { - logger.Errorf("executes [docker rm -f " + rid + "] failed [" + err.Error() + "], this will cause resource leaking") - } - } else { - cmd.Process.Kill() - } - - channelRet["output"] = "\nrun program timeout in 5s\n" - case <-done: - channelRet["output"] = "\nrun program complete\n" - } - - output.Processes.Remove(wSession, cmd.Process) - logger.Debugf("User [%s, %s] done running [id=%s, file=%s]", wSession.UserId, sid, rid, filePath) - - if nil != wsChannel { - wsChannel.WriteJSON(&channelRet) - wsChannel.Refresh() - } + session.RunHandler(w, r, session.PlaygroundWS) } -// StopHandler handles request of stoping a running process. +// StopHandler handles request of stopping a running process. func StopHandler(w http.ResponseWriter, r *http.Request) { - result := util.NewResult() - defer util.RetResult(w, r, result) - - var args map[string]interface{} - if err := json.NewDecoder(r.Body).Decode(&args); err != nil { - logger.Error(err) - result.Succ = false - - return - } - - sid := args["sid"].(string) - pid := int(args["pid"].(float64)) - - wSession := session.WideSessions.Get(sid) - if nil == wSession { - result.Succ = false - - return - } - - output.Processes.Kill(wSession, pid) + session.StopHandler(w, r) } diff --git a/session/processes.go b/session/processes.go new file mode 100644 index 0000000..6f18bcb --- /dev/null +++ b/session/processes.go @@ -0,0 +1,291 @@ +// Copyright (c) 2014-present, b3log.org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package session + +import ( + "bufio" + "encoding/json" + "github.com/b3log/wide/conf" + "github.com/b3log/wide/util" + "math/rand" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + "time" +) + +// Type of process set. +type procs map[string][]*os.Process + +// Processse of all users. +// +// +var Processes = procs{} + +// Exclusive lock. +var procMutex sync.Mutex + +// RunHandler handles request of executing a binary file. +func RunHandler(w http.ResponseWriter, r *http.Request, channel map[string]*util.WSChannel) { + result := util.NewResult() + defer util.RetResult(w, r, result) + + var args map[string]interface{} + + if err := json.NewDecoder(r.Body).Decode(&args); err != nil { + logger.Error(err) + result.Succ = false + } + + sid := args["sid"].(string) + wSession := WideSessions.Get(sid) + if nil == wSession { + result.Succ = false + } + + filePath := args["executable"].(string) + + randInt := rand.Int() + rid := strconv.Itoa(randInt) + var cmd *exec.Cmd + if conf.Docker { + fileName := filepath.Base(filePath) + cmd = exec.Command("docker", "run", "--rm", "--cpus", "0.1", "--name", rid, "-v", filePath+":/"+fileName, conf.DockerImageGo, "/"+fileName) + } else { + cmd = exec.Command(filePath) + curDir := filepath.Dir(filePath) + cmd.Dir = curDir + } + + stdout, err := cmd.StdoutPipe() + if nil != err { + logger.Error(err) + result.Succ = false + } + + stderr, err := cmd.StderrPipe() + if nil != err { + logger.Error(err) + result.Succ = false + } + + outReader := bufio.NewReader(stdout) + errReader := bufio.NewReader(stderr) + + if err := cmd.Start(); nil != err { + logger.Error(err) + result.Succ = false + } + wsChannel := channel[sid] + channelRet := map[string]interface{}{} + if !result.Succ { + channelRet["cmd"] = "run-done" + channelRet["output"] = "" + wsChannel.WriteJSON(&channelRet) + wsChannel.Refresh() + + return + } + + done := make(chan error) + go func() { done <- cmd.Wait() }() + + channelRet["pid"] = cmd.Process.Pid + Processes.Add(wSession, cmd.Process) + + // push once for front-end to get the 'run' state and pid + if nil != wsChannel { + channelRet["cmd"] = "run" + channelRet["output"] = "" + if nil != wsChannel { + wsChannel.WriteJSON(&channelRet) + wsChannel.Refresh() + } + } + + go func() { + defer util.Recover() + + logger.Debugf("User [%s, %s] is running [id=%s, file=%s]", wSession.UserId, sid, rid, filePath) + + go func() { + defer util.Recover() + + for { + r, _, err := outReader.ReadRune() + if nil != err { + break + } + + oneRuneStr := string(r) + oneRuneStr = strings.Replace(oneRuneStr, "<", "<", -1) + oneRuneStr = strings.Replace(oneRuneStr, ">", ">", -1) + channelRet["cmd"] = "run" + channelRet["output"] = oneRuneStr + wsChannel := channel[sid] + if nil != wsChannel { + wsChannel.WriteJSON(&channelRet) + wsChannel.Refresh() + } + } + }() + + for { + r, _, err := errReader.ReadRune() + if nil != err { + break + } + + oneRuneStr := string(r) + oneRuneStr = strings.Replace(oneRuneStr, "<", "<", -1) + oneRuneStr = strings.Replace(oneRuneStr, ">", ">", -1) + channelRet["cmd"] = "run" + channelRet["output"] = "" + oneRuneStr + "" + wsChannel := channel[sid] + if nil != wsChannel { + wsChannel.WriteJSON(&channelRet) + wsChannel.Refresh() + } + } + }() + + after := time.After(5 * time.Second) + channelRet["cmd"] = "run-done" + select { + case <-after: + if conf.Docker { + killCmd := exec.Command("docker", "rm", "-f", rid) + if err := killCmd.Run(); nil != err { + logger.Errorf("executes [docker rm -f " + rid + "] failed [" + err.Error() + "], this will cause resource leaking") + } + } else { + cmd.Process.Kill() + } + + channelRet["output"] = "run program timeout in 5s\n" + case <-done: + channelRet["output"] = "\nrun program complete\n" + } + + Processes.Remove(wSession, cmd.Process) + logger.Debugf("User [%s, %s] done running [id=%s, file=%s]", wSession.UserId, sid, rid, filePath) + + if nil != wsChannel { + wsChannel.WriteJSON(&channelRet) + wsChannel.Refresh() + } +} + +// StopHandler handles request of stopping a running process. +func StopHandler(w http.ResponseWriter, r *http.Request) { + result := util.NewResult() + defer util.RetResult(w, r, result) + + var args map[string]interface{} + if err := json.NewDecoder(r.Body).Decode(&args); err != nil { + logger.Error(err) + result.Succ = false + + return + } + + sid := args["sid"].(string) + pid := int(args["pid"].(float64)) + + wSession := WideSessions.Get(sid) + if nil == wSession { + result.Succ = false + + return + } + + Processes.Kill(wSession, pid) +} + +// Add adds the specified process to the user process set. +func (procs *procs) Add(wSession *WideSession, proc *os.Process) { + procMutex.Lock() + defer procMutex.Unlock() + + sid := wSession.ID + userProcesses := (*procs)[sid] + + userProcesses = append(userProcesses, proc) + (*procs)[sid] = userProcesses + + // bind process with wide session + wSession.SetProcesses(userProcesses) + + logger.Tracef("Session [%s] has [%d] processes", sid, len((*procs)[sid])) +} + +// Remove removes the specified process from the user process set. +func (procs *procs) Remove(wSession *WideSession, proc *os.Process) { + procMutex.Lock() + defer procMutex.Unlock() + + sid := wSession.ID + + userProcesses := (*procs)[sid] + + var newProcesses []*os.Process + for i, p := range userProcesses { + if p.Pid == proc.Pid { + newProcesses = append(userProcesses[:i], userProcesses[i+1:]...) // remove it + (*procs)[sid] = newProcesses + + // bind process with wide session + wSession.SetProcesses(newProcesses) + + logger.Tracef("Session [%s] has [%d] processes", sid, len((*procs)[sid])) + + return + } + } +} + +// Kill kills a process specified by the given pid. +func (procs *procs) Kill(wSession *WideSession, pid int) { + procMutex.Lock() + defer procMutex.Unlock() + + sid := wSession.ID + + userProcesses := (*procs)[sid] + + for i, p := range userProcesses { + if p.Pid == pid { + if err := p.Kill(); nil != err { + logger.Errorf("Kill a process [pid=%d] of user [%s, %s] failed [error=%v]", pid, wSession.UserId, sid, err) + } else { + var newProcesses []*os.Process + + newProcesses = append(userProcesses[:i], userProcesses[i+1:]...) + (*procs)[sid] = newProcesses + + // bind process with wide session + wSession.SetProcesses(newProcesses) + + logger.Debugf("Killed a process [pid=%d] of user [%s, %s]", pid, wSession.UserId, sid) + } + + return + } + } +}