Skip to content

Simulating and Running Statistics Workflows🔗

This tutorial gives a brief overview of the Apheris statistics workflow. You will define data preprocessing functions and submit a query to the platform. You will then examine a failing computation and how to debug it. In particular, Apheris Statistics comes with a simulator, which is useful for debugging on your local machine before deploying your workload on real data via the remote Apheris environment. You will learn how to simulate workflows with dummy data and simulate workflows with datasets on your local file system.

Installation instructions🔗

Your Apheris representative will have provided you with a number of wheel files, which need to be installed in a specific order. For this tutorial, you will use the apheris_auth, apheris_cli and apheris_statistics wheels. If the provided wheel files are zipped, please first unzip them.

Important

With Apheris 3.2, there have been some large changes to the underlying structure of the Apheris-provided packages.

  • Apheris SDK has been removed and should no longer be installed.
  • All Apheris packages are now prefixed with apheris- (e.g. cli -> apheris-cli).
  • The authentication and authorisation components have moved to the new apheris-auth package, interaction code has moved to apheris-cli and the low-level statistics functions have moved to apheris-statistics.

If your environment has the apheris and cli packages installed from prior versions of Apheris, please first uninstall them as follows:

$ pip uninstall apheris
$ pip uninstall cli
$ pip uninstall statistics

Next, create a new Python virtual environment in which to install the Apheris Statistics package. Statistics supports Python 3.8-3.10, so please make sure you're using a supported version of Python.

$ python3 -m venv apheris-statistics-venv
$ source apheris-statistics-venv/bin/activate

The apheris_auth wheel is the Apheris Authentication package, which contains the underlying logic used to authenticate with the Apheris environment. It is not expected for end-users to use this directly, but it is a required component for the Apheris CLI.

Install the apheris_auth wheel (replace x.y.z with the version number of your wheel file):

$ pip install apheris_auth-x.y.z-py3-none-any.whl

The apheris_cli wheel contains the apheris-cli package, which provides the mechanism to interact with the Orchestrator, create Compute Specs and run computation jobs. The Statistics package uses this internally to run federated statistics operations on Apheris.

Install the apheris_cli wheel (again replace x.y.z with the version number of your wheel file):

$ git pip install apheris_cli-x.y.z-py3-none-any.whl

Finally, install apheris_statistics wheel, which contains the Apheris Statistics model from the Model Registry. Once again, please replace x.y.z with the version number of your wheel file:

$ pip install apheris_statistics-x.y.z-py3-none-any.whl

Now you have installed all the necessary packages, you need to provide your environment configuration. This is done using a Python package called dotenv which is installed as a dependency of the CLI and allows easy configuration of environment variables from a file.

Please contact your Apheris representative to get access the dotenv file for your environment. This file contains environment variables for your specific environment, including which URL Statistics should connect to. You can set these environment variables by running dotenv -f <name_of_environment_file> run bash. If you are running Statistics from a conda environment, please note that you may need to reactivate the conda environment after running this command.

Now you have everything installed, let's work through the basic Statistics workflow.

The basic workflow🔗

First you log in to the Apheris platform.

import apheris
apheris.login()

Output:

Logging in to your company account...
Apheris:
Authenticating with Apheris Cloud Platform...
Please continue the authorization process in your browser.
Gateway:
Authenticating with Apheris Compute environments...
Please continue the authorization process in your browser.

Login was successful

You can check your login status at any time, using the Apheris CLI:

import aphcli.utils
aphcli.utils.get_login_status()

This will return a tuple containing 4 values:

(is_logged_in: bool, user_email: str, user_org: str, user_environment: str)

For example:

(True, 'user.name@apheris.com', 'Apheris', 'production')

A data custodian can enable access to their data using an Asset Policy. You can explore the datasets that you have access to:

from aphcli.api import datasets
datasets.list_datasets()

Output:

