This page contains multiple example scripts that are written in Python 3.10. Each example consists of a description, and the code listing.
Each Python3 script requires the following external Python packages installed on the system:
requests
;We refer to the Python3 documentation for instructions on how to install Python packages.
To retrieve datasets, follow these steps:
Actuele10mindataKNMIstations
datasetcurl --location --request GET \
"https://api.dataplatform.knmi.nl/open-data/v1/datasets/Actuele10mindataKNMIstations/versions/2/files" \
--header "Authorization: <API_KEY>"
The default page size for listing files is 10.
Actuele10mindataKNMIstations
datasetcurl --location --request GET -G \
"https://api.dataplatform.knmi.nl/open-data/v1/datasets/Actuele10mindataKNMIstations/versions/2/files" \
-d maxKeys=15 \
-d sorting=asc \
--header "Authorization: <API_KEY>"
The default sort direction is ascending.
curl --location --request GET -G \
"https://api.dataplatform.knmi.nl/open-data/v1/datasets/Actuele10mindataKNMIstations/versions/2/files" \
-d sorting=desc \
--header "Authorization: <API_KEY>"
curl --location --request GET -G \
"https://api.dataplatform.knmi.nl/open-data/v1/datasets/Actuele10mindataKNMIstations/versions/2/files" \
-d sorting=desc \
-d order_by=lastModified \
--header "Authorization: <API_KEY>"
The default attribute the list is ordered by is filename
. Other allowed values are lastModified
and created
.
curl --location --request GET -G \
"https://api.dataplatform.knmi.nl/open-data/v1/datasets/Actuele10mindataKNMIstations/versions/2/files" \
-d maxKeys=15 \
-d order_by=filename
-d begin=KMDS__OPER_P___10M_OBS_L2_202007162330.nc \
--header "Authorization: <API_KEY>"
import logging
import sys
from datetime import datetime
from datetime import timezone
import requests
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel("INFO")
class OpenDataAPI:
def __init__(self, api_token: str):
self.base_url = "https://api.dataplatform.knmi.nl/open-data/v1"
self.headers = {"Authorization": api_token}
def __get_data(self, url, params=None):
return requests.get(url, headers=self.headers, params=params).json()
def list_files(self, dataset_name: str, dataset_version: str, params: dict):
return self.__get_data(
f"{self.base_url}/datasets/{dataset_name}/versions/{dataset_version}/files",
params=params,
)
def get_file_url(self, dataset_name: str, dataset_version: str, file_name: str):
return self.__get_data(
f"{self.base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{file_name}/url"
)
def download_file_from_temporary_download_url(download_url, filename):
try:
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
with open(filename, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except Exception:
logger.exception("Unable to download file using download URL")
sys.exit(1)
logger.info(f"Successfully downloaded dataset file to {filename}")
def main():
api_key = "<API_KEY>"
dataset_name = "Actuele10mindataKNMIstations"
dataset_version = "2"
api = OpenDataAPI(api_token=api_key)
# since the filenames are prefixed with a timestamp, we can use the current date to filter the files
# and start listing files after the first file of the day
timestamp = datetime.now(timezone.utc).date().strftime("%Y%m%d")
begin = f"KMDS__OPER_P___10M_OBS_L2_{timestamp}"
logger.info(f"Fetching first file of {dataset_name} version {dataset_version} on {timestamp}")
# order the files by filename and begin listing after the specified filename
params = {"order_by": "filename", "begin": begin}
response = api.list_files(dataset_name, dataset_version, params)
if "error" in response:
logger.error(f"Unable to retrieve list of files: {response['error']}")
sys.exit(1)
file_name = response["files"][0].get("filename")
logger.info(f"First file of {timestamp} is: {file_name}")
# fetch the download url and download the file
response = api.get_file_url(dataset_name, dataset_version, file_name)
download_file_from_temporary_download_url(response["temporaryDownloadUrl"], file_name)
if __name__ == "__main__":
main()
import logging
import os
import sys
import requests
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))
class OpenDataAPI:
def __init__(self, api_token: str):
self.base_url = "https://api.dataplatform.knmi.nl/open-data/v1"
self.headers = {"Authorization": api_token}
def __get_data(self, url, params=None):
return requests.get(url, headers=self.headers, params=params).json()
def list_files(self, dataset_name: str, dataset_version: str, params: dict):
return self.__get_data(
f"{self.base_url}/datasets/{dataset_name}/versions/{dataset_version}/files",
params=params,
)
def get_file_url(self, dataset_name: str, dataset_version: str, file_name: str):
return self.__get_data(
f"{self.base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{file_name}/url"
)
def download_file_from_temporary_download_url(download_url, filename):
try:
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
with open(filename, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except Exception:
logger.exception("Unable to download file using download URL")
sys.exit(1)
logger.info(f"Successfully downloaded dataset file to {filename}")
def main():
api_key = "<API_KEY>"
dataset_name = "Actuele10mindataKNMIstations"
dataset_version = "2"
logger.info(f"Fetching latest file of {dataset_name} version {dataset_version}")
api = OpenDataAPI(api_token=api_key)
# sort the files in descending order and only retrieve the first file
params = {"maxKeys": 1, "sorting": "desc"}
response = api.list_files(dataset_name, dataset_version, params)
if "error" in response:
logger.error(f"Unable to retrieve list of files: {response['error']}")
sys.exit(1)
latest_file = response["files"][0].get("filename")
logger.info(f"Latest file is: {latest_file}")
# fetch the download url and download the file
response = api.get_file_url(dataset_name, dataset_version, latest_file)
download_file_from_temporary_download_url(response["temporaryDownloadUrl"], latest_file)
if __name__ == "__main__":
main()
import logging
import os
import sys
from datetime import datetime
from datetime import timedelta
import requests
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))
api_url = "https://api.dataplatform.knmi.nl/open-data"
api_version = "v1"
def main():
# Parameters
api_key = "<API_KEY>"
dataset_name = "Actuele10mindataKNMIstations"
dataset_version = "2"
# Use get file to retrieve a file from one hour ago.
# Filename format for this dataset equals KMDS__OPER_P___10M_OBS_L2_YYYYMMDDHHMM.nc,
# where the minutes are increased in steps of 10.
timestamp_now = datetime.utcnow()
timestamp_one_hour_ago = timestamp_now - timedelta(hours=1) - timedelta(minutes=timestamp_now.minute % 10)
filename = f"KMDS__OPER_P___10M_OBS_L2_{timestamp_one_hour_ago.strftime('%Y%m%d%H%M')}.nc"
logger.debug(f"Current time: {timestamp_now}")
logger.debug(f"One hour ago: {timestamp_one_hour_ago}")
logger.debug(f"Dataset file to download: {filename}")
endpoint = f"{api_url}/{api_version}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url"
get_file_response = requests.get(endpoint, headers={"Authorization": api_key})
if get_file_response.status_code != 200:
logger.error("Unable to retrieve download url for file")
logger.error(get_file_response.text)
sys.exit(1)
logger.info(f"Successfully retrieved temporary download URL for dataset file {filename}")
download_url = get_file_response.json().get("temporaryDownloadUrl")
# Check logging for deprecation
if "X-KNMI-Deprecation" in get_file_response.headers:
deprecation_message = get_file_response.headers.get("X-KNMI-Deprecation")
logger.warning(f"Deprecation message: {deprecation_message}")
download_file_from_temporary_download_url(download_url, filename)
def download_file_from_temporary_download_url(download_url, filename):
try:
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
with open(filename, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except Exception:
logger.exception("Unable to download file using download URL")
sys.exit(1)
logger.info(f"Successfully downloaded dataset file to {filename}")
if __name__ == "__main__":
main()
Once you have obtained a dedicated API key to download a complete dataset, you are ready to download the corresponding
dataset
files.
To retrieve these files efficiently, we provide an example script.
This script shows how to download the complete EV24/2
dataset.
The structure of this script is the same regardless the dataset you want to download.
Make sure to change download_directory
to an existing empty directory.
import asyncio
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
import requests
from requests import Session
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))
def download_dataset_file(
session: Session,
base_url: str,
dataset_name: str,
dataset_version: str,
filename: str,
directory: str,
overwrite: bool,
) -> Tuple[bool, str]:
# if a file from this dataset already exists, skip downloading it.
file_path = Path(directory, filename).resolve()
if not overwrite and file_path.exists():
logger.info(f"Dataset file '{filename}' was already downloaded.")
return True, filename
endpoint = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url"
get_file_response = session.get(endpoint)
# retrieve download URL for dataset file
if get_file_response.status_code != 200:
logger.warning(f"Unable to get file: {filename}")
logger.warning(get_file_response.content)
return False, filename
# use download URL to GET dataset file. We don't need to set the 'Authorization' header,
# The presigned download URL already has permissions to GET the file contents
download_url = get_file_response.json().get("temporaryDownloadUrl")
return download_file_from_temporary_download_url(download_url, directory, filename)
def download_file_from_temporary_download_url(download_url, directory, filename):
try:
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
with open(f"{directory}/{filename}", "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except Exception:
logger.exception("Unable to download file using download URL")
return False, filename
logger.info(f"Downloaded dataset file '{filename}'")
return True, filename
def list_dataset_files(
session: Session,
base_url: str,
dataset_name: str,
dataset_version: str,
params: Dict[str, str],
) -> Tuple[List[str], Dict[str, Any]]:
logger.info(f"Retrieve dataset files with query params: {params}")
list_files_endpoint = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files"
list_files_response = session.get(list_files_endpoint, params=params)
if list_files_response.status_code != 200:
raise Exception("Unable to list initial dataset files")
try:
list_files_response_json = list_files_response.json()
dataset_files = list_files_response_json.get("files")
dataset_filenames = list(map(lambda x: x.get("filename"), dataset_files))
return dataset_filenames, list_files_response_json
except Exception as e:
logger.exception(e)
raise Exception(e)
def get_max_worker_count(filesizes):
size_for_threading = 10_000_000 # 10 MB
average = sum(filesizes) / len(filesizes)
# to prevent downloading multiple half files in case of a network failure with big files
if average > size_for_threading:
threads = 1
else:
threads = 10
return threads
async def main():
api_key = "<API_KEY>"
dataset_name = "EV24"
dataset_version = "2"
base_url = "https://api.dataplatform.knmi.nl/open-data/v1"
# When set to True, if a file with the same name exists the output is written over the file.
# To prevent unnecessary bandwidth usage, leave it set to False.
overwrite = False
download_directory = "./dataset-download"
# Make sure to send the API key with every HTTP request
session = requests.Session()
session.headers.update({"Authorization": api_key})
# Verify that the download directory exists
if not Path(download_directory).is_dir() or not Path(download_directory).exists():
raise Exception(f"Invalid or non-existing directory: {download_directory}")
filenames = []
max_keys = 500
next_page_token = None
file_sizes = []
# Use the API to get a list of all dataset filenames
while True:
# Retrieve dataset files after given filename
dataset_filenames, response_json = list_dataset_files(
session,
base_url,
dataset_name,
dataset_version,
{"maxKeys": f"{max_keys}", "nextPageToken": next_page_token},
)
file_sizes.extend(file["size"] for file in response_json.get("files"))
# Store filenames
filenames += dataset_filenames
# If the result is not truncated, we retrieved all filenames
next_page_token = response_json.get("nextPageToken")
if not next_page_token:
logger.info("Retrieved names of all dataset files")
break
logger.info(f"Number of files to download: {len(filenames)}")
worker_count = get_max_worker_count(file_sizes)
loop = asyncio.get_event_loop()
# Allow up to 10 separate threads to download dataset files concurrently
executor = ThreadPoolExecutor(max_workers=worker_count)
futures = []
# Create tasks that download the dataset files
for dataset_filename in filenames:
# Create future for dataset file
future = loop.run_in_executor(
executor,
download_dataset_file,
session,
base_url,
dataset_name,
dataset_version,
dataset_filename,
download_directory,
overwrite,
)
futures.append(future)
# # Wait for all tasks to complete and gather the results
future_results = await asyncio.gather(*futures)
logger.info(f"Finished '{dataset_name}' dataset download")
failed_downloads = list(filter(lambda x: not x[0], future_results))
if len(failed_downloads) > 0:
logger.warning("Failed to download the following dataset files:")
logger.warning(list(map(lambda x: x[1], failed_downloads)))
if __name__ == "__main__":
asyncio.run(main())
To upload files to a dataset, follow these steps:
To complete your key request, a form appears where you provide information for the following input fields:
Dataset name
: name of the dataset;Data version
: version of the dataset;Comments (Optional)
: additional information for your key request.Submit the key request once the form contains the required information.
Once you obtained an API Key and HMAC secret, you are ready to upload dataset files to your requested dataset. The script below uploads all files in the provided directory. To periodically upload newly available files it is recommended to create a scheduled task to run the following script. Each operating system has a different built-in scheduler: for Windows use Task Scheduler, for Linux use cron jobs and for MacOS use Scheduler.
import asyncio
import base64
import hashlib
import hmac
import logging
import os
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import Tuple
import requests
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))
def upload_file_to_dataset(
base_url: str,
api_key: str,
api_secret: str,
dataset_name: str,
dataset_version: str,
filename: str,
directory: str,
) -> Tuple[bool, str]:
content_type = "application/text"
dataset_file_content = Path(f"{directory}/{filename}").read_bytes()
md5_hash_bytes = hashlib.md5(dataset_file_content).digest()
md5_hash_b64 = base64.b64encode(md5_hash_bytes).decode("utf-8")
params = {
"filename": filename,
"datasetFileContentType": content_type,
"md5": md5_hash_b64,
}
endpoint = f"{base_url}/{dataset_name}/versions/{dataset_version}/files/uploadUrl"
headers = generate_signature_headers(api_key, api_secret.encode("utf-8"))
upload_url_response = requests.get(endpoint, headers=headers, params=params)
# retrieve upload URL for dataset file
if upload_url_response.status_code != 200:
logger.warning(f"Unable to get upload url for :{filename}")
logger.warning(upload_url_response.content)
return False, filename
upload_url = upload_url_response.json()["temporaryUploadUrl"]
# max file size supported by Python requests library 2.14 gb
# in the future we will support bigger files using Multipart upload
headers = {"Content-MD5": md5_hash_b64, "Content-Type": content_type}
logger.info(f"Start file upload for: {filename}")
upload_response = requests.put(upload_url, data=dataset_file_content, headers=headers)
if upload_response.status_code != 200:
logger.warning(f"Unable to upload file: {filename}")
logger.warning(upload_response.content)
return False, filename
logger.info(f"Upload of '{filename}' successful")
return True, filename
def generate_signature_headers(key_id: str, hmac_secret_key: bytearray):
now_utc = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S %Z")
signature_string = f"date: {now_utc}".encode("utf-8")
hmac_digest = hmac.new(hmac_secret_key, signature_string, hashlib.sha512).digest()
hmac_digest_b64 = base64.b64encode(hmac_digest).decode("utf-8")
hmac_digest_b64_url_encoded = urllib.parse.quote_plus(hmac_digest_b64)
return {
"Date": now_utc,
"Authorization": f'Signature keyId="{key_id}",algorithm="hmac-sha512",'
f'signature="{hmac_digest_b64_url_encoded}" ',
}
async def main():
api_key = "<API_KEY>"
hmac_secret = "<API_SECRET>"
dataset_name = "<DATASET_NAME>"
dataset_version = "<DATASET_VERSION>"
base_url = "https://api.dataplatform.knmi.nl/dataset-content/v1/datasets"
# folder that contains the files to be uploaded
upload_directory = "./my-dataset-files"
# Verify that the directory exists
if not Path(upload_directory).is_dir():
raise Exception(f"Invalid or non-existing directory: {upload_directory}")
loop = asyncio.get_event_loop()
# Allow up to 20 separate threads to upload dataset files concurrently
executor = ThreadPoolExecutor(max_workers=20)
futures = []
# Create tasks that upload the dataset files
folder_content = Path(upload_directory).glob("*")
files_to_upload = [x for x in folder_content if x.is_file()]
logger.info(f"Number of files to upload: {len(files_to_upload)}")
for file_to_upload in files_to_upload:
# Create future for dataset file
future = loop.run_in_executor(
executor,
upload_file_to_dataset,
base_url,
api_key,
hmac_secret,
dataset_name,
dataset_version,
file_to_upload.name,
upload_directory,
)
futures.append(future)
# Wait for all tasks to complete and gather the results
future_results = await asyncio.gather(*futures)
logger.info(f"Finished '{dataset_name}' uploading")
failed_uploads = list(filter(lambda x: not x[0], future_results))
if len(failed_uploads) > 0:
logger.warning("Failed to upload the following dataset files")
logger.warning(list(map(lambda x: x[1], failed_uploads)))
if __name__ == "__main__":
asyncio.run(main())