diff --git a/mqtt-broker/pom.xml b/mqtt-broker/pom.xml
index 80d604185..4c09fd1be 100644
--- a/mqtt-broker/pom.xml
+++ b/mqtt-broker/pom.xml
@@ -20,7 +20,7 @@
pulsar-protocol-handler-mqtt-parent
io.streamnative.pulsar.handlers
- 4.3.0-SNAPSHOT
+ 5.0.0-M1-SNAPSHOT
4.0.0
pulsar-protocol-handler-mqtt
@@ -56,4 +56,4 @@
-
\ No newline at end of file
+
diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml
index 331fe1fce..51a09e369 100644
--- a/mqtt-common/pom.xml
+++ b/mqtt-common/pom.xml
@@ -20,7 +20,7 @@
pulsar-protocol-handler-mqtt-parent
io.streamnative.pulsar.handlers
- 4.3.0-SNAPSHOT
+ 5.0.0-M1-SNAPSHOT
4.0.0
pulsar-protocol-handler-mqtt-common
diff --git a/mqtt-proxy/pom.xml b/mqtt-proxy/pom.xml
index e0785503e..4de7cb39f 100644
--- a/mqtt-proxy/pom.xml
+++ b/mqtt-proxy/pom.xml
@@ -20,7 +20,7 @@
pulsar-protocol-handler-mqtt-parent
io.streamnative.pulsar.handlers
- 4.3.0-SNAPSHOT
+ 5.0.0-M1-SNAPSHOT
4.0.0
pulsar-protocol-handler-mqtt-proxy
diff --git a/pom.xml b/pom.xml
index 729db5836..48adc7d86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
io.streamnative.pulsar.handlers
pulsar-protocol-handler-mqtt-parent
- 4.3.0-SNAPSHOT
+ 5.0.0-M1-SNAPSHOT
StreamNative :: Pulsar Protocol Handler :: MoP Parent
Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler.
@@ -50,8 +50,8 @@
2.22.0
6.14.3
4.0.2
- 4.3.0-SNAPSHOT
- 4.3.0-SNAPSHOT
+ 5.0.0-M1-SNAPSHOT
+ 5.0.0-M1-SNAPSHOT
2.18.0
1.16
1.2.2
diff --git a/tests/pom.xml b/tests/pom.xml
index 7779b2b04..d8d0b9215 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -20,7 +20,7 @@
pulsar-protocol-handler-mqtt-parent
io.streamnative.pulsar.handlers
- 4.3.0-SNAPSHOT
+ 5.0.0-M1-SNAPSHOT
4.0.0
pulsar-protocol-handler-mqtt-tests
@@ -118,4 +118,4 @@
https://repo.eclipse.org/content/repositories/paho-releases/
-
\ No newline at end of file
+
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java
index ac9a1d417..e10c81fda 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java
@@ -278,8 +278,9 @@ protected void restartBroker() throws Exception {
}
protected void stopBroker() throws Exception {
- for (PulsarService pulsarService : pulsarServiceList) {
- pulsarService.close();
+ for (int i = 0; i < pulsarServiceList.size(); i++) {
+ mockZooKeeper.setSessionId(i + 1);
+ pulsarServiceList.get(i).close();
}
pulsarServiceList.clear();
brokerPortList.clear();
@@ -295,6 +296,7 @@ protected void stopBroker() throws Exception {
}
public void stopBroker(int brokerIndex) throws Exception {
+ mockZooKeeper.setSessionId(brokerIndex + 1);
pulsarServiceList.get(brokerIndex).close();
pulsarServiceList.remove(brokerIndex);
brokerPortList.remove(brokerIndex);
@@ -398,6 +400,7 @@ protected void startBroker(MQTTCommonConfiguration conf) throws Exception {
mqttProxyPort, mqttProxyTlsPort, mqttProxyTlsPskPort, mqttHttpPort);
ConfigurationUtils.extractFieldToProperties(conf);
setTLSConf(conf);
+ mockZooKeeper.setSessionId(pulsarServiceList.size() + 1);
this.pulsarServiceList.add(doStartBroker(conf));
}
@@ -424,7 +427,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
public static MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance();
- zk.setSessionId(-1);
+ zk.setSessionId(1);
List dummyAclList = new ArrayList<>(0);
ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java
index 4c2215ec3..8242a1aef 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java
@@ -22,7 +22,9 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.conscrypt.Conscrypt;
import org.fusesource.mqtt.client.MQTT;
+import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -96,7 +98,6 @@ protected void setup() throws Exception {
super.setup();
log.info("success internal setup");
setupClusterNamespaces();
- setPulsarServiceState();
}
protected void setupClusterNamespaces() throws Exception {
@@ -137,6 +138,12 @@ protected void setupClusterNamespaces() throws Exception {
}
}
+ protected void assumeConscryptAvailable() {
+ if (!Conscrypt.isAvailable()) {
+ throw new SkipException("Conscrypt native library is not available on this platform");
+ }
+ }
+
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java
index 2062748d0..a30190e0b 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java
@@ -442,6 +442,7 @@ public void testInvalidClientId() throws Exception {
@Test
@SneakyThrows
public void testTlsPskWithTlsv1() {
+ assumeConscryptAvailable();
Bootstrap client = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
client.group(group);
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTLSTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTLSTest.java
index 2ab52df0b..9ea75b46e 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTLSTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTLSTest.java
@@ -156,6 +156,7 @@ public void testSendAndConsumeWithFilter(String topic, String filter) {
@Test(timeOut = TIMEOUT, priority = 4)
@SneakyThrows
public void testTlsPsk() {
+ assumeConscryptAvailable();
Bootstrap client = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
client.group(group);
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java
index 8d6981e67..186f2409f 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java
@@ -260,6 +260,7 @@ public void testSendAndConsumeWithFilter(String topic, String filter) {
@Test
@SneakyThrows
public void testTlsPskWithTlsv1() {
+ assumeConscryptAvailable();
Bootstrap client = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
client.group(group);
@@ -501,6 +502,7 @@ public void testRetainedMessageInCluster() throws Exception {
@Test
@SneakyThrows
public void testAddPskIdentity() {
+ assumeConscryptAvailable();
HttpClient httpClient = HttpClientBuilder.create().build();
final String mopEndPoint = "http://localhost:" + brokerWebservicePortList.get(0) + "/mop/add_psk_identity";
HttpPost request = new HttpPost();