Overview
In the first part of this series, I explained how to get started with Cumulocity MQTT Service to connect devices. I’m summarizing the guide with a reference to tools like Dynamic Mapper and the possibility of implementing a custom microservice for integration to Cumulocity.
In this part, I will focus exactly on this task - building a custom microservice to:
Consume data from MQTT Service via Apache Pulsar
Transform & map the data to the Cumulocity Domain Model
Send the data to Cumulocity
For this article, an example microservice has been implemented, which can be found here:
(Not) Re-inventing the Wheel
Before we jump into the details of how to implement a custom microservice, I want to highlight once again that there are (open-source) tools available that handle the job of transformation & mapping quite well. These tools already leverage the same concepts I will explain shortly.
The most feature-rich open-source tool is currently Dynamic Mapper.
It not only connects to MQTT Service (via Apache Pulsar), but is also very powerful in terms of mapping logic. You can use either graphical mappings or JavaScript/Java code-mappings for multiple payload formats both with full AI Agent support, so you can key in your requirement in natural language and the AI Agents will generate the mappings for you. It performs the job in a highly optimized, scalable way and has all the design principles discussed in this article already implemented, so you can focus purely on your mapping logic.
Additionally, if you prefer using a tool which is officially part of the product, there is currently a product in development that addresses somes of the implemented requirements of Dynamic Mapper, which is currently called “Inbound Data Preparation” (IDP) as a working title and will be available as a private preview in Q1 2026.
So, what you should take away before continue reading: You don’t have to implement a custom microservice for integrating data from MQTT Service to Cumulocity but in some use cases it is definitely valid to do so.
Understanding the Architecture
Let’s get started by understanding the architecture of MQTT Service.
If you haven’t already, please check the available documentation for connecting microservices & applications to MQTT Service.
To summarize:
Devices are connected via the MQTT protocol
Due to device isolation, applications cannot subscribe to messages from other devices via MQTT
Each device message is forwarded to the Pulsar messaging service
All MQTT topics & messages are merged into two Pulsar topics only
Applications must use a Pulsar client to consume/publish messages from/to MQTT Service
At the Pulsar messaging service level, only two topics are relevant:
- from-device - contains all messages coming from devices
- to-device - contains all messages that should be sent to devices
To implement a custom microservice, we need the following:
A Pulsar client
A Cumulocity client
Both wrapped in a microservice that can be deployed to Cumulocity
All of this is language-agnostic, so we can potentially use any programming language of choice.
For this article, I decided to use the Cumulocity Microservice Java SDK, which also includes a Cumulocity Client and the official Apache Pulsar Java client.
There is also another example implemented in Python, which can be found here:
Design Principles
Having understood the common architecture, the following design principles can be defined:
1. Scaling
Keeping in mind that we only have one topic to consume from and we potentially receive ALL messages from ALL devices there, scaling is a critical success criterion for our microservice.
Scaling can mean that our microservice uses available resources efficiently (vertical scaling) or that we spawn additional instances of our microservice (horizontal scaling). For horizontal scaling, we need to ensure that messages are distributed round-robin and not to every instance. This is achieved by using a (key) shared subscription in Pulsar.
2. Don’t Block!
When receiving messages, we first need to filter them, directly acknowledging messages that we don’t need and asynchronously processing those that are relevant to our application. Whether a message is relevant can be defined on multiple levels: the MQTT topic, the payload, or even attributes like the clientId.
Most importantly, we must never block. We receive a message in a new thread and filter and further process it in another thread. When dealing with millions of concurrent messages, this could require millions of available threads.
Modern concepts like Virtual Threads seem to be a perfect match for our requirements, as we can spawn millions of them simultaneously (when not limited by available memory).
3. Acking, You Must!
I described it already in principle 2, but I want to mention it again: ALL messages must be acknowledged (acked), even those we’re not interested in. When filtering out messages, we directly acknowledge them. Since we have a durable subscription, failing to acknowledge will result in receiving the messages over and over again when re-establishing a connection.
For messages that are processed, we only acknowledge them when processing is finished—ideally, when the request to Cumulocity is successful. This may seem to contradict principle 2 (“Don’t block”), BUT blocking a virtual thread doesn’t hurt as much as blocking a platform thread.
4. Clean Up Your Consumers!
Since we have a durable subscription, it will persist even when a microservice is unavailable or removed from a tenant. When a consumer/microservice cannot consume messages, messages will queue in the backlog until the backlog reaches a limit, and then bad things happen.
When the backlog limit of a subscription is reached, producers (in our case, devices) can no longer publish new messages. They will receive an error from MQTT Service/Apache Pulsar until the backlog is cleaned up.
There is a TTL concept available, but in some heavy-load situations, the TTL might not be sufficient to clean up the backlog.
In short: A single unavailable consumer can prevent devices from publishing messages.
As a logical design principle, we must clean up our consumers when a microservice is shut down. Specifically, we need to call unsubscribe() on our consumer when a microservice is unsubscribed from a tenant.
5. Use Caches Where Possible!
When receiving a message, the worst thing you can do for throughput is making multiple synchronous calls to fetch additional data you need for processing.
A very common bad practice involves the External ID in Cumulocity. When creating any kind of data in Cumulocity, you might want to check first if the device already exists. To do this, you need to make a call to the Identity API. Just imagine you receive thousands of concurrent messages and the first thing you do is make an API call to the Identity API for each one… Yes, this will break the entire solution as the API call is very expensive.
Here is the solution: Use caching where possible! Keep frequently requested but infrequently changed data in memory. Device identifiers are an excellent example. Compared to measurements, events, and alarms, they are not changed very frequently—only when a device needs replacement, a new device is onboarded, or a device is removed.
Setup the Project
Since we’re using Java/Spring Boot, we can leverage Maven for dependency management and reuse plugins to build & package the microservice.
The main dependencies we need are as follows:
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<pulsar.version>4.0.6</pulsar.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.class>c8y.example.mqttservice.IntegrationMicroservice</main.class>
</properties>
...
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-bom</artifactId>
<version>${pulsar.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.nsn.cumulocity.clients-java</groupId>
<artifactId>microservice-dependencies</artifactId>
<version>${c8y.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
...
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</dependency>
<dependency>
<groupId>com.nsn.cumulocity.clients-java</groupId>
<artifactId>microservice-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>com.nsn.cumulocity.model</groupId>
<artifactId>device-capability-model</artifactId>
</dependency>
</dependencies>
Since we want to leverage virtual threads, we need at least Java 21. As dependencies, we primarily need Apache Pulsar and the Cumulocity SDK.
Check out the full pom.xml here.
PulsarClientService
The main component of the microservice will be the PulsarClientService. It will create a Pulsar Consumer, register a callback, and process incoming messages. Let’s start from the beginning.
The first thing we need to manage is what should happen when a new tenant subscribes to our microservice. For this, we use @EventListener and the MicroserviceSubscriptionAddedEvent.
EventListener - What Happens When a Tenant Subscribes/Unsubscribes
Since we have a multi-tenant microservice, this EventListener is called for each tenant, meaning we need a consumer for each tenant as well. For robustness, we add a RetryTemplate in case the connection to Pulsar fails for any reason.
/* Will be executed each time a tenant subscribes and on microservice start */
@EventListener
public void subscribeTenant(MicroserviceSubscriptionAddedEvent event) {
String tenant = event.getCredentials().getTenant();
log.info("{} - Microservice subscribed", tenant);
try {
//Step 1: Initialize Pulsar Client per tenant
initializePulsarClientForTenant(tenant, event.getCredentials());
//Step 2: Create a consumer and subscribe to Pulsar
subscriptionRetryTemplate.execute(context -> {
if (context.getRetryCount() > 0)
log.info("{} - Retrying to subscribe to Pulsar...", tenant);
createConsumer(tenant, SUBSCRIPTION_NAME, clientMap.get(tenant), callbackMap.get(tenant));
return null;
});
} catch (Exception e) {
log.error("{} - Initialization error: {}", tenant, e.getMessage(), e);
}
}
At the same time, we need another EventListener for MicroserviceSubscriptionRemovedEvent, which ensures our consumer is correctly unsubscribed as mentioned in the Design Principles chapter.
@EventListener
public void removeTenant(MicroserviceSubscriptionRemovedEvent event) {
String tenant = event.getTenant();
try {
// Safely close all resources in reverse order of creation
Optional.ofNullable(consumerMap.remove(tenant))
.ifPresent(consumer -> {
try {
consumer.unsubscribe();
} catch (PulsarClientException e) {
log.warn("{} - Error unsubscribing consumer", tenant, e);
}
});
Optional.ofNullable(producerMap.remove(tenant))
.ifPresent(producer -> {
try {
producer.close();
} catch (PulsarClientException e) {
log.warn("{} - Error closing producer", tenant, e);
}
});
Optional.ofNullable(clientMap.remove(tenant))
.ifPresent(client -> {
try {
client.close();
} catch (PulsarClientException e) {
log.error("{} - Error closing Pulsar client", tenant, e);
}
});
callbackMap.remove(tenant);
log.info("{} - Tenant resources cleaned up successfully", tenant);
} catch (Exception e) {
log.error("{} - Unexpected error during tenant removal", tenant, e);
}
}
Initializing the Pulsar Client
Next, we need to initialize the Pulsar client independently for each tenant. This means using available environment variables to configure and authenticate the client.
You can get the information from the following variable:
C8Y_BASEURL_PULSAR- holds the base URL of Pulsar
Credentials are retrieved and provided by the MicroserviceSubscriptionAddedEvent:
//Retrieve service user credentials on microservice subscription
final AuthenticationBasic basicAuth = new AuthenticationBasic();
basicAuth.configure(Map.of(
"userId", "%s/%s".formatted(tenant, credentials.getUsername()),
"password", credentials.getPassword()
));
Finally, we create the client with default timeout configuration:
//Default configuration
private static final int DEFAULT_CONNECTION_TIMEOUT = 30;
private static final int DEFAULT_OPERATION_TIMEOUT = 30;
private static final int DEFAULT_KEEP_ALIVE = 30;
...
// Create a Pulsar client using the basic authentication credentials.
// The client will *not* try to connect and authenticate immediately.
final PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarUrl)
.authentication(basicAuth)
.connectionTimeout(DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS)
.operationTimeout(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS)
.keepAliveInterval(DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS)
.build();
We manage all tenant clients & callbacks in a Map so we can retrieve & manage them by tenantId key:
//This map is used to manage one client per tenant
private final Map<String, PulsarClient> clientMap = new ConcurrentHashMap<>();
//This map is used to manage one callback per tenant
private final Map<String, PulsarCallback> callbackMap = new ConcurrentHashMap<>();
...
clientMap.put(tenant, client);
PulsarCallback callback = new PulsarCallback(tenant, virtualThreadPool, this);
callbackMap.put(tenant, callback);
Defining a Callback
To receive messages, we need to provide a callback that implements MessageListener:
@Slf4j
@RequiredArgsConstructor
public class PulsarCallback implements MessageListener<byte[]> {
private final String tenant;
private final ExecutorService virtualThreadPool;
private final PulsarClientService pulsarClientService;
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
//This is in most cases "from-device" when the message originated from a device
String internalMQTTServiceTopic = msg.getTopicName();
//This is the MQTT topic used by the device and provided as a message property
String topic = msg.getProperty(PulsarClientService.PULSAR_PROPERTY_TOPIC);
//This is the clientID who originally sent the message
String client = msg.getProperty(PulsarClientService.PULSAR_PROPERTY_CLIENT_ID);
//This is the raw message as a byte array
String payload = new String(msg.getData(), StandardCharsets.UTF_8);
log.info("{} - Received message {} from MQTT device {} on MQTT topic {} with payload: {}", tenant, msg.getMessageId(), client, topic, payload);
// From here we should ideally process the message asynchronously and unblock the callback thread because we could receive many messages here
virtualThreadPool.submit(() -> {
pulsarClientService.processMessage(tenant, consumer, msg);
});
}
}
Here we retrieve relevant data from the received msg, such as topic (the MQTT topic used by the device), client (the clientID), and of course the payload.
As stated in the design principles, the callback should not block where possible, so we create a virtual thread to further process the message. The processMessage method will also include the acknowledgement.
Creating a Consumer
With the Pulsar client initialized and the callback defined for each tenant, we can create a consumer:
// Topic names
public static final String PULSAR_TO_DEVICE_TOPIC = "to-device";
public static final String PULSAR_FROM_DEVICE_TOPIC = "from-device";
public static final String PULSAR_NAMESPACE = "mqtt";
public static final String TOPIC_FORMAT = "persistent://%s/%s/%s";
//FIXME: Change this to a unique subscription name
private static final String SUBSCRIPTION_NAME = "MQTT_SERVICE_PULSAR_EXAMPLE_SUBSCRIPTION";
...
public Consumer<byte[]> createConsumer(String tenant, String subscriptionName, PulsarClient client, PulsarCallback callback) throws PulsarClientException {
log.info("{} - Creating and subscribing consumer to Pulsar ...", tenant);
String fromDevice = String.format(TOPIC_FORMAT, tenant, PULSAR_NAMESPACE, PULSAR_FROM_DEVICE_TOPIC);
final Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(fromDevice)
.subscriptionName(subscriptionName)
.messageListener(callback)
//worth adding so in case of update we won't be blocked by Exclusive consumer exception when new instance will start and the old one is still running
.subscriptionType(SubscriptionType.Failover)
.autoUpdatePartitions(false)
.subscribe();
log.info("{} - Subscription to Pulsar successful!", tenant);
consumerMap.put(tenant, consumer);
return consumer;
}
Please note the subscriptionType. In my example, I used Failover; if you want to scale horizontally, you should use Shared.
The subscriptionName should be a unique string for your application. Ideally, it should not change throughout the application lifecycle, so defining it as a constant as part of your application is a good idea.
Caution: Don’t add any UID or randomly generated numbers in the
subscriptionName; otherwise, you will end up with tons of subscriptions that you’ll need to clean up.
Only when subscribe() is called on a consumer is a connection to Pulsar established.
Processing Messages
With the client, consumer, and callback defined and a connection to Pulsar established, let’s look at how incoming messages are processed.
In our callback, we simply call processMessage in a virtual thread.
Filtering
The first step is to filter the message. In my example below, I filter at the topic level to check if it matches device/sim/message, which is a topic I used in my previous article to simulate a device sending messages:
public void processMessage(String tenant, Consumer<byte[]> consumer, Message<byte[]> msg) {
/* Step 1: Filter the message */
String topic = msg.getProperty(PulsarClientService.PULSAR_PROPERTY_TOPIC);
String client = msg.getProperty(PulsarClientService.PULSAR_PROPERTY_CLIENT_ID);
try {
//For topics other than "device/sim/message" we just acknowledge the message without processing, as we are only interested in messages from devices
if (!"device/sim/message".equals(topic)) {
log.info("{} - Message {} will be ignored for processing", tenant, msg.getMessageId());
consumer.acknowledge(msg);
return;
}
log.debug("{} - Message {} is flagged as to be processed", tenant, msg.getMessageId());
//Step 2: Transform message(s) to target format
//Step 3: Send message to target API(s)
try {
transformAndSendMessage(tenant, msg, client);
//Step 4: Acknowledge message after successful processing
log.info("{} - Processing of message {} successful!", tenant, msg.getMessageId());
consumer.acknowledge(msg);
} catch (SDKException e) {
log.error("{} - Error transforming and sending message", tenant, e);
//For temporary errors like 5xx we should negative ack for a potential retry
if (e.getHttpStatus() >= 500) {
consumer.negativeAcknowledge(msg);
} else {
consumer.acknowledge(msg);
}
} catch (Exception e) {
//For every other exception we ACK the message
log.error("{} - Generic error transforming and sending message", tenant, e);
consumer.acknowledge(msg);
}
} catch (PulsarClientException e) {
log.error("{} - Error acking message", tenant, e);
}
}
The transformation and sending are combined for simplicity in transformAndSendMessage.
If a message is not relevant, we directly acknowledge it.
In error cases, we separate between >=500 status errors (where we assume a temporary issue with the platform and negative acknowledge for retry) and all other errors (where we assume the payload is incorrect, so we still acknowledge the message).
Transform & Send
Transforming a message means receiving a specific payload that must be mapped to a payload that the Cumulocity API understands.
In this example, we receive a message like this:
{
"temperature": {
"value": 19,
"unit": "°C"
},
"time": "2026-01-13T12:00:00.000Z",
"deviceId": "dev4711"
}
A measurement in Cumulocity is fortunately very similar, so the mapping logic is straightforward:
public void transformAndSendMessage(String tenant, Message<byte[]> msg, String clientId) throws SDKException {
if (msg.getData() == null || msg.getData().length == 0) {
log.error("{} - Measurement validation failed, no data provided!", tenant);
return;
}
try {
JsonObject jsonObject = JsonParser.parseString(new String(msg.getData(), StandardCharsets.UTF_8)).getAsJsonObject();
String type = "c8y_TemperatureMeasurement";
String name = "c8y_TemperatureMeasurement";
String extIdType = "c8y_Serial";
// Extract temperature data with null-safe defaults
String unit = null;
BigDecimal value = null;
if (jsonObject.has("temperature")) {
JsonObject temperatureObject = jsonObject.getAsJsonObject("temperature");
if (temperatureObject.has("unit")) {
unit = temperatureObject.get("unit").getAsString();
}
if (temperatureObject.has("value")) {
value = temperatureObject.get("value").getAsBigDecimal();
}
}
// Validation - check early for invalid data
if (value == null) {
log.error("{} - Measurement validation failed, no value provided!", tenant);
return;
}
// Extract time or use current time
DateTime time = jsonObject.has("time") ?
DateTime.parse(jsonObject.get("time").getAsString()) :
DateTime.now();
// Extract device ID or use client ID
String deviceId = jsonObject.has("deviceId") ?
jsonObject.get("deviceId").getAsString() :
clientId;
//This map is needed for producers using device isolation and sending message to dedicated clients/devices only
deviceClientIdMap.put(deviceId, clientId);
// Use effectively final variables for lambda
final String finalUnit = unit;
final BigDecimal finalValue = value;
final DateTime finalTime = time;
subscriptionsService.runForTenant(tenant, () -> {
ExternalIDRepresentation extId = c8YClient.retrieveExternalId(tenant, extIdType, deviceId);
ManagedObjectRepresentation mor;
if (extId == null) {
log.info("{} - Device with id {} does not exist, creating it", tenant, deviceId);
mor = c8YClient.createDevice(tenant, "MQTT Service Example Device " + deviceId, deviceId, type, extIdType);
} else {
log.info("{} - Device with id {} already exists", tenant, deviceId);
mor = extId.getManagedObject();
}
MeasurementRepresentation measurement = c8YClient.createSimpleMeasurement(tenant, mor, name, type, finalTime, finalValue, finalUnit);
});
} catch (Exception e) {
log.error("{} - Error parsing or processing message", tenant, e);
throw e;
}
}
Note that in Java, we need to use the MicroserviceSubscriptionService to ensure we are correctly authenticated in each tenant before making a call to any Cumulocity API.
C8YClient
The Cumulocity Microservice SDK also comes with many APIs that are usable out of the box. Creating a device, measurement, alarm, or event is straightforward.
One best practice I want to highlight is the caching of External IDs according to Design Principle 5.
private static final int IDENTITY_CACHE_SIZE = 1000;
private final Map<ID, ExternalIDRepresentation> externalIdCache = Collections.synchronizedMap(new LinkedHashMap<ID, ExternalIDRepresentation>() {
//Removing oldest entries
@Override
protected boolean removeEldestEntry(Map.Entry<ID, ExternalIDRepresentation> eldest) {
return size() > IDENTITY_CACHE_SIZE;
}
});
//FIXME: Proper handling of updating cache when External IDs are updated/deleted in Cumulocity - currently not handled, can lead to stale data in cache
public ExternalIDRepresentation retrieveExternalId(String tenant, String type, String externalId) {
try {
ID id = new ID();
id.setType(type);
id.setValue(externalId);
//Step 1: Check cache to retrieve External ID
if(externalIdCache.get(id) != null) {
log.info("{} - External ID found in cache of type {} and value {}", tenant, type, externalId);
return externalIdCache.get(id);
} else {
//Step 2: Cache miss, call API
log.info("{} - External ID not found in cache of type {} and value {}, calling API", tenant, type, externalId);
ExternalIDRepresentation extId = identityApi.getExternalId(id);
externalIdCache.put(id, extId);
return extId;
}
} catch (SDKException e) {
log.info("{} - External ID could not be found of type {} and value {}", tenant, type, externalId);
}
return null;
}
The code above does exactly that. It defines a cache limited to 1000 entries. When a new message is received, it will first check if the identifier is part of the cache; if not, it will be retrieved ONCE from the API and added to the cache to ensure that all following requests will hit the cache for the same identifier.
What is missing for simplicity reasons is proper handling of updated/deleted devices in the cache. This can be achieved by invalidating the cache when it is detected that a source no longer exists and starting the process from the beginning: Check if the identifier is known; if not, create a new device for that identifier and add it to the cache.
Manifest
As one of the final steps, we need to add the manifest cumulocity.json to the project. The roles ROLE_MQTT_SERVICE_MESSAGING_TOPICS_READ and ROLE_MQTT_SERVICE_MESSAGING_TOPICS_UPDATE are especially important, as they are used to verify that the service user has permission to connect to Pulsar.
{
"apiVersion":"2",
"version":"@project.version@",
"provider": {
"name":"Cumulocity GmbH"
},
"isolation":"MULTI_TENANT",
"requiredRoles": [
"ROLE_MQTT_SERVICE_MESSAGING_TOPICS_READ",
"ROLE_MQTT_SERVICE_MESSAGING_TOPICS_UPDATE",
"ROLE_INVENTORY_CREATE",
"ROLE_IDENTITY_READ",
"ROLE_IDENTITY_ADMIN",
"ROLE_MEASUREMENT_ADMIN"
],
"roles":[
],
"livenessProbe":{
"httpGet":{
"path": "/health",
"port": 80
},
"initialDelaySeconds": 360,
"periodSeconds": 10
},
"readinessProbe":{
"httpGet":{
"path": "/health",
"port": 80
},
"initialDelaySeconds": 30,
"periodSeconds": 10
}
}
Running the Microservice
After building the microservice via mvn package, you can deploy the generated ZIP file to a test tenant.
After deployment & subscription of the first tenant, you should see something like this in the microservice log:
2026-02-13T15:13:03.595Z INFO 17 --- [subscriptions-0] c.e.m.service.PulsarClientService : t14368213 - Microservice subscribed
2026-02-13T15:13:04.225Z INFO 17 --- [subscriptions-0] c.e.m.service.PulsarClientService : t14368213 - Creating and subscribing consumer to Pulsar ...
2026-02-13T15:13:05.064Z INFO 17 --- [subscriptions-0] c.e.m.service.PulsarClientService : t14368213 - Subscription to Pulsar successful!
Sending Device Messages
Next, we can send some messages to test if the processing works.
You can either use the scripts from Part 1 of this series, or you can use any MQTT tool/client of your choice and publish the following to MQTT Service:
Topic: device/sim/message
Payload:
{
"temperature": {
"value": 19,
"unit": "°C"
},
"deviceId": "dev4711"
}
In the log, you should see something like this:
2026-02-13T15:13:32.862Z INFO 17 --- [pulsar-external-listener-4-1] c.e.mqttservice.callback.PulsarCallback : t14368213 - Received message 1830392:0:0 from MQTT device mqttx_bde9fa2d on MQTT topic device/sim/message with payload: {
"temperature": {
"value": 19,
"unit": "°C"
},
"deviceId": "dev4711"
}
2026-02-13T15:13:32.869Z INFO 17 --- [virtThread-0] c.e.m.service.PulsarClientService : t14368213 - Message 1830392:0:0 is flagged to be processed
2026-02-13T15:13:32.874Z INFO 17 --- [virtThread-0] c.example.mqttservice.client.C8YClient : t14368213 - External ID not found in cache of type c8y_Serial and value dev4711, calling API
2026-02-13T15:13:33.193Z INFO 17 --- [virtThread-0] c.e.m.service.PulsarClientService : t14368213 - Device with id dev4711 already exists
2026-02-13T15:13:33.197Z INFO 17 --- [virtThread-0] c.example.mqttservice.client.C8YClient : t14368213 - Creating Measurement {"time":"2026-02-13T15:13:32.873Z","source":{"self":"http://t14368213.eu-latest.cumulocity.com/inventory/managedObjects/92121930503","id":"92121930503"},"type":"c8y_TemperatureMeasurement","c8y_TemperatureMeasurement":{"T":{"unit":"\u00b0C","value":19}}}
2026-02-13T15:13:33.436Z INFO 17 --- [virtThread-0] c.e.m.service.PulsarClientService : t14368213 - Processing of message 1830392:0:0 successful!
You can check your tenant to verify that the device and corresponding measurement have been created.
Summary
In this guide, I demonstrated how to properly implement a custom microservice for consuming, transforming, and sending messages to Cumulocity. Common design principles have been discussed and implemented with examples. This should serve as a reference to avoid pitfalls and bad practices.
As potential next steps, you can start writing your own microservice and adapt the logic to more comprehensive filtering & transformation requirements.