+-----+-------------------------+--------------+---------------------+
| idx |         dataset_id      | organization |   data custodian    |
+-----+-------------------------+--------------+---------------------+
|  0  |  whas2_gateway-2_org-2  |    Org 2     |     Orsino Hoek     |
|  1  |  whas1_gateway-1_org-1  |    Org 1     |  Agathe McFarland   |
| ... |          ...            |     ...      |         ...         |
+-----+-------------------------+--------------+---------------------+

In this tutorial you will work on the datasets whas1_gateway-1_org-1 and whas2_gateway-2_org-2. These are synthetically generated datasets that contain medical data. Both are CSV files and have same structure. One resides on the Gateway of Organization "Org 1" and the other is with "Org 2".

Important

Please note that a federated computation across multiple datasets requires that each dataset resides in a different Gateway.

In addition to the real data, a Data Custodian can attach dummy data to their dataset to allow you to test your operations locally before executing on real data. The dummy data should have the same structure as the real data, but is not sensitive (for example, it might be randomly generated or anonymized). You can easily test your workflow on this dummy data before the workflow is run on the actual data in an encapsulated environment.

The FederatedDataFrame is an object that behaves similarly to a Pandas DataFrame. It is used to record preprocessing operations. It can be submitted to a Compute Gateway and be re-played on its confidential data in an encapsulated environment. You can also replay it locally on local data.

You can initialize it with a dataset_id. This acts as a placeholder that tells the data that it should be re-played on.

from apheris_stats.simple_stats.util import FederatedDataFrame
fdf_1 = FederatedDataFrame("whas1_gateway-1_org-1")

When you run preprocess_on_dummy(), it will under the hood download dummy data and re-play the recorded operations on the dummy data. You can find the downloaded dummy data in your home folder ~/.apheris/RemoteData/.

fdf_1.preprocess_on_dummy()
afb age av3 bmi chf cvd diasbp gender hr los miord mitype sho sysbp fstat lenfol
0 0.0 73.0 0.0 28.45241 1.0 1.0 102.0 0.0 92.0 6.0 1.0 0.0 0.0 197.0 False 399.0
1 0.0 41.0 0.0 27.26234 0.0 1.0 60.0 0.0 64.0 1.0 0.0 1.0 0.0 110.0 False 2084.0
2 0.0 89.0 0.0 14.83911 1.0 0.0 76.0 1.0 89.0 5.0 1.0 0.0 0.0 125.0 True 19.0
3 0.0 70.0 0.0 41.00206 1.0 1.0 56.0 1.0 68.0 11.0 0.0 0.0 0.0 131.0 True 11.0
4 1.0 86.0 0.0 19.85515 0.0 1.0 62.0 1.0 93.0 8.0 0.0 0.0 0.0 107.0 True 465.0
5 0.0 45.0 0.0 37.06646 0.0 1.0 70.0 0.0 110.0 3.0 0.0 1.0 0.0 130.0 False 1262.0
6 0.0 82.0 0.0 23.88798 1.0 1.0 40.0 0.0 66.0 3.0 0.0 0.0 0.0 96.0 True 140.0
7 1.0 84.0 0.0 20.92089 0.0 0.0 88.0 0.0 92.0 6.0 0.0 1.0 0.0 138.0 False 1939.0
8 0.0 93.0 0.0 22.85147 0.0 1.0 80.0 1.0 88.0 7.0 0.0 0.0 0.0 136.0 True 442.0
9 0.0 65.0 0.0 16.99342 1.0 1.0 105.0 1.0 144.0 8.0 1.0 0.0 0.0 202.0 True 226.0
...

You can record a preprocessing in a pandas-like manner. As an example, you might want to filter for patients of age >= 70.

fdf_1_elderly = fdf_1[fdf_1["age"] >= 70]

When you run preprocess_on_dummy(), you see that the rows of younger patients are filtered out.

