Skip to content

Commit

Permalink
🔇 silent changes: refactor codebase #14
Browse files Browse the repository at this point in the history
  • Loading branch information
pnguyen215 committed Jan 14, 2024
1 parent 12ea69a commit 36386e4
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 415 deletions.
4 changes: 4 additions & 0 deletions example/ami_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ func TestAllEventConsume(t *testing.T) {
func TestDialOut(t *testing.T) {
// adding logic here
}

func TestChanspy(t *testing.T) {

}
78 changes: 43 additions & 35 deletions pkg/ami/ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import (
"github.com/pnguyen215/voipkit/pkg/ami/config"
)

func (c *AMI) Socket() *AMISocket {
return c.socket
}

func (c *AMI) Message() *AMIMessage {
return c.message
}

// Action sends an AMI action message to the Asterisk server.
// If the action message does not have an ActionID, it adds one automatically.
// The method returns true if the action message is successfully sent, otherwise false.
Expand Down Expand Up @@ -57,19 +65,19 @@ func (c *AMI) Action(message *AMIMessage) bool {
// providing a channel through which you can receive and handle messages as they are received from the server.
// It is useful for capturing and reacting to various events happening within the Asterisk communication system.
func (c *AMI) AllEvents() <-chan *AMIMessage {
return c.Subs.Subscribe(config.AmiPubSubKeyRef)
return c.subs.Subscribe(config.AmiPubSubKeyRef)
}

// OnEvent subscribes by event name (case insensitive) and
// returns send-only channel or nil
func (c *AMI) OnEvent(name string) <-chan *AMIMessage {
return c.Subs.Subscribe(name)
return c.subs.Subscribe(name)
}

// OnEvents subscribes by events name (case insensitive) and
// return send-only channel or nil
func (c *AMI) OnEvents(keys ...string) <-chan *AMIMessage {
return c.Subs.Subscribes(keys...)
return c.subs.Subscribes(keys...)
}

// EmitError sends an error to the error channel (c.Err) in a non-blocking manner.
Expand All @@ -83,12 +91,12 @@ func (c *AMI) OnEvents(keys ...string) <-chan *AMIMessage {
// without blocking the main execution flow.
func (c *AMI) EmitError(err error) {
go func(err error) {
c.Mutex.Lock()
defer c.Mutex.Unlock()
if err == nil || c.Err == nil {
c.mutex.Lock()
defer c.mutex.Unlock()
if err == nil || c.err == nil {
return
}
c.Err <- err
c.err <- err
}(err)
}

Expand All @@ -107,9 +115,9 @@ func (c *AMI) EmitError(err error) {
// Note: This method provides a non-blocking way to receive errors from the AMI client.
// It returns the error channel, allowing clients to listen for errors and take appropriate actions.
func (c *AMI) Error() <-chan error {
c.Mutex.RLock()
defer c.Mutex.RUnlock()
return c.Err
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.err
}

// Close closes the AMI client, terminating its connection and cleaning up resources.
Expand All @@ -123,13 +131,13 @@ func (c *AMI) Error() <-chan error {
// It terminates the connection, cancels the context, destroys event subscriptions, and closes the error channel.
// Once closed, the AMI client should not be used further, and a new instance may be created if needed.
func (c *AMI) Close() {
c.Mutex.Lock()
defer c.Mutex.Unlock()
c.Cancel()
c.Subs.Destroy()
c.Conn.Close()
close(c.Err)
c.Err = nil
c.mutex.Lock()
defer c.mutex.Unlock()
c.cancel()
c.subs.Destroy()
c.conn.Close()
close(c.err)
c.err = nil
}

// publish sends the provided AMI message to all subscribers based on event type and general subscriptions.
Expand All @@ -144,7 +152,7 @@ func (c *AMI) Close() {
// When a new AMI message is received, this method broadcasts it to relevant subscribers.
func (c *AMI) publish(message *AMIMessage) {
if message != nil {
c.Subs.Publish(message)
c.subs.Publish(message)
}
}

Expand Down Expand Up @@ -175,7 +183,7 @@ func (c *AMI) apply(_ctx context.Context, timeout time.Duration) error {
go func() {
defer close(prompt)
defer close(fail)
line, err := c.Reader.ReadLine()
line, err := c.reader.ReadLine()
if err != nil {
fail <- err
return
Expand Down Expand Up @@ -213,15 +221,15 @@ func (c *AMI) apply(_ctx context.Context, timeout time.Duration) error {
// and creates an AMIMessage struct to represent the received data. If an error occurs during the read
// operation, it is returned as an error.
func (c *AMI) read() (*AMIMessage, error) {
headers, err := c.Reader.ReadMIMEHeader()
headers, err := c.reader.ReadMIMEHeader()
if err != nil {
if err.Error() == ErrorEOF || err.Error() == ErrorIO {
return nil, ErrorAsteriskNetwork
}
return nil, err
}
message := ofMessage(headers)
c.Raw = message
c.message = message
return message, nil
}

Expand All @@ -241,13 +249,13 @@ func (c *AMI) read() (*AMIMessage, error) {
// connection to ensure thread safety and writes the bytes using the Writer. Any error during the
// write operation is returned as an error.
func (c *AMI) write(bytes []byte) error {
c.Mutex.Lock()
defer c.Mutex.Unlock()
_, err := c.Writer.Write(bytes)
c.mutex.Lock()
defer c.mutex.Unlock()
_, err := c.writer.Write(bytes)
if err != nil {
return err
}
err = c.Writer.Flush()
err = c.writer.Flush()
if err != nil {
return err
}
Expand All @@ -269,10 +277,10 @@ func (c *AMI) write(bytes []byte) error {
// continuously reading messages from the connection, publishing them to subscribers, and handling errors. It terminates
// when the provided context is canceled or an error occurs during the reading process.
func (c *AMI) release(ctx context.Context) {
c.Subs = NewPubSubQueue()
c.Err = make(chan error)
c.subs = NewPubSubQueue()
c.err = make(chan error)
go func() {
defer c.Subs.Disabled()
defer c.subs.Disabled()
for {
select {
case <-ctx.Done():
Expand All @@ -290,7 +298,7 @@ func (c *AMI) release(ctx context.Context) {
return
}
}
c.Raw = message
c.message = message
c.publish(message)
}
}
Expand Down Expand Up @@ -352,7 +360,7 @@ func (c *AMI) authenticate(_ctx context.Context, request AmiClient) error {
if !msg.IsSuccess() {
return ErrorAsteriskAuthenticated
} else {
c.Raw = msg
c.message = msg
}
}
return nil
Expand Down Expand Up @@ -383,16 +391,16 @@ func (c *AMI) authenticate(_ctx context.Context, request AmiClient) error {
func create(conn net.Conn) (*AMI, context.Context) {
ctx, cancel := context.WithCancel(context.Background())
c := &AMI{
Reader: textproto.NewReader(bufio.NewReader(conn)),
Writer: bufio.NewWriter(conn),
Conn: conn,
Cancel: cancel,
reader: textproto.NewReader(bufio.NewReader(conn)),
writer: bufio.NewWriter(conn),
conn: conn,
cancel: cancel,
}
if conn != nil {
addr := conn.RemoteAddr().String()
_socket, err := WithSocket(ctx, addr)
if err == nil {
c.Socket = _socket
c.socket = _socket
D().Info("Ami network cloning (addr: %v) socket connection succeeded", addr)
}
}
Expand Down
23 changes: 9 additions & 14 deletions pkg/ami/ami_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ami

import (
"fmt"
"strings"

"github.com/pnguyen215/voipkit/pkg/ami/config"
)
Expand All @@ -12,7 +11,7 @@ func NewAction() *AMIAction {
return a
}

func NewRevokeAction(cmd string, timeout int) *AMIAction {
func NewRevoke(cmd string, timeout int) *AMIAction {
cli := NewAction()
cli.Name = cmd
cli.Timeout = timeout
Expand All @@ -33,24 +32,20 @@ func (a *AMIAction) SetTimeout(timeout int) *AMIAction {

// Revoke run cli on asterisk server
func (c *AMIAction) Revoke(a *AMI, d *AMIDictionary, e *AMIMessage, deadlock bool) (*AMIResponse, error) {
D().Info("[>] Ami revoke action (state mutex opened lock~unlock) >>> '%v'", e.String())
D().Info("Ami revoking action (state mutex opened lock~unlock): '%v'", e.String())
var response AMIResponse
var _err error

if strings.EqualFold(c.Name, "") {
response.ErrorMessage = fmt.Sprintf(config.AmiErrorFieldRequired, "name")
if IsStringEmpty(c.Name) {
response.Message = fmt.Sprintf(config.AmiErrorFieldRequired, "name")
response.IsSuccess = false
_err = fmt.Errorf(response.ErrorMessage)
_err = fmt.Errorf(response.Message)
return &response, _err
}

a.Action(e)
all := a.AllEvents()

if deadlock {
defer a.Close()
}

for {
select {
case message := <-all:
Expand All @@ -59,17 +54,17 @@ func (c *AMIAction) Revoke(a *AMI, d *AMIDictionary, e *AMIMessage, deadlock boo
message.SetRegion(e.Region)
message.AddFieldDateReceivedAt()
if message.IsResponse() {
response.Event = message
response.event = message
response.IsSuccess = true
response.RawJson = message.JsonTranslator(d)
response.Json = message.JsonTranslator(d)
goto on_success
}
case err := <-a.Error():
a.Close()
_err = err
response.Event = nil
response.event = nil
response.IsSuccess = false
response.ErrorMessage = _err.Error()
response.Message = _err.Error()
goto on_failed
}
}
Expand Down
Loading

0 comments on commit 36386e4

Please sign in to comment.