Example Scripts

This page contains multiple example scripts that are written in Python 3.8. Each example consists of a description, and the code listing.

Retrieving datasets

To retrieve datasets, follow these steps:

  1. Create an account on our Developer Portal, and use this account to request an API key with permissions to list and get files from a dataset, or use the anonymous API key listed in the Obtain an API key section;
  2. List the files in a dataset;
  3. Retrieve a download url for a file in the dataset;
  4. Download the file using the retrieved url.

List 10 files in the Actuele10mindataKNMIstations dataset

curl --location --request GET \
"https://api.dataplatform.knmi.nl/open-data/datasets/Actuele10mindataKNMIstations/versions/2/files" \
--header "Authorization: <API_KEY>"

List 15 files in Actuele10mindataKNMIstations dataset

curl --location --request GET -G \
"https://api.dataplatform.knmi.nl/open-data/datasets/Actuele10mindataKNMIstations/versions/2/files" \
-d maxKeys=15 \
--header "Authorization: <API_KEY>"

List the first 15 files ordered alphabetically after the given value for startAfterFileName

curl --location --request GET -G \
"https://api.dataplatform.knmi.nl/open-data/datasets/Actuele10mindataKNMIstations/versions/2/files" \
-d maxKeys=15 \
-d startAfterFilename=KMDS__OPER_P___10M_OBS_L2_202007162330.nc \
--header "Authorization: <API_KEY>"

Listing the first 10 files of today and retrieving the first one

from datetime import datetime
from pathlib import Path
import requests

api_url = "https://api.dataplatform.knmi.nl/open-data"


def main():
    # Parameters
    api_key = "<API_KEY>"
    dataset_name = "Actuele10mindataKNMIstations"
    dataset_version = "2"
    max_keys = "10"

    # Use list files request to request first 10 files of the day.
    timestamp = datetime.utcnow().date().strftime("%Y%m%d")
    start_after_filename_prefix = f"KMDS__OPER_P___10M_OBS_L2_{timestamp}"
    list_files_response = requests.get(f"{api_url}/datasets/{dataset_name}/versions/{dataset_version}/files",
                                       headers={"Authorization": api_key},
                                       params={"maxKeys": max_keys,
                                               "startAfterFilename": start_after_filename_prefix})
    list_files = list_files_response.json()
    dataset_files = list_files.get("files")

    # Retrieve first file in the list files response
    filename = dataset_files[0].get("filename")
    endpoint = f"{api_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url"
    get_file_response = requests.get(endpoint, headers={"Authorization": api_key})
    download_url = get_file_response.json().get("temporaryDownloadUrl")
    dataset_file_response = requests.get(download_url)
    
    # Write dataset file to disk
    p = Path(filename)
    p.write_bytes(dataset_file_response.content)



if __name__ == "__main__":
    main()

Retrieving the file from one hour ago

from datetime import datetime, timedelta
from pathlib import Path
import requests

api_url = "https://api.dataplatform.knmi.nl/open-data"


def main():
    # Parameters
    api_key = "<API_KEY>"
    dataset_name = "Actuele10mindataKNMIstations"
    dataset_version = "2"
    max_keys = "10"

    # Use get file to retrieve a file from one hour ago.
    # Filename format for this dataset equals KMDS__OPER_P___10M_OBS_L2_YYYYMMDDHHMM.nc,
    # where the minutes are increased in steps of 10.
    timestamp_now = datetime.utcnow()
    timestamp_one_hour_ago = timestamp_now - timedelta(hours=1) - timedelta(minutes=timestamp_now.minute % 10)
    filename = f"KMDS__OPER_P___10M_OBS_L2_{timestamp_one_hour_ago.strftime('%Y%m%d%H%M')}.nc"
    endpoint = f"{api_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url"
    get_file_response = requests.get(endpoint, headers={"Authorization": api_key})
    download_url = get_file_response.json().get("temporaryDownloadUrl")
    dataset_file_response = requests.get(download_url)

    # Write dataset file to disk
    p = Path(filename)
    p.write_bytes(dataset_file_response.content)

if  __name__ == "__main__":
    main()

Complete Dataset Download

Once you have obtained a dedicated API key to download a complete dataset, you are ready to download the corresponding dataset files. To retrieve these files efficiently, we provide an example script. This script shows how to download the complete EV24/2 dataset. The structure of this script is the same regardless the dataset you want to download.

Make sure to change download_directory to an existing empty directory.

