Sample client snippet for message subscriber

This example code snippet is for the Paho client. For more information, see https://eclipse.org/paho/clients/java/.

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
 
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
 
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
public class SampleMQTTCBA implements MqttCallback {
    public static final int KEEP_ALIVE_INTERVAL = 30;
 
    public static void main(String[] args) {
        String topic = "45a10ad4-ada7-4698-bc2b-d8ce1e1eb5c6/device/profile_inventory";
        try {
            MqttClient client = new MqttClient("ssl://<hostname>:8883",
                    "clientId",
                    new MemoryPersistence());
 
            MqttConnectOptions options = getMqttConnectOptions();
            client.setCallback(new SampleMQTTCBA());
 
            client.connect(options);
            client.subscribe(topic, 0);
 
            if (client.isConnected()) {
                System.out.println("Client connected");
            }
            client.unsubscribe(topic);
            client.disconnect();
            client.close();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }
 
    private static MqttConnectOptions getMqttConnectOptions() throws Exception {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
        options.setCleanSession(false);
 
        String caFile = "<ca-cert.pem file path>";
        String p12File = "<End user's P12 file path>";
        String p12Password = "<P12 password>";
 
        final SSLContext sslContext = getSslContext(caFile, p12File, p12Password);
        options.setSocketFactory(sslContext.getSocketFactory());
        return options;
    }
 
 
    private static SSLContext getSslContext(final String caCrtFile, final String p12File, final String password) {
        Security.addProvider(new BouncyCastleProvider());
        try {
            KeyManagerFactory kmf = getKeyManagerFactory(p12File, password);
            TrustManagerFactory tmf = getTrustManagerFactory(caCrtFile);
 
            SSLContext context = SSLContext.getInstance("TLSv1.2");
            context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
            return context;
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("Could not create SSL context");
            throw new IllegalArgumentException();
        }
    }
 
    private static KeyManagerFactory getKeyManagerFactory(String p12File, String password) throws Exception {
        KeyStore clientStore = KeyStore.getInstance("PKCS12");
        clientStore.load(new FileInputStream(p12File), password.toCharArray());
 
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(clientStore, password.toCharArray());
        return kmf;
    }
 
    private static TrustManagerFactory getTrustManagerFactory(String caCrtFile) throws Exception {
        // load CA certificate
        X509Certificate caCert = null;
 
        FileInputStream fis = new FileInputStream(caCrtFile);
        BufferedInputStream bis = new BufferedInputStream(fis);
        CertificateFactory cf = CertificateFactory.getInstance("X.509");
 
        while (bis.available() > 0) {
            caCert = (X509Certificate)cf.generateCertificate(bis);
        }
 
        // CA certificate is used to authenticate server
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("ca-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init(caKs);
        return tmf;
    }
 
    public void connectionLost(Throwable cause) {
        System.out.println("Demo.connectionLost()");
        cause.printStackTrace();
    }
 
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Demo.messageArrived()" + topic + "   " + message);
    }
 
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("message id" + token.getMessageId());
    }
}