Skip to content

Commit

Permalink
优化websocket,线程安全
Browse files Browse the repository at this point in the history
  • Loading branch information
donknap committed Nov 7, 2024
1 parent dc5bd6e commit 8f3a02a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion common/service/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewClient(ctx *gin.Context, options ClientOption) (*Client, error) {
}
collect.Join(client)

slog.Info("ws connect", "fd", client.Fd, "goroutine", runtime.NumGoroutine(), "total", collect.Total())
slog.Info("ws connect", "fd", client.Fd, "goroutine", runtime.NumGoroutine(), "client total", collect.Total(), "progress total", len(collect.progressPip))
return client, nil
}

Expand Down
31 changes: 19 additions & 12 deletions common/service/ws/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,26 @@ var (

func NewCollection() *Collection {
obj := &Collection{
clients: make(map[string]*Client),
clients: sync.Map{},
progressPip: make(map[string]*ProgressPip),
}
go obj.Broadcast()
return obj
}

type Collection struct {
clients map[string]*Client
clients sync.Map
progressPip map[string]*ProgressPip
ctx context.Context
}

func (self *Collection) Join(c *Client) {
self.clients[c.Fd] = c
self.clients.Store(c.Fd, c)
}

func (self *Collection) Leave(c *Client) {
if _, ok := self.clients[c.Fd]; ok {
delete(self.clients, c.Fd)
}
if len(self.clients) == 0 {
self.clients.Delete(c.Fd)
if self.Total() == 0 {
for key, pip := range self.progressPip {
pip.cancel()
delete(self.progressPip, key)
Expand All @@ -47,12 +45,14 @@ func (self *Collection) Leave(c *Client) {
func (self *Collection) sendMessage(message *RespMessage) {
lock.Lock()
lock.Unlock()
for _, client := range self.clients {
err := client.Conn.WriteMessage(websocket.TextMessage, message.ToJson())
self.clients.Range(func(key, value any) bool {
c := value.(*Client)
err := c.Conn.WriteMessage(websocket.TextMessage, message.ToJson())
if err != nil {
slog.Error("ws broadcast error", "fd", client.Fd, "error", err.Error())
slog.Error("ws broadcast error", "fd", c.Fd, "error", err.Error())
}
}
return true
})
}

func (self *Collection) Broadcast() {
Expand All @@ -72,5 +72,12 @@ func (self *Collection) Broadcast() {
}

func (self *Collection) Total() int {
return len(self.clients)
lock.Lock()
lock.Unlock()
count := 0
self.clients.Range(func(key, value any) bool {
count += 1
return true
})
return count
}

0 comments on commit 8f3a02a

Please sign in to comment.