Skip to content

Getting your own models on ApherisπŸ”—

Apheris uses NVIDIA FLARE to underpin its federation platform.

That means that if you have a FLARE application, there are only minimal changes required to make it run on Apheris. This tutorial focuses on what you need to do to integrate a working NVIDIA FLARE application to Apheris. For more information on how to integrate your existing application to FLARE, please see NVIDIA FLARE documentation.

There are two key components that you need to consider when integrating with Apheris: the Data Access Layer (DAL) and the Secure Runtime. In this guide, we will explain what these are, and the steps you need to take to integrate with them.

The Data Access Layer (DAL)πŸ”—

The DAL is a service presented to the Compute Gateways, which allows secure access to data through the Apheris platform. It is presented as an HTTP service with points for retrieving data and the corresponding asset policies.

It is important to note that the DAL is only available from inside a Compute Gateway and so data always remains on the Compute Gateway of the Data Custodian. A user cannot use the DAL to download the data to their own machine.

To retrieve data from the DAL, you simply perform GET requests against the HTTP endpoint, providing a bearer token in the headers to authenticate your request.

This token, as well as the internal URL for the DAL service and the datasets that you are allowed to access are provided as environment variables within the Compute Gateway.

The variables that you will need are:

  • APH_API_DAL_URL: The internal URL for the DAL service
  • APH_DAL_TOKEN: The bearer token that must be provided alongside any request
  • APH_DATA: A JSON string, mapping the dataset ID to a path for download

With these, it is simple to make a request on the Compute Gateway for a specific dataset file.

Downloading datasetsπŸ”—

Imagine you are on the Compute Gateway and you are presented with the following environment variables (note that the following token is not a valid bearer token):

APH_DAL_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
APH_API_DAL_URL=http://gateway-agent-dal
APH_DATA={"medical-decathlon-task004-hippocampus-a_gateway-1_org-1":"s3://my-data-bucket/nnunetv2/dataset004_hippocampus/Dataset004_Hippocampus_A.zip"}

You can see that the dataset in this example is presented as a single zip file. This means that you can simply perform a get request against the DAL, using the URL and token from the environment, and write the resulting file to the Compute Gateway's file system.

It is also possible to work with a dataset registered as a directory and download specific files from within that structure. We will discuss that later in this tutorial.

import json
import os
import requests

dal_url = os.environ["APH_API_DAL_URL"]
dal_token = os.environ["APH_DAL_TOKEN"]
dal_data = json.loads(os.environ["APH_DATA"])

ds_path = dal_data["medical-decathlon-task004-hippocampus-a_gateway-1_org-1"]
ds_zip_response = requests.get(f"{dal_url}/datasets/{ds_path}", headers={'Authorization': f'Bearer {dal_token}'})

with open("hippocampus.zip", "wb") as fh:
    fh.write(ds_zip_response.content)

# unzip the data, pre-process, train ...

Using Pydantic to configure the DALπŸ”—

Pydantic's BaseSettings object supports loading field values from environment variables, so you can use a Pydantic model to simplify your code:

from pydantic import AnyHttpUrl, BaseSettings

class DALSettings(BaseSettings):
    api_dal_url: AnyHttpUrl
    dal_token: str
    data: Dict[str, str]
    timeout: int = 360

    class Config:
        env_prefix = "APH_"
        env_file_encoding = "utf-8"

    @property
    def active(self) -> bool:
        return bool(self.dal_token and self.data)

    @property
    def headers(self) -> Dict[str, str]:
        return {"Authorization": f"Bearer {self.dal_token}"}

    @property
    def endpoint(self) -> str:
        return f"{self.api_dal_url}/datasets/"

# ...

cfg = DALSettings()

Downloading datasets file-by-fileπŸ”—

In the example above, a dataset can be registered as a single file (in this case, a zip). Alternatively, you may register your dataset as a directory structure and navigate through it using the DAL.

In this case, you would say something slightly different in the APH_DATA environment variable:

APH_DATA={"medical-decathlon-task004-hippocampus_files-a_gateway-1_org-1":"s3://my-data-bucket/nnunetv2/dataset004_hippocampus_files/"}

The dataset has been registered as a directory, with a slash (/) at the end to indicate that. Now, when you query against the DAL for this path, the response is a JSON string containing the list of files that live within that directory.

ds_path = dal_data["medical-decathlon-task004-hippocampus_files-a_gateway-1_org-1"]
all_files = requests.get(f"{dal_url}/datasets/{ds_path}", headers={'Authorization': f'Bearer {dal_token}'})

# Check response code, etc.

