From 6f4f9f23473b060f2e972f9d88d23856b345794b Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 2 Dec 2024 18:51:03 +0800 Subject: [PATCH 1/7] 1 --- .../MQTTBrokerProtocolMethodProcessor.java | 3 ++ .../broker/qos/AbstractQosPublishHandler.java | 21 +++++++++++++- .../mqtt/broker/qos/MQTTProducer.java | 29 +++++++++++++++++++ .../mqtt/broker/qos/QosPublishHandler.java | 2 ++ .../mqtt/broker/qos/QosPublishHandlers.java | 1 - .../broker/qos/QosPublishHandlersImpl.java | 2 ++ .../handlers/mqtt/common/Connection.java | 11 +++++++ 7 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java index d8c20ae53..c4f6250b7 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java @@ -152,6 +152,7 @@ public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole, .clientRestrictions(clientRestrictions) .serverRestrictions(serverRestrictions) .authData(authData) + .serverCnx(serverCnx) .channel(channel) .connectMessage(msg) .connectionManager(connectionManager) @@ -295,6 +296,7 @@ public void processDisconnect(MqttAdapterMessage adapterMsg) { if (log.isDebugEnabled()) { log.debug("[Disconnect] [{}] ", clientId); } + qosPublishHandlers.qos0().closeProducer(connection); // If client update session timeout interval property. Optional newSessionExpireInterval; if ((newSessionExpireInterval = MqttPropertyUtils @@ -328,6 +330,7 @@ public void processConnectionLost() { if (log.isDebugEnabled()) { log.debug("[Connection Lost] [{}] ", clientId); } + qosPublishHandlers.qos0().closeProducer(connection); metricsCollector.removeClient(NettyUtils.getAddress(channel)); WillMessage willMessage = connection.getWillMessage(); if (willMessage != null) { diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java index 716c126a9..5e118f322 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.mqtt.broker.qos; import static io.streamnative.pulsar.handlers.mqtt.broker.impl.PulsarMessageConverter.toPulsarMsg; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration; @@ -34,6 +35,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.MessageImpl; @@ -48,6 +50,7 @@ public abstract class AbstractQosPublishHandler implements QosPublishHandler { protected final RetainedMessageHandler retainedMessageHandler; protected final MQTTServerConfiguration configuration; private final ConcurrentHashMap sequenceIdMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap producerMap = new ConcurrentHashMap<>(); protected AbstractQosPublishHandler(MQTTService mqttService) { @@ -109,6 +112,12 @@ protected CompletableFuture writeToPulsarTopic(Connection connection, } return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> { long lastPublishedSequenceId = -1; + Producer producer = producerMap.compute(producerName, (k, v) -> { + if (v == null) { + v = MQTTProducer.create(topic, connection.getServerCnx(), producerName); + } + return v; + }); if (topic instanceof PersistentTopic) { final long lastPublishedId = ((PersistentTopic) topic).getLastPublishedSequenceId(producerName); lastPublishedSequenceId = sequenceIdMap.compute(producerName, (k, v) -> { @@ -121,12 +130,14 @@ protected CompletableFuture writeToPulsarTopic(Connection connection, return id; }); } + final ByteBuf payload = msg.payload(); MessageImpl message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(), - msg.payload().nioBuffer()); + payload.nioBuffer()); CompletableFuture ret = MessagePublishContext.publishMessages(producerName, message, lastPublishedSequenceId, topic); message.recycle(); return ret.thenApply(position -> { + topic.incrementPublishCount(producer, 1, payload.readableBytes()); if (checkSubscription && topic.getSubscriptions().isEmpty()) { throw new MQTTNoMatchingSubscriberException(mqttTopicName); } @@ -135,4 +146,12 @@ protected CompletableFuture writeToPulsarTopic(Connection connection, }).orElseGet(() -> FutureUtil.failedFuture( new BrokerServiceException.TopicNotFoundException(mqttTopicName)))); } + + @Override + public void closeProducer(Connection connection) { + final Producer producer = producerMap.remove(connection.getClientId()); + if (producer != null) { + producer.close(true); + } + } } diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java new file mode 100644 index 000000000..de5f45eb9 --- /dev/null +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java @@ -0,0 +1,29 @@ +package io.streamnative.pulsar.handlers.mqtt.broker.qos; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TransportCnx; +import org.apache.pulsar.common.api.proto.ProducerAccessMode; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; + +public class MQTTProducer extends Producer { + + public static final AtomicLong PRODUCER_ID = new AtomicLong(); + + public MQTTProducer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, + boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, + boolean userProvidedProducerName, ProducerAccessMode accessMode, Optional topicEpoch, + boolean supportsPartialProducer) { + super(topic, cnx, producerId, producerName, appId, isEncrypted, metadata, schemaVersion, epoch, + userProvidedProducerName, accessMode, topicEpoch, supportsPartialProducer); + } + + public static MQTTProducer create(Topic topic, TransportCnx cnx, String producerName) { + return new MQTTProducer(topic, cnx, PRODUCER_ID.incrementAndGet(), producerName, "", + false, null, SchemaVersion.Latest, 0, true, + ProducerAccessMode.Shared, Optional.empty(), true); + } +} diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandler.java index 1007269d3..d0b413828 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandler.java @@ -23,4 +23,6 @@ public interface QosPublishHandler { CompletableFuture publish(Connection connection, MqttAdapterMessage msg); + + void closeProducer(Connection connection); } diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlers.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlers.java index 48abab393..ee2197942 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlers.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlers.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.mqtt.broker.qos; - /** * Qos publish handlers for different Qos message publish. */ diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlersImpl.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlersImpl.java index 99c1bde2e..111511f4c 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlersImpl.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlersImpl.java @@ -44,4 +44,6 @@ public QosPublishHandler qos1() { public QosPublishHandler qos2() { return this.qos2Handler; } + + } diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java index b631ec1a7..96cecfed1 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java @@ -50,6 +50,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.service.ServerCnx; /** * Value object to maintain the information of single connection, like ClientID, Channel, and clean @@ -90,6 +91,9 @@ public class Connection { @Getter private AuthenticationDataSource authData; + @Getter + private ServerCnx serverCnx; + @Getter private final boolean fromProxy; private volatile ConnectionState connectionState = DISCONNECTED; @@ -118,6 +122,7 @@ public class Connection { this.processor = builder.processor; this.fromProxy = builder.fromProxy; this.authData = builder.authData; + this.serverCnx = builder.serverCnx; this.channel.attr(AUTH_DATA_ATTRIBUTE_KEY).set(authData); this.addIdleStateHandler(); this.manager.addConnection(this); @@ -309,6 +314,7 @@ public static class ConnectionBuilder { private boolean fromProxy; private AuthenticationDataSource authData; + private ServerCnx serverCnx; public ConnectionBuilder protocolVersion(int protocolVersion) { this.protocolVersion = protocolVersion; @@ -370,6 +376,11 @@ public ConnectionBuilder authData(AuthenticationDataSource authData) { return this; } + public ConnectionBuilder serverCnx(ServerCnx serverCnx) { + this.serverCnx = serverCnx; + return this; + } + public Connection build() { return new Connection(this); } From 9c1db5bb2d5dda1125f2a424df0900e284c78e44 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 2 Dec 2024 22:13:17 +0800 Subject: [PATCH 2/7] add log --- .../handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java | 1 + .../mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java | 1 + .../mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java | 1 + 3 files changed, 3 insertions(+) diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java index 57a33b602..8c74bb848 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java @@ -47,6 +47,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object message) { checkArgument(message instanceof MqttAdapterMessage); MqttAdapterMessage adapterMsg = (MqttAdapterMessage) message; + log.info("broker channel read from client : {}", adapterMsg.getClientId()); processors.computeIfAbsent(adapterMsg.getClientId(), key -> { MQTTBrokerProtocolMethodProcessor p = new MQTTBrokerProtocolMethodProcessor(mqttService, ctx); CompletableFuture inactiveFuture = p.getInactiveFuture(); diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java index c4f6250b7..3300eeab5 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java @@ -140,6 +140,7 @@ public MQTTBrokerProtocolMethodProcessor(MQTTService mqttService, ChannelHandler public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole, AuthenticationDataSource authData, ClientRestrictions clientRestrictions) { final MqttConnectMessage msg = (MqttConnectMessage) adapterMsg.getMqttMessage(); + log.info("broker process connect : {}", msg.payload().clientIdentifier()); ServerRestrictions serverRestrictions = ServerRestrictions.builder() .receiveMaximum(configuration.getReceiveMaximum()) .maximumPacketSize(configuration.getMqttMessageMaxLength()) diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java index 9d495418e..cf44232e5 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java @@ -126,6 +126,7 @@ public MQTTProxyProtocolMethodProcessor(MQTTProxyService proxyService, ChannelHa public void doProcessConnect(MqttAdapterMessage adapter, String userRole, AuthenticationDataSource authData, ClientRestrictions clientRestrictions) { MqttConnectMessage msg = (MqttConnectMessage) adapter.getMqttMessage(); + log.info("proxy process connect : {}", msg.payload().clientIdentifier()); final ServerRestrictions serverRestrictions = ServerRestrictions.builder() .receiveMaximum(proxyConfig.getReceiveMaximum()) .maximumPacketSize(proxyConfig.getMqttMessageMaxLength()) From ad1b5a9dc200160fda9f15eccd08f6c9e7f71df2 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 3 Dec 2024 12:17:35 +0800 Subject: [PATCH 3/7] fix --- .../handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java index 8c74bb848..57a33b602 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTBrokerInboundHandler.java @@ -47,7 +47,6 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object message) { checkArgument(message instanceof MqttAdapterMessage); MqttAdapterMessage adapterMsg = (MqttAdapterMessage) message; - log.info("broker channel read from client : {}", adapterMsg.getClientId()); processors.computeIfAbsent(adapterMsg.getClientId(), key -> { MQTTBrokerProtocolMethodProcessor p = new MQTTBrokerProtocolMethodProcessor(mqttService, ctx); CompletableFuture inactiveFuture = p.getInactiveFuture(); From 0f772e28b2b928d16c5c57ff6ff2012ef1d3d91a Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 3 Dec 2024 12:37:56 +0800 Subject: [PATCH 4/7] add producer to the topic --- .../mqtt/broker/qos/AbstractQosPublishHandler.java | 9 +++++++++ .../handlers/mqtt/broker/qos/MQTTProducer.java | 13 +++++++++++++ 2 files changed, 22 insertions(+) diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java index 5e118f322..196547457 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java @@ -31,6 +31,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; @@ -44,6 +45,7 @@ /** * Abstract class for publish handler. */ +@Slf4j public abstract class AbstractQosPublishHandler implements QosPublishHandler { protected final PulsarService pulsarService; @@ -115,6 +117,13 @@ protected CompletableFuture writeToPulsarTopic(Connection connection, Producer producer = producerMap.compute(producerName, (k, v) -> { if (v == null) { v = MQTTProducer.create(topic, connection.getServerCnx(), producerName); + final CompletableFuture> producerFuture = + topic.addProducer(v, new CompletableFuture<>()); + producerFuture.whenComplete((r, e) -> { + if (e != null) { + log.error("Failed to add producer", e); + } + }); } return v; }); diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java index de5f45eb9..a71fe521c 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java @@ -1,3 +1,16 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.streamnative.pulsar.handlers.mqtt.broker.qos; import java.util.Map; From a81dba6d8801764b8fb9786b52681a1a319673e3 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 6 Dec 2024 09:58:45 +0800 Subject: [PATCH 5/7] fix test --- .../mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java index 36b394726..ab5b05424 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java @@ -85,7 +85,6 @@ public void testBrokerThrowServiceNotReadyException() throws Exception { .collect(Collectors.toList())) .build(); - admin.clusters().createNamespaceIsolationPolicy("test", "policy-1", isolationData); try { final Mqtt5PublishResult r2 = client.publishWith() .topic(topic1) @@ -104,7 +103,6 @@ public void testBrokerThrowServiceNotReadyException() throws Exception { .send(); Assert.assertFalse(r3.getError().isPresent()); client.disconnect(); - admin.clusters().deleteNamespaceIsolationPolicy("test", "policy-1"); } @Test(invocationCount = 2) From fac866145ad20194bc904d300c2a465c353ff9f2 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 10 Dec 2024 16:24:09 +0800 Subject: [PATCH 6/7] Fix proxy bug --- .../mqtt/proxy/channel/AdapterChannel.java | 14 ++++++++++++-- .../impl/MQTTProxyProtocolMethodProcessor.java | 9 ++++----- .../handlers/mqtt/broker/AdapterChannelTest.java | 12 ++++++++++-- .../mqtt/mqtt3/fusesource/proxy/ProxyTest.java | 9 ++++++--- 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java index 54901fb39..fdbe75daf 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkArgument; import io.netty.channel.Channel; +import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.streamnative.pulsar.handlers.mqtt.common.Connection; import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage; import io.streamnative.pulsar.handlers.mqtt.common.utils.FutureUtils; @@ -39,14 +40,18 @@ public AdapterChannel(MQTTProxyAdapter adapter, this.channelFuture = channelFuture; } - public CompletableFuture writeAndFlush(final MqttAdapterMessage adapterMsg) { + public CompletableFuture writeAndFlush(final Connection connection, final MqttAdapterMessage adapterMsg) { checkArgument(StringUtils.isNotBlank(adapterMsg.getClientId()), "clientId is blank"); final String clientId = adapterMsg.getClientId(); adapterMsg.setEncodeType(MqttAdapterMessage.EncodeType.ADAPTER_MESSAGE); CompletableFuture future = channelFuture.thenCompose(channel -> { if (!channel.isActive()) { channelFuture = adapter.getChannel(broker); - return writeAndFlush(adapterMsg); + if (log.isDebugEnabled()) { + log.debug("channel is inactive, re-create channel to broker : {}", broker); + } + return writeConnectMessage(connection) + .thenCompose(__ -> writeAndFlush(connection, adapterMsg)); } return FutureUtils.completableFuture(channel.writeAndFlush(adapterMsg)); }); @@ -58,6 +63,11 @@ public CompletableFuture writeAndFlush(final MqttAdapterMessage adapterMsg return future; } + private CompletableFuture writeConnectMessage(final Connection connection) { + final MqttConnectMessage connectMessage = connection.getConnectMessage(); + return writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(), connectMessage)); + } + /** * When client subscribes, the adapter channel maybe close in exception, so register listener to close the * related client channel and trigger reconnection. diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java index cf44232e5..03a89d2e5 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java @@ -246,7 +246,7 @@ public void processPingReq(final MqttAdapterMessage msg) { topicBrokers.values().forEach(adapterChannel -> { adapterChannel.thenAccept(channel -> { msg.setClientId(clientId); - channel.writeAndFlush(msg); + channel.writeAndFlush(connection, msg); }); }); } @@ -270,7 +270,7 @@ public void processDisconnect(final MqttAdapterMessage msg) { .filter(future -> !future.isCompletedExceptionally()) .map(CompletableFuture::join) .collect(Collectors.toSet()) - .forEach(channel -> channel.writeAndFlush(msg))); + .forEach(channel -> channel.writeAndFlush(connection, msg))); } else { if (log.isDebugEnabled()) { log.debug("Disconnect is already triggered, ignore"); @@ -465,7 +465,7 @@ public void processUnSubscribe(final MqttAdapterMessage adapter) { private CompletableFuture writeToBroker(final String topic, final MqttAdapterMessage msg) { CompletableFuture proxyExchanger = connectToBroker(topic); - return proxyExchanger.thenCompose(exchanger -> exchanger.writeAndFlush(msg)); + return proxyExchanger.thenCompose(exchanger -> exchanger.writeAndFlush(connection, msg)); } private CompletableFuture connectToBroker(final String topic) { @@ -474,8 +474,7 @@ private CompletableFuture connectToBroker(final String topic) { adapterChannels.computeIfAbsent(mqttBroker, key1 -> { AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker); final MqttConnectMessage connectMessage = connection.getConnectMessage(); - - adapterChannel.writeAndFlush(new MqttAdapterMessage(connection.getClientId(), + adapterChannel.writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(), connectMessage)); return adapterChannel; }) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java index f8c4ba206..48954a746 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java @@ -21,8 +21,11 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import io.streamnative.pulsar.handlers.mqtt.common.Connection; import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; +import io.streamnative.pulsar.handlers.mqtt.common.MQTTConnectionManager; import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage; +import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ClientRestrictions; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; import io.streamnative.pulsar.handlers.mqtt.proxy.channel.AdapterChannel; @@ -38,7 +41,6 @@ import org.testng.annotations.Test; - public class AdapterChannelTest extends MQTTTestBase { @Override @@ -74,7 +76,13 @@ public void testAdapterChannelAutoGetConnection() throws InterruptedException { MqttConnectMessage fakeConnectMessage = MqttMessageBuilders.connect() .clientId(clientId).build(); MqttAdapterMessage mqttAdapterMessage = new MqttAdapterMessage(clientId, fakeConnectMessage); - adapterChannel.writeAndFlush(mqttAdapterMessage).join(); + Connection connection = Connection.builder() + .channel(channel) + .clientRestrictions(ClientRestrictions.builder().keepAliveTime(10).build()) + .connectionManager(Mockito.mock(MQTTConnectionManager.class)) + .clientId(clientId) + .connectMessage(fakeConnectMessage).build(); + adapterChannel.writeAndFlush(connection, mqttAdapterMessage).join(); CompletableFuture channelFutureAfterSend = brokerChannels.get(key); Channel channelAfterSend = channelFutureAfterSend.join(); assertNotEquals(channelAfterSend.id(), previousChannelId); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java index cf99213d7..454d64f5b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java @@ -346,24 +346,27 @@ public void testPubAndSubWithDifferentTopics() { @Test public void testTopicUnload() throws Exception { MQTT mqttConsumer = createMQTTProxyClient(); + mqttConsumer.setClientId("client-consumer"); BlockingConnection consumer = mqttConsumer.blockingConnection(); consumer.connect(); String topicName1 = "topic-unload-1"; Topic[] topic1 = { new Topic(topicName1, QoS.AT_MOST_ONCE)}; consumer.subscribe(topic1); MQTT mqttProducer = createMQTTProxyClient(); + mqttProducer.setClientId("client-producer"); BlockingConnection producer = mqttProducer.blockingConnection(); producer.connect(); String msg1 = "hello topic1"; producer.publish(topicName1, msg1.getBytes(StandardCharsets.UTF_8), QoS.AT_MOST_ONCE, false); - Message receive1 = consumer.receive(); + Message receive1 = consumer.receive(10, TimeUnit.SECONDS); Assert.assertEquals(new String(receive1.getPayload()), msg1); Assert.assertEquals(receive1.getTopic(), topicName1); admin.topics().unload(topicName1); Thread.sleep(5000); + log.info("unloaded topic : {}", topicName1); producer.publish(topicName1, msg1.getBytes(StandardCharsets.UTF_8), QoS.AT_MOST_ONCE, false); - producer.disconnect(); - Message receive2 = consumer.receive(); +// producer.disconnect(); + Message receive2 = consumer.receive(10, TimeUnit.SECONDS); Assert.assertEquals(new String(receive2.getPayload()), msg1); Assert.assertEquals(receive2.getTopic(), topicName1); consumer.disconnect(); From 23cc245767a2f695d742d9ec2cb5ca453110e86c Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 11 Dec 2024 00:00:59 +0800 Subject: [PATCH 7/7] remove log --- .../mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java | 1 - .../mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java | 1 - 2 files changed, 2 deletions(-) diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java index 3300eeab5..c4f6250b7 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java @@ -140,7 +140,6 @@ public MQTTBrokerProtocolMethodProcessor(MQTTService mqttService, ChannelHandler public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole, AuthenticationDataSource authData, ClientRestrictions clientRestrictions) { final MqttConnectMessage msg = (MqttConnectMessage) adapterMsg.getMqttMessage(); - log.info("broker process connect : {}", msg.payload().clientIdentifier()); ServerRestrictions serverRestrictions = ServerRestrictions.builder() .receiveMaximum(configuration.getReceiveMaximum()) .maximumPacketSize(configuration.getMqttMessageMaxLength()) diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java index 03a89d2e5..d7f138d3a 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java @@ -126,7 +126,6 @@ public MQTTProxyProtocolMethodProcessor(MQTTProxyService proxyService, ChannelHa public void doProcessConnect(MqttAdapterMessage adapter, String userRole, AuthenticationDataSource authData, ClientRestrictions clientRestrictions) { MqttConnectMessage msg = (MqttConnectMessage) adapter.getMqttMessage(); - log.info("proxy process connect : {}", msg.payload().clientIdentifier()); final ServerRestrictions serverRestrictions = ServerRestrictions.builder() .receiveMaximum(proxyConfig.getReceiveMaximum()) .maximumPacketSize(proxyConfig.getMqttMessageMaxLength())