diff --git a/mqtt-broker/pom.xml b/mqtt-broker/pom.xml index 80d604185..4c09fd1be 100644 --- a/mqtt-broker/pom.xml +++ b/mqtt-broker/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 4.3.0-SNAPSHOT + 5.0.0-M1-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt @@ -56,4 +56,4 @@ - \ No newline at end of file + diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml index 331fe1fce..51a09e369 100644 --- a/mqtt-common/pom.xml +++ b/mqtt-common/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 4.3.0-SNAPSHOT + 5.0.0-M1-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt-common diff --git a/mqtt-proxy/pom.xml b/mqtt-proxy/pom.xml index e0785503e..4de7cb39f 100644 --- a/mqtt-proxy/pom.xml +++ b/mqtt-proxy/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 4.3.0-SNAPSHOT + 5.0.0-M1-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt-proxy diff --git a/pom.xml b/pom.xml index 729db5836..48adc7d86 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> io.streamnative.pulsar.handlers pulsar-protocol-handler-mqtt-parent - 4.3.0-SNAPSHOT + 5.0.0-M1-SNAPSHOT StreamNative :: Pulsar Protocol Handler :: MoP Parent Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler. @@ -50,8 +50,8 @@ 2.22.0 6.14.3 4.0.2 - 4.3.0-SNAPSHOT - 4.3.0-SNAPSHOT + 5.0.0-M1-SNAPSHOT + 5.0.0-M1-SNAPSHOT 2.18.0 1.16 1.2.2 diff --git a/tests/pom.xml b/tests/pom.xml index 7779b2b04..d8d0b9215 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 4.3.0-SNAPSHOT + 5.0.0-M1-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt-tests @@ -118,4 +118,4 @@ https://repo.eclipse.org/content/repositories/paho-releases/ - \ No newline at end of file + diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java index ac9a1d417..e10c81fda 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java @@ -278,8 +278,9 @@ protected void restartBroker() throws Exception { } protected void stopBroker() throws Exception { - for (PulsarService pulsarService : pulsarServiceList) { - pulsarService.close(); + for (int i = 0; i < pulsarServiceList.size(); i++) { + mockZooKeeper.setSessionId(i + 1); + pulsarServiceList.get(i).close(); } pulsarServiceList.clear(); brokerPortList.clear(); @@ -295,6 +296,7 @@ protected void stopBroker() throws Exception { } public void stopBroker(int brokerIndex) throws Exception { + mockZooKeeper.setSessionId(brokerIndex + 1); pulsarServiceList.get(brokerIndex).close(); pulsarServiceList.remove(brokerIndex); brokerPortList.remove(brokerIndex); @@ -398,6 +400,7 @@ protected void startBroker(MQTTCommonConfiguration conf) throws Exception { mqttProxyPort, mqttProxyTlsPort, mqttProxyTlsPskPort, mqttHttpPort); ConfigurationUtils.extractFieldToProperties(conf); setTLSConf(conf); + mockZooKeeper.setSessionId(pulsarServiceList.size() + 1); this.pulsarServiceList.add(doStartBroker(conf)); } @@ -424,7 +427,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { public static MockZooKeeper createMockZooKeeper() throws Exception { MockZooKeeper zk = MockZooKeeper.newInstance(); - zk.setSessionId(-1); + zk.setSessionId(1); List dummyAclList = new ArrayList<>(0); ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000, diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java index 4c2215ec3..8242a1aef 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java @@ -22,7 +22,9 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.conscrypt.Conscrypt; import org.fusesource.mqtt.client.MQTT; +import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -96,7 +98,6 @@ protected void setup() throws Exception { super.setup(); log.info("success internal setup"); setupClusterNamespaces(); - setPulsarServiceState(); } protected void setupClusterNamespaces() throws Exception { @@ -137,6 +138,12 @@ protected void setupClusterNamespaces() throws Exception { } } + protected void assumeConscryptAvailable() { + if (!Conscrypt.isAvailable()) { + throw new SkipException("Conscrypt native library is not available on this platform"); + } + } + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java index 2062748d0..a30190e0b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleIntegrationTest.java @@ -442,6 +442,7 @@ public void testInvalidClientId() throws Exception { @Test @SneakyThrows public void testTlsPskWithTlsv1() { + assumeConscryptAvailable(); Bootstrap client = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); client.group(group); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTLSTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTLSTest.java index 2ab52df0b..9ea75b46e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTLSTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTLSTest.java @@ -156,6 +156,7 @@ public void testSendAndConsumeWithFilter(String topic, String filter) { @Test(timeOut = TIMEOUT, priority = 4) @SneakyThrows public void testTlsPsk() { + assumeConscryptAvailable(); Bootstrap client = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); client.group(group); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java index 8d6981e67..186f2409f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java @@ -260,6 +260,7 @@ public void testSendAndConsumeWithFilter(String topic, String filter) { @Test @SneakyThrows public void testTlsPskWithTlsv1() { + assumeConscryptAvailable(); Bootstrap client = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); client.group(group); @@ -501,6 +502,7 @@ public void testRetainedMessageInCluster() throws Exception { @Test @SneakyThrows public void testAddPskIdentity() { + assumeConscryptAvailable(); HttpClient httpClient = HttpClientBuilder.create().build(); final String mopEndPoint = "http://localhost:" + brokerWebservicePortList.get(0) + "/mop/add_psk_identity"; HttpPost request = new HttpPost();