Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use network loops to read/write packets in isolated goroutines #151

Closed

Conversation

vishnureddy17
Copy link
Contributor

Here's a proposal to make the client.go read and write packets in isolated goroutines.

I think this makes things easier to work with and makes errors on the connection easier to deal with.

Thanks @BertKleewein for the inspiration!

@MattBrittan
Copy link
Contributor

MattBrittan commented Jul 28, 2023

Thanks @vishnureddy17,

The new approach is somewhat similar to that used in the v3 Client (albeit this client waits for errors almost every time it sends to outgoingPackets rather than returning a token as the v3 client does).

Could you please add some detail as to the benefits that you believe this change delivers?

Moving the reads and writes off to separate goroutines does have some potential benefits but, in my opinion, comes at the cost of significantly increasing complexity. Just to be clear - I'm not saying that this is not the right approach; however at this point I'm not convinced the benefits outweigh the negatives.

Personally I find:

if _, err := ccp.WriteTo(c.Conn); err != nil {
		cleanup()
		return nil, err
	}

easier to read/reason about than:

errChan := make(chan error)
c.outgoingPackets <- outgoing{
	packet: ccp,
	err:    errChan,
}
if err := <-errChan; err != nil {
	cleanup()
	return nil, err
}

Based on experience in the v3 client this approach makes it very easy to introduce races, deadlocks and leaking goroutines (I have put a lot of time into finding/resolving these in the v3 client). For example consider what happens (with your current implementation) when:

  1. A packets.PUBREC is received and is being processed in incoming()
  2. Concurrently the connection drops and readLoop() calls c.error(err) and returns.
  3. c.error closes c.stop
  4. writeLoop terminates due to c.stop
  5. incoming() (back to processing the packets.PUBREC from step 1) is now blocked at c.outgoingPackets <- outgoing{packet: pl,err: plErrChan}
  6. error(e error) will remain blocked on c.workers.Wait() (as incoming never exits c.workers.Done() is not called)

Note: this is theoretical I have not put a huge amount of time into confirming this can happen but it looks like a realistic scenario).

Edit Here is another one (much more likely to be an issue).

  1. Connected with Manual ACKS so tracker goroutine is running.
  2. Some ACKS are queued (due to out of order acknowledgments)
  3. Error occurs resulting in c.stop closing
  4. We now have a race between the checks for c.stop in the acks tracker and in writeLoop() (if writeLoop() wins then the acknowledgments will be blocked due to the send).

These examples may seem convoluted but I hit plenty of similar problems in the v3 client that caused production issues (and these are very hard to track down, particularly when they are user reported and you cannot duplicate them!).

The above leak is not really an issue if the program exists at that point; however, when used with something like autopaho, I'd expect the application to continue running for years (I have v3 clients with up times well over a year running on routers with minimal ram) so leaking goroutines become an issue (and debugging this is time consuming!). Edit actually this is an issue because c.workers.Wait() will block so we are likely to end up with a deadlock.

Anyway I'm keen to discuss - I will add a few other comments to your PR (thanks for the submission by the way - its great to see activity on this package picking up again!).

Copy link
Contributor

@MattBrittan MattBrittan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in addition to my general comments.

c.publishPackets = make(chan *packets.Publish)
go c.incoming()
go c.PingHandler.Start(c.Conn, 30*time.Second)
fakeConnect(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This triggers a warning when resting with the race detector (go test --race). This is not an issue for live use but needs to be resolved so there are no alerts when testing. To fix swap this with the line below.

Copy link
Contributor Author

@vishnureddy17 vishnureddy17 Jul 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tests need to be changed so that we aren't relying on manually initializing the client within the tests. I'd like to rewrite them so we just set up the test server to respond to CONNECT packets and call Client.connect() to initialize everything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"we aren't relying on manually initializing the client" - the way this currently works is generally pretty convenient (especially being able to tell the 'broker' exactly what response to send). The V3 client tests rely upon an external broker which works OK but it complicates things (need to start a broker, ensure the relevant port is free etc).
I have actually been working on a new TEST server that extends the current approach (it's more like a real broker but only supports a single connection). This is required because autopaho is currently not well tested and I'm working on a persistent session option (which will need to be thoroughly tested!).

}
}

