Notification Service


Welcome to the documentation about the KNMI Data Platform Notification Service. This page is meant to get you started with the service. The Notification Service allows you to receive timely updates about events of the KNMI Data Platform. For example, when a new file is created in a dataset.

More information on:

What is the Notification Service

The goal of the Notification Service is to allow users to receive timely updates about events of the KNMI Data Platform. For example, when a new file is created in a dataset. The Notification Service uses MQTT as underlying technology.

Available event topics

The topic structure is as follows:

dataplatform/file/v1/{dataset_name}/{dataset_version}/{created|updated}

To subscribe to notifications for new files of version 2 of the Actuele10mindataKNMIstations dataset, subscribe to the following topic:

dataplatform/file/v1/Actuele10mindataKNMIstations/2/created

It is possible to use wildcards in subscribing to topics. To get notifications for both new and updated files subscribe to:

dataplatform/file/v1/Actuele10mindataKNMIstations/2/#

The dataset name and version can be found in the metadata of the dataset in the KDP Data Catalog. The event topics and messages are described in more detail in the Technical Documentation.

Obtaining access In order to connect to our Notification Service you need a password. This password is an API key that you can request in the API Catalog. To request an API key, you will need to register for a Developer account. You can do this by clicking the “Register” button in the top right corner of the page. Once you have registered, you can request an API key by clicking on the appropriate “Request API Key” button on the API Catalog page. You will receive an email with your API key.

The authentication method to connect to the Notification Service is MQTT username/password. The username is not being used, the password is the API key you received.

How to use the notification service

The Notification Service uses MQTT as underlying technology. To connect you need to know:

Hostnamemqtt.dataplatform.knmi.nl
Port443
TransportWebsockets
MQTT version5.0 (Recommended)
3.1.1 also supported
UsernameNot being used
PasswordSee Obtaining access
Client IDSelf-generated


The MQTT client library paho is available for all popular languages. An example in Python is given below.

Some things to keep in mind when using the notification service:

Client ID: The client identifier is a string that identifies each MQTT client that connects to an MQTT server. Each client that connects to the MQTT server must have a unique client id as the server uses it to identify the state of the MQTT session between the client and the server. If you try to connect with a client id that already has a session present, the old one will be terminated. After a disconnect for whatever reason, by reconnecting with the same client ID, the session will resume seamlessly. This behaviour can be configured by the clean_session flag.

Clean Session: The clean session flag is used to control the behaviour of the MQTT session between the client and the server. If the clean session flag is set to true (1), the server must discard any previous session and start a new session. If the clean session flag is set to false (0), the server must resume any previous session based on the client id. Meaning the server stores all your subscriptions and all the messages that arrive with specific quality of service levels that match the subscriptions that the MQTT client had at the time the disconnection occurred. The messages are stored for a maximum of 1 hour.

Quality of Service: The QoS that you specify when you subscribe to a topic determines the level of assurance you require for receiving messages published to that topic. The supported QoS levels are:

  • QoS 0: At most once delivery. You may miss a message.
  • QoS 1: At least once delivery. You may receive duplicates of a message.

See this external blog post for more information about the use of QoS in MQTT.

Example script in Python

You need to install paho-mqtt version 2.0+ for this example to work.

import logging
import ssl

import paho.mqtt.client as mqtt_client
import paho.mqtt.properties as properties

BROKER_DOMAIN = "mqtt.dataplatform.knmi.nl"
# Client ID should be made static, it is used to identify your session, so that
# missed events can be replayed after a disconnect
# https://www.uuidgenerator.net/version4
CLIENT_ID = "<YOUR CLIENT UUID4>"
# Obtain your token at: https://developer.dataplatform.knmi.nl/notification-service
TOKEN = "<YOUR NOTIFICATION SERVICE TOKEN>"
# This will listen to both file creation and update events of this dataset:
# https://dataplatform.knmi.nl/dataset/radar-echotopheight-5min-1-0
# This topic should have one event every 5 minutes
TOPIC = "dataplatform/file/v1/radar_echotopheight_5min/1.0/#"
# Version 3.1.1 also supported
PROTOCOL = mqtt_client.MQTTv5

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel("INFO")


def connect_mqtt() -> mqtt_client:
    def on_connect(c: mqtt_client, userdata, flags, reason_code, props=None):
        logger.info(f"Connected using client ID: {str(c._client_id)}")
        logger.info(f"Session present: {str(flags.session_present)}")
        logger.info(f"Connection result: {str(reason_code)}")
        # Subscribe here so it is automatically done after disconnect
        subscribe(c, TOPIC)

    client = mqtt_client.Client(
        mqtt_client.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID, protocol=PROTOCOL, transport="websockets"
    )
    client.tls_set(tls_version=ssl.PROTOCOL_TLS)
    connect_properties = properties.Properties(properties.PacketTypes.CONNECT)
    # Maximum is 3600
    connect_properties.SessionExpiryInterval = 3600

    # The MQTT username is not used for authentication, only the token
    username = "token"
    client.username_pw_set(username, TOKEN)
    client.on_connect = on_connect

    client.connect(host=BROKER_DOMAIN, port=443, keepalive=60, clean_start=False, properties=connect_properties)

    return client


def subscribe(client: mqtt_client, topic: str):
    def on_message(c: mqtt_client, userdata, message):
        # NOTE: Do NOT do slow processing in this function, as this will interfere with PUBACK messages for QoS=1.
        # A couple of seconds seems fine, a minute is definitely too long.
        logger.info(f"Received message on topic {message.topic}: {str(message.payload)}")

    def on_subscribe(c: mqtt_client, userdata, mid, reason_codes, properties):
        logger.info(f"Subscribed to topic '{topic}'")

    client.on_subscribe = on_subscribe
    client.on_message = on_message
    # A qos=1 will replay missed events when reconnecting with the same client ID. Use qos=0 to disable
    client.subscribe(topic, qos=1)


def run():
    client = connect_mqtt()
    client.enable_logger(logger=logger)
    client.loop_forever()


if __name__ == "__main__":
    run()

Navigation