fdf_1_elderly.preprocess_on_dummy()
afb age av3 bmi chf cvd diasbp gender hr los miord mitype sho sysbp fstat lenfol
0 0.0 73.0 0.0 28.45241 1.0 1.0 102.0 0.0 92.0 6.0 1.0 0.0 0.0 197.0 False 399.0
2 0.0 89.0 0.0 14.83911 1.0 0.0 76.0 1.0 89.0 5.0 1.0 0.0 0.0 125.0 True 19.0
3 0.0 70.0 0.0 41.00206 1.0 1.0 56.0 1.0 68.0 11.0 0.0 0.0 0.0 131.0 True 11.0
4 1.0 86.0 0.0 19.85515 0.0 1.0 62.0 1.0 93.0 8.0 0.0 0.0 0.0 107.0 True 465.0
6 0.0 82.0 0.0 23.88798 1.0 1.0 40.0 0.0 66.0 3.0 0.0 0.0 0.0 96.0 True 140.0
7 1.0 84.0 0.0 20.92089 0.0 0.0 88.0 0.0 92.0 6.0 0.0 1.0 0.0 138.0 False 1939.0
8 0.0 93.0 0.0 22.85147 0.0 1.0 80.0 1.0 88.0 7.0 0.0 0.0 0.0 136.0 True 442.0
10 0.0 102.0 0.0 22.27393 1.0 0.0 60.0 0.0 89.0 3.0 0.0 0.0 0.0 118.0 True 169.0
13 0.0 75.0 0.0 21.25718 0.0 1.0 56.0 1.0 56.0 2.0 1.0 0.0 0.0 209.0 True 289.0
15 0.0 95.0 0.0 27.98863 1.0 1.0 62.0 0.0 80.0 1.0 1.0 1.0 0.0 111.0 True 1.0
...

Let's define the same operation for the second dataset.

fdf_2 = FederatedDataFrame("whas2_gateway-2_org-2")
fdf_2_elderly = fdf_2[fdf_2["age"] >= 70]

Before you can submit a statistical query, you need to deploy an Apheris Client on each Gateway and a server (Apheris Aggregator) that orchestrates the computation. It is possible to specify hardware requirements for the machine, but for now you can stay with the default values. Launching these machine can take a few minutes.

from apheris_stats.simple_stats.util import provision

simple_stats_session = provision(
    dataset_ids=[
        "whas1_gateway-1_org-1", "whas2_gateway-2_org-2",
    ]
)

Output:

compute_spec_id: 3689fef6-b0a9-46ae-bbeb-eedec41f0143

Successfully activated ComputeSpec!

The provisioning returns a SimpleStatsSession object. It's "compute_spec_id" is an ID that refers to the cluster that has just been spun up. You could open a new Jupyter Notebook, and instantiate a new session with it. If you want to contact Apheris support, please provide this ID.

It is not possible to download the raw confidential data. You must apply an "aggregation" on the Gateway-level, and you can optionally run an aggregation on the global level.

Let's use fdf_1_elderly and fdf_2_elderly to pre-process the data on the corresponding Gateways. Then assume you are interested in the mean value of the bmi column, and you don't want global aggregation over the two Gateways.

from apheris_stats import simple_stats

result = simple_stats.mean_column(
    [fdf_1_elderly, fdf_2_elderly],
    column_name="bmi",
    aggregation=False,
    session=simple_stats_session
)

Your result will look like this (reformatted for readability):

{
    'whas1_gateway-1_org-1': {
        'results':
                    mean_column  count
        total bmi    24.236224     61
    }, 
    'whas2_gateway-2_org-2': {
        'results':
                    mean_column  count
        total bmi     25.02628    202
    }
}
result['whas1_gateway-1_org-1']['results']
mean_column count
total bmi 24.236224 61

Each Gateway returns a Pandas DataFrame. You see that the mean bmi of elderly people in dataset whas1_gateway-1_org-1 is 24.2, and it was calculated over 61 patients.

Debugging a remote query🔗

Let's have a look at how to debug a remote query! To do so, you can submit a query with a non-existing column.

result2 = simple_stats.mean_column(
    [fdf_1_elderly, fdf_2_elderly],
    column_name="non-existing column",
    aggregation=False,
    session=simple_stats_session
)

