Skip to content

Commit

Permalink
优化websocket,线程安全
Browse files Browse the repository at this point in the history
  • Loading branch information
donknap committed Nov 8, 2024
1 parent 8f3a02a commit 8396862
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 36 deletions.
63 changes: 63 additions & 0 deletions app/application/http/controller/compose-container.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package controller

import (
"bytes"
"errors"
"fmt"
"github.com/docker/docker/pkg/stdcopy"
"github.com/donknap/dpanel/app/application/logic"
logic2 "github.com/donknap/dpanel/app/common/logic"
"github.com/donknap/dpanel/common/accessor"
Expand Down Expand Up @@ -217,3 +219,64 @@ func (self Compose) ContainerProcessKill(http *gin.Context) {
self.JsonSuccessResponse(http)
return
}

func (self Compose) ContainerLog(http *gin.Context) {
type ParamsValidate struct {
Id int32 `json:"id" binding:"required"`
}
params := ParamsValidate{}
if !self.Validate(http, &params) {
return
}
composeRow, _ := dao.Compose.Where(dao.Compose.ID.Eq(params.Id)).First()
if composeRow == nil {
self.JsonResponseWithError(http, errors.New("任务不存在"), 500)
return
}
tasker, err := logic.Compose{}.GetTasker(composeRow)
if err != nil {
self.JsonResponseWithError(http, err, 500)
return
}
response, err := tasker.Logs()
if err != nil {
self.JsonResponseWithError(http, err, 500)
return
}
wsBuffer, err := ws.NewFdProgressPip(http, fmt.Sprintf(ws.MessageTypeComposeLog, composeRow.ID))
if err != nil {
self.JsonResponseWithError(http, err, 500)
return
}
defer wsBuffer.Close()
go func() {
select {
case <-wsBuffer.Done():
err = response.Close()
slog.Debug("compose", "run log response close", fmt.Sprintf(ws.MessageTypeComposeLog, composeRow.ID), "error", err)
if err != nil {
fmt.Printf("%v \n", err)
}
}
}()

wsBuffer.OnWrite = func(p string) error {
newReader := bytes.NewReader([]byte(p))
stdout := new(bytes.Buffer)
_, err = stdcopy.StdCopy(stdout, stdout, newReader)
if err != nil {
wsBuffer.BroadcastMessage(p)
} else {
wsBuffer.BroadcastMessage(stdout.String())
}
return nil
}
_, err = io.Copy(wsBuffer, response)
if err != nil {
self.JsonResponseWithError(http, err, 500)
return
}

self.JsonSuccessResponse(http)
return
}
1 change: 0 additions & 1 deletion app/application/http/controller/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (self Image) ImportByImageTar(http *gin.Context) {
imageTag := ""
buffer := new(bytes.Buffer)
wsBuffer.OnWrite = func(p string) error {
fmt.Printf("%v \n", p)
newReader := bufio.NewReader(bytes.NewReader([]byte(p)))
for {
line, _, err := newReader.ReadLine()
Expand Down
9 changes: 6 additions & 3 deletions app/application/http/controller/run-log.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ func (self RunLog) Run(http *gin.Context) {
http.Data(200, "text/plain", buffer.Bytes())
return
}
progress := ws.NewProgressPip(fmt.Sprintf(ws.MessageTypeContainerLog, params.Md5))
progress, err := ws.NewFdProgressPip(http, fmt.Sprintf(ws.MessageTypeContainerLog, params.Md5))
if err != nil {
self.JsonResponseWithError(http, err, 500)
return
}
progress.OnWrite = func(p string) error {
fmt.Printf("%v \n", p)
newReader := bytes.NewReader([]byte(p))
stdout := new(bytes.Buffer)
_, err = stdcopy.StdCopy(stdout, stdout, newReader)
Expand All @@ -70,7 +73,7 @@ func (self RunLog) Run(http *gin.Context) {
go func() {
select {
case <-progress.Done():
slog.Debug("container", "run log", "close")
slog.Debug("container", "run log response close", fmt.Sprintf(ws.MessageTypeContainerLog, params.Md5))
_ = response.Close()
}
}()
Expand Down
1 change: 1 addition & 0 deletions app/application/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (provider *Provider) Register(httpServer *http_server.Server) {
cors.POST("/app/compose/container-destroy", controller.Compose{}.ContainerDestroy)
cors.POST("/app/compose/container-ctrl", controller.Compose{}.ContainerCtrl)
cors.POST("/app/compose/container-process-kill", controller.Compose{}.ContainerProcessKill)
cors.POST("/app/compose/container-log", controller.Compose{}.ContainerLog)
},
)
}
9 changes: 8 additions & 1 deletion common/service/compose/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ func (self Task) Ctrl(op string) (io.Reader, error) {
return self.runCommand(cmd)
}

func (self Task) Logs() (io.ReadCloser, error) {
cmd := []string{
"--progress", "tty", "logs", "-f",
}
return self.runCommand(cmd)
}

func (self Task) OriginalYaml() ([]byte, error) {
return self.Original.Project.MarshalYAML()
}
Expand Down Expand Up @@ -118,7 +125,7 @@ func (self Task) Ps() []*composeContainerResult {
return result
}

func (self Task) runCommand(command []string) (io.Reader, error) {
func (self Task) runCommand(command []string) (io.ReadCloser, error) {
command = append(self.Composer.GetBaseCommand(), command...)
return exec.Command{}.RunInTerminal(&exec.RunCommandOption{
CmdName: "docker",
Expand Down
8 changes: 6 additions & 2 deletions common/service/exec/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ type RunCommandOption struct {
CmdName string
CmdArgs []string
WindowSize *pty.Winsize
Follow bool
}

func (self Command) RunInTerminal(option *RunCommandOption) (io.Reader, error) {
func (self Command) RunInTerminal(option *RunCommandOption) (io.ReadCloser, error) {
slog.Debug("run command", option.CmdName, option.CmdArgs)

cmd = exec.Command(option.CmdName, option.CmdArgs...)
Expand All @@ -36,7 +37,10 @@ func (self Command) RunInTerminal(option *RunCommandOption) (io.Reader, error) {
if err != nil {
return nil, err
}
return out, err
return TerminalResult{
Conn: out,
cmd: cmd,
}, err
}

func (self Command) Run(option *RunCommandOption) (io.Reader, error) {
Expand Down
34 changes: 34 additions & 0 deletions common/service/exec/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package exec

import (
"errors"
"fmt"
"io"
"os"
"os/exec"
)

type TerminalResult struct {
Conn *os.File
cmd *exec.Cmd
}

func (self TerminalResult) Close() error {
var errCmd error
var errConn error
if self.cmd != nil {
errCmd = self.cmd.Process.Kill()
}
if self.Conn != nil {
errConn = self.Conn.Close()
}
return errors.Join(errCmd, errConn)
}

func (self TerminalResult) Read(p []byte) (n int, err error) {
fmt.Printf("%v \n", self.Conn)
if self.Conn == nil {
return 0, io.EOF
}
return self.Conn.Read(p)
}
14 changes: 13 additions & 1 deletion common/service/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ func NewClient(ctx *gin.Context, options ClientOption) (*Client, error) {
if options.RecvMessageHandler == nil {
options.RecvMessageHandler = map[string]RecvMessageHandlerFn{}
}
options.RecvMessageHandler[MessageTypeProgressClose] = func(message *RecvMessage) {
closeMessage := struct {
Type string `json:"type"`
Data string `json:"data"`
}{}
if err := json.Unmarshal(message.Message, &closeMessage); err == nil {
if p, exists := collect.progressPip.Load(closeMessage.Data); exists {
// 在 pip 的 close 方法中统一删除
p.(ProgressPip).Close()
}
}
}
ws := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
Expand All @@ -48,7 +60,7 @@ func NewClient(ctx *gin.Context, options ClientOption) (*Client, error) {
}
collect.Join(client)

slog.Info("ws connect", "fd", client.Fd, "goroutine", runtime.NumGoroutine(), "client total", collect.Total(), "progress total", len(collect.progressPip))
slog.Info("ws connect", "fd", client.Fd, "goroutine", runtime.NumGoroutine(), "client total", collect.Total(), "progress total", collect.ProgressTotal())
return client, nil
}

Expand Down
31 changes: 25 additions & 6 deletions common/service/ws/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ var (
func NewCollection() *Collection {
obj := &Collection{
clients: sync.Map{},
progressPip: make(map[string]*ProgressPip),
progressPip: sync.Map{},
}
go obj.Broadcast()
return obj
}

type Collection struct {
clients sync.Map
progressPip map[string]*ProgressPip
progressPip sync.Map
ctx context.Context
}

Expand All @@ -34,11 +34,19 @@ func (self *Collection) Join(c *Client) {

func (self *Collection) Leave(c *Client) {
self.clients.Delete(c.Fd)
if self.Total() == 0 {
for key, pip := range self.progressPip {
pip.cancel()
delete(self.progressPip, key)
self.progressPip.Range(func(key, value any) bool {
p := value.(ProgressPip)
if p.fd == c.Fd {
p.Close()
}
return true
})
if self.Total() == 0 {
self.progressPip.Range(func(key, value any) bool {
p := value.(ProgressPip)
p.Close()
return true
})
}
}

Expand Down Expand Up @@ -81,3 +89,14 @@ func (self *Collection) Total() int {
})
return count
}

func (self *Collection) ProgressTotal() int {
lock.Lock()
lock.Unlock()
count := 0
self.progressPip.Range(func(key, value any) bool {
count += 1
return true
})
return count
}
37 changes: 26 additions & 11 deletions common/service/ws/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,49 @@ package ws

import (
"context"
"errors"
"github.com/donknap/dpanel/app/common/logic"
"github.com/gin-gonic/gin"
"time"
)

type ProgressWrite func(p []byte) ([]byte, error)

func NewProgressPip(messageType string) *ProgressPip {
func NewProgressPip(messageType string) ProgressPip {
ctx, cancelFunc := context.WithCancel(context.Background())
process := &ProgressPip{
process := ProgressPip{
messageType: messageType,
ctx: ctx,
cancel: cancelFunc,
}
if progress, ok := collect.progressPip[messageType]; ok {
progress.cancel()
if p, exists := collect.progressPip.LoadOrStore(messageType, process); exists {
p.(ProgressPip).Close()
}
collect.progressPip[messageType] = process
return process
}

func NewFdProgressPip(http *gin.Context, messageType string) (ProgressPip, error) {
fd := ""
if data, exists := http.Get("userInfo"); exists {
userInfo := data.(logic.UserInfo)
fd = userInfo.Fd
} else {
return ProgressPip{}, errors.New("fd not found")
}
process := NewProgressPip(messageType)
process.fd = fd
return process, nil
}

type ProgressPip struct {
fd string
messageType string
ctx context.Context
cancel context.CancelFunc
OnWrite func(p string) error
}

func (self *ProgressPip) Write(p []byte) (n int, err error) {
func (self ProgressPip) Write(p []byte) (n int, err error) {
temp := string(p)
if self.OnWrite != nil {
err = self.OnWrite(temp)
Expand All @@ -41,18 +57,17 @@ func (self *ProgressPip) Write(p []byte) (n int, err error) {
return len(p), nil
}

func (self *ProgressPip) BroadcastMessage(data interface{}) {
func (self ProgressPip) BroadcastMessage(data interface{}) {
BroadcastMessage <- &RespMessage{
Type: self.messageType,
Data: data,
RespAt: time.Now(),
}
}

func (self *ProgressPip) Close() {
if pip, ok := collect.progressPip[self.messageType]; ok {
pip.cancel()
delete(collect.progressPip, self.messageType)
func (self ProgressPip) Close() {
if p, exists := collect.progressPip.LoadAndDelete(self.messageType); exists {
p.(ProgressPip).cancel()
}
}

Expand Down
23 changes: 12 additions & 11 deletions common/service/ws/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (
)

const (
MessageTypeEvent = "event"
MessageTypeSignalAbort = "signal:abort"
MessageTypeEventFd = "event:fd"
MessageTypeCompose = "compose:%d"
MessageTypeConsole = "console:%s"
MessageTypeContainerLog = "container:log:%s"
MessageTypeImagePull = "image:pull:%s"
MessageTypeImageBuild = "image:build:%d"
MessageTypeImageImport = "image:import:%s"
MessageTypeEvent = "event"
MessageTypeSignalAbort = "signal:abort"
MessageTypeEventFd = "event:fd"
MessageTypeCompose = "compose:%d"
MessageTypeComposeLog = "compose:log:%d"
MessageTypeConsole = "console:%s"
MessageTypeContainerLog = "container:log:%s"
MessageTypeImagePull = "image:pull:%s"
MessageTypeImageBuild = "image:build:%d"
MessageTypeImageImport = "image:import:%s"
MessageTypeProgressClose = "progress:close"
)

type RespMessage struct {
Expand Down Expand Up @@ -46,8 +48,7 @@ func (self RecvMessage) IsPing() bool {
}

type recvMessageContent struct {
Type string
Content map[string]interface{}
Type string
}

type RecvMessageHandlerFn func(message *RecvMessage)
Expand Down

0 comments on commit 8396862

Please sign in to comment.