Correlated synthetic data
October 9, 2024About 2 min
Generate Correlated Synthetic Data
Select Correlated Synthetic Data
import numpy as np
import pandas as pd
import random
from deap import base, creator, tools, algorithms
class CorrSelector:
@staticmethod
def select(src_data:pd.DataFrame, corr: np.ndarray, size: int, iterate_steps: int = 100) -> pd.DataFrame:
"""
@param src_data: original dataset
@param corr: target subset correlation matrix
@param size: the size of data to extract
@param iterate_steps: steps to select the subset from original dataset, more steps, more close to the target correlation matrix
@return the result dataset
"""
# Genetic Algorithm Setup
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)
toolbox = base.Toolbox()
# Attribute: Randomly select a row index from the dataset
toolbox.register("attr_item", random.randrange, len(src_data))
# Structure initializers: Define individuals and population
subset_size = size # Size of the subset
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_item, subset_size)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
# Operator registration
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutShuffleIndexes, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)
# Define fitness function - minimize the difference between target and actual correlation
def evaluate(individual):
subset = src_data.iloc[list(individual)]
subset_corr = subset.corr().values
return np.abs(subset_corr - corr).mean(), # Minimize the mean absolute difference
toolbox.register("evaluate", evaluate)
# Evolutionary Algorithm Parameters
population = toolbox.population(n=50)
NGEN = iterate_steps # Number of generations
CXPB, MUTPB = 0.5, 0.2 # Crossover and mutation probabilities
# Run the genetic algorithm
for gen in range(NGEN):
offspring = algorithms.varAnd(population, toolbox, CXPB, MUTPB)
# Evaluate the fitness of the offspring
fits = map(toolbox.evaluate, offspring)
for fit, ind in zip(fits, offspring):
ind.fitness.values = fit
# Select the next generation of individuals
population = toolbox.select(offspring, k=len(population))
return src_data.iloc[
list(tools.selBest(population, 1)[0])
]
### Test code
def get_subset_data_and_target_corr():
# Sample data - this is your larger dataset
data = pd.DataFrame({
'A': np.random.normal(size=1000),
'B': np.random.normal(size=1000),
'C': np.random.normal(size=1000),
'D': np.random.normal(size=1000)
})
# Target correlation matrix
target_corr = np.array([[1.0, 0.8, 0.5, 0.3],
[0.8, 1.0, 0.4, 0.2],
[0.5, 0.4, 1.0, 0.6],
[0.3, 0.2, 0.6, 1.0]])
subset = CorrSelector.select(data, target_corr, 100)
return subset, target_corr
def test_corr_selector():
subset, target_corr = get_subset_data_and_target_corr()
assert np.abs(subset.corr().values - target_corr).mean() < 0.5
if __name__ == "__main__":
subset, target_corr = get_subset_data_and_target_corr()
print("The std between subset correlation matrix and target correlation matrix:", np.abs(subset.corr().values-target_corr).mean())
print()
print("target correlation matrix")
print(target_corr)
print()
print("subset correlation matrix")
print(subset.corr().values)
Generate Correlated Dataset
import numpy as np
import pandas as pd
class DataGenerator:
def __init__(self, n=10000):
self.n = n
self.correlation_matrix = np.array([
[1.00, 0.65, 0.25, 0.10, 0.30, 0.40, 0.30, 0.35, -0.25, 0.70, 0.20], # A Correlation
[0.65, 1.00, 0.35, 0.20, 0.40, 0.50, 0.45, 0.45, -0.30, 0.75, 0.25], # B Correlation
[0.25, 0.35, 1.00, 0.50, 0.25, 0.40, 0.30, 0.55, -0.60, 0.20, 0.10], # C Correlation
[0.10, 0.20, 0.50, 1.00, 0.10, 0.25, 0.15, 0.35, -0.50, 0.10, 0.05], # D Correlation
[0.30, 0.40, 0.25, 0.10, 1.00, 0.60, 0.50, 0.55, -0.20, 0.40, 0.50], # E Correlation
[0.40, 0.50, 0.40, 0.25, 0.60, 1.00, 0.65, 0.70, -0.30, 0.45, 0.55], # F Correlation
[0.30, 0.45, 0.30, 0.15, 0.50, 0.65, 1.00, 0.60, -0.25, 0.40, 0.50], # G Correlation
[0.35, 0.45, 0.55, 0.35, 0.55, 0.70, 0.60, 1.00, -0.55, 0.45, 0.30], # H Correlation
[-0.25, -0.30, -0.60, -0.50, -0.20, -0.30, -0.25, -0.55, 1.00, -0.30, -0.15], # I Correlation
[0.70, 0.75, 0.20, 0.10, 0.40, 0.45, 0.40, 0.45, -0.30, 1.00, 0.30], # G Correlation
[0.20, 0.25, 0.10, 0.05, 0.50, 0.55, 0.50, 0.30, -0.15, 0.30, 1.00] # K Correlation
])
def generate_correlated_data(self):
# Perform Cholesky decomposition
L = np.linalg.cholesky(self.correlation_matrix)
# Generate independent standard normal variables (uncorrelated)
uncorrelated_data = np.random.normal(size=(self.n, 11))
# Apply the Cholesky matrix to introduce correlations
correlated_data = uncorrelated_data @ L.T
# Introduce randomness by adding a mixture of uniform and normal distributions
random_adjustment = np.random.uniform(-0.1, 0.1, size=(self.n, 11)) # Uniform noise
correlated_data += random_adjustment # Add the noise to the correlated data
# Scaling and clipping based on schema for all indicators
data = {
'A': np.clip(0.5 + 0.2 * correlated_data[:, 0], 0, 1),
'B': np.clip(0.6 + 0.2 * correlated_data[:, 1], 0, 1),
'C': np.clip(0.5 + 0.15* correlated_data[:, 2], 0, 1),
'D': np.clip(0.6 + 0.2 * correlated_data[:, 3], 0, 1),
'E': np.clip(0.5 + 0.2 * correlated_data[:, 4], 0, 1),
'F': np.clip(0.5 + 0.2 * correlated_data[:, 5], 0, 1),
'G': np.clip(0.5 + 0.2 * correlated_data[:, 6], 0, 1),
'H': np.clip(0.6 + 0.15* correlated_data[:, 7], 0, 1),
'I': np.clip(0.4 + 0.15* correlated_data[:, 8], 0, 1),
'G': np.clip(0.5 + 0.2 * correlated_data[:, 9], 0, 1),
'K': np.clip(0.5 + 0.15* correlated_data[:, 10], 0, 1)
}
# Return as DataFrame
return pd.DataFrame(data).round(2)
#Test Code
data_generator = DataGenerator(n=50000)
indicator_df = data_generator.generate_correlated_data()