This will trigger a computation on the Compute Gateways, attempting to perform a mean calculation over the non-existing column. As this column doesn't exist, the calculation will raise an error in the Gateway, which wil result in the computation failing.

You will see that manifests as a ResultsNotFound exception on your machine:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/path/to/apheris_stats/simple_stats/_core/simple_stats.py", line 68, in mean_column
    results = _run_simple_stats(
  File "/path/to/apheris_stats/simple_stats/_core/simple_stats.py", line 1146, in _run_simple_stats
    results = session.run(job_definition)
  File "/path/to/apheris_stats/simple_stats/_core/stats_session.py", line 508, in run
    raise ResultsNotFound(
apheris_stats.simple_stats._core.stats_session.ResultsNotFound: No results found. You can find the full logs at
`/path/to/.apheris/statistics/statistic_results/2fda941c-d807-4995-9471-028aa23dce93/a1eb7641-b379-4c1e-bbac-9133b629d340/workspace/log.txt`

Find a summary below:

 ######### SUMMARY OF RELEVANT LOGS #########


# ERROR MESSAGES OF CLIENTS

## sender: a6600818-fd6f-e994-2f7a-f687710d2021


## sender: b312cc05-da9d-70c2-d8c9-51af2f9f726a

2024-06-27 10:28:25,866 - StatsController - INFO - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: Message from 'b312cc05-da9d-70c2-d8c9-51af2f9f726a' [ERROR]: Failed to apply the computation `mean_column`. Exception type: KeyError, Error message: 'The column name non-existing column was not found in the pandas dataframe.'


# ERROR MESSAGES OF SERVER

2024-06-27 10:28:25,870 - StatsController - ERROR - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller, peer=b312cc05-da9d-70c2-d8c9-51af2f9f726a, peer_run=a1eb7641-b379-4c1e-bbac-9133b629d340, peer_rc=EXECUTION_EXCEPTION, task_name=apheris_stats, task_id=1436e47b-e126-4c51-8e74-fb3570a7626f]: processing error in result_received_cb on task apheris_stats(1436e47b-e126-4c51-8e74-fb3570a7626f): RuntimeError: A client has returned empty results.

2024-06-27 10:28:25,871 - StatsController - ERROR - Traceback (most recent call last):

2024-06-27 10:28:26,036 - StatsController - INFO - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: task apheris_stats exit with status TaskCompletionStatus.ERROR

2024-06-27 10:28:26,037 - StatsController - ERROR - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: Less than minimum number of clients have returned results!

2024-06-27 10:28:26,037 - ServerRunner - ERROR - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: Exception in workflow fed_stats_controller: RuntimeError: Less than minimum number of clients have returned results!

2024-06-27 10:28:26,038 - ServerRunner - ERROR - Traceback (most recent call last):

2024-06-27 10:28:26,038 - ServerRunner - ERROR - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: Aborting current RUN due to FATAL_SYSTEM_ERROR received: Exception in workflow fed_stats_controller: RuntimeError: Less than minimum number of clients have returned results!

# LAST 3 MESSAGES OF CLIENTS

## sender: a6600818-fd6f-e994-2f7a-f687710d2021

2024-06-27 10:28:25,385 - StatsController - INFO - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: Message from 'a6600818-fd6f-e994-2f7a-f687710d2021' [INFO]: Start loading real data via Apheris Data Access Layer (DAL).


## sender: b312cc05-da9d-70c2-d8c9-51af2f9f726a

2024-06-27 10:28:25,866 - StatsController - INFO - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: Message from 'b312cc05-da9d-70c2-d8c9-51af2f9f726a' [INFO]: Finished applying bounded privacy rule.

2024-06-27 10:28:25,866 - StatsController - INFO - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: Message from 'b312cc05-da9d-70c2-d8c9-51af2f9f726a' [INFO]: Start applying computation.

2024-06-27 10:28:25,866 - StatsController - INFO - [identity=Unnamed_project_42f47214-5b6f-4d5f-9775-40555cebabc7, run=a1eb7641-b379-4c1e-bbac-9133b629d340, wf=fed_stats_controller]: Message from 'b312cc05-da9d-70c2-d8c9-51af2f9f726a' [ERROR]: Failed to apply the computation `mean_column`. Exception type: KeyError, Error message: 'The column name non-existing column was not found in the pandas dataframe.'


# LAST 3 MESSAGES OF SERVER

2024-06-27 10:28:34,042 - FederatedServer - INFO - Server app stopped.

2024-06-27 10:28:35,548 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00002 Not Connected] is closed PID: 113

