package io.moquette.spi.impl;

import io.moquette.proto.messages.AbstractMessage;
import io.moquette.proto.messages.ConnAckMessage;
import io.moquette.proto.messages.ConnectMessage;
import io.moquette.proto.messages.PubAckMessage;
import io.moquette.proto.messages.PubCompMessage;
import io.moquette.proto.messages.PubRecMessage;
import io.moquette.proto.messages.PubRelMessage;
import io.moquette.proto.messages.PublishMessage;
import io.moquette.proto.messages.SubAckMessage;
import io.moquette.proto.messages.SubscribeMessage;
import io.moquette.proto.messages.UnsubAckMessage;
import io.moquette.proto.messages.UnsubscribeMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMatchingCondition;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.SubscriptionsStore;
import io.moquette.spi.security.IAuthenticator;
import io.moquette.spi.security.IAuthorizator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.timeout.IdleStateHandler;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class ProtocolProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolProcessor.class);
    private boolean allowAnonymous;
    private IAuthenticator m_authenticator;
    private IAuthorizator m_authorizator;
    protected ConcurrentMap<String, ConnectionDescriptor> m_clientIDs;
    private BrokerInterceptor m_interceptor;
    private IMessagesStore m_messagesStore;
    private ISessionsStore m_sessionsStore;
    private ConcurrentMap<String, WillMessage> m_willStore = new ConcurrentHashMap();
    private SubscriptionsStore subscriptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public static final class WillMessage {
        private final ByteBuffer payload;
        private final AbstractMessage.QOSType qos;
        private final boolean retained;
        private final String topic;

        public WillMessage(String str, ByteBuffer byteBuffer, boolean z, AbstractMessage.QOSType qOSType) {
            this.topic = str;
            this.payload = byteBuffer;
            this.retained = z;
            this.qos = qOSType;
        }

        public ByteBuffer getPayload() {
            return this.payload;
        }

        public AbstractMessage.QOSType getQos() {
            return this.qos;
        }

        public String getTopic() {
            return this.topic;
        }

        public boolean isRetained() {
            return this.retained;
        }
    }

    private static IMessagesStore.StoredMessage asStoredMessage(PublishMessage publishMessage) {
        IMessagesStore.StoredMessage storedMessage = new IMessagesStore.StoredMessage(publishMessage.getPayload().array(), publishMessage.getQos(), publishMessage.getTopicName());
        storedMessage.setRetained(publishMessage.isRetainFlag());
        storedMessage.setMessageID(publishMessage.getMessageID());
        return storedMessage;
    }

    private static IMessagesStore.StoredMessage asStoredMessage(WillMessage willMessage) {
        IMessagesStore.StoredMessage storedMessage = new IMessagesStore.StoredMessage(willMessage.getPayload().array(), willMessage.getQos(), willMessage.getTopic());
        storedMessage.setRetained(willMessage.isRetained());
        return storedMessage;
    }

    private void failedCredentials(Channel channel) {
        ConnAckMessage connAckMessage = new ConnAckMessage();
        connAckMessage.setReturnCode((byte) 4);
        channel.writeAndFlush(connAckMessage);
    }

    private void forwardPublishWill(WillMessage willMessage, String str) {
        Integer valueOf = willMessage.getQos() != AbstractMessage.QOSType.MOST_ONE ? Integer.valueOf(this.m_sessionsStore.nextPacketID(str)) : null;
        IMessagesStore.StoredMessage asStoredMessage = asStoredMessage(willMessage);
        asStoredMessage.setClientID(str);
        asStoredMessage.setMessageID(valueOf);
        route2Subscribers(asStoredMessage);
    }

    private void republishStoredInSession(ClientSession clientSession) {
        LOG.trace("republishStoredInSession for client <{}>", clientSession);
        List<IMessagesStore.StoredMessage> storedMessages = clientSession.storedMessages();
        if (storedMessages.isEmpty()) {
            LOG.info("No stored messages for client <{}>", clientSession.clientID);
            return;
        }
        LOG.info("republishing stored messages to client <{}>", clientSession.clientID);
        for (IMessagesStore.StoredMessage storedMessage : storedMessages) {
            directSend(clientSession, storedMessage.getTopic(), storedMessage.getQos(), storedMessage.getMessage(), false, storedMessage.getMessageID());
            clientSession.removeEnqueued(storedMessage.getGuid());
        }
    }

    private void sendPubAck(String str, int i) {
        LOG.trace("sendPubAck invoked");
        PubAckMessage pubAckMessage = new PubAckMessage();
        pubAckMessage.setMessageID(Integer.valueOf(i));
        try {
            if (this.m_clientIDs == null) {
                throw new RuntimeException("Internal bad error, found m_clientIDs to null while it should be initialized, somewhere it's overwritten!!");
            }
            LOG.debug("clientIDs are {}", this.m_clientIDs);
            if (this.m_clientIDs.get(str) == null) {
                throw new RuntimeException(String.format("Can't find a ConnectionDescriptor for client %s in cache %s", str, this.m_clientIDs));
            }
            this.m_clientIDs.get(str).channel.writeAndFlush(pubAckMessage);
        } catch (Throwable th) {
            LOG.error((String) null, th);
        }
    }

    private void sendPubComp(String str, int i) {
        LOG.debug("PUB <--PUBCOMP-- SRV sendPubComp invoked for clientID {} ad messageID {}", str, Integer.valueOf(i));
        PubCompMessage pubCompMessage = new PubCompMessage();
        pubCompMessage.setMessageID(Integer.valueOf(i));
        this.m_clientIDs.get(str).channel.writeAndFlush(pubCompMessage);
    }

    private void sendPubRec(String str, int i) {
        LOG.trace("PUB <--PUBREC-- SRV sendPubRec invoked for clientID {} with messageID {}", str, Integer.valueOf(i));
        PubRecMessage pubRecMessage = new PubRecMessage();
        pubRecMessage.setMessageID(Integer.valueOf(i));
        this.m_clientIDs.get(str).channel.writeAndFlush(pubRecMessage);
    }

    private void setIdleTime(ChannelPipeline channelPipeline, int i) {
        if (channelPipeline.names().contains("idleStateHandler")) {
            channelPipeline.remove("idleStateHandler");
        }
        channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, i));
    }

    private boolean subscribeSingleTopic(final Subscription subscription) {
        this.subscriptions.add(subscription.asClientTopicCouple());
        Collection<IMessagesStore.StoredMessage> searchMatching = this.m_messagesStore.searchMatching(new IMatchingCondition() { // from class: io.moquette.spi.impl.ProtocolProcessor.1
            @Override // io.moquette.spi.IMatchingCondition
            public boolean match(String str) {
                return SubscriptionsStore.matchTopics(str, subscription.getTopicFilter());
            }
        });
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(subscription.getClientId());
        verifyToActivate(subscription.getClientId(), sessionForClient);
        for (IMessagesStore.StoredMessage storedMessage : searchMatching) {
            LOG.debug("send publish message for topic {}", subscription.getTopicFilter());
            directSend(sessionForClient, storedMessage.getTopic(), storedMessage.getQos(), storedMessage.getPayload(), true, storedMessage.getQos() == AbstractMessage.QOSType.MOST_ONE ? null : Integer.valueOf(sessionForClient.nextPacketId()));
        }
        this.m_interceptor.notifyTopicSubscribed(subscription);
        return true;
    }

    private void verifyToActivate(String str, ClientSession clientSession) {
        if (this.m_clientIDs.containsKey(str)) {
            clientSession.activate();
        }
    }

    protected void directSend(ClientSession clientSession, String str, AbstractMessage.QOSType qOSType, ByteBuffer byteBuffer, boolean z, Integer num) {
        String str2 = clientSession.clientID;
        LOG.debug("directSend invoked clientId <{}> on topic <{}> QoS {} retained {} messageID {}", str2, str, qOSType, Boolean.valueOf(z), num);
        PublishMessage publishMessage = new PublishMessage();
        publishMessage.setRetainFlag(z);
        publishMessage.setTopicName(str);
        publishMessage.setQos(qOSType);
        publishMessage.setPayload(byteBuffer);
        LOG.info("send publish message to <{}> on topic <{}>", str2, str);
        if (LOG.isDebugEnabled()) {
            LOG.debug("content <{}>", DebugUtils.payload2Str(byteBuffer));
        }
        if (publishMessage.getQos() != AbstractMessage.QOSType.MOST_ONE) {
            publishMessage.setMessageID(num);
        } else if (num != null) {
            throw new RuntimeException("Internal bad error, trying to forwardPublish a QoS 0 message with PacketIdentifier: " + num);
        }
        if (this.m_clientIDs == null) {
            throw new RuntimeException("Internal bad error, found m_clientIDs to null while it should be initialized, somewhere it's overwritten!!");
        }
        LOG.debug("clientIDs are {}", this.m_clientIDs);
        if (this.m_clientIDs.get(str2) == null) {
            throw new RuntimeException(String.format("Can't find a ConnectionDescriptor for client <%s> in cache <%s>", str2, this.m_clientIDs));
        }
        Channel channel = this.m_clientIDs.get(str2).channel;
        LOG.debug("Session for clientId {} is {}", str2, channel);
        channel.writeAndFlush(publishMessage);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init(SubscriptionsStore subscriptionsStore, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, IAuthorizator iAuthorizator, BrokerInterceptor brokerInterceptor) {
        this.m_clientIDs = new ConcurrentHashMap();
        this.m_interceptor = brokerInterceptor;
        this.subscriptions = subscriptionsStore;
        this.allowAnonymous = z;
        this.m_authorizator = iAuthorizator;
        LOG.trace("subscription tree on init {}", subscriptionsStore.dumpTree());
        this.m_authenticator = iAuthenticator;
        this.m_messagesStore = iMessagesStore;
        this.m_sessionsStore = iSessionsStore;
    }

    public void internalPublish(PublishMessage publishMessage) {
        AbstractMessage.QOSType qos = publishMessage.getQos();
        String topicName = publishMessage.getTopicName();
        LOG.info("embedded PUBLISH on topic <{}> with QoS {}", topicName, qos);
        IMessagesStore.StoredMessage asStoredMessage = asStoredMessage(publishMessage);
        asStoredMessage.setClientID("BROKER_SELF");
        asStoredMessage.setMessageID(1);
        String storePublishForFuture = qos == AbstractMessage.QOSType.EXACTLY_ONCE ? this.m_messagesStore.storePublishForFuture(asStoredMessage) : null;
        route2Subscribers(asStoredMessage);
        if (publishMessage.isRetainFlag()) {
            if (qos == AbstractMessage.QOSType.MOST_ONE || !publishMessage.getPayload().hasRemaining()) {
                this.m_messagesStore.cleanRetained(topicName);
                return;
            }
            if (storePublishForFuture == null) {
                storePublishForFuture = this.m_messagesStore.storePublishForFuture(asStoredMessage);
            }
            this.m_messagesStore.storeRetained(topicName, storePublishForFuture);
        }
    }

    public void processConnect(Channel channel, ConnectMessage connectMessage) {
        LOG.debug("CONNECT for client <{}>", connectMessage.getClientID());
        if (connectMessage.getProtocolVersion() != 3 && connectMessage.getProtocolVersion() != 4) {
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte) 1);
            LOG.warn("processConnect sent bad proto ConnAck");
            channel.writeAndFlush(connAckMessage);
            channel.close();
            return;
        }
        if (connectMessage.getClientID() == null || connectMessage.getClientID().length() == 0) {
            ConnAckMessage connAckMessage2 = new ConnAckMessage();
            connAckMessage2.setReturnCode((byte) 2);
            channel.writeAndFlush(connAckMessage2);
            this.m_interceptor.notifyClientConnected(connectMessage);
            return;
        }
        if (connectMessage.isUserFlag()) {
            byte[] bArr = null;
            if (connectMessage.isPasswordFlag()) {
                bArr = connectMessage.getPassword();
            } else if (!this.allowAnonymous) {
                failedCredentials(channel);
                return;
            }
            if (!this.m_authenticator.checkValid(connectMessage.getUsername(), bArr)) {
                failedCredentials(channel);
                channel.close();
                return;
            }
            NettyUtils.userName(channel, connectMessage.getUsername());
        } else if (!this.allowAnonymous) {
            failedCredentials(channel);
            return;
        }
        if (this.m_clientIDs.containsKey(connectMessage.getClientID())) {
            LOG.info("Found an existing connection with same client ID <{}>, forcing to close", connectMessage.getClientID());
            Channel channel2 = this.m_clientIDs.get(connectMessage.getClientID()).channel;
            this.m_sessionsStore.sessionForClient(connectMessage.getClientID()).disconnect();
            NettyUtils.sessionStolen(channel2, true);
            channel2.close();
            LOG.debug("Existing connection with same client ID <{}>, forced to close", connectMessage.getClientID());
        }
        this.m_clientIDs.put(connectMessage.getClientID(), new ConnectionDescriptor(connectMessage.getClientID(), channel, connectMessage.isCleanSession()));
        int keepAlive = connectMessage.getKeepAlive();
        LOG.debug("Connect with keepAlive {} s", Integer.valueOf(keepAlive));
        NettyUtils.keepAlive(channel, keepAlive);
        NettyUtils.cleanSession(channel, connectMessage.isCleanSession());
        NettyUtils.clientID(channel, connectMessage.getClientID());
        LOG.debug("Connect create session <{}>", channel);
        setIdleTime(channel.pipeline(), Math.round(keepAlive * 1.5f));
        if (connectMessage.isWillFlag()) {
            AbstractMessage.QOSType valueOf = AbstractMessage.QOSType.valueOf(connectMessage.getWillQos());
            byte[] willMessage = connectMessage.getWillMessage();
            this.m_willStore.put(connectMessage.getClientID(), new WillMessage(connectMessage.getWillTopic(), (ByteBuffer) ByteBuffer.allocate(willMessage.length).put(willMessage).flip(), connectMessage.isWillRetain(), valueOf));
        }
        ConnAckMessage connAckMessage3 = new ConnAckMessage();
        connAckMessage3.setReturnCode((byte) 0);
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(connectMessage.getClientID());
        boolean z = sessionForClient != null;
        if (!connectMessage.isCleanSession() && z) {
            connAckMessage3.setSessionPresent(true);
        }
        if (z) {
            sessionForClient.cleanSession(connectMessage.isCleanSession());
        }
        channel.writeAndFlush(connAckMessage3);
        this.m_interceptor.notifyClientConnected(connectMessage);
        if (!z) {
            LOG.info("Create persistent session for clientID <{}>", connectMessage.getClientID());
            sessionForClient = this.m_sessionsStore.createNewSession(connectMessage.getClientID(), connectMessage.isCleanSession());
        }
        sessionForClient.activate();
        if (connectMessage.isCleanSession()) {
            sessionForClient.cleanSession();
        }
        LOG.info("Connected client ID <{}> with clean session {}", connectMessage.getClientID(), Boolean.valueOf(connectMessage.isCleanSession()));
        if (!connectMessage.isCleanSession()) {
            republishStoredInSession(sessionForClient);
        }
        LOG.info("CONNECT processed");
    }

    public void processConnectionLost(String str, boolean z, Channel channel) {
        this.m_clientIDs.remove(str, new ConnectionDescriptor(str, channel, true));
        if (z) {
            this.m_sessionsStore.sessionForClient(str).deactivate();
            LOG.info("Lost connection with client <{}>", str);
        }
        if (z || !this.m_willStore.containsKey(str)) {
            return;
        }
        forwardPublishWill(this.m_willStore.get(str), str);
        this.m_willStore.remove(str);
    }

    public void processDisconnect(Channel channel) throws InterruptedException {
        String clientID = NettyUtils.clientID(channel);
        boolean cleanSession = NettyUtils.cleanSession(channel);
        LOG.info("DISCONNECT client <{}> with clean session {}", clientID, Boolean.valueOf(cleanSession));
        this.m_sessionsStore.sessionForClient(clientID).disconnect();
        this.m_clientIDs.remove(clientID);
        channel.close();
        this.m_willStore.remove(clientID);
        this.m_interceptor.notifyClientDisconnected(clientID);
        LOG.info("DISCONNECT client <{}> finished", clientID, Boolean.valueOf(cleanSession));
    }

    public void processPubAck(Channel channel, PubAckMessage pubAckMessage) {
        String clientID = NettyUtils.clientID(channel);
        int intValue = pubAckMessage.getMessageID().intValue();
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        verifyToActivate(clientID, sessionForClient);
        sessionForClient.inFlightAcknowledged(intValue);
    }

    public void processPubComp(Channel channel, PubCompMessage pubCompMessage) {
        String clientID = NettyUtils.clientID(channel);
        int intValue = pubCompMessage.getMessageID().intValue();
        LOG.debug("\t\tSRV <--PUBCOMP-- SUB processPubComp invoked for clientID {} ad messageID {}", clientID, Integer.valueOf(intValue));
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        verifyToActivate(clientID, sessionForClient);
        sessionForClient.secondPhaseAcknowledged(intValue);
    }

    public void processPubRec(Channel channel, PubRecMessage pubRecMessage) {
        String clientID = NettyUtils.clientID(channel);
        int intValue = pubRecMessage.getMessageID().intValue();
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        verifyToActivate(clientID, sessionForClient);
        sessionForClient.inFlightAcknowledged(intValue);
        sessionForClient.secondPhaseAckWaiting(intValue);
        LOG.debug("\t\tSRV <--PUBREC-- SUB processPubRec invoked for clientID {} ad messageID {}", clientID, Integer.valueOf(intValue));
        PubRelMessage pubRelMessage = new PubRelMessage();
        pubRelMessage.setMessageID(Integer.valueOf(intValue));
        pubRelMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
        channel.writeAndFlush(pubRelMessage);
    }

    public void processPubRel(Channel channel, PubRelMessage pubRelMessage) {
        String clientID = NettyUtils.clientID(channel);
        int intValue = pubRelMessage.getMessageID().intValue();
        LOG.debug("PUB --PUBREL--> SRV processPubRel invoked for clientID {} ad messageID {}", clientID, Integer.valueOf(intValue));
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        verifyToActivate(clientID, sessionForClient);
        IMessagesStore.StoredMessage storedMessage = sessionForClient.storedMessage(intValue);
        route2Subscribers(storedMessage);
        if (storedMessage.isRetained()) {
            String topic = storedMessage.getTopic();
            if (storedMessage.getMessage().hasRemaining()) {
                this.m_messagesStore.storeRetained(topic, storedMessage.getGuid());
            } else {
                this.m_messagesStore.cleanRetained(topic);
            }
        }
        sendPubComp(clientID, intValue);
    }

    public void processPublish(Channel channel, PublishMessage publishMessage) {
        LOG.trace("PUB --PUBLISH--> SRV executePublish invoked with {}", publishMessage);
        String clientID = NettyUtils.clientID(channel);
        String topicName = publishMessage.getTopicName();
        if (!this.m_authorizator.canWrite(topicName, NettyUtils.userName(channel), clientID)) {
            LOG.debug("topic {} doesn't have write credentials", topicName);
            return;
        }
        AbstractMessage.QOSType qos = publishMessage.getQos();
        Integer messageID = publishMessage.getMessageID();
        LOG.info("PUBLISH from clientID <{}> on topic <{}> with QoS {}", clientID, topicName, qos);
        String str = null;
        IMessagesStore.StoredMessage asStoredMessage = asStoredMessage(publishMessage);
        asStoredMessage.setClientID(clientID);
        if (qos == AbstractMessage.QOSType.MOST_ONE) {
            route2Subscribers(asStoredMessage);
        } else if (qos == AbstractMessage.QOSType.LEAST_ONE) {
            route2Subscribers(asStoredMessage);
            sendPubAck(clientID, messageID.intValue());
            LOG.debug("replying with PubAck to MSG ID {}", messageID);
        } else if (qos == AbstractMessage.QOSType.EXACTLY_ONCE) {
            str = this.m_messagesStore.storePublishForFuture(asStoredMessage);
            sendPubRec(clientID, messageID.intValue());
        }
        if (publishMessage.isRetainFlag()) {
            if (qos == AbstractMessage.QOSType.MOST_ONE) {
                this.m_messagesStore.cleanRetained(topicName);
            } else if (publishMessage.getPayload().hasRemaining()) {
                if (str == null) {
                    str = this.m_messagesStore.storePublishForFuture(asStoredMessage);
                }
                this.m_messagesStore.storeRetained(topicName, str);
            } else {
                this.m_messagesStore.cleanRetained(topicName);
            }
        }
        this.m_interceptor.notifyTopicPublished(publishMessage, clientID);
    }

    public void processSubscribe(Channel channel, SubscribeMessage subscribeMessage) {
        String clientID = NettyUtils.clientID(channel);
        LOG.debug("SUBSCRIBE client <{}> packetID {}", clientID, subscribeMessage.getMessageID());
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        verifyToActivate(clientID, sessionForClient);
        SubAckMessage subAckMessage = new SubAckMessage();
        subAckMessage.setMessageID(subscribeMessage.getMessageID());
        String userName = NettyUtils.userName(channel);
        ArrayList arrayList = new ArrayList();
        for (SubscribeMessage.Couple couple : subscribeMessage.subscriptions()) {
            if (this.m_authorizator.canRead(couple.topicFilter, userName, sessionForClient.clientID)) {
                AbstractMessage.QOSType valueOf = AbstractMessage.QOSType.valueOf(couple.qos);
                Subscription subscription = new Subscription(clientID, couple.topicFilter, valueOf);
                boolean subscribe = sessionForClient.subscribe(couple.topicFilter, subscription);
                subAckMessage.addType(subscribe ? valueOf : AbstractMessage.QOSType.FAILURE);
                if (subscribe) {
                    arrayList.add(subscription);
                }
            } else {
                LOG.debug("topic {} doesn't have read credentials", couple.topicFilter);
                subAckMessage.addType(AbstractMessage.QOSType.FAILURE);
            }
        }
        LOG.debug("SUBACK for packetID {}", subscribeMessage.getMessageID());
        if (LOG.isTraceEnabled()) {
            LOG.trace("subscription tree {}", this.subscriptions.dumpTree());
        }
        channel.writeAndFlush(subAckMessage);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            subscribeSingleTopic((Subscription) it.next());
        }
    }

    public void processUnsubscribe(Channel channel, UnsubscribeMessage unsubscribeMessage) {
        List<String> list = unsubscribeMessage.topicFilters();
        int intValue = unsubscribeMessage.getMessageID().intValue();
        String clientID = NettyUtils.clientID(channel);
        LOG.debug("UNSUBSCRIBE subscription on topics {} for clientID <{}>", list, clientID);
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        verifyToActivate(clientID, sessionForClient);
        for (String str : list) {
            if (!SubscriptionsStore.validate(str)) {
                channel.close();
                LOG.warn("UNSUBSCRIBE found an invalid topic filter <{}> for clientID <{}>", str, clientID);
                return;
            } else {
                this.subscriptions.removeSubscription(str, clientID);
                sessionForClient.unsubscribeFrom(str);
                this.m_interceptor.notifyTopicUnsubscribed(str, clientID);
            }
        }
        UnsubAckMessage unsubAckMessage = new UnsubAckMessage();
        unsubAckMessage.setMessageID(Integer.valueOf(intValue));
        LOG.info("replying with UnsubAck to MSG ID {}", Integer.valueOf(intValue));
        channel.writeAndFlush(unsubAckMessage);
    }

    void route2Subscribers(IMessagesStore.StoredMessage storedMessage) {
        AbstractMessage.QOSType qOSType;
        String topic = storedMessage.getTopic();
        AbstractMessage.QOSType qos = storedMessage.getQos();
        ByteBuffer message = storedMessage.getMessage();
        LOG.debug("route2Subscribers republishing to existing subscribers that matches the topic {}", topic);
        if (LOG.isTraceEnabled()) {
            LOG.trace("content <{}>", DebugUtils.payload2Str(message));
            LOG.trace("subscription tree {}", this.subscriptions.dumpTree());
        }
        String str = null;
        if (qos == AbstractMessage.QOSType.EXACTLY_ONCE || qos == AbstractMessage.QOSType.LEAST_ONE) {
            str = this.m_messagesStore.storePublishForFuture(storedMessage);
        }
        String str2 = str;
        Iterator<Subscription> it = this.subscriptions.matches(topic).iterator();
        while (true) {
            Iterator<Subscription> it2 = it;
            if (!it2.hasNext()) {
                return;
            }
            Subscription next = it2.next();
            AbstractMessage.QOSType qOSType2 = qos;
            if (qOSType2.byteValue() > next.getRequestedQos().byteValue()) {
                qOSType2 = next.getRequestedQos();
            }
            AbstractMessage.QOSType qOSType3 = qOSType2;
            ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(next.getClientId());
            verifyToActivate(next.getClientId(), sessionForClient);
            LOG.debug("Broker republishing to client <{}> topic <{}> qos <{}>, active {}", next.getClientId(), next.getTopicFilter(), qOSType3, Boolean.valueOf(sessionForClient.isActive()));
            ByteBuffer duplicate = message.duplicate();
            if (qOSType3 == AbstractMessage.QOSType.MOST_ONE && sessionForClient.isActive()) {
                qOSType = qos;
                directSend(sessionForClient, topic, qOSType3, duplicate, false, null);
            } else {
                qOSType = qos;
                if (!sessionForClient.isCleanSession() && !sessionForClient.isActive()) {
                    sessionForClient.enqueueToDeliver(str2);
                } else if (sessionForClient.isActive()) {
                    int nextPacketId = sessionForClient.nextPacketId();
                    sessionForClient.inFlightAckWaiting(str2, nextPacketId);
                    directSend(sessionForClient, topic, qOSType3, duplicate, false, Integer.valueOf(nextPacketId));
                }
            }
            it = it2;
            qos = qOSType;
        }
    }
}