print(all_files.json())
['s3://my-data-bucket/nnunetv2/dataset004_hippocampus_files/Dataset004_Hippocampus_A/dataset.json',
 's3://my-data-bucket/nnunetv2/dataset004_hippocampus_files/Dataset004_Hippocampus_A/imagesTr/',
 's3://my-data-bucket/nnunetv2/dataset004_hippocampus_files/Dataset004_Hippocampus_A/imagesTr/hippocampus_001_0000.nii.gz',
 's3://my-data-bucket/nnunetv2/dataset004_hippocampus_files/Dataset004_Hippocampus_A/imagesTr/hippocampus_003_0000.nii.gz',
...]

You can then either navigate further through the directory structure by querying listed directories, or download individual files by giving their full path in the DAL request. Directories are identified by a trailing slash (/), everything else is a file.

from pathlib import Path
from http import HTTPStatus


def remove_prefix(s: str, ds_path: str) -> Path:
    """
    Find the path relative to the dataset's root
    """
    return Path(s).relative_to(ds_path)


def local_path(url: str, local_root: Path, ds_path: str) -> Path:
    """
    Take the url of the file, extract its relative path against the dataset root and
    return the path in the local file system. Any parent directories are created first.
    """
    path = remove_prefix(url)
    local_path = local_root / path
    local_path.parent.mkdir(parents=True, exist_ok=True)
    return local_path


def download_file(url: str, local_path: Path, cfg: DALSettings) -> None:
    """
    Download the file from the DAL endpoint and write to the provided path
    """
    r = requests.get(f"{cfg.endpoint}{url}", headers=cfg.headers, timeout=cfg.timeout)
    if r.status_code != HTTPStatus.OK:
        raise DALException(f"Status code is not OK: {r.status_code}, error: {r.text}")
    with open(local_path, "wb") as file:
        file.write(r.content)


for url in all_files.json():
    if not url.endswith("/"):
        download_path = local_path(url, download_root, ds_path)
        download_file(url, download_path, cfg)
...

Retrieving asset policies for a datasetπŸ”—

You can also download the asset policy for this dataset using the policy's endpoint. In this case, the data custodian has given you access to both the nnU-Net model and Apheris Statistics, so the asset policy includes permissions from both:

policies_response = requests.get(f"{dal_url}/policies/{ds_path}", headers={'Authorization': f'Bearer {dal_token}'})
print(policies_response.json())

{'permissions': {'explore': True, 'restrict_functions': True, 'allowed_function_names': ['max_column', 'max_aggregation']}, 'privacy': {}}

When integrating your model with Apheris, it is important to take into account the permissions you might receive from the asset policy, to ensure data safety.

The Secure RuntimeπŸ”—

As mentioned at the start of this tutorial, Apheris uses NVIDIA FLARE as its federation engine.

To run a job with FLARE, you provide a set of files that define the configuration for that job. The configuration includes things like which components lie on the Orchestrator and the Compute Gateways, and any parameters that need to be passed into those.

You can see examples of these configuration files in the NVIDIA FLARE GitHub examples (client, server). There is also a meta.json file, which defines which configurations are sent to which Compute Gateway, allowing you to send different workloads to different Compute Gateways within a Compute Spec.

Apheris offers the capability to tightly control the workload submitted by end-users. In FLARE, users can modify the job configuration files before submitting a workload, which means that they could potentially disable components, change parameters outside of safe bounds, or add components that may pose a risk to your data.

Apheris includes the Secure Runtime, which sits between the user and the FLARE configuration files. It reads input from the user, validates it and uses it to build the FLARE config files in a reproducible, secure manner.

When you submit a job using the Apheris CLI, you provide the payload, this then serves as input to the Secure Runtime.

from aphcli.api import job
job.submit({"optimizer_name": "adam", "learning_rate": 0.01, "num_federation_rounds": 10})

You can think of it as something of a pipeline, like so:

jobs-api-pipeline.svg

To integrate with the Secure Runtime, you simply have to provide the Python file that defines these functions at a specific location within your Docker image, which is then dynamically loaded into the Secure Runtime when a Compute Spec is activated.

The Secure Runtime expects to find a single Python script, named secure_runtime_service.py at /usr/secure_runtime/src/secure_runtime/secure_runtime_service.py.

That script should contain three functions:

  • parse(body: str) -> dict : Convert a JSON payload into a Python object
  • validate(payload: dict) -> None : Validate the payload to ensure only the expected parameters are provided and align with the defined format
  • template(payload: dict, _dataset2gw: dict) -> dict[str, str] : Use the parameters provided in the payload to generate the FLARE configuration files.

The next part of the tutorial will walk through how to implement those functions.

Functions: ParseπŸ”—