2024-06-27 10:28:35,548 - MPM - INFO - MPM: Good Bye!    

You see the error message, that no results arrived from the Orchestrator. The server logs were downloaded to your machine, which contain the entire job log history from the Orchestrator. As well as messages generated by the Orchestrator itself, you will also see messages that have been received from the Compute Gateways. Apheris Statistics catches error messages on the gateways, truncates them to remove potential privacy violations and forwards them to the Orchestrator for logging.

To make it easier to find the source of the error, Apheris Statistics provides a summary of the last 3 errors that were shown from each Compute Gateway (client) and the Orchestrator (server).

Looking at the summary, you find that not all Compute Gateways have returned a result to the server (Less than minimum number of clients have returned results!). The problem on the Gateway, where the FederatedDataFrame was re-played, is: Failed to apply the computation 'mean_column'. Exception type: KeyError, Error message: 'The column name non-existing column was not found in the pandas dataframe.'

For now, you are done with work on remote queries. So, you can shut down all machines that are running on the Gateways and the server.

simple_stats_session.close()

Preprocessing on Dummy Data and Local Files🔗

The function FederatedDataFrame.preprocess_on_dummy() relies on the download of dummy data from an external service. As a reminder, dummy data can be uploaded by a Data Custodian when they register a dataset. It should have the same structure as the real data, but should not contain sensitive information (this responsibility lies with the Data Custodian).

For local testing or when dummy data is not available, you can re-play the FederatedDataFrame on alternative data from your local filesystem. You call preprocess_on_files and pass a dict that tells how to replace the "placeholders". (A FederatedDataFrame can contain multiple placeholders if it was created by merging of multiple FederatedDataFrames.)

local_fpath = "my_file.csv"
with open(local_fpath, "wt") as f:
    f.write("age,bmi\n69,20\n78,25\n83,30")

fdf_1_elderly.preprocess_on_files({"whas1_gateway-1_org-1": local_fpath})
age bmi
1 78 25
2 83 30

Simulating Statistics Workflows🔗

Before running your workloads on real data, you might find it easier to build your statistics locally, in which case you can use the local session objects in place of the SimpleStatsSession.

These are called LocalDummySimpleStatsSession and LocalDebugSimpleStatsSession and execute only on your machine, not on the Apheris environment. Similarly to the preprocessing functions outlined above, the former supports dummy data from an Apheris dataset and the latter allows you to use data stored on your local filesystem. The LocalDebugSimpleStatsSession is particularly useful in a closed environment where you don't have access to the Apheris environment to use dummy data.

Initialize a LocalDummySimpleStatsSession session with the dataset_ids that you want to work with, and use this session object as a replacement for your SimpleStatsSession. Your query will not be submitted to an external service. Instead, dummy data and the policies and permissions of the original data are downloaded, and the query is executed locally on your machine. You can use your IDE's debugger to step into the code. It is possible to overwrite the policies and permissions that come from the original datasets (for details run help(LocalDummySimpleStatsSession)).

from apheris_stats.simple_stats.util import LocalDummySimpleStatsSession

dummy_session = LocalDummySimpleStatsSession(
    dataset_ids=[
        "whas1_gateway-1_org-1", "whas2_gateway-2_org-2",
    ]
)
result = simple_stats.mean_column(
    [fdf_1_elderly, fdf_2_elderly],
    column_name="bmi",
    aggregation=False,
    session=dummy_session
)

