Skip to content

Regression Models Reference🔗

The Apheris Regression Models codebase is a toolkit to allow you to run regressions on Apheris.

Currently the only supported model is the Cox Regression, but in future versions of Apheris you can expect to find other regression models in this package.

regression.logistic_regression.api_client🔗

fit_lr(datasets, session, feature_cols, target_col, validation_set_col=None, validation_split=None, feature_selector_direction=None, num_rounds=5, num_steps_per_round=2) 🔗

Trains a linear regression model using the specified datasets and session.

Parameters:

Name Type Description Default
datasets Union[Iterable[FederatedDataFrame], FederatedDataFrame]

The datasets to be used for training.

required
session Union[SupervisedMLSession, LocalDebugMLSession]

The session object that defines compute_spec and dataset ids,

required
num_rounds int

The number of training rounds to perform.

5
feature_cols List[Union[int, float, str]]

Columns to be used as features in the model.

required
target_col Union[int, float, str]

Column to be used as the target variable.

required
validation_set_col Optional[Union[int, float, str]]

Column to be used for the validation set. Defaults to None.

None
validation_split float

Fraction of the data to be used for validation. Defaults to 0.2.

None
feature_selector_direction Optional[str]

Direction for feature selection ('forward' or 'backward'). Defaults to None.

None
num_steps_per_round int

Number of steps to perform per round. Defaults to 2.

2

Returns:

Name Type Description
results dict

Dictionary containing the model parameters as results of the training process.

Source code in .env/lib/python3.10/site-packages/regression/logistic_regression/api_client.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def fit_lr(
    datasets: Union[Iterable[FederatedDataFrame], FederatedDataFrame],
    session: Union[SupervisedMLSession, LocalDebugMLSession],
    feature_cols: List[Union[int, float, str]],
    target_col: Union[int, float, str],
    validation_set_col: Optional[Union[int, float, str]] = None,
    validation_split: Optional[float] = None,
    feature_selector_direction: Optional[str] = None,
    num_rounds: int = 5,
    num_steps_per_round: int = 2,
) -> dict:
    """
    Trains a linear regression model using the specified datasets and session.

    Args:
        datasets (Union[Iterable[FederatedDataFrame], FederatedDataFrame]):
            The datasets to be used for training.
        session (Union[SupervisedMLSession, LocalDebugMLSession]):
            The session object that defines compute_spec and dataset ids,
        num_rounds (int): The number of training rounds to perform.
        feature_cols (List[Union[int, float, str]]):
            Columns to be used as features in the model.
        target_col (Union[int, float, str]):
            Column to be used as the target variable.
        validation_set_col (Optional[Union[int, float, str]], optional):
            Column to be used for the validation set. Defaults to None.
        validation_split (float, optional):
            Fraction of the data to be used for validation. Defaults to 0.2.
        feature_selector_direction (Optional[str], optional):
            Direction for feature selection ('forward' or 'backward'). Defaults to None.
        num_steps_per_round (int, optional):
            Number of steps to perform per round. Defaults to 2.

    Returns:
        results: Dictionary containing the model parameters as results of the training
            process.
    """

    training_params = {
        "model": "logistic_regression",
        "task": AppConstants.TASK_TRAIN,
        "num_rounds": num_rounds,
        "feature_cols": feature_cols,
        "target_col": target_col,
        "feature_selector_direction": feature_selector_direction,
        "num_steps_per_round": num_steps_per_round,
        "validation_split": validation_split,
    }
    if training_params is not None and validation_set_col is not None:
        training_params["validation_set_col"] = validation_set_col

    validate_job_params(training_params)
    _validate_datasets(datasets)
    _validate_session(session)
    return _run_supervised_ml(
        datasets=datasets,
        session=session,
        job_params=training_params,
    )

validate_lr(datasets, session, feature_cols, ground_truth_col, modelparameter) 🔗

Validates the model based on the specified validation dataset. Args: datasets (FederatedDataFrame): The dataset to be used for prediction. session (Union[SupervisedMLSession, LocalDebugMLSession]): The session object that defines compute_spec and dataset ids. feature_cols (List[Union[int, float, str]]): Columns to be used as features in the model. ground_truth_col (Union[int, float, str]): Column to be used as the ground truth. modelparameter (dict): The model parameters to be used for prediction as dictionary. This can be the output of the fit_lr function. Returns: results: Dictionary containing the predicted values.