import asyncio
import logging
from concurrent.futures.process import ProcessPoolExecutor
from pathlib import Path
from typing import Tuple, Dict, List, Any

import requests
from requests import Session

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel("INFO")


def download_dataset_file(session: Session, base_url: str, dataset_name: str, dataset_version: str, filename: str,
                          directory: str) -> Tuple[bool, str]:
    endpoint = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url"
    get_file_response = session.get(endpoint)

    # retrieve download URL for dataset file
    if get_file_response.status_code != 200:
        logger.warning(f"Unable to get file: {filename}")
        logger.warning(get_file_response.content)
        return False, filename

    # use download URL to GET dataset file. We don't need to set the 'Authorization' header,
    # The presigned download URL already has permissions to GET the file contents
    download_url = get_file_response.json().get("temporaryDownloadUrl")
    download_dataset_file_response = requests.get(download_url)

    if download_dataset_file_response.status_code != 200:
        logger.warning(f"Unable to download file: {filename}")
        logger.warning(download_dataset_file_response.content)
        return False, filename

    # write dataset file to disk
    p = Path(f"{directory}/{filename}")
    p.write_bytes(download_dataset_file_response.content)

    logger.info(f"Downloaded dataset file '{filename}'")
    return True, filename


def list_dataset_files(session: Session, base_url: str, dataset_name: str, dataset_version: str,
                       params: Dict[str, str]) -> Tuple[List[str], Dict[str, Any]]:
    logger.info(f"Retrieve dataset files with query params: {params}")

    list_files_endpoint = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files"
    list_files_response = session.get(list_files_endpoint, params=params)

    if list_files_response.status_code != 200:
        raise Exception("Unable to list initial dataset files")

    try:
        list_files_response_json = list_files_response.json()
        dataset_files = list_files_response_json.get("files")
        dataset_filenames = list(map(lambda x: x.get("filename"), dataset_files))
        return dataset_filenames, list_files_response_json
    except Exception as e:
        logger.error(e)
        raise Exception(e)


async def main():
    api_key = "<API_KEY>"
    dataset_name = "EV24"
    dataset_version = "2"
    base_url = "https://api.dataplatform.knmi.nl/open-data"

    download_directory = "./dataset-download"

    # Make sure to send the API key with every HTTP request
    session = requests.Session()
    session.headers.update({"Authorization": api_key})

    # Verify that the download directory exists
    if not Path(download_directory).is_dir() or not Path(download_directory).exists():
        raise Exception(f"Invalid or non-existing directory: {download_directory}")

    filenames = []

    start_after_filename = " "
    max_keys = 500

    # Use the API to get a list of all dataset filenames
    while True:
        # Retrieve dataset files after given filename
        dataset_filenames, response_json = list_dataset_files(session, base_url, dataset_name, dataset_version,
                                                              {"maxKeys": f"{max_keys}",
                                                               "startAfterFilename": start_after_filename})

        # Store filenames
        filenames += dataset_filenames

        # If the result is not truncated, we retrieved all filenames
        is_truncated = response_json.get("isTruncated")
        if not is_truncated:
            logger.info("Retrieved names of all dataset files")
            break

        start_after_filename = dataset_filenames[-1]

    loop = asyncio.get_event_loop()

    # Allow up to 20 separate processes to download dataset files concurrently
    executor = ProcessPoolExecutor(max_workers=20)
    futures = []

    # Create tasks that download the dataset files
    for dataset_filename in filenames:
        # Create future for dataset file
        future = loop.run_in_executor(
            executor,
            download_dataset_file,
            session,
            base_url,
            dataset_name,
            dataset_version,
            dataset_filename,
            download_directory
        )
        futures.append(future)

    # Wait for all tasks to complete and gather the results
    future_results = await asyncio.gather(*futures)
    logger.info(f"Finished '{dataset_name}' dataset download")

    failed_downloads = list(filter(lambda x: not x[0], future_results))

    if len(failed_downloads) > 0:
        logger.info("Failed to download the following dataset files")
        logger.info(list(map(lambda x: x[1], failed_downloads)))


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Dataset management

To upload files to a dataset, follow these steps:

  1. Create an account on our Developer Portal, and use this account to request an API key with permissions to upload a dataset;
  2. Retrieve an upload url for the specified dataset and version;
  3. Upload a file using the retrieved url.