While the computation runs, you'll see the logging from the from simulated Orchestrator and Gateways. Since the command above doesn't use aggregation, the final result will be returned as a dictionary mapping dataset names to each Compute Gateway's result, as Pandas DataFrames:

Output:

result
{
    'whas1_gateway-1_org-1': {
        'results':
                    mean_column  count
        total bmi    24.630376     34
    },
    'whas2_gateway-2_org-2': {
        'results':
                    mean_column  count
        total bmi    24.356071     29
    }
}

Simulating on Local Data🔗

The LocalDummySimpleStatsSession depends on an external service to download dummy data. There are situations where you want to be independent of such external services, for example for testing or when no dummy data is available. For these situations you can use the LocalDebugSimpleStatsSession. It does not depend on any external services and runs fully locally on your machine.

If you want to use it, you must define your datasets beforehand. First, create 2 small CSV files that will represent your local data:

from apheris_stats.simple_stats.util import LocalDebugDataset, LocalDebugSimpleStatsSession

local_fpath_1 = "my_ds1.csv"
with open(local_fpath_1, "wt") as f:
    f.write("age,bmi\n69,20\n78,21\n83,22")

local_fpath_2 = "my_ds2.csv"
with open(local_fpath_2, "wt") as f:
    f.write("age,bmi\n68,10\n77,12\n82,13")

Now, create LocalDebugDataset objects, which emulate the behaviour of datasets registered in the Apheris environment:

ds1 = LocalDebugDataset(
    dataset_id="whas1_gateway-1_org-1",
    gateway_id="gw1",
    dataset_fpath=local_fpath_1,
    policy={},
    permissions={"any_operation": True}
)
ds2 = LocalDebugDataset(
    dataset_id="whas2_gateway-2_org-2",
    gateway_id="gw2",
    dataset_fpath=local_fpath_2,
    policy={},
    permissions={"any_operation": True}
)

Now that the datasets have been created, you can create the LocalDebugSimpleStatsSession, referencing them:

debug_session = LocalDebugSimpleStatsSession(
    datasets=[ds1, ds2],
    max_threads=1
)

Finally, you can run a mean_column query on the local data like so:

result = simple_stats.mean_column(
    [fdf_1_elderly, fdf_2_elderly],
    column_name="bmi",
    aggregation=False,
    session=debug_session
)

Output:

result
{
    'whas2_gateway-2_org-2': {
        'results':            mean_column  count
                total bmi         12.5      2
    }, 
    'whas1_gateway-1_org-1': {
        'results':            mean_column  count
                total bmi         21.5      2
    }
}

The impact of multithreading on debugging🔗

By default, the LocalDebugSimpleStatsSession and LocalDummySimpleStatsSession sessions use one thread per simulated gateway. However if you want to step into the computations using the PDB debugger, you may find that it is not possible to connect to the computation due to the threaded execution.

Therefore, if you wish to use PDB with one of the local sessions, you can instantiate the session with the additional parameter; max_threads, set to 1. For example:

debug_session = LocalDebugSimpleStatsSession(
    datasets=[ds1, ds2],
    max_threads=1
)

or:

dummy_session = LocalDummySimpleStatsSession(
    dataset_ids=[
        "whas1_gateway-1_org-1", "whas2_gateway-2_org-2",
        max_threads=1
    ]
)

This will be functionally equivalent to the previous sessions, but you may find the computations take longer as each Compute Gateway will run sequentially rather than concurrently.

Summary🔗

In this guide, you have used the Apheris Statistics package to run some simple operations against real data in the Apheris solution.

You then saw how to interpret logs are returned when errors occur in remote execution, including how to use to use the summarised logs to find the source of errors.

Finally, you tried out the Apheris Statistics Simulator to run workloads on your local machine, both using dummy data and with files stored on your local machine.

For more information on the functions you can use with Apheris Statistcs, see the Statistics Reference document