Sample client snippet for message subscriber
This example code snippet is for the Paho client. For more information, see https://eclipse.org/paho/clients/java/.
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SampleMQTTCBA implements MqttCallback {
public static final int KEEP_ALIVE_INTERVAL = 30;
public static void main(String[] args) {
String topic = "45a10ad4-ada7-4698-bc2b-d8ce1e1eb5c6/device/profile_inventory";
try {
MqttClient client = new MqttClient("ssl://<hostname>:8883",
"clientId",
new MemoryPersistence());
MqttConnectOptions options = getMqttConnectOptions();
client.setCallback(new SampleMQTTCBA());
client.connect(options);
client.subscribe(topic, 0);
if (client.isConnected()) {
System.out.println("Client connected");
}
client.unsubscribe(topic);
client.disconnect();
client.close();
} catch (Exception exception) {
exception.printStackTrace();
}
}
private static MqttConnectOptions getMqttConnectOptions() throws Exception {
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
options.setCleanSession(false);
String caFile = "<ca-cert.pem file path>";
String p12File = "<End user's P12 file path>";
String p12Password = "<P12 password>";
final SSLContext sslContext = getSslContext(caFile, p12File, p12Password);
options.setSocketFactory(sslContext.getSocketFactory());
return options;
}
private static SSLContext getSslContext(final String caCrtFile, final String p12File, final String password) {
Security.addProvider(new BouncyCastleProvider());
try {
KeyManagerFactory kmf = getKeyManagerFactory(p12File, password);
TrustManagerFactory tmf = getTrustManagerFactory(caCrtFile);
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context;
} catch (Exception e) {
e.printStackTrace();
System.err.println("Could not create SSL context");
throw new IllegalArgumentException();
}
}
private static KeyManagerFactory getKeyManagerFactory(String p12File, String password) throws Exception {
KeyStore clientStore = KeyStore.getInstance("PKCS12");
clientStore.load(new FileInputStream(p12File), password.toCharArray());
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(clientStore, password.toCharArray());
return kmf;
}
private static TrustManagerFactory getTrustManagerFactory(String caCrtFile) throws Exception {
// load CA certificate
X509Certificate caCert = null;
FileInputStream fis = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(fis);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate)cf.generateCertificate(bis);
}
// CA certificate is used to authenticate server
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(caKs);
return tmf;
}
public void connectionLost(Throwable cause) {
System.out.println("Demo.connectionLost()");
cause.printStackTrace();
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Demo.messageArrived()" + topic + " " + message);
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("message id" + token.getMessageId());
}
}