/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ecf.provider.jms.mqtt.container;

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.jms.JMSException;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.core.util.Trace;
import org.eclipse.ecf.provider.jms.identity.JMSID;
import org.eclipse.ecf.provider.jms.mqtt.container.MQTTMessage;
import org.eclipse.ecf.provider.jms.mqtt.container.MqttChannelMessageHandler;
import org.eclipse.ecf.remoteservice.util.ObjectSerializationUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttChannel
implements MqttCallbackExtended {
    private MqttChannelMessageHandler handler;
    private MqttAsyncClient client;
    private String topic;
    private int qos;
    private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory(){

        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "Mqtt RemoteService Provider");
            t.setDaemon(true);
            return t;
        }
    });
    static final ObjectSerializationUtil osu = new ObjectSerializationUtil();

    public MqttChannel(JMSID targetID, ID localID, MqttConnectOptions options, int qos, MqttChannelMessageHandler handler) throws ECFException {
        try {
            this.handler = handler;
            this.client = new MqttAsyncClient(targetID.getBroker(), localID.getName(), null);
            this.client.setCallback((MqttCallback)this);
            if (options == null) {
                options = new MqttConnectOptions();
            }
            this.topic = targetID.getTopicOrQueueName();
            this.qos = qos;
            options.setAutomaticReconnect(true);
            this.client.connect(options).waitForCompletion((long)(options.getConnectionTimeout() * 1000));
        }
        catch (MqttException e) {
            throw new ECFException("MqttClient could not connect to broker at targetID=" + targetID.getName(), (Throwable)e);
        }
    }

    public synchronized boolean isConnected() {
        return this.client.isConnected();
    }

    public synchronized void sendMessage(Serializable object, String jmsCorrelationId) throws JMSException {
        byte[] serializedMessage;
        try {
            serializedMessage = osu.serializeToBytes((Object)object);
        }
        catch (IOException e) {
            JMSException jmse = new JMSException(e.getMessage());
            jmse.setStackTrace(e.getStackTrace());
            throw jmse;
        }
        MQTTMessage.send(this.client, this.topic, serializedMessage, jmsCorrelationId);
    }

    public synchronized void disconnect() {
        if (this.client.isConnected()) {
            this.client.setCallback(null);
            try {
                this.client.disconnect();
                this.client.close();
            }
            catch (MqttException e) {
                e.printStackTrace();
            }
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            this.executorService = null;
        }
    }

    public void connectionLost(Throwable arg0) {
        this.handler.connectionLost(arg0);
    }

    public void deliveryComplete(IMqttDeliveryToken arg0) {
    }

    public void messageArrived(String arg0, MqttMessage message) throws Exception {
        Trace.trace((String)"org.eclipse.ecf.provider.jms.mqtt", (String)("handleMessageArrived topic=" + this.topic + ", message=" + message));
        String localTopic = this.topic;
        if (localTopic == null || !localTopic.equals(this.topic)) {
            Trace.trace((String)"org.eclipse.ecf.provider.jms.mqtt", (String)("handleMessageArrived.  Our topic=" + localTopic + " message topic=" + this.topic));
            return;
        }
        if (message.isRetained()) {
            Trace.trace((String)"org.eclipse.ecf.provider.jms.mqtt", (String)("handleMessageArrived.  Message=" + message + " retained, so not processing"));
            return;
        }
        if (message.isDuplicate()) {
            Trace.trace((String)"org.eclipse.ecf.provider.jms.mqtt", (String)("handleMessageArrived.  Message=" + message + " is duplicate, so not processing"));
            return;
        }
        final MQTTMessage m = MQTTMessage.receive(message.getPayload());
        if (m == null) {
            Trace.exiting((String)"org.eclipse.ecf.provider.jms.mqtt", (String)"exiting", this.getClass(), (String)"handleMessageArrived");
            return;
        }
        this.executorService.submit(new Runnable(){

            public void run() {
                MqttChannel.this.handler.handleMqttChannelMessage(m.getData(), m.getCorrelationId());
            }
        });
    }

    public void connectComplete(boolean reconnect, String arg1) {
        if (!reconnect) {
            try {
                this.client.subscribe(this.topic, this.qos);
            }
            catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }
}