func (c *Client) writeLoop() {
Copy link
Contributor

@MattBrittan MattBrittan Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we take this approach then I believe this function should range over c.outgoingPackets (with the channel being closed in c.stop()). The rationale for this is that it's too easy to leak goroutines when you have a channel with nothing processing it's output (i.e. c.stop is closed and then something tries to send to c.outgoingPackets).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think a tricky part of this is avoiding write operations causing a panic. We could use mutexes like is being done elsewhere, but I want to think about a solution that doesn't use mutexes. I have an idea in mind for this that I might try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its possible to avoid Mutexes (mostly) by being very careful about shutdown order. However this is tricky and, unless we are very careful with future PR's, its easy to create issues later. This is the main reason that I believe we need to be sure this is the right approach before adopting it.

@@ -551,6 +596,73 @@ func (c *Client) close() {
c.debug.Println("acks tracker reset")
}

func (c *Client) readLoop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is the only thing writing to c.incommingPackets it should close the channel when it exits (this allows anything listening on that channel to shut down cleanly). Other approaches tend to lead to things happening in an unexpected order with the listeners exiting before the sender (leading to a goroutine leak i.e. at c.incomingPackets <- packet).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

}
c.debug.Printf("received %s%s\n", packet.PacketType(), packetIdString)

incoming <- packet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goroutine leak here.

c.stop is closed, the outer for exits so incoming <- packet will block (as nothing is receiving on incoming).

I don't really see the benefit of having using the extra goroutines here packets.ReadPacket(c.Conn) will return an error when c.Conn is closed (so readLoop() can be simplified to a few lines).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