Source code in .env/lib/python3.10/site-packages/regression/logistic_regression/api_client.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def validate_lr(
    datasets: Union[Iterable[FederatedDataFrame], FederatedDataFrame],
    session: Union[SupervisedMLSession, LocalDebugMLSession],
    feature_cols: List[Union[int, float, str]],
    ground_truth_col: Union[int, float, str],
    modelparameter: dict,
) -> dict:
    """
    Validates the model based on the specified validation dataset.
    Args:
        datasets (FederatedDataFrame): The dataset to be used for prediction.
        session (Union[SupervisedMLSession, LocalDebugMLSession]): The session object
            that defines compute_spec and dataset ids.
        feature_cols (List[Union[int, float, str]]): Columns to be used as features in
            the model.
        ground_truth_col (Union[int, float, str]): Column to be used as the ground truth.
        modelparameter (dict): The model parameters to be used for prediction as
            dictionary. This can be the output of the fit_lr function.
    Returns:
        results: Dictionary containing the predicted values.
    """
    _validate_datasets(datasets)
    _validate_session(session)
    validation_params = {
        "model": "logistic_regression",
        "task": AppConstants.TASK_VALIDATION,
        "feature_cols": feature_cols,
        "ground_truth_col": ground_truth_col,
        "modelparameter": modelparameter,
    }
    return _run_supervised_ml(
        datasets=datasets, session=session, job_params=validation_params
    )

regression.cox.api_client🔗

fit_coxph(datasets, session, time_col, target_col, validation_set_col=None, max_time=-1, num_rounds=5, num_steps_per_round=2) 🔗

Actual training job to fit a cox regression model to given federated datasets.

Parameters:

Name Type Description Default
datasets Union[Iterable[FederatedDataFrame], FederatedDataFrame]

List of FederatedDataFrame or single FederatedDataFrame that point to the datasets to be used for training.

required
session Union[SupervisedMLSession, LocalDebugMLSession]

session object that defines compute_spec and dataset ids,

required
num_rounds int

Number of training rounds

5
time_col Union[int, float, str]

Column name of integer valued time columns,

required
target_col Union[int, float, str]

Column name of ground truth,

required
validation_set_col Optional[Union[int, float, str]]

Column name of boolean valued validation set indicator, if None a validation set is obtained by a train test split of 20%

None
max_time int

maximum time over all datasets, if not given it is computed in a preliminary max computation,

-1
num_steps_per_round int

number of steps per federated round,

2

Returns:

Type Description
dict

A dictionary with three keys:

dict
  • coef: The regression coefficients (betas) for each covariate in the model
dict
  • baseline_hazard: The baseline hazard function, that is the risk of the event happening at a particular time point for a baseline individual (one with all covariates equal to zero).
dict
  • cumulative_hazard: The integral of the baseline hazard over time, representing the total accumulated risk of the event up to a specific time point.

Raises:

Type Description
RuntimeError

If the job cannot be created

TimeoutError

If the job takes longer than the supplied timeout

ResultsNotFound

If the job did not complete due to an error. In this case, please check the supplied logs for more details.

Source code in .env/lib/python3.10/site-packages/regression/cox/api_client.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def fit_coxph(
    datasets: Union[Iterable[FederatedDataFrame], FederatedDataFrame],
    session: Union[SupervisedMLSession, LocalDebugMLSession],
    time_col: Union[int, float, str],
    target_col: Union[int, float, str],
    validation_set_col: Optional[Union[int, float, str]] = None,
    max_time: int = -1,
    num_rounds: int = 5,
    num_steps_per_round: int = 2,
) -> dict:
    """
    Actual training job to fit a cox regression model to given federated
    datasets.

    Args:
        datasets: List of FederatedDataFrame or single FederatedDataFrame that point
          to the datasets to be used for training.
        session: session object that defines compute_spec and dataset ids,
        remote datasets and contain possible preprocessing operations,
        num_rounds: Number of training rounds
        time_col: Column name of integer valued time columns,
        target_col: Column name of ground truth,
        validation_set_col: Column name of boolean valued validation set indicator, if
            None a validation set is obtained by a train test split of 20%
        max_time: maximum time over all datasets, if not given it is computed in a
            preliminary max computation,
        num_steps_per_round: number of steps per federated round,

    Returns:
        A dictionary with three keys:

        * coef: The regression coefficients (betas) for each covariate in the model
        * baseline_hazard: The baseline hazard function, that is the risk of the event
            happening at a particular time point for a baseline individual (one with
            all covariates equal to zero).
        * cumulative_hazard: The integral of the baseline hazard over time, representing
            the total accumulated risk of the event up to a specific time point.

    Raises:
        RuntimeError: If the job cannot be created
        TimeoutError: If the job takes longer than the supplied timeout
        ResultsNotFound: If the job did not complete due to an error. In this case, please
            check the supplied logs for more details.
    """
    training_params = {
        "model": "coxph",
        "task": AppConstants.TASK_TRAIN,
        "num_rounds": num_rounds,
        "time_col": time_col,
        "target_col": target_col,
        "max_time": max_time,
        "num_steps_per_round": num_steps_per_round,
    }
    validate_job_params(training_params)
    _validate_datasets(datasets)
    _validate_session(session)
    if validation_set_col is not None:
        training_params["validation_set_col"] = validation_set_col
    return _run_supervised_ml(
        datasets=datasets,
        session=session,
        job_params=training_params,
    )

