Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection shutdown, channel close data race #196

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

imkira
Copy link
Contributor

@imkira imkira commented Mar 28, 2016

I am running tests with go test -v -race -cpu=1,2,4 and, I am not sure but, I think I found the following data race (commit b4f3ceab0337f013208d31348b578d83c0064744):

==================
WARNING: DATA RACE
Read by goroutine 79:
  github.com/streadway/amqp.(*Channel).call()
      /opt/go/src/github.com/streadway/amqp/channel.go:144 +0x70
  github.com/streadway/amqp.(*Channel).Close()
      /opt/go/src/github.com/streadway/amqp/channel.go:406 +0x207
[ommited]

Previous write by goroutine 80:
  github.com/streadway/amqp.(*Channel).shutdown.func1()
      /opt/go/src/github.com/streadway/amqp/channel.go:104 +0x17b
  sync.(*Once).Do()
      /home/ubuntu/go/src/sync/once.go:44 +0xf6
  github.com/streadway/amqp.(*Channel).shutdown()
      /opt/go/src/github.com/streadway/amqp/channel.go:134 +0x98
  github.com/streadway/amqp.(*Connection).closeChannel()
      /opt/go/src/github.com/streadway/amqp/connection.go:580 +0x38
  github.com/streadway/amqp.(*Connection).shutdown.func1()
      /opt/go/src/github.com/streadway/amqp/connection.go:350 +0x1d5
  sync.(*Once).Do()
      /home/ubuntu/go/src/sync/once.go:44 +0xf6
  github.com/streadway/amqp.(*Connection).shutdown()
      /opt/go/src/github.com/streadway/amqp/connection.go:370 +0x93
  github.com/streadway/amqp.(*Connection).dispatch0()
      /opt/go/src/github.com/streadway/amqp/connection.go:394 +0x4d5
  github.com/streadway/amqp.(*Connection).demux()
      /opt/go/src/github.com/streadway/amqp/connection.go:377 +0x62
  github.com/streadway/amqp.(*Connection).reader()
      /opt/go/src/github.com/streadway/amqp/connection.go:471 +0x322

It appears that Connection.shutdown is accessing me.channels to close them (goroutine 80) without any kind of mutex locking, while concurrently getting a Connection.releaseChannel (via channel.Close) which accesses me.channels too (using the connection lock though).

After trying to fix this by protecting Connection.shutdown I noticed I started having another race condition due to Channel.call accessing me.send while Channel.shutdown changes it to sendClosed.

Seriously, I don't know if this the correct fix.
I wrote a simple test that causes at least one of these issues to happen, and enabled -race for travis builds.
There is another data race related to IO handling in shared_test.go or whatever.
I didn't understand exactly what the problem was, but it looked like a test code-only problem, and since it is out of the scope of this PR I leave it open for you to take a look first.

@imkira imkira force-pushed the hotfix/connection-shutdown-race branch from ba1648b to bc44d43 Compare March 28, 2016 05:38
@imkira
Copy link
Contributor Author

imkira commented Mar 28, 2016

Please notice too that I had to remove the locking around Channel.call for Channel.Confirm because I applied the lock inside Channel.call while calling Channel.send. Therefore this PR also applies the fix proposed in #187 .

Additionally, since adding -race makes the build explode on travis due to other problems, I first fixed them on #197 and rebased that branch on this PR's branch.

