Skip to content

Added smooth and non-smooth prediction functions with tests and comments #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions dadapy/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import scipy as sp

from dadapy._cython import cython_clustering as cf
from dadapy._cython import cython_density as cd
from dadapy._utils.density_estimation import (
return_not_normalised_density_kstarNN,
return_not_normalised_density_PAk,
)
from dadapy._utils.utils import compute_cross_nn_distances
from dadapy.density_estimation import DensityEstimation

cores = multiprocessing.cpu_count()
Expand Down Expand Up @@ -332,6 +338,116 @@ def compute_clustering_ADP_pure_python( # noqa: C901

return self.cluster_assignment

def predict_cluster_DP(self, X_new, Dthr=23.92812698, density_est="PAk"):
"""Compute clustering for points outside the initialization set using Density Peaks.

Args:
X_new (np.ndarray(float)): The points for which to predict cluster assignment
Dthr (float): Likelihood ratio parameter used to compute optimal k, the value of Dthr=23.92 corresponds
to a p-value of 1e-6.

Returns:
cluster_prediction (np.ndarray(int)): prediction of points to specific clusters
cluster_probability (np.ndarray(float)): probability of points to belong to specific clusters
"""
cross_distances, cross_dist_indices = compute_cross_nn_distances(
X_new, self.X, self.maxk, self.metric, self.period
)

kstar = cd._compute_kstar_interp(
self.intrinsic_dim,
X_new.shape[0],
self.maxk,
Dthr,
cross_dist_indices,
cross_distances,
self.distances,
)
if density_est == "PAk":
log_den, log_den_err, dc = return_not_normalised_density_PAk(
cross_distances,
self.intrinsic_dim,
kstar,
self.maxk,
interpolation=True,
)
elif density_est == "kstarNN":
log_den, log_den_err, dc = return_not_normalised_density_kstarNN(
cross_distances, self.intrinsic_dim, kstar, interpolation=True
)

log_den -= np.log(self.N)

cluster_probability = np.zeros((len(X_new), self.N_clusters))
for i in np.arange(len(X_new)):
higher_density_neighbours = (
self.log_den[cross_dist_indices][i]
- self.log_den_err[cross_dist_indices][i]
> log_den[i] - log_den_err[i]
)
try:
index_nearest_neighbour_higher_density = cross_dist_indices[i][
higher_density_neighbours
][0]
cluster_probability[
i, self.cluster_assignment[index_nearest_neighbour_higher_density]
] = 1
# If no data with higher density is found in the neighbourhood,
# predict the cluster of the closest data point
except IndexError:
cluster_probability[
i, self.cluster_assignment[cross_dist_indices[0]]
] = 1
cluster_prediction = np.argmax(cluster_probability, axis=-1)
return cluster_prediction, cluster_probability

def predict_cluster_inverse_distance_smooth(self, X_new, Dthr=23.92812698):
"""Compute clustering for points outside the initialization set using a smooth estimator.

Args:
X_new (np.ndarray(float)): The points for which to predict cluster assignment
Dthr (float): Likelihood ratio parameter used to compute optimal k, the value of Dthr=23.92 corresponds
to a p-value of 1e-6.

Returns:
cluster_prediction (np.ndarray(int)): prediction of points to specific clusters
cluster_probability (np.ndarray(float)): probability of points to belong to specific clusters
"""
cross_distances, cross_dist_indices = compute_cross_nn_distances(
X_new, self.X, self.maxk, self.metric, self.period
)

kstar = cd._compute_kstar_interp(
self.intrinsic_dim,
X_new.shape[0],
self.maxk,
Dthr,
cross_dist_indices,
cross_distances,
self.distances,
)

cluster_assignment_kstar = [
self.cluster_assignment[cross_dist_indices[i][: kstar[i]]]
for i in np.arange(len(X_new))
]
cross_distances_kstar = [
cross_distances[i][: kstar[i]] for i in np.arange(len(X_new))
]
cluster_probability = np.zeros((len(cluster_assignment_kstar), self.N_clusters))

for i in np.arange(len(X_new)):
if len(set(cluster_assignment_kstar[i])) == 1:
cluster_probability[i, cluster_assignment_kstar[i][0]] = 1
else:
cluster_probability[i] = _cluster_weight_smooth_assignment(
cross_distances_kstar[i],
cluster_assignment_kstar[i],
self.N_clusters,
)
cluster_prediction = np.argmax(cluster_probability, axis=-1)
return cluster_prediction, cluster_probability

# ------------ helper methods for compute_clustering_ADP_pure_python ------------ #

def _find_density_modes(self, g):
Expand Down Expand Up @@ -628,3 +744,19 @@ def _finalise_clustering( # noqa: C901
log_den_bord_err_m,
bord_indices_m,
)


def _cluster_weight_smooth_assignment(
cross_distances_kstar, cluster_assignment_kstar, N_clusters
):

weights_kstar = (
cross_distances_kstar[-1] - cross_distances_kstar
) ** 2 / cross_distances_kstar**2
normalization = np.sum(weights_kstar)
cluster_probability = [
np.sum(weights_kstar[cluster_assignment_kstar == c])
for c in np.arange(N_clusters)
]
cluster_probability = np.array(cluster_probability) / normalization
return cluster_probability
161 changes: 161 additions & 0 deletions tests/test_clustering/test_clustering_prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Copyright 2021-2022 The DADApy Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Module for testing DP clustering."""

import os

import numpy as np

from dadapy import Clustering

filename = os.path.join(os.path.split(__file__)[0], "../2gaussians_in_2d.npy")

X = np.load(filename)

expected_cluster_assignment = np.array(
[
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
]
)


def test_predict_DP_PAk():
"""Test the prediction using PAk clustering works correctly."""
cl = Clustering(coordinates=X[25:-25])
cl.compute_clustering_ADP(halo=False)
preds0, _ = cl.predict_cluster_DP(X[-25:], density_est="PAk")
preds1, _ = cl.predict_cluster_DP(X[:25], density_est="PAk")
assert (preds1 == cl.cluster_assignment[:25]).all()
assert (preds0 == cl.cluster_assignment[-25:]).all()


def test_predict_DP_kstarNN():
"""Test the prediction using kstarNN clustering works correctly."""
cl = Clustering(coordinates=X[25:-25])
cl.compute_clustering_ADP(halo=False)
preds0, _ = cl.predict_cluster_DP(X[-25:], density_est="kstarNN")
preds1, _ = cl.predict_cluster_DP(X[:25], density_est="kstarNN")
assert (preds1 == cl.cluster_assignment[:25]).all()
assert (preds0 == cl.cluster_assignment[-25:]).all()


def test_predict_smooth():
"""Test the smooth prediction using kstar neighbour distances works correctly."""
cl = Clustering(coordinates=X[25:-25])
cl.compute_clustering_ADP(halo=False)
preds0, _ = cl.predict_cluster_inverse_distance_smooth(X[-25:])
preds1, _ = cl.predict_cluster_inverse_distance_smooth(X[:25])
assert (preds1 == cl.cluster_assignment[:25]).all()
assert (preds0 == cl.cluster_assignment[-25:]).all()