validate_cox(datasets, session, time_col, ground_truth_col, modelparameter) 🔗

Validates the model based on the specified validation datasets and column ground_truth_col. For validation, the default scoring function from the lifelines CoxPH model, which is the average partial log-likelihood, is used. Args: datasets (FederatedDataFrame): The dataset to be used for prediction. session (Union[SupervisedMLSession, LocalDebugMLSession]): The session object that defines compute_spec and dataset ids. time_col (Union[int, float, str]): Column to be used as time column for the cox inference. The time column should be integer valued. ground_truth_col (Union[int, float, str]): Column to be used as the ground truth. modelparameter (dict): The model parameters to be used for prediction as dictionary. This can be the output of the fit_coxph function.

Returns:

Name Type Description
results dict

Dictionary containing the predicted values.

Source code in .env/lib/python3.10/site-packages/regression/cox/api_client.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def validate_cox(
    datasets: Union[Iterable[FederatedDataFrame], FederatedDataFrame],
    session: Union[SupervisedMLSession, LocalDebugMLSession],
    time_col: Union[int, float, str],
    ground_truth_col: Union[int, float, str],
    modelparameter: dict,
) -> dict:
    """
    Validates the model based on the specified validation datasets and column
        ground_truth_col. For validation, the default scoring function from the
        lifelines CoxPH model, which is the average partial log-likelihood, is used.
    Args:
        datasets (FederatedDataFrame): The dataset to be used for prediction.
        session (Union[SupervisedMLSession, LocalDebugMLSession]): The session
            object that defines compute_spec and dataset ids.
        time_col (Union[int, float, str]): Column to be used as time column for
            the cox inference. The time column should be integer valued.
        ground_truth_col (Union[int, float, str]): Column to be used as the ground
            truth.
        modelparameter (dict): The model parameters to be used for prediction as
            dictionary. This can be the output of the fit_coxph function.

    Returns:
        results: Dictionary containing the predicted values.
    """
    _validate_datasets(datasets)
    _validate_session(session)
    validation_params = {
        "model": "cox",
        "task": AppConstants.TASK_VALIDATION,
        "time_col": time_col,
        "ground_truth_col": ground_truth_col,
        "modelparameter": modelparameter,
    }
    return _run_supervised_ml(
        datasets=datasets, session=session, job_params=validation_params
    )

regression.session🔗

LocalDebugMLSession 🔗

Bases: LocalDebugSimpleStatsSession

Local session object that connects the regression model with nvflare simulator and supports running a simulation of an SupervisedMLJobDefinition.

Parameters:

Name Type Description Default
datasets List[LocalDebugDataset]

A list of LocalDebugDatasets that should be included in the session. Each dataset will be assigned to its own simulated Compute Gateway.

required
workspace Optional[Union[str, Path]]

The path to the workspace where the results will be stored.

None
max_threads Optional[int]

The maximum number of threads to be used in the computation. By default this is set to the number of Compute Gateways in the computation, but can be set to 1 for improved debugging with PDB.