Imagine you have a quite simple model, and you want to allow the user to specify two parameters; the optimizer ("adam" or "sgd"), the learning rate and the number of rounds of federation to perform.

You might define the expected payload as follows:

{
    "optimizer": "adam",
    "learning_rate": 0.01,
    "num_federation_rounds": 20
}

As shown above, this payload will be delivered as a JSON string, that you will convert to a Python dictionary in the parse method.

The simplest way to achieve this is using the json library:

import json

def parse(body: str) -> dict:
    try:
        return json.loads(body)
    except json.JSONDecodeError:
        # Handle malformed input body
        raise ValueError("Could not parse the request body as JSON")

Functions: ValidationπŸ”—

Now you have your payload as a Python dictionary, the next step is to validate the parameters provided by the user match your expected specification.

You can implement these checks manually or you could use the Pydantic library, which is included in the Secure Runtime image.

Since the payload defined above is quite simple, this first example shows how you would do it manually:

def validate(payload: dict) -> None:
    """
    Validate the input payload to ensure the user provides an optimizer_name and learning_rate.

    `optimizer_name` is mandatory and must be one of "adam" or "sgd"
    `learning_rate` is an optional float and will default to 0.01. 
    `num_federation_rounds` is an optional int and will default to 10. 

    Raises a value error if an invalid payload is provided
    """
    supported_optimizers = ("adam", "sgd")

    optimizer = payload.get("optimizer_name")

    if optimizer not in supported_optimizers:
        raise ValueError(f"Invalid optimizer_name provided {optimizer}. Valid options are ('adam', 'sgd')")

    learning_rate = payload.get("learning_rate", 0.01)

    # Enforce learning_rate must be a float
    learning_rate = float(learning_rate)

    #Β Enforce learning_rate be > 0
    if learning_rate <= 0:
        raise ValueError("Invalid value for learning_rate. Must be > 0")

    num_federation_rounds = int(payload.get("num_federation_rounds"), 10)

    if num_federation_rounds <= 0:
        raise ValueError("Invalid value for num_federation_rounds. Must be > 0")

Note that the validate method doesn't return anything. Its only action is to raise a ValueError in the case of invalid input.

In a more realistic setting, this manual validation approach might be intractable. In such cases we recommend the use of Pydantic models to perform the validation, which is included in the Secure Runtime container.

You could simply use this in the validate function to ensure the payload is valid, but it can be helpful to use it inside parse to ensure that defaults are correctly adhered to. While this will perform validation earlier, at the parse stage, any errors will be propagated to the user in exactly the same way as they would be under validate.

Below, you'll see the validate function re-implemented using Pydantic:

from typing import Literal

from pydantic import BaseModel, ConfigDict, PositiveFloat, PositiveInt, ValidationError

class MyModelPayload(BaseModel):
    # Only allow the specific parameters below - error if additional are provided
    model_config = ConfigDict(extra="forbid")

    optimizer_name: Literal["adam", "sgd"]
    learning_rate: PositiveFloat = 0.01
    num_federation_rounds: PositiveInt = 10


def parse(body: str) -> dict:
    try:
        payload = json.loads(body)
        return MyModelPayload(**payload).model_dump()
    except json.JSONDecodeError:
        # Handle malformed input body
        raise ValueError("Could not parse the request body as JSON")


def validate(payload: dict) -> None:
    # Since validation is already covered by the `parse` function, you can either define
    # this as a no-op, or include additional validation here.
    pass

Functions: TemplateπŸ”—

The final function you need to implement is the template function. This takes the payload provided by the user and populates configuration files required for FLARE.

The input to template is the validated payload from the previous steps.

The output is a dictionary, whose keys are filenames within the job submission package, and whose values are the content that should go inside those files. The Secure Runtime will then write the files as defined in this dictionary, create the job package and submit it to FLARE.

As a reminder, the basic form of a job package looks like this:

example_job
β”œβ”€β”€ app
β”‚   └── config
β”‚       β”œβ”€β”€ config_fed_client.json
β”‚       └── config_fed_server.json
└── meta.json
  • config_fed_client.json defines the components for use on the Compute Gateways - these will typically be trainers / executors.
  • config_fed_server.json defines the components for use on the Orchestrator - these will typically be aggregators / persistors.
  • meta.json gives the resource mapping for the server and clients, so FLARE knows which application packages each should run. In this simple example, all Compute Gateways will run the same client config, but in more complex setups, you may wish to submit different client configurations to different Compute Gateways. This is beyond the scope of this tutorial, but for more information please look at the NVIDIA examples.