@@ -39,6 +39,7 @@ type (
Unpack(*bytes.Buffer) error
Buffers() net.Buffers
WriteTo(io.Writer) (int64, error)
ToControlPacket() *ControlPacket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this couples the interface to this specific implementation (prior to this change it was generic, only utilising the standard library). While its not currently a goal, I suspect that at some point in the future we may want to add a very basic v3 implementation and this change would complicate that (because it would limit the ability to implement Packet in a different package).
I believe that this is probably why the functions were implemented as they were (a bit of duplication keeps things loosely coupled).
We can make this change but do need to consider the potential future impact.

@@ -321,7 +355,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from ack tracker routine")
defer c.debug.Println("ack tracker worker returned")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function will end up being a race (because sending the queued acks requires that writeLoop() is running but that also stops when c.stop is closed).

@vishnureddy17
Copy link
Contributor Author

Thanks for taking a careful look at this. This is all excellent feedback!

I think a part of the difficulty here is that I'm trying to keep this pull request small and not change everything at once. However, this is one step in a sequence of changes I'm thinking of. My goal here was to make the fewest changes possible to have reads and writes in their own goroutines. Clearly, there are still issues. How about I continue working on my fork to see where I can take this? The end result will be a big change, but I'll do my best to make each commit understandable in isolation. I could submit a PR later with a more complete picture, and we may need to move the discussion off of GitHub once we get to that point. It's fine if it doesn't work out in the end, but I'd like to at least prove these ideas to myself.

Personally I find:
if _, err := ccp.WriteTo(c.Conn); err != nil {
cleanup()
return nil, err
}
easier to read/reason about than:
errChan := make(chan error)
c.outgoingPackets <- outgoing{
packet: ccp,
err: errChan,
}
if err := <-errChan; err != nil {
cleanup()
return nil, err
}

I agree. I don't think error handling should actually be done in those goroutines. I'd like to see errors being dealt with more consistently in the client, and I see error handling being done this way:

  1. Worker goroutine (in this case, writeLoop()) encounters an error
  2. Worker goroutine invokes the client error handling mechanism (currently Client.error())
  3. Rest of the workers get cleaned up.

I don't think errors should be passed between worker goroutines. They should just be passed up to the client which takes care of any necessary cleanup. I did not make that change because I was trying to make the minimal changes needed to communicate network loop idea.

Could you please add some detail as to the benefits that you believe this change delivers?

The two main benefits I see are:

  1. It makes error handling with the connection easier to deal with. Instead of having to handle errors everywhere there is an issue with the connection, the error would only happen in one place. Note that I do not take advantage of that in this PR, because I was trying to keep it reasonably small.
  2. MQTT has some pretty stringent packet ordering requirements, and using channels for packet reads/writes could make handling these ordering requirements easier to reason about, especially once retry delivery stuff comes into the picture.

@MattBrittan
Copy link
Contributor

MattBrittan commented Jul 30, 2023

However, this is one step in a sequence of changes I'm thinking of.... How about I continue working on my fork to see where I can take this?

It may be worth raising an issue to discuss your longer term plan? I've used similar approaches successfully but have also been burnt due to this approach making it really easy to introduce deadlocks etc.

So a fork may be the way to go (but worth collaborating to ensure its a worthwhile direction to take). Note that I am currently working on session persistence off-line (may make a test release public on my repo, but will not push to this repo until it's fully working and tested).

The two main benefits I see are:
It makes error handling with the connection easier to deal with. Instead of having to handle errors everywhere there is an issue with the connection, the error would only happen in one place. Note that I do not take advantage of that in this PR, because I was trying to keep it reasonably small.
MQTT has some pretty stringent packet ordering requirements, and using channels for packet reads/writes could make handling these ordering requirements easier to reason about, especially once retry delivery stuff comes into the picture.

I can see some benefits re point 1. However most of the time when we are sending data we do need to be aware of any issues at that point in the code so appropriate action can be taken (e.g. returning an error to the caller). I believe this reduces the benefit of your approach (and a simpler technique might be a custom io.Writer that triggers the shutdown if an error is returned).

Ref point 2 - the ordering requirements are mainly around PUBLISH and the relevant ACKS (MQTT-4.6.0-2 to MQTT-4.6.0-4). I believe the requirement exists (I asked the spec authors about this :-) ) to enable transactional logging of messages (e.g. run INSERT on PUBLISH and COMMIT on PUBREL). So my personal opinion is that, while it would be nice to follow the spec, it's not worth adding heaps of complexity to do so (I have not seen a broker that will error on out of order ACK's and paho.Client does not currently provide a mechanism for the user to access a message before it's fully acknowledged).

@vishnureddy17
Copy link
Contributor Author

vishnureddy17 commented Aug 1, 2023

Another thing: Right now, there seems to be cases where packets reads and writes can block each other. It might be worth trying to avoid that.

It may be worth raising an issue to discuss your longer term plan? I've used similar approaches successfully but have also been burnt due to this approach making it really easy to introduce deadlocks etc.

I'm going to work on it offline to see where I can take it before I start discussing it on GitHub, the ideas I have are a bit nebulous right now, and I think getting something concrete will clarify things. Your feedback has been super helpful. And I see what you mean about how this approach can make it easy to introduce deadlocks, races, and goroutine leaks.

So a fork may be the way to go (but worth collaborating to ensure its a worthwhile direction to take). Note that I am currently working on session persistence off-line (may make a test release public on my repo, but will not push to this repo until it's fully working and tested).

This is great! If you decide to make a release on your repo, I'd love to check it out. Looking forward to seeing what comes out of this :)

I have not seen a broker that will error on out of order ACK's

In my testing, Azure Event Grid Namespaces (which is currently in public preview) disconnects clients that send out-of-order ACKs.

@vishnureddy17
Copy link
Contributor Author

I've been working on this, and I have something working, including in-memory persistence. However, it only supports QoS 1 for the time-being. I don't think it's quite ready to bring into this repo, but I thought I'd post it here in case anyone is curious about the direction I'm thinking of.

https://github.com/vishnureddy17/paho.golang/tree/persistence-network-loops

@MattBrittan
Copy link
Contributor

Thanks @vishnureddy17 - I also hope to have a solution (quite a different direction that caters for QOS2) ready in a few days so it will be interesting to compare approaches (I've tried a lot of approaches but feel I have something workable). Will be keeping things outside the paho.golang repo for now because whichever option we go for is going to be a big change (need to work out how to handle review/integration). Its very hard to do this without changes that will break some users code and it's likely that bugs will be introduced...

@vishnureddy17
Copy link
Contributor Author

quite a different direction that caters for QOS2

To be clear, what I've done does not preclude QoS 2, I just chose to omit it for now.

whichever option we go for is going to be a big change (need to work out how to handle review/integration). Its very hard to do this without changes that will break some users code and it's likely that bugs will be introduced...

Yes, definitely. Good thing this is still in beta.

Looking forward to seeing what you have!

@MattBrittan
Copy link
Contributor

My attempt is in this branch - I believe it's almost ready; it passes the tests I've put in place so far and seems to run OK with the docker test (to which I've added a disconnect so I can check for lost messages). It does need a further review and clean-up but I thought it was better to make it public so we can discuss the different approaches and decide the best way forward. Note that the readme contains a rationale for the decisions made (and info on todo's, breaking changes etc).

You will note that this is a major change - it decouples the state (including Mids and store) from paho. This means that you can call Publish and the call can block even if the connection drops, and is re-established, in the interim. My aim (nearly there!) is for autopaho to be fire and forget (just keep publishing and have confidence that autopaho will ensure the message is delivered regardless of any issues with the connection).

I'll add a comment to issue #25 with links for those watching that issue.

@MattBrittan
Copy link
Contributor

Hi @vishnureddy17,
What are your current thoughts on this PR? #172 introduced major changes which mean that this PR would need a refresh (note that I did implement a couple of things you included here such as adding ToControlPacket() to some packets). We could take some elements of this (and your work in the separate repo) and implement them within the new structure if you feel they will bring benefits?

Matt

@vishnureddy17
Copy link
Contributor Author

@MattBrittan, I'm going to go ahead and close this PR from now. I think the state of repo has diverged so far from this PR at this point that it's not worth keeping around.

I'm busy with other work right now, but in the future I might look to contribute some of the ideas in this PR separately.

@vishnureddy17 vishnureddy17 deleted the network-loops branch January 12, 2024 14:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants