Source code for parepy_toolbox.common_library

"""Common library contains utility functions for PAREpy's framework.
"""
from typing import Optional, Callable

import scipy as sc
import numpy as np
import pandas as pd

import parepy_toolbox.distributions as parepydi


[docs] def std_matrix(std: list) -> tuple[np.ndarray, np.ndarray]: """ Extract D matrix and D^-1 matrix from a list of variables. Used in Y to X or X to Y transformation. :param std: Standard deviation parameters. :return: output[0] = D matrix (Diagonal standard deviation matrix), output[1] = D^-1 matrix (Inverse of diagonal standard deviation matrix). """ dneq = np.zeros((len(std), len(std))) dneq1 = np.zeros((len(std), len(std))) for i, sigma in enumerate(std): dneq[i, i] = sigma dneq1[i, i] = 1 / sigma return dneq, dneq1
[docs] def mu_matrix(mean: list) -> np.ndarray: """ Extract mean matrix from a list of variables. Used in Y to X or X to Y transformation. :param mu: Mean parameters. :return: Mean matrix. """ mu_neq = np.zeros((len(mean), 1)) for i, mu in enumerate(mean): mu_neq[i, 0] = mu return mu_neq
[docs] def x_to_y(x: np.ndarray, dneq1: np.ndarray, mu_neq: np.ndarray) -> np.ndarray: """ Transforms a vector of random variables from the X space to the Y space. :param x: Random variables in the X space. :param dneq1: D^-1 matrix (Inverse of diagonal standard deviation matrix). :param mu_neq: Mean matrix. :return: Transformed random variables in the Y space. """ return dneq1 @ (x - mu_neq)
[docs] def y_to_x(y: np.ndarray, dneq: np.ndarray, mu_neq: np.ndarray) -> np.ndarray: """ Transforms a vector of random variables from the Y space to the X space. :param y: Random variables in the Y space. :param dneq: D matrix. :param mu_neq: Mean matrix. :return: Transformed random variables in the X space. """ return dneq @ y + mu_neq
[docs] def pf_equation(beta: float) -> float: """ Calculates the probability of failure (pf) for a given reliability index (β), using the cumulative distribution function (CDF) of the standard normal distribution. :param beta: Reliability index (β). :return: Probability of failure (pf). Example ============== >>> # pip install -U parepy-toolbox >>> from parepy_toolbox import pf_equation >>> beta = 3.5 >>> pf = pf_equation(beta) >>> print(f"Probability of failure: {pf:.5e}") Probability of failure: 2.32629e-04 """ return sc.stats.norm.cdf(-beta)
[docs] def beta_equation(pf: float) -> float: """ Calculates the reliability index value for a given probability of failure (pf), using the inverse cumulative distribution function (ICDF) of the standard normal distribution. :param pf: Probability of failure (pf). :return: Reliability index (β). Example ============== >>> # pip install -U parepy-toolbox >>> from parepy_toolbox import beta_equation >>> pf = 2.32629e-04 >>> beta = beta_equation(pf) >>> print(f"Reliability index: {beta:.5f}") Reliability index: 3.50000 """ return -sc.stats.norm.ppf(pf)
[docs] def first_second_order_derivative_numerical_differentiation_unidimensional(obj: Callable, x: list, pos: str, method: str, order: str = 'first', h: float = 1E-5, args: Optional[tuple] = None) -> float: """ Computes the numerical derivative of a function at a given point in the given dimension using the central, backward and forward difference method. :param obj: The objective function: obj(x, args) -> float or obj(x) -> float, where x is a list with shape n and args is a tuple fixed parameters needed to completely specify the function. :param x: Point at which to evaluate the derivative. :param pos: Dimension in the list x where the derivative is to be calculated. When use order 'xy', pos is a str contain first and second dimension separated by a comma (e.g., '0,1' for the first and second dimensions). :param method: Method to use for differentiation. Supported values: 'center', 'forward', or 'backward'. :param order: Order of the derivative to compute (default is first for first-order derivative). Supported values: 'first', 'second', or 'xy'. :param h: Step size for the finite difference approximation (default is 1e-10). :param args: Extra arguments to pass to the objective function (optional). :return: Numerical derivative of order n of the function at point x in dimension pos. """ if order == 'first': a = x.copy() b = x.copy() if method == "forward": for i in range(len(x)): if i == int(pos): a[i] += h den = h elif method == "backward": for i in range(len(x)): if i == int(pos): b[i] -= h den = h elif method == "center": for i in range(len(x)): if i == int(pos): a[i] += h b[i] -= h den = 2 * h fa = obj(a, args) if args is not None else obj(a) fb = obj(b, args) if args is not None else obj(b) diff = (fa - fb) / den elif order == 'second': a = x.copy() b = x.copy() x_aux = x.copy() den = h ** 2 if method == "forward": for i in range(len(x)): if i == int(pos): a[i] += 2*h x_aux[i] += h fa = obj(a, args) if args is not None else obj(a) fx = obj(x_aux, args) if args is not None else obj(x_aux) fb = obj(b, args) if args is not None else obj(b) elif method == "backward": for i in range(len(x)): if i == int(pos): b[i] -= 2*h x_aux[i] -= h fa = obj(a, args) if args is not None else obj(a) fx = obj(x_aux, args) if args is not None else obj(x_aux) fb = obj(b, args) if args is not None else obj(b) elif method == "center": for i in range(len(x)): if i == int(pos): a[i] += h b[i] -= h fa = obj(a, args) if args is not None else obj(a) fx = obj(x_aux, args) if args is not None else obj(x_aux) fb = obj(b, args) if args is not None else obj(b) diff = (fa - 2 * fx + fb) / den elif order == 'xy': pos_x = int(pos.split(',')[0]) pos_y = int(pos.split(',')[1]) a = x.copy() a[pos_x] += h a[pos_y] += h b = x.copy() b[pos_x] += h b[pos_y] -= h c = x.copy() c[pos_x] -= h c[pos_y] += h d = x.copy() d[pos_x] -= h d[pos_y] -= h fa = obj(a, args) if args is not None else obj(a) fb = obj(b, args) if args is not None else obj(b) fc = obj(c, args) if args is not None else obj(c) fd = obj(d, args) if args is not None else obj(d) diff = (fa - fb - fc + fd) / (4 * h ** 2) return diff
[docs] def jacobian_matrix(obj: Callable, x: list, method: str, h: float = 1E-5, args: Optional[tuple] = None) -> np.ndarray: """ Computes Jacobian matrix of a vector-valued function using finite difference methods. :param obj: The objective function: obj(x, args) -> float or obj(x) -> float, where x is a list with shape n and args is a tuple fixed parameters needed to completely specify the function. :param x: Point at which to evaluate the derivative. :param method: Method to use for differentiation. Supported values: 'center', 'forward', or 'backward'. :param h: Step size for the finite difference approximation (default is 1e-10). :param args: Extra arguments to pass to the objective function (optional). :return: Numerical Jacobian matrix at point x. """ jacob = np.zeros((len(x), 1)) for i in range(len(x)): jacob[i, 0] = first_second_order_derivative_numerical_differentiation_unidimensional(obj, x, str(i), method, 'first', h=h, args=args) if args is not None else first_second_order_derivative_numerical_differentiation_unidimensional(obj, x, str(i), method, 'first', h=h) return jacob
[docs] def hessian_matrix(obj: Callable, x: list, method: str, h: float = 1E-5, args: Optional[tuple] = None) -> np.ndarray: """ Computes Hessian matrix of a vector-valued function using finite difference methods. :param obj: The objective function: obj(x, args) -> float or obj(x) -> float, where x is a list with shape n and args is a tuple fixed parameters needed to completely specify the function. :param x: Point at which to evaluate the derivative. :param method: Method to use for differentiation. Supported values: 'center', 'forward', or 'backward'. :param h: Step size for the finite difference approximation (default is 1e-10). :param args: Extra arguments to pass to the objective function (optional). :return: Numerical Hessian matrix at point x. """ hessian = np.zeros((len(x), len(x))) for i in range(len(x)): for j in range(len(x)): if i == j: hessian[i, j] = first_second_order_derivative_numerical_differentiation_unidimensional(obj, x, str(i), method, 'second', h=h, args=args) if args is not None else first_second_order_derivative_numerical_differentiation_unidimensional(obj, x, str(i), method, 'second', h=h) else: hessian[i, j] = first_second_order_derivative_numerical_differentiation_unidimensional(obj, x, f'{j},{i}', method, 'xy', h=h, args=args) if args is not None else first_second_order_derivative_numerical_differentiation_unidimensional(obj, x, f'{j},{i}', method, 'xy', h=h) return hessian
[docs] def sampling_kernel_without_time(obj: Callable, random_var_settings: list, method: str, n_samples: int, number_of_limit_functions: int, args: Optional[tuple] = None) -> pd.DataFrame: """ Generates random samples from a specified distribution using kernel density estimation. This sampling generator not consider time series :param obj: The objective function: :py:func:`obj(x, args) -> float` or :py:func:`obj(x) -> float`, where ``x`` is a list with shape *n* and args is a tuple fixed parameters needed to completely specify the function. :param random_var_settings: Containing the distribution type and parameters. Example: ``{"type": "normal", "parameters": {"mean": 0, "std": 1}}``. Supported distributions (See more details in Table 1): (a) ``"uniform ``, (b) ``"normal"``, (c) ``"lognormal'``: keys ``'mean'`` and ``'std'``, (d) ``'gumbel max'``: keys ``'mean'`` and ``'std'``, (e) ``'gumbel min'``: keys ``'mean'`` and ``'std'``, (f) ``'triangular'``: keys ``'min'``, ``'mode'`` and ``'max'``, or (g) ``'gamma'``: keys ``'mean'`` and ``'std'``. :param method: Sampling method. Supported values: ``"lhs"`` (Latin Hypercube Sampling), ``"mcs"`` (Crude Monte Carlo Sampling) or ``"sobol"`` (Sobol Sampling). :param n_samples: Number of samples. For Sobol sequences, this variable represents the exponent *m* (:math:`n = 2^m`). :param number_of_limit_functions: Number of limit state functions or constraints. :param args: Extra arguments to pass to the objective function (optional). :return: Random samples, objective function evaluations and indicator functions. .. list-table:: **Table 1: Supported values** :widths: 20 40 :header-rows: 1 * - **Name** - **Expected parameters** * - ``"uniform"`` - ``"min"`` and ``"max"`` * - ``"normal"`` - ``"mean"`` and ``"std"`` * - ``"lognormal"`` - ``"mean"`` and ``"std"`` * - ``"gumbel max"`` - ``"mean"`` and ``"std"`` * - ``"gumbel min"`` - ``"mean"`` and ``"std"`` * - ``"triangular"`` - ``"min"``, ``"mode"``, and ``"max"`` * - ``"gamma"`` - ``"mean"`` and ``"std"`` Example ============== >>> # pip install -U parepy-toolbox >>> from parepy_toolbox import sampling_kernel_without_time >>> def obj(x): # We reccomend to create this py function in other .py file when use parellel process and a .ipynb code g_0 = 12.5 * x[0] ** 3 - x[1] return [g_0] >>> d = {'type': 'normal', 'parameters': {'mean': 1.0, 'std': 0.1}} >>> l = {'type': 'normal', 'parameters': {'mean': 10.0, 'std': 1.0}} >>> var = [d, l] >>> number_of_limit_functions = 1 >>> method = 'mcs' >>> n_samples = 10000 >>> start = time.perf_counter() >>> df = sampling_kernel_without_time(obj, var, method, n_samples, number_of_limit_functions) >>> end = time.perf_counter() >>> print(f"Time elapsed: {(end-start):.5f}") >>> print(df) X_0 X_1 G_0 I_0 0 1.193612 10.539209 10.717671 0.0 1 1.041650 10.441663 3.686170 0.0 2 1.133054 9.232075 8.950766 0.0 3 0.983667 10.080005 1.817470 0.0 4 0.908051 10.095981 -0.736729 1.0 ... ... ... ... ... 9995 1.016563 9.815083 3.316392 0.0 9996 0.998764 9.623686 2.830013 0.0 9997 0.826956 9.338711 -2.269712 1.0 9998 1.060813 9.721774 5.200211 0.0 9999 1.107219 9.239544 7.727668 0.0 [10000 rows x 4 columns] """ n_real_samples = 2**n_samples if method == 'sobol' else n_samples random_data = np.zeros((n_real_samples, len(random_var_settings))) # Generate random samples for each variable for i, dist_info in enumerate(random_var_settings): random_data[:, i] = parepydi.random_sampling(dist_info['type'], dist_info['parameters'], method, n_samples) # Evaluate objective function for each sample g_matrix = np.zeros((n_real_samples, number_of_limit_functions)) indicator_matrix = np.zeros_like(g_matrix) for idx, sample in enumerate(random_data): g_values = obj(list(sample), args) if args is not None else obj(list(sample)) g_matrix[idx, :] = g_values indicator_matrix[idx, :] = [1 if g <= 0 else 0 for g in g_values] # Build DataFrame df = pd.DataFrame(random_data, columns=[f'X_{i}' for i in range(len(random_var_settings))]) for j in range(number_of_limit_functions): df[f'G_{j}'] = g_matrix[:, j] df[f'I_{j}'] = indicator_matrix[:, j] # dataset_x = {} # for i, value in enumerate(random_var_settings): # dataset_x[f'X_{i}'] = parepydi.random_sampling(value['type'], value['parameters'], method, n_samples) # random_data = pd.DataFrame(dataset_x) # results = random_data.apply(lambda row: obj(list(row), args), axis=1) if args is not None else random_data.apply(lambda row: obj(list(row)), axis=1) # g_names = [] # for i in range(number_of_limit_functions): # g_names.append(f'G_{i}') # random_data[g_names] = pd.DataFrame(results.tolist(), index=random_data.index) # for col in g_names: # random_data[f'I_{col}'] = np.where(random_data[col] <= 0, 1, 0) return df
[docs] def summarize_pf_beta(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: """ Generates a summary DataFrame containing the probability of failure (pf) and reliability index (β) for each indicator function column in the input DataFrame. :param df: Random samples, objective function evaluations and indicator functions. :return: output [0] = Probability of failure values for each indicator function, output[1] = Reliability index values for each indicator function. """ pf_values = {} beta_values = {} for col in df.columns: if col.startswith("I_"): idx = col.split("_")[1] pf = df[col].mean() beta = beta_equation(pf) pf_values[f"pf_{idx}"] = pf beta_values[f"beta_{idx}"] = beta pf_df = pd.DataFrame([pf_values]) beta_df = pd.DataFrame([beta_values]) return pf_df, beta_df
[docs] def convergence_probability_failure(df: pd.DataFrame, column: str) -> tuple[list, list, list, list, list]: """ Calculates the convergence rate of the probability of failure. :param df: Random samples, objective function evaluations and indicator functions. :param column: Name of the column to be analyzed. Supported values: ``"I_0"``, ``"I_1"``, ..., ``"I_n"`` where *n* is the number of limit state functions. :return: output[0] = Sample size, output[1] = Mean, output[2] = Lower confidence limit, output[3] = Upper confidence limit, output[4] = Variance. Example ============== >>> # pip install -U parepy-toolbox >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from parepy_toolbox import sampling_algorithm_structural_analysis, common_library >>> >>> def obj(x): >>> return [12.5 * x[0]**3 - x[1]] >>> >>> d = {'type': 'normal', 'parameters': {'mean': 1., 'std': 0.1}} >>> l = {'type': 'normal', 'parameters': {'mean': 10., 'std': 1.}} >>> var = [d, l] >>> >>> df, pf, beta = sampling_algorithm_structural_analysis( ... obj, var, method='lhs', n_samples=50000, ... number_of_limit_functions=1, parallel=False, verbose=False ... ) >>> div, pf_mean, ci_lower, ci_upper, pf_var = common_library.convergence_probability_failure(df, 'I_0') >>> >>> print("Sample sizes considered at each step:", div) >>> print("Estimated probability of failure (mean):", pf_mean) >>> print("Lower confidence interval values:", ci_lower) >>> print("Upper confidence interval values:", ci_upper) >>> print("Variance values:", pf_var) >>> >>> plt.figure(figsize=(10, 6)) >>> plt.plot(div, pf_mean, label='Probability of Failure (Mean)', color='blue') >>> plt.fill_between(div, ci_lower, ci_upper, color='lightblue', alpha=0.5, label='Confidence Interval (95%)') >>> plt.xlabel('Number of Samples') >>> plt.ylabel('Probability of Failure') >>> plt.title('Convergence of Probability of Failure') >>> plt.legend() >>> plt.grid(True) >>> plt.savefig('convergence_probability_failure.png') >>> plt.show() Sample sizes considered at each step: [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000, 13000, 14000, 15000, 16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000, 24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000, 35000, 36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000, 46000, 47000, 48000, 49000, 50000] Estimated probability of failure (mean): [np.float64(0.242), np.float64(0.247), np.float64(0.247), np.float64(0.24725), np.float64(0.2472), np.float64(0.24633333333333332), np.float64(0.24757142857142858), np.float64(0.2485), np.float64(0.24933333333333332), np.float64(0.2495), np.float64(0.24836363636363637), np.float64(0.2475), np.float64(0.24769230769230768), np.float64(0.24671428571428572), np.float64(0.24713333333333334), np.float64(0.2471875), np.float64(0.24670588235294116), np.float64(0.24688888888888888), np.float64(0.24705263157894736), np.float64(0.24675), np.float64(0.2464761904761905), np.float64(0.24659090909090908), np.float64(0.24643478260869564), np.float64(0.24654166666666666), np.float64(0.24656), np.float64(0.2463076923076923), np.float64(0.2462962962962963), np.float64(0.24632142857142858), np.float64(0.24624137931034482), np.float64(0.2462), np.float64(0.2461290322580645), np.float64(0.24603125), np.float64(0.24575757575757576), np.float64(0.24582352941176472), np.float64(0.24568571428571429), np.float64(0.24588888888888888), np.float64(0.24572972972972973), np.float64(0.24568421052631578), np.float64(0.24546153846153845), np.float64(0.245525), np.float64(0.2454390243902439), np.float64(0.24542857142857144), np.float64(0.24527906976744185), np.float64(0.24525), np.float64(0.24513333333333334), np.float64(0.2450217391304348), np.float64(0.24506382978723404), np.float64(0.24485416666666668), np.float64(0.24512244897959184), np.float64(0.24518)] Lower confidence interval values: [np.float64(0.21540903357963798), np.float64(0.228083068360113), np.float64(0.23155879695800996), np.float64(0.23387488924574903), np.float64(0.23523877319727374), np.float64(0.2354277842195651), np.float64(0.23745823999749613), np.float64(0.23902839339999074), np.float64(0.240393629754369), np.float64(0.24101732156483296), np.float64(0.24028817804264055), np.float64(0.23977745889759358), np.float64(0.24027087812793255), np.float64(0.2395723793872807), np.float64(0.24022971642358976), np.float64(0.2405026573074099), np.float64(0.24022492194668174), np.float64(0.24058899248018165), np.float64(0.24091942715838915), np.float64(0.24077458068625188), np.float64(0.24064697923770598), np.float64(0.24089485014120868), np.float64(0.24086513272715937), np.float64(0.24108850709933902), np.float64(0.24121689821472678), np.float64(0.24107016841955742), np.float64(0.24115677125173032), np.float64(0.24127435324748092), np.float64(0.24128263893287646), np.float64(0.24132489116477843), np.float64(0.24133367312548154), np.float64(0.24131205280692852), np.float64(0.24111218162959616), np.float64(0.2412465527840086), np.float64(0.24117545601885282), np.float64(0.24144048105524066), np.float64(0.241342810362619), np.float64(0.24135567423316676), np.float64(0.24119016788789038), np.float64(0.24130699633542), np.float64(0.24127327385658773), np.float64(0.24131277514874505), np.float64(0.2412122532337765), np.float64(0.24122982735611062), np.float64(0.24115872214389933), np.float64(0.24109117527801505), np.float64(0.24117508276361788), np.float64(0.24100725590087446), np.float64(0.24131358898507338), np.float64(0.2414091247502843)] Upper confidence interval values: [np.float64(0.268590966420362), np.float64(0.265916931639887), np.float64(0.26244120304199003), np.float64(0.26062511075425093), np.float64(0.2591612268027263), np.float64(0.2572388824471016), np.float64(0.257684617145361), np.float64(0.25797160660000923), np.float64(0.25827303691229764), np.float64(0.25798267843516703), np.float64(0.2564390946846322), np.float64(0.25522254110240644), np.float64(0.25511373725668285), np.float64(0.25385619204129073), np.float64(0.25403695024307693), np.float64(0.25387234269259007), np.float64(0.25318684275920056), np.float64(0.25318878529759614), np.float64(0.25318583599950556), np.float64(0.25272541931374815), np.float64(0.252305401714675), np.float64(0.25228696804060946), np.float64(0.2520044324902319), np.float64(0.25199482623399433), np.float64(0.2519031017852732), np.float64(0.2515452161958272), np.float64(0.2514358213408623), np.float64(0.25136850389537624), np.float64(0.25120011968781314), np.float64(0.25107510883522155), np.float64(0.2509243913906475), np.float64(0.2507504471930715), np.float64(0.2504029698855554), np.float64(0.25040050603952085), np.float64(0.25019597255257575), np.float64(0.25033729672253713), np.float64(0.25011664909684045), np.float64(0.25001274681946484), np.float64(0.24973290903518652), np.float64(0.24974300366458), np.float64(0.24960477492390007), np.float64(0.24954436770839783), np.float64(0.24934588630110718), np.float64(0.24927017264388937), np.float64(0.24910794452276736), np.float64(0.24895230298285453), np.float64(0.2489525768108502), np.float64(0.2487010774324589), np.float64(0.2489313089741103), np.float64(0.2489508752497157)] Variance values: [np.float64(0.00018343599999999998), np.float64(9.29955e-05), np.float64(6.1997e-05), np.float64(4.6529359375e-05), np.float64(3.7218432000000004e-05), np.float64(3.09422037037037e-05), np.float64(2.6611402332361518e-05), np.float64(2.334346875e-05), np.float64(2.079624691358025e-05), np.float64(1.8724975e-05), np.float64(1.69708309541698e-05), np.float64(1.55203125e-05), np.float64(1.4333909877105144e-05), np.float64(1.3274739067055394e-05), np.float64(1.2403896592592592e-05), np.float64(1.1630364990234375e-05), np.float64(1.0931887645023408e-05), np.float64(1.0329709190672153e-05), np.float64(9.790401516256013e-06), np.float64(9.293221874999998e-06), np.float64(8.844079904977864e-06), np.float64(8.444719665664914e-06), np.float64(8.074116544752198e-06), np.float64(7.739953052662036e-06), np.float64(7.430726656e-06), np.float64(7.140008192990441e-06), np.float64(6.8753492861860486e-06), np.float64(6.630256514212828e-06), np.float64(6.4002262905408166e-06), np.float64(6.186185333333334e-06), np.float64(5.985468765734618e-06), np.float64(5.796871063232422e-06), np.float64(5.61699362773743e-06), np.float64(5.452774170567881e-06), np.float64(5.294978402332362e-06), np.float64(5.150765089163237e-06), np.float64(5.00936836909956e-06), np.float64(4.876933663799388e-06), np.float64(4.748978758913671e-06), np.float64(4.6310618593749995e-06), np.float64(4.517041699917296e-06), np.float64(4.409366375121477e-06), np.float64(4.305052272126982e-06), np.float64(4.206873579545455e-06), np.float64(4.112066271604938e-06), np.float64(4.021436662694173e-06), np.float64(3.9363308322818646e-06), np.float64(3.852095911096644e-06), np.float64(3.776274162976311e-06), np.float64(3.7013353520000004e-06)] .. image:: _static/convergence_probability_failure.png :alt: Convergence of Probability of Failure :align: center :width: 700px """ column_values = df[column].to_list() step = 1000 div = [i for i in range(step, len(column_values), step)] m = [] ci_u = [] ci_l = [] var = [] for i in range(0, len(div)+1): if i == len(div): aux = column_values.copy() div.append(len(column_values)) else: aux = column_values[:div[i]] mean = np.mean(aux) std = np.std(aux, ddof=1) n = len(aux) confidence_level = 0.95 t_critic = sc.stats.t.ppf((1 + confidence_level) / 2, df=n-1) margin = t_critic * (std / np.sqrt(n)) confidence_interval = (mean - margin, mean + margin) m.append(mean) ci_u.append(confidence_interval[1]) ci_l.append(confidence_interval[0]) var.append((mean * (1 - mean))/n) return div, m, ci_l, ci_u, var
[docs] def calculate_weights(df: pd.DataFrame, random_var_settings: list, random_var_settings_importance_sampling: list) -> pd.DataFrame: n_vars = len(random_var_settings) df_copy = df.copy() for j in range(n_vars): col = f"X_{j}" x_j = df_copy[col].tolist() p_info = random_var_settings[j] q_info = random_var_settings_importance_sampling[j] df_copy[f"p_X_{j}"] = parepydi.random_sampling_statistcs(p_info['type'], p_info['parameters'], x_j) df_copy[f"q_X_{j}"] = parepydi.random_sampling_statistcs(q_info['type'], q_info['parameters'], x_j) df_copy['num'] = 1 df_copy['den'] = 1 for col in df_copy.columns: if col.startswith('p_X_'): df_copy['num'] *= df_copy[col] elif col.startswith('q_X_'): df_copy['den'] *= df_copy[col] df_copy['w'] = [a / b for a, b in zip(df_copy['num'].tolist(), df_copy['den'].tolist())] cols_i = [col for col in df.columns if col.startswith('I_')] for j in cols_i: df_copy[f'w_{j}'] = np.where((df[j]==0).any(axis=1), 0, df_copy['w']) return df_copy
# def sampling(n_samples: int, model: dict, variables_setup: list) -> np.ndarray: # """ # Generates a set of random numbers according to a specified probability distribution model. # :param n_samples: Number of samples to generate. # :param model: Dictionary containing the model parameters. # :param variables_setup: List of dictionaries, each containing parameters for a random variable. # :return: Numpy array with the generated random samples. # """ # # Model settings # model_sampling = model['model sampling'].upper() # id_type = [] # id_corr = [] # for v in variables_setup: # if 'parameters' in v and 'corr' in v['parameters']: # id_type.append('g-corr-g_var') # id_corr.append(v['parameters']['corr']['var']) # else: # id_type.append('g') # for k in id_corr: # id_type[k] = 'g-corr-b_var' # if model_sampling in ['MCS', 'LHS']: # random_sampling = np.zeros((n_samples, len(variables_setup))) # for j, variable in enumerate(variables_setup): # if id_type[j] == 'g-corr-b_var': # continue # type_dist = variable['type'].upper() # seed_dist = variable['seed'] # params = variable['parameters'] # if (type_dist == 'NORMAL' or type_dist == 'GAUSSIAN') and id_type[j] == 'g': # mean = params['mean'] # sigma = params['sigma'] # parameters = {'mean': mean, 'sigma': sigma} # random_sampling[:, j] = parepydi.normal_sampling(parameters, method=model_sampling.lower(), n_samples=n_samples, seed=seed_dist) # elif (type_dist == 'NORMAL' or type_dist == 'GAUSSIAN') and id_type[j] == 'g-corr-g_var': # mean = params['mean'] # sigma = params['sigma'] # parameters_g = {'mean': mean, 'sigma': sigma} # pho = params['corr']['pho'] # m = params['corr']['var'] # parameters_b = variables_setup[m]['parameters'] # random_sampling[:, m], random_sampling[:, j] = parepydi.corr_normal_sampling(parameters_b, parameters_g, pho, method=model_sampling.lower(), n_samples=n_samples, seed=seed_dist) # elif type_dist == 'UNIFORM' and id_type[j] == 'g': # min_val = params['min'] # max_val = params['max'] # parameters = {'min': min_val, 'max': max_val} # random_sampling[:, j] = parepydi.uniform_sampling(parameters, method=model_sampling.lower(), n_samples=n_samples, seed=seed_dist) # elif type_dist == 'GUMBEL MAX' and id_type[j] == 'g': # mean = params['mean'] # sigma = params['sigma'] # parameters = {'mean': mean, 'sigma': sigma} # random_sampling[:, j] = parepydi.gumbel_max_sampling(parameters, method=model_sampling.lower(), n_samples=n_samples, seed=seed_dist) # elif type_dist == 'GUMBEL MIN' and id_type[j] == 'g': # mean = params['mean'] # sigma = params['sigma'] # parameters = {'mean': mean, 'sigma': sigma} # random_sampling[:, j] = parepydi.gumbel_min_sampling(parameters, method=model_sampling.lower(), n_samples=n_samples, seed=seed_dist) # elif type_dist == 'LOGNORMAL' and id_type[j] == 'g': # mean = params['mean'] # sigma = params['sigma'] # parameters = {'mean': mean, 'sigma': sigma} # random_sampling[:, j] = parepydi.lognormal_sampling(parameters, method=model_sampling.lower(), n_samples=n_samples, seed=seed_dist) # elif type_dist == 'TRIANGULAR' and id_type[j] == 'g': # min_val = params['min'] # max_val = params['max'] # mode = params['mode'] # parameters = {'min': min_val, 'max': max_val, 'mode': mode} # random_sampling[:, j] = parepydi.triangular_sampling(parameters, method=model_sampling.lower(), n_samples=n_samples, seed=seed_dist) # elif model_sampling in ['MCS-TIME', 'MCS_TIME', 'MCS TIME', 'LHS-TIME', 'LHS_TIME', 'LHS TIME']: # time_analysis = model['time steps'] # random_sampling = np.empty((0, len(variables_setup))) # match = re.search(r'\b(MCS|LHS)\b', model_sampling.upper(), re.IGNORECASE) # model_sampling = match.group(1).upper() # for _ in range(n_samples): # temporal_sampling = np.zeros((time_analysis, len(variables_setup))) # for j, variable in enumerate(variables_setup): # if id_type[j] == 'g-corr-b_var': # continue # type_dist = variable['type'].upper() # seed_dist = variable['seed'] # sto = variable['stochastic variable'] # params = variable['parameters'] # if (type_dist == 'NORMAL' or type_dist == 'GAUSSIAN') and id_type[j] == 'g': # mean = params['mean'] # sigma = params['sigma'] # parameters = {'mean': mean, 'sigma': sigma} # if sto is False: # temporal_sampling[:, j] = parepydi.normal_sampling(parameters, method=model_sampling.lower(), n_samples=1, seed=seed_dist) # temporal_sampling[1:, j] # else: # temporal_sampling[:, j] = parepydi.normal_sampling(parameters, method=model_sampling.lower(), n_samples=time_analysis, seed=seed_dist) # elif (type_dist == 'NORMAL' or type_dist == 'GAUSSIAN') and id_type[j] == 'g-corr-g_var': # mean = params['mean'] # sigma = params['sigma'] # parameters_g = {'mean': mean, 'sigma': sigma} # pho = params['corr']['pho'] # m = params['corr']['var'] # parameters_b = variables_setup[m]['parameters'] # if sto is False: # temporal_sampling[:, m], temporal_sampling[:, j] = parepydi.corr_normal_sampling(parameters_b, parameters_g, pho, method=model_sampling.lower(), n_samples=1, seed=seed_dist) # temporal_sampling[1:, j] # temporal_sampling[1:, m] # else: # temporal_sampling[:, m], temporal_sampling[:, j] = parepydi.corr_normal_sampling(parameters_b, parameters_g, pho, method=model_sampling.lower(), n_samples=time_analysis, seed=seed_dist) # elif type_dist == 'UNIFORM' and id_type[j] == 'g': # min_val = params['min'] # max_val = params['max'] # parameters = {'min': min_val, 'max': max_val} # if sto is False: # temporal_sampling[:, j] = parepydi.uniform_sampling(parameters, method=model_sampling.lower(), n_samples=1, seed=seed_dist) # temporal_sampling[1:, j] # else: # temporal_sampling[:, j] = parepydi.uniform_sampling(parameters, method=model_sampling.lower(), n_samples=time_analysis, seed=seed_dist) # elif type_dist == 'GUMBEL MAX' and id_type[j] == 'g': # mean = params['mean'] # sigma = params['sigma'] # parameters = {'mean': mean, 'sigma': sigma} # if sto is False: # temporal_sampling[:, j] = parepydi.gumbel_max_sampling(parameters, method=model_sampling.lower(), n_samples=1, seed=seed_dist) # temporal_sampling[1:, j] # else: # temporal_sampling[:, j] = parepydi.gumbel_max_sampling(parameters, method=model_sampling.lower(), n_samples=time_analysis, seed=seed_dist) # elif type_dist == 'GUMBEL MIN' and id_type[j] == 'g': # mean = params['mean'] # sigma = params['sigma'] # parameters = {'mean': mean, 'sigma': sigma} # if sto is False: # temporal_sampling[:, j] = parepydi.gumbel_min_sampling(parameters, method=model_sampling.lower(), n_samples=1, seed=seed_dist) # temporal_sampling[1:, j] # else: # temporal_sampling[:, j] = parepydi.gumbel_min_sampling(parameters, method=model_sampling.lower(), n_samples=time_analysis, seed=seed_dist) # elif type_dist == 'LOGNORMAL' and id_type[j] == 'g': # mean = params['mean'] # sigma = params['sigma'] # parameters = {'mean': mean, 'sigma': sigma} # if sto is False: # temporal_sampling[:, j] = parepydi.lognormal_sampling(parameters, method=model_sampling.lower(), n_samples=1, seed=seed_dist) # temporal_sampling[1:, j] # else: # temporal_sampling[:, j] = parepydi.lognormal_sampling(parameters, method=model_sampling.lower(), n_samples=time_analysis, seed=seed_dist) # elif type_dist == 'TRIANGULAR' and id_type[j] == 'g': # min_val = params['min'] # max_val = params['max'] # mode = params['mode'] # parameters = {'min': min_val, 'max': max_val, 'mode': mode} # if sto is False: # temporal_sampling[:, j] = parepydi.triangular_sampling(parameters, method=model_sampling.lower(), n_samples=1, seed=seed_dist) # temporal_sampling[1:, j] # else: # temporal_sampling[:, j] = parepydi.triangular_sampling(parameters, method=model_sampling.lower(), n_samples=time_analysis, seed=seed_dist) # random_sampling = np.concatenate((random_sampling, temporal_sampling), axis=0) # time_sampling = np.zeros((time_analysis * n_samples, 1)) # cont = 0 # for _ in range(n_samples): # for m in range(time_analysis): # time_sampling[cont, 0] = int(m) # cont += 1 # random_sampling = np.concatenate((random_sampling, time_sampling), axis=1) # return random_sampling # def calc_pf_beta(df_or_path: Union[pd.DataFrame, str], numerical_model: str, n_constraints: int) -> tuple[pd.DataFrame, pd.DataFrame]: # """ # Calculates the probability of failure (pf) and reliability index (β) based on the columns of a DataFrame # that start with 'I' (indicator function). If a .txt file path is passed, this function evaluates pf and β values too. # :param df_or_path: A DataFrame containing boolean indicator columns prefixed with 'I', or a string path to a .txt file. # :param numerical_model: Dictionary containing the numerical model. # :param n_constraints: Number of limit state functions or constraints. # :return: Tuple of DataFrames: # - df_pf: probability of failure values for each column prefixed with 'G'. # - df_beta: reliability index values for each column prefixed with 'G'. # """ # # Read dataset # if isinstance(df_or_path, str) and df_or_path.endswith('.txt'): # df = pd.read_csv(df_or_path, delimiter='\t') # else: # df = df_or_path # # Calculate pf and beta values # if numerical_model.upper() in ['MCS', 'LHS']: # filtered_df = df.filter(like='I_', axis=1) # pf_results = filtered_df.mean(axis=0) # df_pf = pd.DataFrame([pf_results.to_list()], columns=pf_results.index) # beta_results = [beta_equation(pf) for pf in pf_results.to_list()] # df_beta = pd.DataFrame([beta_results], columns=pf_results.index) # elif numerical_model.upper() in ['TIME-MCS', 'TIME-LHS', 'TIME MCS', 'TIME LHS', 'MCS TIME', 'LHS TIME', 'MCS-TIME', 'LHS-TIME']: # df_pf = pd.DataFrame() # df_beta = pd.DataFrame() # for i in range(n_constraints): # filtered_df = df.filter(like=f'I_{i}', axis=1) # pf_results = filtered_df.mean(axis=0) # beta_results = [beta_equation(pf) for pf in pf_results.to_list()] # df_pf[f'G_{i}'] = pf_results.to_list() # df_beta[f'G_{i}'] = beta_results # return df_pf, df_beta # def fbf(algorithm: str, n_constraints: int, time_analysis: int, results_about_data: pd.DataFrame) -> tuple[pd.DataFrame, list]: # """ # This function application first barrier failure algorithm. # :param algorithm: Name of the algorithm. # :param n_constraints: Number of constraints analyzed. # :param time_analysis: Time period for analysis. # :param results_about_data: DataFrame containing the results to be processed. # :return: Updated DataFrame after processing. # """ # if algorithm.upper() in ['MCS-TIME', 'MCS_TIME', 'MCS TIME']: # i_columns = [] # for i in range(n_constraints): # aux_column_names = [] # for j in range(time_analysis): # aux_column_names.append('I_' + str(i) + '_t=' + str(j)) # i_columns.append(aux_column_names) # for i in i_columns: # matrixx = results_about_data[i].values # for id, linha in enumerate(matrixx): # indice_primeiro_1 = np.argmax(linha == 1) # if linha[indice_primeiro_1] == 1: # matrixx[id, indice_primeiro_1:] = 1 # results_about_data = pd.concat([results_about_data.drop(columns=i), # pd.DataFrame(matrixx, columns=i)], axis=1) # else: # i_columns = [] # for i in range(n_constraints): # i_columns.append(['I_' + str(i)]) # return results_about_data, i_columns # def log_message(message: str) -> None: # """ # Logs a message with the current time. # :param message: The message to log. # :return: None # """ # current_time = datetime.now().strftime('%H:%M:%S') # print(f'{current_time} - {message}') # def norm_array(ar: list) -> float: # """ # Evaluates the norm of the array ar. # :param ar: A list of numerical values (floats) representing the array. # :return: The norm of the array. # """ # norm_ar = [i ** 2 for i in ar] # norm_ar = sum(norm_ar) ** 0.5 # return norm_ar # def hasofer_lind_rackwitz_fiessler_algorithm(y_k: np.ndarray, g_y: float, grad_y_k: np.ndarray) -> np.ndarray: # """ # Calculates the new y value using the Hasofer-Lind-Rackwitz-Fiessler algorithm. # :param y_k: Current y value. # :param g_y: Objective function at point `y_k`. # :param grad_y_k: Gradient of the objective function at point `y_k`. # :return: New y value. # """ # num = np.dot(np.transpose(grad_y_k), y_k) - np.array([[g_y]]) # print("num: ", num) # num = num[0][0] # den = (np.linalg.norm(grad_y_k)) ** 2 # print("den: ", den) # aux = num / den # y_new = aux * grad_y_k # return y_new # def goodness_of_fit(data: Union[np.ndarray, list], distributions: Union[str, list] = 'all') -> dict: # """ # Evaluates the fit of distributions to the provided data. # This function fits various distributions to the data using the distfit library and returns the top three distributions based on the fit score. # Args: # data (np.array or list): Data to which distributions will be fitted. It should be a list or array of numeric values. # distributions (str or list, optional): Distributions to be tested. If 'all', all available distributions will be tested. Otherwise, it should be a list of strings specifying the names of the distributions to test. The default is 'all'. # Returns: # dict: A dictionary containing the top three fitted distributions. Each entry is a dictionary with the following keys: # - 'rank': Ranking of the top three distributions based on the fit score. # - 'type' (str): The name of the fitted distribution. # - 'params' (tuple): Parameters of the fitted distribution. # Raises: # ValueError: If the expected 'score' column is not present in the DataFrame returned by `dist.summary()`. # """ # if distributions == 'all': # dist = distfit() # else: # dist = distfit(distr=distributions) # dist.fit_transform(data) # summary_df = dist.summary # sorted_models = summary_df.sort_values(by='score').head(3) # top_3_distributions = { # f'rank_{i+1}': { # 'type': model['name'], # 'params': model['params'] # } # for i, model in sorted_models.iterrows() # } # return top_3_distributions # def newton_raphson(f: Callable, df: Callable, x0: float, tol: float) -> float: # """ # Calculates the root of a function using the Newton-Raphson method. # :param f: Function for which the root is sought. # :param df: Derivative of the function. # :param x0: Initial guess for the root. # :param tol: Tolerance for convergence. # :return: Approximated root of the function. # """ # if abs(f(x0)) < tol: # return x0 # else: # return newton_raphson(f, df, x0 - f(x0)/df(x0), tol)