@imkira imkira force-pushed the hotfix/connection-shutdown-race branch 2 times, most recently from d785718 to d465039 Compare March 28, 2016 07:29
@imkira imkira force-pushed the hotfix/connection-shutdown-race branch 2 times, most recently from 3faff45 to 7bac62b Compare March 28, 2016 10:09
@imkira imkira changed the title connection shutdown, channel close data race [WIP] connection shutdown, channel close data race Mar 28, 2016
@imkira imkira force-pushed the hotfix/connection-shutdown-race branch from ea536fa to f997802 Compare March 28, 2016 11:36
@imkira imkira changed the title [WIP] connection shutdown, channel close data race connection shutdown, channel close data race Mar 28, 2016
@imkira imkira changed the title connection shutdown, channel close data race [WIP] connection shutdown, channel close data race Mar 28, 2016
@imkira imkira force-pushed the hotfix/connection-shutdown-race branch 2 times, most recently from dfd04b5 to f997802 Compare March 29, 2016 05:15
@imkira imkira changed the title [WIP] connection shutdown, channel close data race connection shutdown, channel close data race Mar 29, 2016
@imkira
Copy link
Contributor Author

imkira commented Mar 29, 2016

I am sorry, it took me a while to figure it out there was a deadlock in the use of pipes with go1.1. The last commit should work around it.

@michaelklishin
Copy link
Collaborator

Looks reasonable at first approximation, thank you.

@michaelklishin
Copy link
Collaborator

Note that this PR also seems to include #197.

@michaelklishin
Copy link
Collaborator

Please rebase this against master.

