Synthèse 2025

This commit is contained in:
Alex38Lyon
2026-01-08 18:50:25 +01:00
parent 8a54cbfad4
commit c0dbe92755
48 changed files with 152294 additions and 151961 deletions
+164 -51
View File
@@ -15,6 +15,7 @@
import sqlite3, sys, os, re, argparse
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
@@ -41,7 +42,7 @@ def importation_sql_data(fichier_sql):
try:
# Si la base de données existe, supprimez-la pour forcer l'écriture
print(f"\033[1;32mPhase 1: Importation de la base de données Therion \033[0m{input_file_name}\033[1;32m dans: \033[0m{imported_database}")
print(f"\033[1;32mPhase 1: Importation de la base de données Therion \033[0m{safe_relpath(input_file_name)}\033[1;32m dans: \033[0m{safe_relpath(imported_database)}")
if os.path.exists(imported_database):
#print("Suppression de la Bd existante: " + imported_database)
os.remove(imported_database)
@@ -74,8 +75,6 @@ def importation_sql_data(fichier_sql):
return
#####################################################################################################################################
# Fonction pour construire les tables JONCTION, SERIE, VISEE_FLAG et RESEAU #
# #
@@ -614,8 +613,7 @@ def SHOT_equates_station():
error_count += 1
return
#####################################################################################################################################
# Fonction pour supprimer les visées en double (même départs arrivée lg, az et pente) #
#####################################################################################################################################
@@ -1163,8 +1161,7 @@ def nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_I
error_count += 1
return
#####################################################################################################################################
# Fonction pour tester si la station a déjà été lue #
#####################################################################################################################################
@@ -1283,11 +1280,11 @@ def calcul_stats(output_file):
global _largeurColTete
try:
print(f"\033[1;32mPhase 5: Écriture des statistiques dans \033[0m{output_file}")
print(f"\033[1;32mPhase 5: Écriture des statistiques dans \033[0m{safe_relpath(output_file)}")
# Enregistrement des résultats dans un fichier texte
output_file_ligne =[]
for i in range(9): output_file_ligne.append(titre[i].ljust(90)+"*\n")
for i in range(9): output_file_ligne.append(titre[i].ljust(120)+"*\n")
sql_query1 = ("""
Select
@@ -1304,43 +1301,53 @@ def calcul_stats(output_file):
# output_file_ligne.append(f"Développement total centerline:\t{"{:.2f}".format(results[0][0]).ljust(_largeurCol)}\t{"{:.2f}".format(results[0][1]).ljust(_largeurCol)}\t{vide}\t{vide}\t{vide}\tdev.(m), dupl.(m)\n")
#print('Développement total: ' + formatted_row + 'm')
output_file_ligne.append(f"Développement total centerline:\t%s\t%s\t%s\t%s\t%s\tDev.(m), Dupl.(m), Surf.(m)\n" %(str("{:.2f}".format(results[0][0]).ljust(_largeurCol)),
str("{:.2f}".format(results[0][1]).ljust(_largeurCol)),
str("{:.2f}".format(results[0][2]).ljust(_largeurCol)),
str(vide), str(vide)))
# output_file_ligne.append(f"**Développement total centerline:**\t%s\t%s\t%s\t%s\t%s\tDev.(m), Dupl.(m), Surf.(m)\n" %(str("{:.2f}".format(results[0][0]).ljust(_largeurCol)),
# str("{:.2f}".format(results[0][1]).ljust(_largeurCol)),
# str("{:.2f}".format(results[0][2]).ljust(_largeurCol)),
# str(vide), str(vide)))
output_file_ligne.append(
f"**Développement total des centerlines (m):** "
f"**, Développement:** {results[0][0]:.2f} "
f"**, Dupliqué:** {results[0][1]:.2f} "
f"**, Surface:** {results[0][2]:.2f}\n"
)
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
_compteur = cursor.fetchall()
compteur = int(_compteur[0][0])
if compteur > 0 : # type: ignore
output_file_ligne.append(f"Attention, {compteur} station(s) non comptabilisée(s) et raccordée(s)\n")
output_file_ligne.append(f"!!Attention, {compteur} station(s) non comptabilisée(s) et raccordée(s)\n")
results=sql_bilan_reseaux()
if results[0][0] != None :# type: ignore
output_file_ligne.append('\nDéveloppement total par réseaux\n')
output_file_ligne.append(f"\n--------------\n")
output_file_ligne.append("**Développement total par réseaux**\n")
for row in results: # type: ignore
formatted_row = '\t'.join(map(str, row))
output_file_ligne.append('\t' + formatted_row + '\n')
# formatted_row = '\t'.join(map(str, row))
# output_file_ligne.append('\t' + formatted_row + '\n')
formatted_row = '| ' + ' | '.join(map(str, row)) + ' |'
output_file_ligne.append(formatted_row + '\n')
#print('Développement total: ' + formatted_row + 'm')
results=sql_bilan_annee()
if results[0][0] != None :# type: ignore
output_file_ligne.append('\nDéveloppement total topographié par année(s)\n')
output_file_ligne.append(f"\n--------------")
output_file_ligne.append("**Développement total topographié par année(s)**\n")
for row in results: # type: ignore
formatted_row = '\t'.join(map(str, row))
output_file_ligne.append('\t' + formatted_row + '\n')
#print('Développement total: ' + formatted_row + 'm')
if row[1].strip() != "0.00" or row[3].strip() != "0.00" or row[5].strip() != "0.00" :
# Formatage pour Markdown avec alignement simple
formatted_row = '| ' + ' | '.join(map(str, row)) + ' |'
output_file_ligne.append(formatted_row + '\n')
#print('Développement total: ' + formatted_row + 'm')
Rose(output_file_name_rose)
Shot_lengths_histogram(output_file_name_histo)
findetraitement = datetime.now()
duree = findetraitement - maintenant
@@ -1348,27 +1355,27 @@ def calcul_stats(output_file):
heures, secondes = divmod(secondes, 3600) # 3600 secondes dans une heure
minutes, secondes = divmod(secondes, 60) # 60 secondes dans une minute
if duree.seconds > 3600:
duree_formatee = "{:02}:{:02}:{:02}(s)".format(heures, minutes, secondes)
duree_formatee = "{:02}(h){:02}(m){:02}(s)".format(heures, minutes, secondes)
elif duree.seconds > 60:
duree_formatee = "{:02}:{:02}(s)".format(minutes, secondes)
duree_formatee = "{:02}(m){:02}(s)".format(minutes, secondes)
else :
duree_formatee = "{:02}(s)".format(secondes)
if error_count == 0:
output_file_ligne[7] = "* Durée calcul: " + duree_formatee + " sans erreur"
output_file_ligne[7] = output_file_ligne[7].ljust(90)+"*\n"
output_file_ligne[7] = output_file_ligne[7].ljust(120)+"*\n"
else :
output_file_ligne[7] = "* Durée calcul: " + duree_formatee + " avec erreur(s): " + str(error_count)
output_file_ligne[7] = output_file_ligne[7].ljust(90)+"*\n"
output_file_ligne[7] = "* !!!Durée calcul: " + duree_formatee + " avec erreur(s): " + str(error_count) + "!!!"
output_file_ligne[7] = output_file_ligne[7].ljust(120)+"*\n"
with open(output_file, 'w', encoding='utf-8') as file:
file.writelines(output_file_ligne)
if error_count == 0 :
print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee + f"\033[1;32m, résultats enregistrés dans \033[0m{output_file}")
print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee + f"\033[1;32m, résultats enregistrés dans \033[0m{safe_relpath(output_file)}")
else :
print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee
+ f",\033[91m avec \033[0m{error_count}\033[91m erreur(s), \033[1;32mrésultats enregistrés dans \033[0m{output_file}")
+ f",\033[91m avec \033[0m{error_count}\033[91m erreur(s), \033[1;32mrésultats enregistrés dans \033[0m{safe_relpath(output_file)}")
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution des requêtes calcul_stats:\033[0m {e}")
@@ -1378,7 +1385,7 @@ def calcul_stats(output_file):
file.writelines(output_file_ligne)
except FileNotFoundError:
print(f"\033[91mErreur d'ouverture du fichier: \033[0m{output_file} ")
print(f"\033[91mErreur d'ouverture du fichier: \033[0m{safe_relpath(output_file)} ")
error_count += 1
except Exception as e:
@@ -2266,15 +2273,12 @@ def sql_bilan_reseaux():
return
#####################################################################################################################################
# # Clé de tri #
#####################################################################################################################################
def cle_tri(element):
return float(element[2])
#####################################################################################################################################
# #-- Bilan topo par années #
#####################################################################################################################################
@@ -2412,7 +2416,6 @@ def Rose(graph_name, bins = 72):
return
#####################################################################################################################################
# diagramme de longueurs de visées #
#####################################################################################################################################
@@ -2603,6 +2606,94 @@ def PlotExploYears(graph_name, rangeyear = [1959, datetime.now().year], systems
return
#################################################################################################
# fonction pour réduire l'affichage des chemins long #
#################################################################################################
def safe_relpath(path, base_dir=None, max_depth=3, max_name_len=50, prefix="~"):
"""
Retourne un chemin lisible et sûr pour affichage (logs / UI).
- Compatible Windows / Linux / macOS
- Tronque la profondeur du chemin
- Tronque le nom de fichier si trop long
- Ne lève jamais d'exception
"""
try:
path = Path(path).expanduser().resolve()
except Exception:
return str(path)
try:
base = Path(base_dir).expanduser().resolve() if base_dir else Path.cwd().resolve()
except Exception:
base = None
name = path.name or str(path)
if len(name) > max_name_len:
stem = path.stem[: max(1, max_name_len - 6)]
name = f"{stem}...{path.suffix}"
try:
if base:
rel = path.relative_to(base)
parts = list(rel.parts)
else:
raise ValueError
except Exception:
parts = list(path.parts)
if not parts:
parts = ["."]
if isinstance(max_depth, int) and max_depth > 0 and len(parts) > max_depth:
parts = parts[-max_depth:]
parts.insert(0, prefix)
if parts and parts[-1] not in (".", os.sep):
parts[-1] = name
try:
return os.path.join(*parts)
except Exception:
return name
#################################################################################################
# Coloration des messages d'aide d'arg #
#################################################################################################
def colored_help(parser):
"""
Affiche l'aide colorée pour les arguments de la ligne de commande.
Args:
parser (argparse.ArgumentParser): Le parseur d'arguments.
Returns:
None
"""
# Captures the help output
help_text = parser.format_help()
# Coloration des différentes parties
colored_help_text = help_text.replace(
'usage:', f'\033[91musage:\033[0m'
).replace(
'options:', f'\033[92moptions:\033[0m'
).replace('positional arguments:', f'\033[94mpositional arguments:\033[0m'
).replace(', --help', f'\033[94m, --help:\033[0m'
).replace('elp:', f'\033[94melp\033[0m')
# Surligner les arguments
for action in parser._actions:
if action.option_strings:
# Colorer les options (--xyz)
for opt in action.option_strings:
colored_help_text = colored_help_text.replace(opt, f'\033[94m{opt}\033[0m').replace('--help', f'\033[94m--help:\033[0m')
# Imprimer le texte coloré
print(colored_help_text)
sys.exit(1)
#####################################################################################################################################
# #
# Main #
@@ -2619,10 +2710,25 @@ if __name__ == '__main__':
inputs_path = "./Test/"
# if not os.path.exists(outputs_path): os.makedirs(outputs_path)
if os.name == 'posix': os.system('clear') # Linux, MacOS
elif os.name == 'nt': os.system('cls')# Windows
else: print("\n" * 100)
maintenant = datetime.now()
parser = argparse.ArgumentParser(description="Calcul des statistiques par entrées d'une BD Therion. ",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser = argparse.ArgumentParser(description=f"Calcul des statistiques par entrées d'une BD Therion",
formatter_class=argparse.RawTextHelpFormatter)
parser.print_help = colored_help.__get__(parser)
parser.add_argument(
'--option',
default="sync",
choices=["sync", "update"],
help=(
f"Options d'execution de pythStat.py\nsync\t-> Synchronisation des données depuis une nouvelle base de données(défaut)\n"
f"update\t-> Mise à jour des statistiques de la base de données\n"
)
)
parser.add_argument("--file", help="Chemin vers le fichier SQL d'entrée (pas de d'option : fenêtre de choix)")
parser.epilog = (f"Commande therion (fichier .thconfig) : export database -o Outputs/database.sql")
@@ -2633,9 +2739,7 @@ if __name__ == '__main__':
if not args.file: # Si aucun fichier n'est fourni en ligne de commande, ouvrir une fenêtre Tkinter pour sélectionner un fichier
# input_file = "rabbit.sql" # Erreur car pas de point fix ou d'entrée python
# input_file_name = inputs_path + input_file
if os.name == 'posix': os.system('clear') # Linux, MacOS
elif os.name == 'nt': os.system('cls')# Windows
else: print("\n" * 100)
root = tk.Tk()
root.withdraw() # Cacher la fenêtre principale de Tkinter
input_file_name = filedialog.askopenfilename( title="Sélectionnez le fichier SQL", filetypes=(("Fichiers SQL", "*.sql"), ("Tous les fichiers", "*.*")) )
@@ -2667,34 +2771,43 @@ if __name__ == '__main__':
if not os.path.exists(outputfolder): os.makedirs(outputfolder)
output_file_name = outputfolder + input_file[:-4]+"_stats.csv"
output_file_name = outputfolder + input_file[:-4]+"_stats.md"
output_file_name_rose = outputfolder + input_file[:-4]+"_rose.pdf"
output_file_name_histo = outputfolder + input_file[:-4]+"_histo.pdf"
output_file_name_year = outputfolder + input_file[:-4]+"_year"
imported_database = outputfolder + input_file[:-4]+"_stats.db"
_titre =['\033[1;32m******************************************************************************************\033[0m',
_titre =['\033[1;32m************************************************************************************************************************\033[0m',
'\033[1;32m* Calcul des statistiques par entrées d\'une BD Therion\033[0m',
'\033[1;32m* Script pythStat par alexandre.pont@yahoo.fr\033[0m',
'\033[1;32m* Version : \033[0m' + Version,
'\033[1;32m* Fichier source : \033[0m' + input_file_name,
'\033[1;32m* Dossier destination : \033[0m' + outputfolder,
'\033[1;32m* Fichier source : \033[0m' + safe_relpath(input_file_name),
'\033[1;32m* Dossier destination : \033[0m' + safe_relpath(outputfolder),
'\033[1;32m* Date : \033[0m' + maintenant.strftime("%Y-%m-%d %H:%M:%S"),
'\033[1;32m* \033[0m',
'\033[1;32m******************************************************************************************\033[0m']
'\033[1;32m************************************************************************************************************************\033[0m']
for i in range(9): print(_titre[i].ljust(101)+"\033[1;32m*\033[0m")
for i in range(9): print(_titre[i].ljust(131)+"\033[1;32m*\033[0m")
titre = [ligne.replace("\033[1;32m", "").replace("\033[0m", "") for ligne in _titre]
importation_sql_data(input_file_name)
if args.option == "sync" :
importation_sql_data(input_file_name)
conn = sqlite3.connect(imported_database) # Connexion à la base de données SQLite
cursor = conn.cursor()
conn = sqlite3.connect(imported_database) # Connexion à la base de données SQLite
cursor = conn.cursor()
construction_tables()
calcul_stats(output_file_name)
construction_tables()
calcul_stats(output_file_name)
elif args.option == "update" :
conn = sqlite3.connect(imported_database) # Connexion à la base de données SQLite
cursor = conn.cursor()
calcul_stats(output_file_name)
conn.close()