Skip to content

Commit

Permalink
Wrap the interruption to a custom exception when a blocking API is in…
Browse files Browse the repository at this point in the history
…terrupted (#99)

### Motivation

Currently, when a blocking API is interrupted by a signal, `SystemError`
will be thrown. However, in this case, `PyErr_SetInterrupt` will be
called and next time a blocking API is called, `std::system_error` will
be somehow thrown.

The failure of
https://lists.apache.org/thread/cmzykd9qz9x1d0s35nc5912o3slwpxpv is
caused by this issue. The `SystemError` is not called, then
`client.close()` will be skipped, which leads to the `bad_weak_ptr`
error.

P.S. Currently we have to call `client.close()` on a `Client` instance,
otherwise, the `bad_weak_ptr` will be thrown.

However, even if we caught the `SystemError` like:

```python
    try:
        msg = consumer.receive()
        # ...
    except SystemError:
        break
```

we would still see the following error:

```
terminate called after throwing an instance of 'std::system_error'
  what():  Operation not permitted
Aborted
```

### Modifications

- Wrap `ResultInterrupted` into the `pulsar.Interrupted` exception.
- Refactor the `waitForAsyncValue` and `waitForAsyncResult` functions
  and raise `pulsar.Interrupted` when `PyErr_CheckSignals` detects a
  signal.
- Add `InterruptedTest` to cover this case.
- Remove `future.h` since we now use `std::future` instead of the
  manually implemented `Future`.
- Fix the `examples/consumer.py` to support stopping by Ctrl+C.

(cherry picked from commit ec05f50)
  • Loading branch information
BewareMyPower committed Mar 8, 2023
1 parent 9ed92ec commit b883f42
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 331 deletions.
10 changes: 7 additions & 3 deletions examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
})

while True:
msg = consumer.receive()
print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
consumer.acknowledge(msg)
try:
msg = consumer.receive()
print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
consumer.acknowledge(msg)
except pulsar.Interrupted:
print("Stop receiving messages")
break

client.close()
2 changes: 1 addition & 1 deletion pulsar/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
ProducerBlockedQuotaExceededException, ProducerQueueIsFull, MessageTooBig, TopicNotFound, SubscriptionNotFound, \
ConsumerNotFound, UnsupportedVersionError, TopicTerminated, CryptoError, IncompatibleSchema, ConsumerAssignError, \
CumulativeAcknowledgementNotAllowedError, TransactionCoordinatorNotFoundError, InvalidTxnStatusError, \
NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull
NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull, Interrupted
61 changes: 13 additions & 48 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,73 +24,38 @@
namespace py = pybind11;

Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
Producer producer;

waitForAsyncValue(std::function<void(CreateProducerCallback)>([&](CreateProducerCallback callback) {
client.createProducerAsync(topic, conf, callback);
}),
producer);

return producer;
return waitForAsyncValue<Producer>(
[&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
}

Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf) {
Consumer consumer;

waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
client.subscribeAsync(topic, subscriptionName, conf, callback);
}),
consumer);

return consumer;
return waitForAsyncValue<Consumer>(
[&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); });
}

Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>& topics,
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
Consumer consumer;

waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
client.subscribeAsync(topics, subscriptionName, conf, callback);
}),
consumer);

return consumer;
return waitForAsyncValue<Consumer>(
[&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); });
}

Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern,
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
Consumer consumer;

waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
}),
consumer);

return consumer;
return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) {
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
});
}

Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf) {
Reader reader;

waitForAsyncValue(std::function<void(ReaderCallback)>([&](ReaderCallback callback) {
client.createReaderAsync(topic, startMessageId, conf, callback);
}),
reader);

return reader;
return waitForAsyncValue<Reader>(
[&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); });
}

std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
std::vector<std::string> partitions;

waitForAsyncValue(std::function<void(GetPartitionsCallback)>([&](GetPartitionsCallback callback) {
client.getPartitionsForTopicAsync(topic, callback);
}),
partitions);

return partitions;
return waitForAsyncValue<std::vector<std::string>>(
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
}

void Client_close(Client& client) {
Expand Down
23 changes: 6 additions & 17 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ void Consumer_unsubscribe(Consumer& consumer) {
}

Message Consumer_receive(Consumer& consumer) {
Message msg;

waitForAsyncValue(std::function<void(ReceiveCallback)>(
[&consumer](ReceiveCallback callback) { consumer.receiveAsync(callback); }),
msg);

return msg;
return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); });
}

Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
Expand All @@ -59,32 +53,27 @@ Messages Consumer_batch_receive(Consumer& consumer) {
void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }

void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeAsync(msgId, nullptr);
Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr);
Py_END_ALLOW_THREADS
}

void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
Py_BEGIN_ALLOW_THREADS
consumer.negativeAcknowledge(msg);
Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg);
Py_END_ALLOW_THREADS
}

void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS
consumer.negativeAcknowledge(msgId);
Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId);
Py_END_ALLOW_THREADS
}

void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeCumulativeAsync(msg, nullptr);
Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr);
Py_END_ALLOW_THREADS
}

void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeCumulativeAsync(msgId, nullptr);
Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr);
Py_END_ALLOW_THREADS
}

Expand Down
181 changes: 0 additions & 181 deletions src/future.h

This file was deleted.

14 changes: 4 additions & 10 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,15 @@
namespace py = pybind11;

MessageId Producer_send(Producer& producer, const Message& message) {
MessageId messageId;

waitForAsyncValue(std::function<void(SendCallback)>(
[&](SendCallback callback) { producer.sendAsync(message, callback); }),
messageId);

return messageId;
return waitForAsyncValue<MessageId>(
[&](SendCallback callback) { producer.sendAsync(message, callback); });
}

void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) {
Py_BEGIN_ALLOW_THREADS
producer.sendAsync(msg, callback);
Py_BEGIN_ALLOW_THREADS producer.sendAsync(msg, callback);
Py_END_ALLOW_THREADS

if (PyErr_CheckSignals() == -1) {
if (PyErr_CheckSignals() == -1) {
PyErr_SetInterrupt();
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,8 @@ Message Reader_readNextTimeout(Reader& reader, int timeoutMs) {
}

bool Reader_hasMessageAvailable(Reader& reader) {
bool available = false;

waitForAsyncValue(
std::function<void(HasMessageAvailableCallback)>(
[&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); }),
available);

return available;
return waitForAsyncValue<bool>(
[&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); });
}

void Reader_close(Reader& reader) {
Expand Down
Loading

0 comments on commit b883f42

Please sign in to comment.