Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Configuration Option for Dollar-Topic Publishing #567

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.hivemq.configuration.entity;

import com.hivemq.configuration.entity.mqtt.AllowDollarTopicsConfigEntity;
import com.hivemq.configuration.entity.mqtt.KeepAliveConfigEntity;
import com.hivemq.configuration.entity.mqtt.MessageExpiryConfigEntity;
import com.hivemq.configuration.entity.mqtt.PacketsConfigEntity;
Expand Down Expand Up @@ -48,6 +49,9 @@ public class MqttConfigEntity {
@XmlElementRef(required = false)
private @NotNull RetainedMessagesConfigEntity retainedMessagesConfigEntity = new RetainedMessagesConfigEntity();

@XmlElementRef(required = false)
private @NotNull AllowDollarTopicsConfigEntity allowDollarTopicsEntity = new AllowDollarTopicsConfigEntity();

@XmlElementRef(required = false)
private @NotNull WildcardSubscriptionsConfigEntity wildcardSubscriptionsConfigEntity =
new WildcardSubscriptionsConfigEntity();
Expand Down Expand Up @@ -101,6 +105,11 @@ public class MqttConfigEntity {
return retainedMessagesConfigEntity;
}

public @NotNull AllowDollarTopicsConfigEntity getAllowDollarTopicConfigEntity() {
return allowDollarTopicsEntity;
}


public @NotNull WildcardSubscriptionsConfigEntity getWildcardSubscriptionsConfigEntity() {
return wildcardSubscriptionsConfigEntity;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.configuration.entity.mqtt;

import com.hivemq.configuration.entity.EnabledEntity;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;

/**
* @author Marcel Bartholet
* @since 2025.1
*/
@XmlRootElement(name = "allow-dollar-topics")
@XmlAccessorType(XmlAccessType.NONE)
@SuppressWarnings({"FieldMayBeFinal", "FieldCanBeLocal"})
public class AllowDollarTopicsConfigEntity extends EnabledEntity {
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class MqttConfigurationDefaults {
public static final long MAX_EXPIRY_INTERVAL_DEFAULT = UnsignedDataTypes.UNSIGNED_INT_MAX_VALUE + 1;

public static final boolean RETAINED_MESSAGES_ENABLED_DEFAULT = true;
public static final boolean ALLOW_DOLLAR_TOPICS_DEFAULT = false;

public static final QoS MAXIMUM_QOS_DEFAULT = QoS.EXACTLY_ONCE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ void setMqttConfig(@NotNull final MqttConfigEntity mqttConfigEntity) {
mqttConfigurationService.setRetainedMessagesEnabled(mqttConfigEntity.getRetainedMessagesConfigEntity()
.isEnabled());

mqttConfigurationService.setAllowDollarTopicsEnabled(mqttConfigEntity.getAllowDollarTopicConfigEntity()
.isEnabled());

mqttConfigurationService.setWildcardSubscriptionsEnabled(mqttConfigEntity.getWildcardSubscriptionsConfigEntity()
.isEnabled());
mqttConfigurationService.setSubscriptionIdentifierEnabled(mqttConfigEntity.getSubscriptionIdentifierConfigEntity()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,6 @@ public class InternalConfigurations {

public static final AtomicInteger INTERVAL_BETWEEN_CLEANUP_JOBS_SEC = new AtomicInteger(4);

public static final AtomicBoolean MQTT_ALLOW_DOLLAR_TOPICS = new AtomicBoolean(false);

public static final AtomicInteger MQTT_EVENT_EXECUTOR_THREAD_COUNT =
new AtomicInteger(AVAILABLE_PROCESSORS_TIMES_TWO);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public int getIndex() {
*/
boolean retainedMessagesEnabled();

/**
* @return true if publishing to dollar topics is enabled, else false. Default false
*/
boolean allowDollarTopicsEnabled();

/**
* @return true if wildcard subscriptions are enabled, else false. Default true
*/
Expand Down Expand Up @@ -150,6 +155,8 @@ public int getIndex() {
void setMaxMessageExpiryInterval(final long maxMessageExpiryInterval);

void setRetainedMessagesEnabled(final boolean enabled);

void setAllowDollarTopicsEnabled(final boolean enabled);

void setWildcardSubscriptionsEnabled(final boolean enabled);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.MAXIMUM_QOS_DEFAULT;
import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.MAX_EXPIRY_INTERVAL_DEFAULT;
import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.MAX_QUEUED_MESSAGES_DEFAULT;
import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.ALLOW_DOLLAR_TOPICS_DEFAULT;
import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.QUEUED_MESSAGES_STRATEGY_DEFAULT;
import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.RETAINED_MESSAGES_ENABLED_DEFAULT;
import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.SERVER_RECEIVE_MAXIMUM_DEFAULT;
Expand Down Expand Up @@ -64,6 +65,8 @@ public class MqttConfigurationServiceImpl implements MqttConfigurationService {

private final AtomicBoolean retainedMessagesEnabled = new AtomicBoolean(RETAINED_MESSAGES_ENABLED_DEFAULT);

private final AtomicBoolean allowDollarTopics = new AtomicBoolean(ALLOW_DOLLAR_TOPICS_DEFAULT);

private final AtomicBoolean wildcardSubscriptionsEnabled =
new AtomicBoolean(WILDCARD_SUBSCRIPTIONS_ENABLED_DEFAULT);

Expand Down Expand Up @@ -112,6 +115,9 @@ public boolean retainedMessagesEnabled() {
return retainedMessagesEnabled.get();
}

@Override
public boolean allowDollarTopicsEnabled() { return allowDollarTopics.get(); }

@Override
public boolean wildcardSubscriptionsEnabled() {
return wildcardSubscriptionsEnabled.get();
Expand Down Expand Up @@ -189,6 +195,12 @@ public void setRetainedMessagesEnabled(final boolean enabled) {
this.retainedMessagesEnabled.set(enabled);
}

@Override
public void setAllowDollarTopicsEnabled(final boolean enabled) {
log.debug("Setting allow dollar topics enabled to {}", enabled);
this.allowDollarTopics.set(enabled);
}

@Override
public void setWildcardSubscriptionsEnabled(final boolean enabled) {
log.debug("Setting wildcard subscriptions enabled to {}", enabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,22 @@
*/
package com.hivemq.extensions.handler;

import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.bootstrap.ClientConnectionContext;
import com.hivemq.configuration.ConfigurationBootstrap;
import com.hivemq.configuration.info.SystemInformation;
import com.hivemq.configuration.info.SystemInformationImpl;
import com.hivemq.configuration.ioc.ConfigurationModule;
import com.hivemq.configuration.reader.MqttConfigurator;
import com.hivemq.configuration.service.FullConfigurationService;
import com.hivemq.configuration.service.MqttConfigurationService;
import com.hivemq.configuration.service.impl.ConfigurationServiceImpl;
import com.hivemq.configuration.service.impl.MqttConfigurationServiceImpl;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.auth.parameter.AuthorizerProviderInput;
import com.hivemq.extension.sdk.api.client.parameter.ServerInformation;
Expand Down Expand Up @@ -63,8 +73,6 @@
import java.util.List;
import java.util.Map;

import static com.hivemq.configuration.service.InternalConfigurations.MQTT_ALLOW_DOLLAR_TOPICS;

/**
* @author Florian Limpöck
* @since 4.1.0
Expand Down Expand Up @@ -92,7 +100,8 @@ public PluginAuthorizerServiceImpl(
final @NotNull HiveMQExtensions hiveMQExtensions,
final @NotNull MqttServerDisconnector mqttServerDisconnector,
final @NotNull IncomingPublishService incomingPublishService,
final @NotNull IncomingSubscribeService incomingSubscribeService) {
final @NotNull IncomingSubscribeService incomingSubscribeService,
final @NotNull MqttConfigurationService mqttConfigService) {

this.authorizers = authorizers;
this.asyncer = asyncer;
Expand All @@ -102,7 +111,7 @@ public PluginAuthorizerServiceImpl(
this.mqttServerDisconnector = mqttServerDisconnector;
this.extensionPriorityComparator = new ExtensionPriorityComparator(hiveMQExtensions);
this.incomingSubscribeService = incomingSubscribeService;
this.allowDollarTopics = MQTT_ALLOW_DOLLAR_TOPICS.get();
this.allowDollarTopics = mqttConfigService.allowDollarTopicsEnabled();
}

public void authorizePublish(final @NotNull ChannelHandlerContext ctx, final @NotNull PUBLISH msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void verify_mqtt_default_values() {
.getMaxInterval());
verify(mqttConfigurationService).setRetainedMessagesEnabled(defaultMqttValues.getRetainedMessagesConfigEntity()
.isEnabled());
verify(mqttConfigurationService).setAllowDollarTopicsEnabled(defaultMqttValues.getAllowDollarTopicConfigEntity()
.isEnabled());
verify(mqttConfigurationService).setWildcardSubscriptionsEnabled(defaultMqttValues.getWildcardSubscriptionsConfigEntity()
.isEnabled());
verify(mqttConfigurationService).setMaximumQos(QoS.valueOf(defaultMqttValues.getQoSConfigEntity().getMaxQos()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.bootstrap.ClientConnectionContext;
import com.hivemq.common.shutdown.ShutdownHooks;
import com.hivemq.configuration.service.MqttConfigurationService;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.client.parameter.ServerInformation;
import com.hivemq.extension.sdk.api.services.auth.provider.AuthorizerProvider;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class PluginAuthorizerServiceImplTest {
private final @NotNull EventLog eventLog = mock(EventLog.class);
private final @NotNull IncomingPublishService incomingPublishService = mock(IncomingPublishService.class);
private final @NotNull PublishFlushHandler publishFlushHandler = mock(PublishFlushHandler.class);
private final @NotNull MqttConfigurationService mqttConfigService = mock(MqttConfigurationService.class);

private @NotNull PluginTaskExecutor executor;
private @NotNull EmbeddedChannel channel;
Expand Down Expand Up @@ -139,7 +141,8 @@ public void setUp() throws Exception {
hiveMQExtensions,
mqttServerDisconnector,
incomingPublishService,
incomingSubscribeService);
incomingSubscribeService,
mqttConfigService);

eventsHandler = new CollectUserEventsHandler<>(AuthorizeWillResultEvent.class);
channel.pipeline().addLast(eventsHandler);
Expand Down