Sample client snippet for message subscriber
This example code snippet is for the Paho client. For more information, see https://eclipse.org/paho/clients/java/.
import java.io.BufferedInputStream; import java.io.FileInputStream; import java.security.KeyStore; import java.security.Security; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class SampleMQTTCBA implements MqttCallback { public static final int KEEP_ALIVE_INTERVAL = 30; public static void main(String[] args) { String topic = "45a10ad4-ada7-4698-bc2b-d8ce1e1eb5c6/device/profile_inventory"; try { MqttClient client = new MqttClient("ssl://<hostname>:8883", "clientId", new MemoryPersistence()); MqttConnectOptions options = getMqttConnectOptions(); client.setCallback(new SampleMQTTCBA()); client.connect(options); client.subscribe(topic, 0); if (client.isConnected()) { System.out.println("Client connected"); } client.unsubscribe(topic); client.disconnect(); client.close(); } catch (Exception exception) { exception.printStackTrace(); } } private static MqttConnectOptions getMqttConnectOptions() throws Exception { MqttConnectOptions options = new MqttConnectOptions(); options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL); options.setCleanSession(false); String caFile = "<ca-cert.pem file path>"; String p12File = "<End user's P12 file path>"; String p12Password = "<P12 password>"; final SSLContext sslContext = getSslContext(caFile, p12File, p12Password); options.setSocketFactory(sslContext.getSocketFactory()); return options; } private static SSLContext getSslContext(final String caCrtFile, final String p12File, final String password) { Security.addProvider(new BouncyCastleProvider()); try { KeyManagerFactory kmf = getKeyManagerFactory(p12File, password); TrustManagerFactory tmf = getTrustManagerFactory(caCrtFile); SSLContext context = SSLContext.getInstance("TLSv1.2"); context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); return context; } catch (Exception e) { e.printStackTrace(); System.err.println("Could not create SSL context"); throw new IllegalArgumentException(); } } private static KeyManagerFactory getKeyManagerFactory(String p12File, String password) throws Exception { KeyStore clientStore = KeyStore.getInstance("PKCS12"); clientStore.load(new FileInputStream(p12File), password.toCharArray()); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(clientStore, password.toCharArray()); return kmf; } private static TrustManagerFactory getTrustManagerFactory(String caCrtFile) throws Exception { // load CA certificate X509Certificate caCert = null; FileInputStream fis = new FileInputStream(caCrtFile); BufferedInputStream bis = new BufferedInputStream(fis); CertificateFactory cf = CertificateFactory.getInstance("X.509"); while (bis.available() > 0) { caCert = (X509Certificate)cf.generateCertificate(bis); } // CA certificate is used to authenticate server KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType()); caKs.load(null, null); caKs.setCertificateEntry("ca-certificate", caCert); TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509"); tmf.init(caKs); return tmf; } public void connectionLost(Throwable cause) { System.out.println("Demo.connectionLost()"); cause.printStackTrace(); } public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Demo.messageArrived()" + topic + " " + message); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("message id" + token.getMessageId()); } }