The following script will upload all files directly in the specified folder. To periodically upload newly available files it is recommended to create a scheduled task to run the following script. Each operating system has a different built-in scheduler: for Windows use Task Scheduler, for Linux use cron jobs and for MacOS use Scheduler.

Upload files in folder to a dataset

import asyncio
import base64
import hashlib
import hmac
import logging
import urllib.parse
from concurrent.futures.process import ProcessPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from typing import Tuple

import requests

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel("INFO")


def upload_file_to_dataset(base_url: str, api_key: str, api_secret: str, dataset_name: str, dataset_version: str,
                           filename: str, directory: str) -> Tuple[bool, str]:
    content_type = "application/text"

    with open(f"{directory}/{filename}", "rb") as f:
        dataset_file_contents = f.read()
    md5_hash_bytes = hashlib.md5(dataset_file_contents).digest()
    md5_hash_b64 = base64.b64encode(md5_hash_bytes).decode("utf-8")

    params = {"filename": filename, "datasetFileContentType": content_type, "md5": md5_hash_b64}
    endpoint = f"{base_url}/{dataset_name}/versions/{dataset_version}/files/uploadUrl"
    headers = generate_signature_headers(api_key, api_secret.encode("utf-8"))

    upload_url_response = requests.get(endpoint, headers=headers, params=params)

    # retrieve upload URL for dataset file
    if upload_url_response.status_code != 200:
        logger.warning(f"Unable to get upload url for :{filename}")
        logger.warning(upload_url_response.content)
        return False, filename

    upload_url = upload_url_response.json()["temporaryUploadUrl"]

    # max file size supported by Python requests library 2.14 gb
    # in the future we will support bigger files using Multipart upload
    with open(f"{directory}/{filename}", "rb") as file_content:
        headers = {"Content-MD5": md5_hash_b64, "Content-Type": content_type}
        logger.info(f"Start file upload for: {filename}")
        upload_response = requests.put(upload_url, data=file_content, headers=headers)

        if upload_response.status_code != 200:
            logger.warning(f"Unable to upload file: {filename}")
            logger.warning(upload_response.content)
            return False, filename

        logger.info(f"Upload of '{filename}' successful")
        return True, filename


def generate_signature_headers(key_id: str, hmac_secret_key: bytearray):
    now_utc = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S %Z")

    signature_string = f"date: {now_utc}".encode("utf-8")

    hmac_digest = hmac.digest(hmac_secret_key, signature_string, hashlib.sha512)
    hmac_digest_b64 = base64.b64encode(hmac_digest).decode("utf-8")
    hmac_digest_b64_url_encoded = urllib.parse.quote_plus(hmac_digest_b64)

    return {
        "Date": now_utc,
        "Authorization": f"""Signature keyId="{key_id}",algorithm="hmac-sha512",signature="{hmac_digest_b64_url_encoded}" """
    }


async def main():
    api_key = "<API_KEY>"
    api_secret = "<API_SECRET>"
    dataset_name = "<DATASET_NAME>"
    dataset_version = "<DATASET_VERSION>"
    base_url = "https://api.dataplatform.knmi.nl/master-dataset-management/datasets"

    # folder that contains the files to be uploaded
    upload_directory = "./my-dataset-files"

    # Verify that the directory exists
    if not Path(upload_directory).is_dir() or not Path(upload_directory).exists():
        raise Exception(f"Invalid or non-existing directory: {upload_directory}")

    loop = asyncio.get_event_loop()

    # Allow up to 20 separate processes to upload dataset files concurrently
    executor = ProcessPoolExecutor(max_workers=20)
    futures = []

    # Create tasks that upload the dataset files
    folder_content = Path(upload_directory).glob('*')
    files_to_upload = [x for x in folder_content if x.is_file()]
    for file_to_upload in files_to_upload:
        # Create future for dataset file
        future = loop.run_in_executor(
            executor,
            upload_file_to_dataset,
            base_url,
            api_key,
            api_secret,
            dataset_name,
            dataset_version,
            file_to_upload.name,
            upload_directory
        )
        futures.append(future)

    # Wait for all tasks to complete and gather the results
    future_results = await asyncio.gather(*futures)
    logger.info(f"Finished '{dataset_name}' uploading")

    failed_uploads = list(filter(lambda x: not x[0], future_results))

    if len(failed_uploads) > 0:
        logger.info("Failed to upload the following dataset files")
        logger.info(list(map(lambda x: x[1], failed_uploads)))


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Navigation