@@ -141,7 +141,11 @@ func (me *Channel) open() error {
// Performs a request/response call for when the message is not NoWait and is
// specified as Synchronous.
func (me *Channel) call(req message, res ...message) error {
if err := me.send(me, req); err != nil {
me.m.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a separate mutex for sending stuff, me.sendM. I think we should use it here.

Copy link
Contributor Author

@imkira imkira Nov 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code is very misleading.
Please note that m.send starts as sendOpen and then switches to sendClosed:

https://github.com/imkira/amqp/blob/0563adcdb2a14308873fce26b14d3637dc2ac685/channel.go#L85
https://github.com/imkira/amqp/blob/0563adcdb2a14308873fce26b14d3637dc2ac685/channel.go#L104

Please, also note that each of those 2 functions use the mutex you specified for actually sending data.

What I am doing here is to protect a race condition that could happen if we used me.send directly.
That would collide with the following modification:

https://github.com/imkira/amqp/blob/0563adcdb2a14308873fce26b14d3637dc2ac685/channel.go#L104

connection.go Outdated
@@ -578,6 +577,8 @@ func (me *Connection) openChannel() (*Channel, error) {
// this connection.
func (me *Connection) closeChannel(ch *Channel, e *Error) {
ch.shutdown(e)
me.m.Lock()
defer me.m.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the intent be clearer if we explicitly unlocked after releaseChannel? or are we after a particular failure that should be guarded against with defer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't see any problem with that.
I will change to that then.

@imkira
Copy link
Contributor Author

imkira commented Nov 24, 2016

Note that this PR also seems to include #197.

Yes. I left a comment regarding that. Please check:
#196 (comment)

@imkira imkira force-pushed the hotfix/connection-shutdown-race branch from 0563adc to 85f0c79 Compare November 25, 2016 05:44
@imkira
Copy link
Contributor Author

imkira commented Nov 25, 2016

Just committed one of the changes you asked and the rebase against master.

@michaelklishin
Copy link
Collaborator

Note that #197, when merged, broke CI in ways that I cannot reproduce in other environments.

if err := me.call(
&confirmSelect{Nowait: noWait},
&confirmSelectOk{},
); err != nil {
return err
}

me.m.Lock()
me.confirming = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@imkira imkira Nov 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. I fixed it in c6a5ab1

@imkira imkira force-pushed the hotfix/connection-shutdown-race branch from 85f0c79 to eeb226c Compare November 29, 2016 14:35
@imkira
Copy link
Contributor Author

imkira commented Nov 29, 2016

Please note that travis is failing because it is detecting a data race in connection.go that was not fixed by #210 against which I handled the merged conflict by using that version (therefore removing my original code).

https://api.travis-ci.org/jobs/179778933/log.txt?deansi=true

My original code, I believe fixes this by protecting the whole shutdown.
@DmitriyMV @michaelklishin Please let me know what you think before I try to commit it:
https://github.com/imkira/amqp/blob/hotfix/connection-shutdown-race-orig/connection.go#L344

@DmitriyMV
Copy link
Contributor

DmitriyMV commented Nov 29, 2016

@imkira
According to the Travis CI - it still doesn't pass tests. I manually applied this pull request and reverted #210. Still - 4 Data Races
https://github.com/DmitriyMV/amqp/commits/pull/196
https://travis-ci.org/DmitriyMV/amqp/builds/179808513

Any advice?

@imkira imkira force-pushed the hotfix/connection-shutdown-race branch 4 times, most recently from ff6f6b5 to 93127f4 Compare November 29, 2016 17:03
shared_test.go Outdated

func (me *logIO) logf(format string, args ...interface{}) {
me.m.Lock()
me.t.Logf(format, args...)
Copy link
Contributor

@DmitriyMV DmitriyMV Nov 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still fails - according to https://api.travis-ci.org/jobs/179844976/log.txt?deansi=true. On 1,6, but still.

This one is quite illusive, isn't it? 😆

@imkira imkira force-pushed the hotfix/connection-shutdown-race branch 9 times, most recently from 42635f8 to 114eca9 Compare November 29, 2016 22:29
client test code was opening connections in separate goroutines but not
waiting their termination. This is particularly dangerous if any calls
are made (asynchronously) to testing.*.Log* after the test function is
over. Not only the behavior is undefined, it was causing a data race in
go1.6 which is fixed in go1.7 by the following commit:

golang/go@5c83e65
@imkira imkira force-pushed the hotfix/connection-shutdown-race branch from 114eca9 to 524f95a Compare November 29, 2016 22:50
@imkira
Copy link
Contributor Author

imkira commented Nov 29, 2016

@DmitriyMV OK, so I ended up removing all #210 code which was just a subset of the problems I addressed originally with this PR.

The "illusiveness" of the bugs in here are, in my opinion, a reflection of the lack of -race for a long time in this project resulting in a huge amount of untested code being committed to the package, tricky code with lack of goroutine synchronization and finalizers, and in general, IMHO, bad (too complex) synchronization practices present in this library :(

I added 524f95a to try to fix some of these problems.

With this commit I am synchronizing Connection.reader() with Connection.Close and doing some other tricky things:

  • I found places where the reader and the heartbeater relied on each other but if the heartbeater finished first (via NotifyClose), sometimes reader would be stuck trying to send the heartbeat to a full channel (me.sends).
  • I also noticed me.blocks were being used in Connection.dispatch0 without any mutex, despite the fact that they are modifiable via NofifyBlocked and accessed during destruction (actually closed, which is even worse).
  • I also force connections to be closed in tests so that when the test function finishes processing everything returns back to a clean state.

Please let me know what you think.

By the way, I am getting the following error sometimes on travis https://api.travis-ci.org/jobs/179929218/log.txt?deansi=true:

--- FAIL: TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46 (0.09s)
	shared_test.go:68: issue46 close
	integration_test.go:1411: expected error only on publish, got error on channel.open: Exception (503) Reason: "COMMAND_INVALID - second 'channel.open' seen"

Do you have any idea why this may be happening?

@DmitriyMV
Copy link
Contributor

DmitriyMV commented Nov 30, 2016

Sorry, no. I spent last two days, trying to figure out, why this problem occurs (at random times too). My current thought is that we get channelClose, but don't wait for channelCloseOk confirmation in

func (me *Channel) dispatch(msg message) {
	switch m := msg.(type) {
	case *channelClose:
		me.connection.closeChannel(me, newError(m.ReplyCode, m.ReplyText))
		me.send(me, &channelCloseOk{})
...

I'm not familiar with AMQP protocol tho. And because of that I'm left to guesses.

@michaelklishin
Copy link
Collaborator

There is no channel.close-ok confirmation. There is channel.close which can be sent by both clients and server and there is channel.close-ok that confirms it.

@DmitriyMV
Copy link
Contributor

DmitriyMV commented Nov 30, 2016

@michaelklishin what if we are immediately trying to open channel after we had closed it, with the same channel id?

Can you describe a second 'channel.open' seen error a bit more, and what could be the source of it. My google-fu showed me numerous amounts of libraries who are dealing with this error from time to time, but the source of the problem is different too. Each time it's wrong usage in user code. No exact specifics tho.

EDIT: Clarification.

@imkira
Copy link
Contributor Author

imkira commented Dec 1, 2016

@DmitriyMV I thought the reusing of the channel could be a good explanation for what is happening, but when you look at TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46, the function only uses 1 connection, creates 100 channels and sends 10 messages with each serially, and doesn't close any of them. So, I think a bug in the allocator is out of question.

The error message received from the server mentions second 'channel.open' seen but, irrespective of that, and according to the reference https://www.rabbitmq.com/amqp-0-9-1-reference.html#constant.channel-error :

command-invalid
503	connection
The client sent an invalid sequence of frames, attempting to perform an operation that was considered invalid by the server. This usually implies a programming error in the client.

It implies a "invalid sequence of frames". What if we are not waiting enough time between creating a channel and publishing to it? I looked at the code but couldn't find a reason for it, though I found this case block maybe "too inclusive":

amqp/channel.go

Lines 298 to 299 in 1b88538

default:
me.rpc <- msg

@DmitriyMV
Copy link
Contributor

DmitriyMV commented Dec 5, 2016

But the other tests do not fail, so either they don't generate enough Open/Close channel sequences or it has something to do with situation where channel got closed by the RabbitMQ due to error.

@imkira I'm open to ideas of how to create a test which will show if we actually send wrong sequence of frames, without provoking an error from server in the first place (like TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46 do).

@michaelklishin
Copy link
Collaborator

Hey folks,

I'm posting this on behalf of the core team.

As you have noticed, this client hasn't seen a lot of activity recently.
Many users are unhappy about that and we fully recognize that it's a popular
library that should be maintained more actively. There are also many community
members who have contributed pull requests and haven't been merged for various reasons.

Because this client has a long tradition of "no breaking public API changes", certain
reasonable changes will likely never be accepted. This is frustrating to those who
have put in their time and effort into trying to improve this library.

We would like to thank @streadway
for developing this client and maintaining it for a decade — that's a remarkable contribution
to the RabbitMQ ecosystem. We this now is a good time to get more contributors
involved.

Team RabbitMQ has adopted a "hard fork" of this client
in order to give the community a place to evolve the API. Several RabbitMQ core team members
will participate but we think it very much should be a community-driven effort.

What do we mean by "hard fork" and what does it mean for you? The entire history of the project
is retained in the new repository but it is not a GitHub fork by design. The license remains the same
2-clause BSD. The contribution process won't change much (except that we hope to review and accept PRs
reasonably quickly).

What does change is that this new fork will accept reasonable breaking API changes according
to Semantic Versioning (or at least our understanding of it). At the moment the API is identical
to that of streadway/amqp but the package name is different. We will begin reviewing PRs
and merging them if they make sense in the upcoming weeks.

If your PR hasn't been accepted or reviewed, you are welcome to re-submit it for rabbitmq/amqp091-go.
RabbitMQ core team members will evaluate the PRs currently open for streadway/amqp as time allows,
and pull those that don't have any conflicts. We cannot promise that every PR would be accepted
but at least we are open to changing the API going forward.

Note that it is a high season for holidays in some parts of the world, so we may be slower
to respond in the next few weeks but otherwise, we are eager to review as many currently open PRs
as practically possible soon.

Thank you for using RabbitMQ and contributing to this client. On behalf of the RabbitMQ core team,
@ChunyiLyu and @michaelklishin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants