Source code for geogenie.outliers.detect_outliers

import logging
import time
from os import path
from pathlib import Path

import numpy as np
from pynndescent import NNDescent
from scipy.optimize import minimize
from scipy.spatial.distance import cdist
from scipy.stats import gamma

from geogenie.plotting.plotting import PlotGenIE
from geogenie.utils.scorers import calculate_r2_knn, haversine_distance
from geogenie.utils.utils import geo_coords_is_valid


[docs] class GeoGeneticOutlierDetector: """A class to detect outliers based on genomic SNPs and geographic coordinates. This class uses a K-nearest neighbors (KNN) approach to detect outliers based on the difference between predicted and actual data. Attributes: args (argparse.Namespace): Command line arguments. genetic_data (pd.DataFrame): The SNP data as a DataFrame. geographic_data (np.array): geographic coordinates as 2D array. output_dir (str): Output directory. prefix (str): Prefix for output files. n_jobs (int): Number of parallel jobs to run. seed (int): Random seed to use. url (str): url to download base map for plots. buffer (float): Buffer to put around sampling area on base map when plotting. show_plots (bool): Whether to show plots in-line. debug (bool): If True, writes genetic_data and geographic_data to file. verbose (int): Verbosity setting (0-2), least to most verbose. logger (logging.Logger): Logger object. plotting (PlotGenIE): Plotting object. """ def __init__( self, args, genetic_data, geographic_data, output_dir, prefix, n_jobs=-1, seed=None, url="https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_state_500k.zip", buffer=0.1, show_plots=False, debug=False, verbose=0, ): """Initialize GeoGeneticOutlierDetector. This class uses a K-nearest neighbors (KNN) approach to detect outliers based on the difference between predicted and actual data. Args: genetic_data (pd.DataFrame): The SNP data as a DataFrame. geographic_data (np.array): geographic coordinates as 2D array. output_dir (str): Output directory. prefix (str): Prefix for output files. seed (int): Random seed to use. If None, then the seed is random. url (str): url to download base map for plots. buffer (float): Buffer to put around sampling area on base map when plotting. show_plots (bool): Whether to show plots in-line. debug (bool): If True, writes genetic_data and geographic_data to file. verbose (int): Verbosity setting (0-2), least to most verbose. """ if debug: data_dir = Path(output_dir, "data") fn = data_dir / "debug_eigen.csv" genetic_data.to_csv(fn, header=False, index=False) tmp = geographic_data.reset_index() tmp.columns = ["sampleID", "x", "y"] fn2 = data_dir / "train_test_coords.csv" tmp.to_csv(fn2, header=True, index=False) self.genetic_data = genetic_data.to_numpy() self.geographic_data = geographic_data.to_numpy() geo_coords_is_valid(self.geographic_data) self.args = args self.output_dir = output_dir self.prefix = prefix self.n_jobs = n_jobs self.seed = seed self.url = url self.buffer = buffer self.show_plots = show_plots self.debug = debug self.verbose = verbose self.logger = logging.getLogger(__name__) self.plotting = PlotGenIE( "cpu", self.args.output_dir, self.args.prefix, self.args.basemap_fips, self.args.highlight_basemap_counties, self.args.shapefile, show_plots=False, fontsize=self.args.fontsize, filetype=self.args.filetype, dpi=self.args.plot_dpi, remove_splines=self.args.remove_splines, ) # Ensure correct coordinate projection and CRS. df_geo = self.plotting.processor.to_geopandas(self.geographic_data) self.geographic_data = self.plotting.processor.to_numpy(df_geo) geo_coords_is_valid(self.geographic_data)
[docs] def calculate_dgeo(self, pred_geo_coords, geo_coords, scalar): """Calculate the Dgeo statistic for geographic coordinates. This method calculates the geographic distance between predicted and actual geographic coordinates. Args: pred_geo_coords (np.array): Predicted geographic coordinates. geo_coords (np.array): Actual geographic coordinates. scalar (float): Scalar value to divide the distance. Returns: np.array: Calculated Dgeo values. """ if self.verbose >= 2 or self.debug: msg = "Estimating Dgeo statistic..." self.logger.debug(msg) if self.debug else self.logger.info(msg) if pred_geo_coords.shape[1] > 2: msg = f"Invalid shape for pred_geo_coords: {pred_geo_coords.shape}" self.logger.error(msg) raise ValueError(msg) if geo_coords.shape[1] > 2: msg = f"Invalid shape for geo_coords: {geo_coords.shape}" self.logger.error(msg) raise ValueError(msg) distances = ( np.diagonal(cdist(geo_coords, pred_geo_coords, metric=haversine_distance)) / scalar ) if self.verbose >= 2 or self.debug: msg = "Finished estimating Dgeo statistic." self.logger.debug(msg) if self.debug else self.logger.info(msg) return distances
[docs] def calculate_statistic( self, predicted_data, actual_data, is_genetic, min_nn_dist, scale_factor ): """Calculate the Dg or Dgeo statistic based on the difference between predicted and actual data. This method calculates the Dg or Dgeo statistic based on the mean squared error or geographic distance between predicted and actual data. Args: predicted_data (np.array): Predicted data from KNN. actual_data (np.array): Actual data. is_genetic (bool): Flag to determine if the calculation is for genetic data. min_nn_dist (float): The minimum distance to consider between geographic points. scale_factor (float): Scaling factor for geo_coords. Returns: np.array: Dg or Dgeo statistic for each sample. """ if is_genetic: # Dg calculation (mean squared error). d = np.mean((predicted_data - actual_data) ** 2, axis=1) eps = 1e-6 else: # Dgeo calculation (geographical distance) d = self.calculate_dgeo(predicted_data, actual_data, scale_factor) eps = 1e-8 min_nn_dist, scale_factor, recalculate = self.rescale_statistic( d, scale_factor, min_nn_dist ) if recalculate: # Recalculate Dgeo again with new scale. d = self.calculate_dgeo(predicted_data, actual_data, scale_factor) zero_indices = np.isclose(d, 0) if np.any(zero_indices): d[zero_indices] = eps # To avoid zeros in D statistic. pred_r2 = calculate_r2_knn(predicted_data, actual_data) return d, min_nn_dist, scale_factor, pred_r2
[docs] def rescale_statistic(self, Dgeo, s, orig_min_nn_dist, max_threshold=20): """Rescales the Dgeo array to avoid large values that might cause errors in maximum likelihood estimation. This method rescales the Dgeo array to avoid large values that might cause errors in maximum likelihood estimation. Args: Dgeo (np.ndarray): An array representing geographic distances or differences. s (float): A scalar value used in calculations. orig_min_nn_dist (float): Original minimum nearest neighbor distance. max_threshold (int): Maximum Dgeo value to trigger rescaling. Returns: float, float: adjusted scalar value, and adjusted minimum nearest neighbor distance. """ min_nn_dist = orig_min_nn_dist recalculate = False maxD = np.max(Dgeo) if maxD > max_threshold: recalculate = True tmps = 1 while maxD > 20: tmps *= 10 maxD /= 10 s *= tmps # new min_nn_dist adjusted according to new s min_nn_dist = orig_min_nn_dist / s return min_nn_dist, s, recalculate
[docs] def find_gen_knn(self, coords, k, scale_factor): """Find K-nearest neighbors for genetic data using PyNNDescent. This method finds the K-nearest neighbors for genetic data using PyNNDescent. Args: dist_matrix (np.array): Distance matrix. k (int): Number of neighbors. Returns: np.array: Indices of K-nearest neighbors. """ nnd = NNDescent( coords, metric="euclidean", n_neighbors=k, n_jobs=self.n_jobs, ) indices, distances = nnd.neighbor_graph # Exclude the first column which is the point itself return indices[:, 1 : k + 1], distances[:, 1 : k + 1]
[docs] def find_geo_knn(self, coords, k, min_nn_dist): """Find K-nearest neighbors for geographic data considering minimum neighbor distance using PyNNDescent. This method finds the K-nearest neighbors for geographic data using PyNNDescent. Args: dist_matrix (np.array): Distance matrix. k (int): Number of neighbors. min_nn_dist (float): Minimum distance to consider for neighbors. Returns: np.array: Indices of K-nearest neighbors. """ geo_coords_is_valid(coords) # PyNNDescent takes coordinates ordered: lat, lon geo_coords = np.flip(coords) # Initialize NNDescent with the precomputed distance matrix nnd = NNDescent( geo_coords, metric="haversine", n_neighbors=k, n_jobs=self.n_jobs, ) indices, distances = nnd.neighbor_graph # Exclude neighbors that are too close (less than min_nn_dist) valid_indices = distances >= min_nn_dist k_indices = np.where(valid_indices, indices, -1) # Truncate or pad the result_indices to ensure exactly k neighbors # (excluding the point itself which is at index 0) return k_indices[:, 1 : k + 1], distances[:, 1 : k + 1]
[docs] def find_optimal_k( self, geo_coords, gen_coords, klim, w_power, min_nn_dist, is_genetic, scale_factor, ): """Find optimal number of nearest neighbors for KNN. This method finds the optimal number of nearest neighbors for KNN based on the Dg or Dgeo statistic. Args: geo_coords (np.array): Geographic coordinates. gen_coords (np.array): Genetic coordinatees. dist_matrix (np.array): Distance matrix. klim (tuple): Range of K values to search (min_k, max_k). w_power (float): Power for distance weighting. min_nn_dist (int): Minimum nearest neighbor distance to consider points. is_genetic (bool): Flag to determine if the calculation is for genetic data as distance matrix. scale_factor (float): Factor to scale geo_coords by. Returns: int: Optimal K value. """ if self.verbose >= 1: self.logger.info("Finding optimal K for Nearest Neighbors...") min_k, max_k = klim all_D = [] for k in range(min_k, max_k + 1): if is_genetic: # Genetic coords, geo dist matrix. knn_indices, distances = self.find_geo_knn(geo_coords, k, min_nn_dist) # Genetic predictions. predictions = self.predict_coords_knn( gen_coords, distances, knn_indices, w_power ) # Genetic error. ( D_statistic, min_nn_dist, scale_factor, _, ) = self.calculate_statistic( predictions, gen_coords, is_genetic, min_nn_dist, scale_factor ) else: # Geo coords, genetic dist matrix. knn_indices, distances = self.find_gen_knn(gen_coords, k, scale_factor) # Geographic predictions. predictions = self.predict_coords_knn( geo_coords, distances, knn_indices, w_power ) # Geographic error. ( D_statistic, min_nn_dist, scale_factor, _, ) = self.calculate_statistic( predictions, geo_coords, is_genetic, min_nn_dist, scale_factor ) all_D.append(np.sum(D_statistic)) optimal_k = min_k + np.argmin(all_D) # Adjust index to actual K value if self.verbose >= 1: self.logger.info("Completed optimal K search.") return optimal_k
[docs] def predict_coords_knn(self, coords, knn_distances, knn_indices, w_power): """Predict coordinates data using weighted KNN. This method predicts coordinates data using weighted KNN based on the distances to the K-nearest neighbors. Args: coords (np.array): Array of genetic or geographic coordinates. knn_distances (np.array): Distances to K-nearest neighbors. knn_indices (np.array): Indices of K-nearest neighbors. w_power (float): Power of distance weight in prediction. Returns: np.array: Predicted coordinates using weighted KNN. """ predictions = np.zeros_like(coords) for i in range(len(coords)): neighbor_distances = knn_distances[i] neighbor_indices = knn_indices[i] # Compute weights inversely proportional to the distance weights = 1 / (neighbor_distances + 1e-8) ** w_power normalized_weights = weights / np.sum(weights) # Calculate the weighted average of the neighbors predictions[i] = np.average( coords[neighbor_indices], axis=0, weights=normalized_weights ) return predictions
[docs] def fit_gamma_mle(self, D_statistic, sig_level): """Detect outliers using a Gamma distribution fitted to the Dg or Dgeo statistic. This method detects outliers using a Gamma distribution fitted to the Dg or Dgeo statistic. Args: D_statistic (np.array): Dg or Dgeo statistic for each sample. Dgeo (np.array): For determining initial_shape and initial_rate. sig_level (float): Significance level for detecting outliers. Returns: tuple: Indices of outliers, p-values, and fitted Gamma parameters. """ initial_shape = (np.mean(D_statistic) ** 2) / np.var(D_statistic) initial_rate = np.mean(D_statistic) / np.var(D_statistic) result = minimize( self.gamma_neg_log_likelihood, x0=(initial_shape, initial_rate), args=(D_statistic,), bounds=[(1e-8, 1e8), (1e-8, 1e8)], method="L-BFGS-B", ) shape, rate = result.x scale = 1 / rate # Convert rate back to scale for gamma distribution p_values = 1 - gamma.cdf(D_statistic, a=shape, scale=scale) outliers = np.where(p_values < sig_level)[0] gamma_params = (shape, scale) return outliers, p_values, gamma_params
[docs] def gamma_neg_log_likelihood(self, params, data): """Negative log likelihood for gamma distribution. This method calculates the negative log likelihood value for the gamma distribution. Args: params (tuple): Contains the shape and rate parameters for the gamma distribution. data (np.array): Data to fit the gamma distribution to. Returns: float: Negative log likelihood value. """ shape, rate = params scale = 1 / rate # Convert rate to scale for gamma distribution return -np.sum(gamma.logpdf(data, a=shape, scale=scale))
[docs] def multi_stage_outlier_knn( self, geo_coords, gen_coords, analysis_type="composite", sig_level=0.05, maxk=50, w_power=2, min_nn_dist=1000, scale_factor=100, ): """Iterative Outlier Detection via KNN for genetic and geographic data. This method performs iterative outlier detection using KNN for genetic and geographic data. Args: geo_coords (np.array): Array of geographic coordinates. gen_coords (np.array): Array of genetic data coordinates. analysis_type (str): Type of analysis to perform (genetic, geographic, composite). sig_level (float): Significance level for detecting outliers. maxk (int): Maximum number of nearest neighbors to consider. w_power (float): Power of distance weight in KNN prediction. min_nn_dist (int): Minimum distance required to consider points. scale_factor (int): Scaling factor for geo coordinates. Returns: tuple: Indices of detected outliers for geographic and genetic data. """ if len(geo_coords) != len(gen_coords): msg = f"geographic and genetic coordinates must have the same number of samples: {len(geo_coords)}, {len(gen_coords)}" self.logger.error(msg) raise ValueError(msg) max_iter = len(geo_coords) // 2 self.orig_min_nn_dist = min_nn_dist self.orig_scale_factor = scale_factor min_nn_dist /= scale_factor time_durations, optk_gen, optk_geo = self.search_nn_optk( geo_coords, gen_coords, maxk, w_power, min_nn_dist, scale_factor, analysis_type, ) opt_ks = {"genetic": optk_gen, "geographic": optk_geo} if self.verbose >= 1: self.logger.info( f"Optimal K for Genetic Regression: {optk_gen}, Time taken: {time_durations['find_optimal_k_genetic']} seconds" ) self.logger.info( f"Optimal K for Geographic Regression: {optk_geo}, Time taken: {time_durations['find_optimal_k_geographic']} seconds" ) if analysis_type == "composite": res = {} for at in ["genetic", "geographic"]: res = self.analysis( geo_coords, gen_coords, at, sig_level, min_nn_dist, scale_factor, max_iter, opt_ks, res, at, time_durations, ) elif analysis_type == "genetic": at = "genetic" res = self.analysis( geo_coords, gen_coords, analysis_type, sig_level, min_nn_dist, scale_factor, max_iter, opt_ks, res, at, time_durations, ) elif analysis_type == "geographic": res = self.analysis( geo_coords, gen_coords, analysis_type, sig_level, min_nn_dist, scale_factor, max_iter, opt_ks, res, at, time_durations, ) # Return the indices of the detected outliers return ( np.where(res["geographic"]["outlier_flags"])[0], np.where(res["genetic"]["outlier_flags"])[0], res, )
[docs] def analysis( self, geo_coords, gen_coords, analysis_type, sig_level, min_nn_dist, scale_factor, max_iter, opt_ks, res, at, time_duration, ): """Perform outlier detection analysis for genetic or geographic data. This method performs outlier detection analysis for genetic or geographic data. Args: geo_coords (np.array): Array of geographic coordinates. gen_coords (np.array): Array of genetic data coordinates. analysis_type (str): Type of analysis to perform (genetic, geographic, composite). sig_level (float): Significance level for detecting outliers. min_nn_dist (int): Minimum distance required to consider points. scale_factor (int): Scaling factor for geo coordinates. max_iter (int): Maximum number of iterations. opt_ks (dict): Optimal K values for genetic and geographic data. res (dict): Dictionary to store results. at (str): Analysis type (genetic, geographic). time_duration (dict): Dictionary to store run times for each method. """ ( time_duration, outlier_flags, d_stats, p_values, r2_values, gamma_params, ) = self.run_multistage( geo_coords, gen_coords, sig_level, min_nn_dist, scale_factor, max_iter, opt_ks[at], analysis_type, time_duration, ) if self.verbose >= 1: self.logger.info("Completed multi-stage outlier detection.") # Plotting Gamma distribution start_time = time.time() self.plot_gamma_dist(sig_level, d_stats, gamma_params, at) end_time = time.time() time_duration[f"plot_gamma_distribution_{at}"] = end_time - start_time if self.verbose >= 2 or self.debug: key = f"plot_gamma_distribution_{at}" msg = f"Plotted Gamma distributions in {time_duration[key]} seconds" self.logger.debug(msg) if self.debug else self.logger.info(msg) res[at] = { "time_duration": time_duration, "outlier_flags": outlier_flags, "d_stats": d_stats, "p_values": p_values, "r2_values": r2_values, "gamma_params": gamma_params, } return res
[docs] def run_multistage( self, geo_coords, gen_coords, sig_level, min_nn_dist, scale_factor, max_iter, optk, analysis_type, time_durations, ): """Run multi-stage outlier detection using KNN. This method runs multi-stage outlier detection using KNN for genetic and geographic data. Args: geo_coords (np.array): Array of geographic coordinates. gen_coords (np.array): Array of genetic data coordinates. sig_level (float): Significance level for detecting outliers. min_nn_dist (int): Minimum distance required to consider points. scale_factor (int): Scaling factor for geo coordinates. max_iter (int): Maximum number of iterations. optk (int): Optimal K value for nearest neighbors. analysis_type (str): Type of analysis to perform (genetic, geographic, composite). time_durations (dict): Dictionary to store run times for each method. Returns: tuple: Time durations, outlier flags, D statistics, p-values, r-squared values, and gamma parameters """ outlier_flags = np.zeros(len(gen_coords), dtype=bool) allow = True break_msg = "No new outliers detected. Terminating iteration." iteration = 0 while iteration < max_iter: iteration += 1 new_outliers_detected = False if self.verbose >= 1: self.logger.info( f"Iteration {iteration} of multi-stage outlier detection.\n\n" ) ( time_durations, current_outliers, d_stats, p_values, r2_values, gamma_params, filtered_indices, ) = self.filter_and_detect( geo_coords, gen_coords, sig_level, min_nn_dist, scale_factor, optk, outlier_flags, time_durations, analysis_type, ) # Map indices from filtered to original if current_outliers is not None and current_outliers.size > 0: if np.all(p_values > sig_level): if self.verbose >= 1: self.logger.info(break_msg) break actual_outlier_indices = filtered_indices[current_outliers] # Update the flags in the original arrays outlier_flags[actual_outlier_indices] = True new_outliers_detected = True else: allow = False if not new_outliers_detected or not allow: if self.verbose >= 1: self.logger.info(break_msg) break # No significant outliers. if np.all(p_values) > sig_level: if self.verbose >= 1: self.logger.info(break_msg) break if self.verbose >= 2 or self.debug: msg = f"Finished iteration {iteration}." self.logger.debug(msg) if self.debug else self.logger.info(msg) return ( time_durations, outlier_flags, d_stats, p_values, r2_values, gamma_params, )
[docs] def filter_and_detect( self, geo_coords, gen_coords, sig_level, min_nn_dist, scale_factor, optk, outlier_flags, time_durations, analysis_type, ): """Filter outliers and detect new outliers using KNN. This method filters outliers and detects new outliers using KNN for genetic and geographic data. Args: geo_coords (np.array): Array of geographic coordinates. gen_coords (np.array): Array of genetic data coordinates. sig_level (float): Significance level for detecting outliers. min_nn_dist (int): Minimum distance required to consider points. scale_factor (int): Scaling factor for geo coordinates. optk (int): Optimal K value for nearest neighbors. outlier_flags (np.array): Array of outlier flags. time_durations (dict): Dictionary to store run times for each method. analysis_type (str): Type of analysis to perform (genetic, geographic, composite). Returns: tuple: Time durations, outlier flags, D statistics, p-values, r-squared values, gamma parameters, and filtered indices """ non_outlier_mask = ~outlier_flags # Keep track of original indices original_indices = np.arange(len(geo_coords)) filtered_indices = original_indices[non_outlier_mask] # Get only non-outliers. filtered_geo_coords = geo_coords[non_outlier_mask] filtered_gen_coords = gen_coords[non_outlier_mask] ( time_durations, current_outliers, d_stats, p_values, r2_values, gamma_params, ) = self.detect_outliers( filtered_geo_coords, filtered_gen_coords, optk, time_durations, w_power=2, sig_level=sig_level, min_nn_dist=min_nn_dist, scale_factor=scale_factor, analysis_type=analysis_type, ) return ( time_durations, current_outliers, d_stats, p_values, r2_values, gamma_params, filtered_indices, )
[docs] def search_nn_optk( self, geo_coords, gen_coords, maxk, w_power, min_nn_dist, scale_factor, analysis_type, ): """Search for optimal K for nearest neighbors. This method searches for the optimal K for nearest neighbors based on the Dg or Dgeo statistic. Args: geo_coords (np.array): Array of geographic coordinates. gen_coords (np.array): Array of genetic data coordinates. maxk (int): Maximum number of nearest neighbors to consider. w_power (float): Power of distance weight in KNN prediction. min_nn_dist (int): Minimum distance required to consider points. scale_factor (int): Scaling factor for geo coordinates. analysis_type (str): Type of analysis to perform (genetic, geographic, composite). Returns: tuple: Time durations, optimal K for genetic data, and optimal K for geographic data. """ time_durations = {} optk_gen = None optk_geo = None if analysis_type in ["genetic", "composite"]: # Step 1: Find optimal K for both genetic and geographic data start_time = time.time() optk_gen = self.find_optimal_k( geo_coords, gen_coords, (2, maxk), w_power, min_nn_dist, is_genetic=True, scale_factor=scale_factor, ) end_time = time.time() time_durations["find_optimal_k_genetic"] = end_time - start_time if analysis_type in ["geographic", "composite"]: start_time = time.time() optk_geo = self.find_optimal_k( geo_coords, gen_coords, (2, maxk), w_power, min_nn_dist, is_genetic=False, scale_factor=scale_factor, ) end_time = time.time() time_durations["find_optimal_k_geographic"] = end_time - start_time return time_durations, optk_gen, optk_geo
[docs] def plot_gamma_dist(self, sig_level, d_stats, gamma_params, dtype): """Plot the gamma distribution for detected outliers. This method plots the gamma distribution for detected outliers based on the Dg or Dgeo statistic. Args: sig_level (float): Significance level for detecting outliers. d_stats (np.array): Dg or Dgeo statistic for each sample. gamma_params (tuple): Parameters for the gamma distribution. dtype (str): Type of data (genetic, geographic """ outdir = path.join(self.output_dir, "plots") fn = path.join(outdir, f"{self.prefix}_gamma_{dtype}.png") self.plotting.plot_gamma_distribution( gamma_params[0], gamma_params[1], d_stats, sig_level, fn, f"{dtype.capitalize()} Outlier Gamma Distribution", )
[docs] def detect_outliers( self, geo_coords, gen_coords, optk, time_durations, w_power=2, sig_level=0.05, min_nn_dist=1000, scale_factor=100, analysis_type="genetic", ): """Detect outliers based on composite data using the KNN approach. This method detects outliers based on composite data using the KNN approach. Args: geo_coords (np.array): Array of geographic coordinates. gen_coords (np.array): Array of genetic data coordinates. optk (int): Optimal K for nearest neigbbors (geographic). time_durations (dict): Dictionary storing run times for each method. w_power (float): Power of distance weight in KNN prediction. sig_level (float): Significance level for detecting outliers. min_nn_dist (int): Minimum distance required to consider points. scale_factor (int): Scaling factor for geo coordinates. analysis_type (str): Either 'genetic' or 'geographic'. Returns: tuple: Indices of detected outliers, p-values, and gamma parameters for geographic and genetic outliers. Returns: tuple: Indices of detected outliers, p-values, and gamma parameters for geographic and genetic outliers. """ if analysis_type == "genetic": dependent_data = gen_coords independent_data = geo_coords knn_func = self.find_geo_knn is_genetic = True elif analysis_type == "geographic": dependent_data = geo_coords independent_data = gen_coords knn_func = self.find_gen_knn is_genetic = False else: msg = f"Invalid 'analysis_type' parameter provided: {analysis_type}" self.logger.error(msg) raise ValueError(msg) # Step 2: Find KNN based on distances start_time = time.time() knn_indices, dist = knn_func(independent_data, optk, min_nn_dist) end_time = time.time() time_durations[f"find_knn_{analysis_type}"] = end_time - start_time if self.verbose >= 2 or self.debug: msg = f"Found KNN indices in: {time_durations[key]} seconds" key = f"find_knn_{analysis_type}" self.logger.debug(msg) if self.debug else self.logger.info(msg) # Step 3: Predict using weighted KNN start_time = time.time() predictions = self.predict_coords_knn( dependent_data, dist, knn_indices, w_power ) end_time = time.time() time_durations[f"predict_knn_{analysis_type}"] = end_time - start_time if self.verbose >= 2 or self.debug: msg = f"Predicted using weighted KNN: {time_durations[key]} seconds" key = f"predict_knn_{analysis_type}" self.logger.debug(msg) if self.debug else self.logger.info(msg) # Step 4: Calculate D statistics and detect outliers start_time = time.time() d, min_nn_dist, scale_factor, r2_vals = self.calculate_statistic( predictions, dependent_data, is_genetic=is_genetic, min_nn_dist=min_nn_dist, scale_factor=scale_factor, ) end_time = time.time() time_durations[f"calculate_statistic_{analysis_type}"] = end_time - start_time if self.verbose >= 2 or self.debug: msg = f"Calculated D statistics in: {time_durations[key]} seconds" key = f"calculate_statistic_{analysis_type}" self.logger.debug(msg) if self.debug else self.logger.info(msg) if self.verbose >= 1: self.logger.info( f"r-squared for {analysis_type} outlier detection: {r2_vals}" ) # Step 5: Fit Gamma distribution and detect outliers start_time = time.time() outliers, p_values, gamma_params = self.fit_gamma_mle(d, sig_level) end_time = time.time() time_durations[f"fit_gamma_{analysis_type}"] = end_time - start_time if self.verbose >= 2 or self.debug: key = f"fit_gamma_{analysis_type}" msg = f"Fitted gamma distribution for {analysis_type} outliers in: {time_durations[key]} seconds" self.logger.debug(msg) if self.debug else self.logger.info(msg) return ( time_durations, outliers, d, p_values, r2_vals, gamma_params, )
[docs] def composite_outlier_detection( self, sig_level=0.05, maxk=50, min_nn_dist=1000, scale_factor=100, w_power=2 ): """Perform composite outlier detection using the KNN approach. This method performs composite outlier detection using the KNN approach. Args: sig_level (float): Significance level for detecting outliers. maxk (int): Maximum number of nearest neighbors to consider. min_nn_dist (int): Minimum distance required to consider points. scale_factor (int): Scaling factor for geo coordinates. w_power (float): Power of distance weight in KNN prediction. Returns: dict: Detected outliers for geographic and genetic data. """ if self.verbose >= 1: self.logger.info("Starting composite outlier detection...") dgen = self.genetic_data dgeo = self.geographic_data outliers = {} ( outliers["geographic"], outliers["genetic"], _, ) = self.multi_stage_outlier_knn( dgeo, dgen, analysis_type="composite", sig_level=sig_level, maxk=maxk, w_power=w_power, min_nn_dist=min_nn_dist, scale_factor=scale_factor, ) if self.verbose >= 1: self.logger.info("Outlier detection completed.") return outliers