None
Source code in .env/lib/python3.10/site-packages/regression/session/session.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class LocalDebugMLSession(LocalDebugSimpleStatsSession):
    """
    Local session object that connects the regression model with nvflare simulator and
    supports running a simulation of an SupervisedMLJobDefinition.

    Args:
        datasets: A list of LocalDebugDatasets that should be included in the session.
            Each dataset will be assigned to its own simulated Compute Gateway.
        workspace: The path to the workspace where the results will be stored.
        max_threads: The maximum number of threads to be used in the computation. By
            default this is set to the number of Compute Gateways in the computation,
            but can be set to 1 for improved debugging with PDB.
    """

    def __init__(
        self,
        datasets: List[LocalDebugDataset],
        workspace: Optional[Union[str, Path]] = None,
        max_threads: Optional[int] = None,
    ):
        secure.register_decomposers()
        super().__init__(datasets, workspace, max_threads=max_threads)

    def get_stats_session(self) -> LocalDebugSimpleStatsSession:
        return LocalDebugSimpleStatsSession(self.datasets)

    def run(self, job_definition: SupervisedMLJobDefinition):

        os.environ["aph_local_run"] = "1"
        os.environ["aph_dataset_paths"] = json.dumps(self.get_dataset_fpaths())
        os.environ["aph_client_names"] = json.dumps(self.get_client_names())
        os.environ["aph_policies"] = json.dumps(self.get_policies())
        os.environ["aph_permissions"] = json.dumps(self.get_permissions())

        ctx = nullcontext() if self.workspace else tempfile.TemporaryDirectory()

        with ctx as tmp_workspace, tempfile.TemporaryDirectory() as tmp_dir:
            job_dir = Path(tmp_dir) / "job"
            job_dir.mkdir(parents=True, exist_ok=True)
            create_job_dir(job_dir=Path(job_dir), **job_definition.to_dict())

            # The simulator requires all clients to be involved. So, we don't start a
            # client for each gateway from the session, but only for the ones that are
            # used in the FederatedDataFrames.
            clients = list(
                set(self.get_client_names()).intersection(
                    set(job_definition.mapped_fdfs.keys())
                )
            )

            if self.max_threads is None or self.max_threads > len(clients):
                num_threads = len(clients)
            else:
                num_threads = self.max_threads

            workspace = self.workspace if self.workspace else tmp_workspace
            simulator = SimulatorRunner(
                job_folder=str(job_dir),
                workspace=str(workspace),
                n_clients=len(clients),
                threads=num_threads,
                clients=",".join(clients),
            )
            _ = simulator.run()

            if job_definition.pre_run:
                result_path = Path(workspace) / "simulate_job/models/results.bin"
            elif job_definition.job_params.get("task") in ["predict", "validate"]:
                results = {}
                for client_name in job_definition.mapped_fdfs.keys():

                    result_path = (
                        Path(workspace) / f"simulate_job/app_server/{client_name}.secure"
                    )
                    results[client_name] = _load_result(result_path, job_definition)

                return results
            else:
                result_path = (
                    Path(workspace) / "simulate_job/app_server/model_param.secure"
                )

        return _load_result(result_path, job_definition)

SupervisedMLSession 🔗

Bases: SimpleStatsSession

Session object that connects the regression models with job api and supports running a SupervisedMLJobDefinition.

Can be instantiated manually for a running compute spec, but typically will be created using the provision function from regression.session.provision.

Parameters:

Name Type Description Default
compute_spec_id UUID

The UUID of the compute spec running the Regression Models

required
Source code in .env/lib/python3.10/site-packages/regression/session/session.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class SupervisedMLSession(SimpleStatsSession):
    """
    Session object that connects the regression models with job api and
    supports running a SupervisedMLJobDefinition.

    Can be instantiated manually for a running compute spec, but typically will be
    created using the `provision` function from `regression.session.provision`.

    Args:
        compute_spec_id: The UUID of the compute spec running the Regression Models
        model.
    """

    def __init__(self, compute_spec_id: UUID):
        """
        Create SupervisedMLSession for a given compute_spec_id.
        """
        self.compute_spec_id = compute_spec_id
        secure.register_decomposers()
        super().__init__(compute_spec_id)

    def get_stats_session(self) -> SimpleStatsSession:
        return SimpleStatsSession(self.compute_spec_id)

    def run(self, job_definition: SupervisedMLJobDefinition):
        from aphcli.api import job

        job_id = job.submit(job_definition.to_dict(), self.compute_spec_id)
        timeout_job_seconds = _get_job_timeout()

        print(f"Computation submitted under Job ID: {job_id}")

        start = time.time()
        while True:
            status = job.status(job_id, self.compute_spec_id)
            if "finished" in status.lower():
                break
            if time.time() - start > timeout_job_seconds:
                raise TimeoutError(
                    f"Computation did not finish within {timeout_job_seconds}s."
                )
            time.sleep(2)

        download_path = result_base_dir / str(self.compute_spec_id) / str(job_id)
        job.download_results(download_path, job_id, self.compute_spec_id)

        if job_definition.pre_run:
            result_path = download_path / "workspace" / "models" / "results.bin"

        elif job_definition.job_params.get("task") in ["predict", "validate"]:
            results = {}
            for client_name in job_definition.mapped_fdfs.keys():
                result_path = download_path / f"workspace/app_server/{client_name}.secure"
                results[client_name] = _load_result(result_path, job_definition)

            return results

        else:
            result_path = (
                download_path / "workspace" / "app_server" / "model_param.secure"
            )

        return _load_result(result_path, job_definition)

