Dataset Content API

Welcome to the KNMI Data Platform (KDP) Dataset Content API. The Dataset Content API enables dataset managers to upload new dataset files to existing datasets. 5GB is the maximium file size that can be used with the upload URL.

More information on:

Obtaining an API token

Access is granted on a case by case basis.

Step 1: Register for a Tyk Developer account. You can do this by clicking the “Register” button in the top right corner of the page.

Step 2: Request credentials for the Dataset Content API by contacting Further instructions will be sent to you.

This API makes use of HMAC signatures spec in addition to API tokens for adding another level of security.

The table below lists the rate limits and quotas for the API keys.

Access Rate Limit Quota
Dataset Content API 100 requests per second Unlimited

How to use the Dataset Content API

Creating the signature

Since this API makes use of the HMAC signature spec instead of API token, the Authorization header contains a signature like: ‘Signature keyId=,algorithm=,signature=’. When access is granted to the API you will receive a key id and a hmac secret via email. The secret is used in generating the signature and should be kept private.

The signature is based on certain headers but the minimum is the Date header (which must therefore be added) which contains a timestamp in the format: ‘Mon, 02 Jan 2006 15:04:05 MST’

The signature is calculated by creating a signature string: “date: Mon, 02 Jan 2006 15:04:05 MST” and calculating a hash with one of the supported algorithms which uses the secret and the signature string to calculate signed signature.

Supported algorithms:

  • hmac-sha256
  • hmac-sha384
  • hmac-sha512

Requesting the upload url

In order to authenticate your API calls, you need to add the signature to the Authorization header of the HTTP request and also add the Date header. To create an upload URL for a file in a dataset, construct an API call using the endpoint:{datasetName}/versions/{versionId}/files/uploadUrl

You need to supply the following query parameters to make the API call:

  • md5: MD5 hash digest of dataset file as Base64 string
  • filename: The filename of the dataset file
  • datasetFileContentType: Content Type of dataset file, e.g. NetCDF or HDF5.

The full documentation of these API endpoints can be found on the Technical Documentation (Swagger) page.

Uploading the dataset file

In order to actually upload the file to the dataset, you need to make a PUT request to the upload URL you received in the previous step. The body of the request should contain the dataset file. You must also supply the following headers:

  • Content-Type: Content Type of dataset file
  • Content-MD5: MD5 hash digest of dataset file as Base64 string

Python example: upload all dataset files in a directory

The script below uploads all files in the provided directory. To periodically upload newly available files it is recommended to create a scheduled task to run the following script. Each operating system has a different built-in scheduler: for Windows use Task Scheduler, for Linux use cron jobs and for MacOS use Scheduler.

Upload files in a directory to a dataset

#!/usr/bin/env python3
import asyncio
import base64
import hashlib
import hmac
import logging
import os
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import Tuple

import requests

logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))

# The type of data that is uploaded, e.g. NetCDF or HDF5
# The folder that contains the files to be uploaded

def upload_file_to_dataset(
    base_url: str,
    api_key: str,
    api_secret: str,
    dataset_name: str,
    dataset_version: str,
    filename: str,
    directory: str,
) -> Tuple[bool, str]:
    dataset_file_content = Path(f"{directory}/{filename}").read_bytes()
    md5_hash_bytes = hashlib.md5(dataset_file_content).digest()
    md5_hash_b64 = base64.b64encode(md5_hash_bytes).decode("utf-8")

    params = {
        "filename": filename,
        "datasetFileContentType": CONTENT_TYPE,
        "md5": md5_hash_b64,
    endpoint = f"{base_url}/{dataset_name}/versions/{dataset_version}/files/uploadUrl"
    headers = generate_signature_headers(api_key, api_secret.encode("utf-8"))

    upload_url_response = requests.get(endpoint, headers=headers, params=params)

    # retrieve upload URL for dataset file
    if upload_url_response.status_code != 200:
        logger.warning(f"Unable to get upload url for :{filename}")
        return False, filename

    upload_url = upload_url_response.json()["temporaryUploadUrl"]

    # max file size supported by Python requests library 2.14 gb
    # in the future we will support bigger files using Multipart upload
    headers = {"Content-MD5": md5_hash_b64, "Content-Type": CONTENT_TYPE}"Start file upload for: {filename}")
    upload_response = requests.put(upload_url, data=dataset_file_content, headers=headers)

    if upload_response.status_code != 200:
        logger.warning(f"Unable to upload file: {filename}")
        return False, filename"Upload of '{filename}' successful")
    return True, filename

def generate_signature_headers(key_id: str, hmac_secret_key: bytearray):
    now_utc ="%a, %d %b %Y %H:%M:%S %Z")

    signature_string = f"date: {now_utc}".encode("utf-8")

    hmac_digest =, signature_string, hashlib.sha512).digest()
    hmac_digest_b64 = base64.b64encode(hmac_digest).decode("utf-8")
    hmac_digest_b64_url_encoded = urllib.parse.quote_plus(hmac_digest_b64)

    return {
        "Date": now_utc,
        "Authorization": f'Signature keyId="{key_id}",algorithm="hmac-sha512",'
        f'signature="{hmac_digest_b64_url_encoded}" ',

async def main():
    base_url = ""

    # Verify that the directory exists
    if not Path(UPLOAD_DIRECTORY).is_dir():
        raise Exception(f"Invalid or non-existing directory: {UPLOAD_DIRECTORY}")

    loop = asyncio.get_event_loop()

    # Allow up to 20 separate threads to upload dataset files concurrently
    executor = ThreadPoolExecutor(max_workers=20)
    futures = []

    # Create tasks that upload the dataset files
    folder_content = Path(UPLOAD_DIRECTORY).glob("*")
    files_to_upload = [x for x in folder_content if x.is_file()]"Number of files to upload: {len(files_to_upload)}")
    for file_to_upload in files_to_upload:
        # Create future for dataset file
        future = loop.run_in_executor(

    # Wait for all tasks to complete and gather the results
    future_results = await asyncio.gather(*futures)"Finished '{DATASET_NAME}' uploading")

    failed_uploads = list(filter(lambda x: not x[0], future_results))

    if len(failed_uploads) > 0:
        logger.warning("Failed to upload the following dataset files")
        logger.warning(list(map(lambda x: x[1], failed_uploads)))

if __name__ == "__main__":