There are various ways to write this code, but it is important to ensure that any input from the user is carefully validated, and not added verbatim to the configuration files, as this may provide an opportunity for an injection attack. We recommend loading the template as a Python dictionary and modifying it in-place, though you might also consider packages like Jinja, which allow templating with escaped strings.

Below, you'll see a simple example where the payload is used to populate the client, server and meta configuration files by loading a template JSON file in as a dictionary, adding the parameters from the payload to the configuration files, and writing them to the output dictionary.

A template can be as simple as using your existing FLARE job configuration file. You may wish to set to null any values that will not have a default when populating the template.

The following snippet will show the high-level function. Then in subsequent snippets you'll see how to perform the templating in more detail. You'll see there is a new parameter here that is passed into the template function alongside the payload - _dataset2gw. This is a dictionary which maps dataset IDs to Compute Gateway IDs, and can be used when you want to send different configurations to different Compute Gateways. In this example, it will only be used to calculate the number of sites (Compute Gateways) that need to be connected for a job to execute.

def template(payload: dict, _dataset2gw: Optional[dict] = None, *, template_folder: Optional[Path] = None) -> dict[str,str]:
    if template_folder is None:
        template_folder = Path(__file__).parent / "templates"

    num_clients = len(set(_dataset2gw.values())) if _dataset2gw else 1

    # Build the FLARE server config
    orchestrator_config = build_orchestrator_config(payload, num_clients, template_folder)

    # Build the FLARE client config
    gateway_config = build_gateway_config(payload, template_folder)

    # Build the meta config
    meta_config = build_meta_config(num_clients, template_folder)

    return {
        "app/config/config_fed_server.json": orchestrator_config,
        "app/config/config_fed_client.json": gateway_config,
        "meta.json": meta_config,
    }

At a high level, you can see that you'll create the three config files (Orchestrator, Compute Gateway and meta), and add them to the output dictionary, following the FLARE file system convention, shown above.

Next you'll see how to build the orchestrator configuration file based on an existing JSON template. By default, the jobs service will look for templates in a directory called templates, alongside secure_runtime_service.py. While this is configurable, we recommend keeping with the default option.

The important fields to change in the Orchestrator configuration are the number of clients (Compute Gateways) and the number of communication rounds. In practice, it would be better to do something more robust than directly accessing specific list indices, but in the interest of brevity, this example assumes the layout of the template won't change.

def build_orchestrator_config(payload: dict, num_clients: int, template_folder: Optional[Path] = None) -> dict:
    # Load in the template from JSON
    with open(template_folder / "config_fed_server.json") as fh:
        config_tpl = json.load(fh)

    # Set the fields we need to configure using the data from the payload.
    # To keep this simple, these are set directly here, but you will likely want to
    # search for the correct component using its name/path to make the templating more
    # robust to change.
    args = config_tpl["workflows"][1]["args"]
    args["min_clients"] = num_clients
    args["num_rounds"] = payload.get("num_federation_rounds")
    return config_tpl

The code for the Compute Gateway and meta configurations is very similar:

def build_gateway_config(payload: dict, template_folder: Optional[Path] = None) -> dict:
    # Load in the template from JSON
    with open(template_folder / "config_fed_client.json") as fh:
        config_tpl = json.load(fh)

    args = config_tpl["executors"][0]["executor"]["args"]
    args["optimizer"] = payload.get("optimizer_name")
    args["learning_rate"] = payload.get("learning_rate")
    return config_tpl

def build_meta_config(num_clients: int, template_folder: Optional[Path] = None) -> dict:
    # Load in the template from JSON
    with open(template_folder / "meta.json") as fh:
        config_tpl = json.load(fh)

    config_tpl["min_clients"] = num_clients
    return config_tpl

The output of the functions above is a Dictionary, mapping the paths of configuration files for NVIDIA FLARE to the job configuration dictionaries that will eventually be written as JSON files for FLARE by the secure_runtime service.

From here on, the job service will write the JSON files and execute the job.

Adding the Secure Runtime to your model imageπŸ”—

To enable the NVIDIA service to find your code and templates, they need to be added to the model Docker image in a specific location.

COPY secure_runtime/src /usr/secure_runtime/src

The Secure Runtime will dynamically load those functions, and delegate the parse, validate and template commands to the model-specific handling code.

SummaryπŸ”—

In this guide, you have seen how to use two major components of Apheris: the DAL, which provides secure access to datasets on the Compute Gateways; and the Secure Runtime, which provides a mechanism for securely executing your workloads on the Apheris environment.

If you would like to learn more or see full examples of models using these components, please speak to your Apheris representative who will be able to share code examples with you.

Once you've finished building your model, you can add it to the Apheris solution by following the steps on the Custom models guide.