Skip to content

Commit

Permalink
[python] Support CryptoKeyReader for Reader API in python clients (ap…
Browse files Browse the repository at this point in the history
…ache#11447)

### Motivation

The Reader API in the python client does not support reading an encrypted message. 
This PR adds the same and leverages existing  C++ Reader API which supports the same.

### Modifications

* Updated `pulsar.Client.create_reader` to accept  `crypto_key_reader` argument
* Update existing unit test for Python encryption.
  • Loading branch information
sanjivr authored Aug 21, 2021
1 parent 8126d98 commit cf7de12
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,8 @@ def create_reader(self, topic, start_message_id,
receiver_queue_size=1000,
reader_name=None,
subscription_role_prefix=None,
is_read_compacted=False
is_read_compacted=False,
crypto_key_reader=None
):
"""
Create a reader on a particular topic
Expand Down Expand Up @@ -815,6 +816,9 @@ def my_listener(reader, message):
Sets the subscription role prefix.
* `is_read_compacted`:
Selects whether to read the compacted version of the topic
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, topic, 'topic')
_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
Expand All @@ -823,6 +827,7 @@ def my_listener(reader, message):
_check_type_or_none(str, reader_name, 'reader_name')
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')

conf = _pulsar.ReaderConfiguration()
if reader_listener:
Expand All @@ -834,6 +839,8 @@ def my_listener(reader, message):
conf.subscription_role_prefix(subscription_role_prefix)
conf.schema(schema.schema_info())
conf.read_compacted(is_read_compacted)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)

c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,24 @@ def test_encryption(self):
producer = client.create_producer(topic=topic,
encryption_key="client-rsa.pem",
crypto_key_reader=crypto_key_reader)
reader = client.create_reader(topic=topic,
start_message_id=MessageId.earliest,
crypto_key_reader=crypto_key_reader)
producer.send('hello')
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.value(), 'hello')
consumer.unsubscribe()

msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')

with self.assertRaises(pulsar.Timeout):
reader.read_next(100)

reader.close()

client.close()

def test_tls_auth3(self):
Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC
return conf;
}

static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfiguration& conf,
py::object cryptoKeyReader) {
CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
return conf;
}

class LoggerWrapper: public Logger {
PyObject* _pyLogger;
Logger* fallbackLogger;
Expand Down Expand Up @@ -292,5 +299,6 @@ void export_config() {
.def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
.def("read_compacted", &ReaderConfiguration::isReadCompacted)
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
.def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, return_self<>())
;
}

0 comments on commit cf7de12

Please sign in to comment.