__init__(compute_spec_id) 🔗

Create SupervisedMLSession for a given compute_spec_id.

Source code in .env/lib/python3.10/site-packages/regression/session/session.py
162
163
164
165
166
167
168
def __init__(self, compute_spec_id: UUID):
    """
    Create SupervisedMLSession for a given compute_spec_id.
    """
    self.compute_spec_id = compute_spec_id
    secure.register_decomposers()
    super().__init__(compute_spec_id)

regression.session.provision🔗

provision(dataset_ids, client_n_cpu=0.5, client_memory=1000, server_n_cpu=0.5, server_memory=1000, modelversion=None) 🔗

Create and activate a compute spec to run remote regression models on Apheris.

Parameters:

Name Type Description Default
dataset_ids List[str]

A list of Apheris dataset IDs

required
client_n_cpu float

The fractional number of CPUs to request in the compute spec for the Compute Gateways. Consider increasing this if your computation takes too long.

0.5
client_memory int

The amount of client memory to request in the compute spec for the Compute Gateways. Consider increasing this if your computation runs out of memory.

1000
server_n_cpu float

The fractional number of CPUs to request in the compute spec for the Orchestrator. Consider increasing this if your computation takes too long during aggregation.

0.5
server_memory int

The amount of client memory to request in the compute spec for the Orchestrator. Consider increasing this if your computation runs out of memory in the Orchestrator.

1000
modelversion Optional[str]

The version of regression models to use for this session. Defaults to the latest available version.

None

Returns:

Type Description
SupervisedMLSession

A SupervisedMLSession that should be used as input to the regression functions, such as fit_coxph.

Source code in .env/lib/python3.10/site-packages/regression/session/provision.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def provision(
    dataset_ids: List[str],
    client_n_cpu: float = 0.5,
    client_memory: int = 1000,
    server_n_cpu: float = 0.5,
    server_memory: int = 1000,
    modelversion: Optional[str] = None,
) -> SupervisedMLSession:
    """
    Create and activate a compute spec to run remote regression models on Apheris.

    Args:
        dataset_ids: A list of Apheris dataset IDs
        client_n_cpu: The fractional number of CPUs to request in the compute spec for
            the Compute Gateways. Consider increasing this if your computation takes too
            long.
        client_memory: The amount of client memory to request in the compute spec for the
            Compute Gateways. Consider increasing this if your computation runs out of
            memory.
        server_n_cpu: The fractional number of CPUs to request in the compute spec for
            the Orchestrator. Consider increasing this if your computation takes too
            long during aggregation.
        server_memory: The amount of client memory to request in the compute spec for the
            Orchestrator. Consider increasing this if your computation runs out of
            memory in the Orchestrator.
        modelversion: The version of regression models to use for this session. Defaults
            to the latest available version.

    Returns:
        A `SupervisedMLSession` that should be used as input to the regression functions,
            such as `fit_coxph`.
    """
    from aphcli.api import compute
    from aphcli.api.compute import wait_until_running

    validate_login_status()
    validate_dataset_ids(dataset_ids)
    if not modelversion:
        modelversion = version("apheris-regression-models")

    logger.info(f"Create compute_spec for model {modelversion}")
    compute_spec_id = compute.create_from_args(
        dataset_ids=dataset_ids,
        client_n_cpu=client_n_cpu,
        client_n_gpu=0,
        client_memory=client_memory,
        server_n_cpu=server_n_cpu,
        server_n_gpu=0,
        server_memory=server_memory,
        model_id="apheris-regression-models",
        model_version=modelversion,
    )
    print(f"compute_spec_id: {compute_spec_id}")
    provisioning_timeout = int(
        os.environ.get(
            "APH_TIMEOUT_PROVISIONING_SECONDS", DEFAULT_TIMEOUT_PROVISIONING_SECONDS
        )
    )

    compute.activate(compute_spec_id)
    try:
        wait_until_running(compute_spec_id, timeout=provisioning_timeout)
    except TimeoutError as e:
        compute.deactivate(compute_spec_id)
        raise e
    print("\nSuccessfully activated ComputeSpec!")
    return SupervisedMLSession(compute_spec_id)

misc🔗

ResultsNotFound 🔗

Bases: Exception

Source code in .env/lib/python3.10/site-packages/apheris_stats/simple_stats/_core/stats_session.py
75
76
class ResultsNotFound(Exception):
    pass