Let's build a simple MQTT echo on AWS IoT Core

An end-to-end IoT solution

Author's image
Tamás Sallai
7 mins

Building on AWS IoT Core

In the course of this article we'll build a stack on AWS IoT Core that implements a simple echo functionality. Whenever a device sends a message via MQTT the backend replies to it with another message.

By the end of the article, we'll have a client application that sends messages to AWS IoT Core via MQTT:

$ ./run.sh
Connected, send some messages by typing in the terminal (press enter to send)
Hello world!
[Message received]: {
  "reported": {
    "value": "Hello world!"
  }
}

Then a few seconds later, the backend publishes the echo (notice the desired.value):

[Message received]: {
  "desired": {
    "value": "Echo: Hello world!"
  },
  "reported": {
    "value": "Hello world!"
  }
}

The functionality here is simple, but we'll cover a lot of concepts while we're building it:

  • IoT certificates and policies
  • The device shadows service
  • Topic rules that call a Lambda function
  • MQTT.js and how to connect, publish, and subscribe to topics with it

Let's start!

Certificate and thing

IoT security is based on certificates. The device has a private key, often concealed in a secure element, and its certificate is added to AWS IoT. This way when the device connects AWS can trust it and allow the connection. There are no passwords or tokens involved, which makes the process more secure. Especially with tamper-resistant hardware and locked-down policies (which we'll talk about in a minute) this setup is the gold standard for IoT security.

Certificate added to IoT Core

The certificate is connected to a thing which is AWS's concept of a connected device. Naming it this way makes it very hard to Google for it.

Thing added to IoT Core

Then the certificate has policies that are similar to IAM policies but for MQTT. With a policy you can define conditions for the MQTT connection and which topics are accessible for subscribing and publishing for the device.

Policy added to IoT Core

This forms the full MQTT security circle: the device has a private key that AWS trusts and allows the actions defined in the policy.

Device shadow

MQTT is like a blank canvas: topics and the message format are not defined nor enforced, it's up to the application. This is good in many cases but AWS offers an opt-in structured way of handling device state. This is the shadows service.

Each thing has a dedicated space where it can publish its state and subscribe to updates called a shadow. Then each space has its MQTT topics for changing and retrieving data. Devices and backend processes can use these dedicated topcs to interact with the shadows.

MQTT topics for a shadow

Moreover, shadows define a message structure with a reported and a desired states. The former is the output of the device, while the latter is the input. The device only writes to its reported states while the cloud (or other devices) write to the desired one.

Thing shadow with a reported and a desired state

Topic rule

A topic rule is a listener for MQTT topics. When a message is published to a matching topic the rule runs and if it matches the event then it runs an action. Topic rules form the basis of handling data in the cloud coming from connected devices.

An action can be a lot of things: writing the message to a database, call an HTTP endpoint, run a Lambda function. The most versatile is to call a Lambda as that can then interact with all other services inside or outside AWS.

A topic rule for shadow messages

Since a topic rule listens for messages in MQTT topics, it can integrate with device shadows easily. A change to a shadow publishes a message in a specific topic, then a topic rule that matches that topic will process the message. In the above SQL, the $aws/things/+/shadow/name/+/update/documents topic filter gets all updates for named shadows.

Finally, the Lambda function implements the core logic of the solution: it echoes the message back to the desired state:

import { IoTDataPlaneClient, UpdateThingShadowCommand } from "@aws-sdk/client-iot-data-plane";

export const handler = async (event) => {
	const {thingName, shadowName, current} = event;
	return new IoTDataPlaneClient().send(new UpdateThingShadowCommand({
		shadowName,
		thingName,
		payload: Buffer.from(JSON.stringify({
			state: {
				desired: {
					value: "Echo: " + current.state.reported.value,
				}
			}
		}))
	}));
};

Connecting with MQTT.js

Now that the backend-side is ready, let's move on to the client! We'll use the MQTT.js library to connect to MQTT and send/receive messages.

The client needs a couple of arguments for the connection. The most important is the IoT endpoint which is the domain to connect. You can view it on the Console:

The device endpoint on the Console

Or get is using the AWS CLI:

aws iot describe-endpoint --endpoint-type iot:Data-ATS

Or with Terraform:

data "aws_iot_endpoint" "iot_endpoint" {
	endpoint_type = "iot:Data-ATS"
}

The next required part is the thing name. This is required by IoT Core as it locates the thing that is connecting based on the client ID defined for the MQTT connection.

Then it needs the device certificate and the private key. In a real-world scenario it's the other way around: the device has access to the private key and the certificate through the secure element. But since we are emulating a device here, these are inputs.

Finally, it needs the CA certificate of the AWS endpoint. This is to validate the server certificate presented by AWS. The CA is then embedded in the device code forming the trust anchor.

In our case, it is downloaded by Terraform from AWS:

data "http" "root_ca" {
	url = "https://www.amazontrust.com/repository/AmazonRootCA1.pem"
}

Arguments

To pass all these information from Terraform to the client code, we'll need a couple of outputs:

output "ca" {
	value = data.http.root_ca.response_body
}

output "iot_endpoint" {
	value = data.aws_iot_endpoint.iot_endpoint.endpoint_address
}

output "thing_name" {
	value = aws_iot_thing.thing.name
}

output "cert" {
	value = tls_self_signed_cert.cert.cert_pem
}

output "key" {
	value = tls_private_key.key.private_key_pem
	sensitive = true
}

Then pass it to the client:

CA=$(terraform output -raw ca) \
	IOT_ENDPOINT=$(terraform output -raw iot_endpoint) \
	CERT=$(terraform output -raw cert) \
	KEY=$(terraform output -raw key) \
	THING_NAME=$(terraform output -raw thing_name) \
	node index.js

And in the client code extract from the environment:

const {IOT_ENDPOINT, CA, CERT, KEY, THING_NAME} = process.env;

Connecting to the MQTT endpoint

The connection needs all these arguments:

import {connect} from "mqtt";

const opt = {
	host: IOT_ENDPOINT,
	protocol: "mqtt",
	clientId: THING_NAME,
	clean: true,
	key: KEY,
	cert: CERT,
	ca: CA,
	reconnectPeriod: 0,
};

const client  = connect(opt);

client.on("error", (e) => {
	console.log(e);
	process.exit(-1);
});

client.on("connect", () => {
	// connected
});

The error event is fired when there is a problem with the connection. If it is successful, the connect event is fired.

Publishing messages

To publish a message to a topic use the publish method of the client:

client.publish(
	`$aws/things/${THING_NAME}/shadow/name/test/update`,
	JSON.stringify({state: {reported: {value: data}}})
);

The first argument is the topic name, which is the update topic for the test shadow. Then the second argument is the payload with a state.reported, as required by the shadows service.

Sending this message triggers the topic rule that in turn publishes another message.

Subscribing to topics

Subscriptions require two things: the subscription itself and a message handler.

To subscribe to a topic, use the subscribe method:

client.subscribe(`$aws/things/${THING_NAME}/shadow/name/test/update/documents`, (err) => {
	if (err) {
		// error
	}else {
		// subscribed
	}
);

Messages are coming through a different channel. To add a listener for new ones, use the message event of the client:

client.on("message", (topic, message) => {
	console.log("[Message received]: " + JSON.stringify(JSON.parse(message.toString()).current.state, undefined, 2));
});

This separation makes it tricky to match the message to a subscription and MQTT.js does not support that.

Conclusion

IoT Core is a powerful platform to provide a backend for connected devices. It provides secure connectivity with the use of certificates and policies, an MQTT broker that handles communications, and topic rules to integrate messages with the rest of the platform.

June 13, 2023