Add incomplete processing module

This commit is contained in:
coolneng 2020-12-09 22:49:18 +01:00
parent 1060677d1f
commit 0a54f7403b
Signed by: coolneng
GPG Key ID: 9893DA236405AF57
1 changed files with 195 additions and 0 deletions

195
src/P2/processing.py Normal file
View File

@ -0,0 +1,195 @@
import time
from typing import Union
from sys import argv
from matplotlib.pyplot import *
from pandas import DataFrame
from seaborn import heatmap, set_style, set_theme, pairplot
from sklearn.metrics import silhouette_score, calinski_harabasz_score
from sklearn.cluster import KMeans, Birch, AffinityPropagation, MeanShift, DBSCAN
from preprocessing import parse_data
def choose_model(
model,
) -> Union[KMeans, Birch, AffinityPropagation, MeanShift, DBSCAN, None]:
if model == "kmeans":
return KMeans(random_state=42)
elif model == "birch":
return Birch()
elif model == "affinity":
return AffinityPropagation(random_state=42)
elif model == "meanshift":
return MeanShift()
elif model == "dbscan":
return DBSCAN()
def predict_data(data, model, results, sample) -> DataFrame:
model = choose_model(model)
start_time = time.time()
prediction = model.fit_predict(data)
execution_time = time.time() - start_time
calinski = calinski_harabasz_score(X=data, labels=prediction)
silhouette = silhouette_score(
X=data,
labels=prediction,
metric="euclidean",
sample_size=sample,
)
populated_results = populate_results(
df=results,
model=model,
prediction=prediction,
clusters=len(prediction),
calinski=calinski,
silhouette=silhouette,
time=execution_time,
)
return populated_results
def plot_heatmap(results):
fig = figure(figsize=(20, 10))
heatmap(
data=results,
cmap="Blues",
square=True,
annot=True,
)
fig_title = "Heatmap"
title(fig_title)
show()
fig.savefig(f"docs/assets/{fig_title.replace(' ', '_').lower()}.png")
def plot_scatter_plot(results):
fig = figure(figsize=(20, 10))
original_data = results.drop("prediction")
pairplot(
data=results,
vars=original_data,
hue="prediction",
palette="Paired",
diag_kind="hist",
)
fig_title = "Scatter plot"
title(fig_title)
show()
fig.savefig(f"docs/assets/{fig_title.replace(' ', '_').lower()}.png")
def print_dataframe(df):
df.set_index("model")
output_df = df.filter["clusters", "silhouette", "calinski", "time"]
print(output_df)
def show_results(results):
set_theme()
set_style("white")
plot_heatmap(results=results)
plot_scatter_plot(results=results)
print_dataframe(df=results)
def create_result_dataframes():
results = DataFrame(
columns=[
"clusters",
"model",
"prediction",
"silhouette",
"calinski-harabasz",
"time",
]
)
indexed_results = results.set_index("model")
return indexed_results, indexed_results
def populate_results(
df, model, clusters, prediction, calinski, silhouette, time
) -> DataFrame:
renamed_model = rename_model(model=f"{model}")
columns = [
"model",
"clusters",
"prediction",
"silhouette",
"calinski-harabasz",
"time",
]
values = [renamed_model, clusters, prediction, silhouette, calinski, time]
dictionary = dict(zip(columns, values))
populated_df = df.append(dictionary, ignore_index=True)
return populated_df
def rename_model(model) -> str:
short_name = ["kmeans", "birch", "affinity", "meanshift", "dbscan"]
models = [
"KMean(random_state=42)",
"AffinityPropagation(random_state=42)",
"MeanShift()",
"DBSCAN()",
]
mapping = dict(zip(models, short_name))
return mapping[model]
def construct_case(df, choice):
cases = {
"case1": df.loc[(df["LUMINOSIDAD"].str.contains("NOCHE"))],
"case2": df.loc[
(df["ISLA"].str.contains("NO_ES_ISLA") == False)
& (df["FACTORES_ATMOSFERICOS"].str.contains("LLUVIA|LLOVIZNA"))
],
"case3": df.loc[(df["HORA"] > 19) & (df["TIPO_VIA"] == "AUTOPISTA")],
"case4": df.loc[
(df["COMUNIDAD_AUTONOMA"] == "Andalucía")
& (df["LUMINOSIDAD"].str.contains("SIN ILUMINACIÓN"))
],
"case5": df.loc[
(df["DIASEMANA"] == 7)
& (df["COMUNIDAD_AUTONOMA"] == "Madrid, Comunidad de")
],
}
return cases[choice]
def usage():
print("Usage: " + argv[0] + "<preprocessing action> <case> <sample size>")
print("preprocessing actions:")
print("fill: fills the na values with the mean")
print("drop: drops the na values")
print("cases: choice of case study")
print("sample size: size of the sample when computing the Silhouette Coefficient")
exit()
def main():
models = ["kmeans", "birch", "affinity", "meanshift", "dbscan"]
if len(argv) != 4:
usage()
case, sample = argv[2], argv[3]
data = parse_data(source="data/accidentes_2013.csv", action=str(argv[1]))
individual_result, complete_results = create_result_dataframes()
case_data = construct_case(df=data, choice=case)
for model in models:
model_results = predict_data(
data=case_data,
model=model,
results=individual_result,
sample=sample,
)
complete_results = complete_results.append(
individual_result.append(model_results)
)
indexed_results = complete_results.set_index("model")
show_results(results=indexed_results)
if __name__ == "__main__":
main()