# -*- coding: utf-8 -*- ######################################################################################################################################## # # # Script pour automatiser le calcul des statistiques # # d'un fichier database (.sql) produit par Therion # # By Alexandre PONT alexandre.pont@yahoo.fr # # Février 2024 # # # # Utilisation: # # Exporter le fichier sql avec therion, commande : export database -o Outputs/database.sql # # Sélectionner le fichier database.sql à calculer dans main (ligne 2620) # # Résultat : fichiers database_stats.csv et suivants dans dossier /output # ######################################################################################################################################## import sqlite3, sys, os, re from alive_progress import alive_bar # https://github.com/rsalmei/alive-progress from datetime import datetime import numpy as np import pandas as pd import matplotlib.pyplot as plt """##################################################################################################################################### # Fonction pour importer un fichier SQL dans une base de données SQLite # # # #####################################################################################################################################""" def importation_sql_data(fichier_sql): """ Fonction pour importer un fichier SQL dans une base de données SQLite Args: fichier_sql (_type_): _description_ """ global error_count try: # Si la base de données existe, supprimez-la pour forcer l'écriture print(f"\033[1;32mPhase 1: Importation de la base de données Therion \033[0m{input_file_name}\033[1;32m dans: \033[0m{imported_database}") if os.path.exists(imported_database): #print("Suppression de la Bd existante: " + imported_database) os.remove(imported_database) connection = sqlite3.connect(imported_database) cursor = connection.cursor() # Lecture du fichier SQL et exécution des commandes with open(fichier_sql, 'r') as sql_file: sql_script = sql_file.read() # Séparation du script en commandes individuelles sql_script = re.sub(r', nan', ', 0', sql_script, flags=re.IGNORECASE) commandes = [cmd.strip() + ';\n' for cmd in sql_script.split(';\n') if cmd.strip()] # Exécution des commandes avec une barre de progression with alive_bar(len(commandes), title = "\x1b[32;1m\t Progression\x1b[0m", length = 20) as bar: for commande in commandes: cursor.execute(commande) connection.commit() bar() connection.close() except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête importation_sql_data code:\033[0m {e}") error_count += 1 sys.exit(1) # Arrête le programme en cas d'erreur return ##################################################################################################################################### # Fonction pour construire les tables JONCTION, SERIE, VISEE_FLAG et RESEAU # # # ##################################################################################################################################### def construction_tables(): """ Fonction pour construire les tables JONCTION, SERIE, VISEE_FLAG et RESEAU """ global avt_compteur global error_count # Principales requêtes #conn = sqlite3.connect(database) # Connexion à la base de données SQLite #cursor = conn.cursor() # print(f"\033[1;32mConstruction des tables dans {imported_database}\033[0m") try : print(f"\033[1;32mPhase 2: Création des nouvelles tables, indexation\033[0m") cursor.execute("DROP TABLE IF EXISTS JONCTION") # Créer et initialiser une nouvelle table de jonctions cursor.execute(""" CREATE TABLE JONCTION ( ID INTEGER PRIMARY KEY AUTOINCREMENT, STATION_ID INTEGER, SERIE_ID INTEGER, SERIE_RANG INTEGER, SERIE_ENT INTEGER, STATION_ENT INTEGER, SERIE_JONC INTEGER, STATION_JONC INTEGER, STATION_TYPE varchar(4), ENTREE_ID INTEGER, RESEAU_ID INTEGER ) """) cursor.execute("select STATION.ID from STATION") results = cursor.fetchall() Next_Station_ID = cursor.fetchall() # Pour forcer le type cursor.executemany("INSERT INTO JONCTION (STATION_ID) VALUES (?)", results) conn.commit() cursor.execute("DROP TABLE IF EXISTS SERIE") # Créer et initialiser une nouvelle table des Series cursor.execute(""" CREATE TABLE SERIE ( SERIE_ID INTEGER PRIMARY KEY AUTOINCREMENT, SERIE_DEP_ID INTEGER, STATION_DEP_ID INTEGER, SERIE_ARR_ID INTEGER, STATION_ARR_ID INTEGER, SERIE_NBRE_SHOT INTEGER, SERIE_LENGHT REAL, SERIE_LENGHT_SURFACE REAL, SERIE_LENGHT_DUPLICATE REAL, DIRECTION INTEGER, STATION_ENT_ID INTEGER, RESEAU_ID INTEGER) """) Current_Serie_ID = cursor.lastrowid conn.commit() cursor.execute("DROP TABLE IF EXISTS RESEAU") cursor.execute(""" CREATE TABLE RESEAU ( ID INTEGER PRIMARY KEY AUTOINCREMENT, RESEAU_ID INTEGER, STATION_JONC INTEGER, ENT_1 INTEGER, ENT_2 INTEGER) """) conn.commit() SHOT_equates_station() issue_SHOT() duplicate_SHOT() cursor.execute("DROP TABLE IF EXISTS VISEE_FLAG") # Créer et initialiser une nouvelle table VISEE_FLAG (suivi des visées lues) cursor.execute(""" CREATE TABLE VISEE_FLAG ( ID INTEGER PRIMARY KEY AUTOINCREMENT, SHOT_ID INTEGER, SERIE_ID INTEGER, ENTREE_ID INTEGER, RESEAU_ID INTEGER, SERIE_RANG INTEGER ) """) cursor.execute("select SHOT.ID from SHOT") results = cursor.fetchall() cursor.executemany("INSERT INTO VISEE_FLAG (SHOT_ID) VALUES (?)", results) # type: ignore conn.commit() print(f"\033[0m\t Création de l'index des tables principales et optimisation de la mémoire\033[0m") cursor.execute("CREATE INDEX INDEX_JONCTION_STATION_ID ON JONCTION(STATION_ID)") cursor.execute("PRAGMA index_list(SHOT)") result = cursor.fetchall() marquage_visee_station_habillage() # print(result) if len(result)==0 : cursor.execute("CREATE INDEX INDEX_SHOT_FROM_ID ON SHOT(FROM_ID)") cursor.execute("CREATE INDEX INDEX_SHOT_TO_ID ON SHOT(TO_ID)") cursor.execute("CREATE INDEX INDEX_SHOT_FLAG_SHOT_ID ON SHOT_FLAG(SHOT_ID)") cursor.execute("CREATE INDEX INDEX_STATION_ID ON STATION(ID)") cursor.execute("CREATE INDEX INDEX_VISEE_FLAG_SHOT_ID ON VISEE_FLAG(SHOT_ID)") cursor.execute("CREATE INDEX INDEX_STATION_FLAG_STATION_ID ON STATION_FLAG(STATION_ID)") cursor.execute("VACUUM") conn.commit() # A partir des entrées, remplir les tables des jonctions et des séries results = sql_liste_entree() print(f"\033[1;32mPhase 3: Remplissage des tables d'après les \033[0m{len(results)}\033[1;32m entrée(s)\033[0m") # type: ignore for row in results: # type: ignore # if row[0]==28548: # print("debug point") cursor.execute(f""" INSERT INTO SERIE ( SERIE_DEP_ID, STATION_DEP_ID, SERIE_ARR_ID , STATION_ARR_ID, SERIE_NBRE_SHOT, SERIE_LENGHT, SERIE_LENGHT_SURFACE, SERIE_LENGHT_DUPLICATE, DIRECTION, STATION_ENT_ID, RESEAU_ID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (0, row[0], 0, row[0], -1, 0, 0, 0, 0, row[0], 0)) Current_Serie_ID = cursor.lastrowid cursor.execute(f""" UPDATE JONCTION SET SERIE_ID = ?, SERIE_ENT = ?, STATION_ENT = ?, SERIE_JONC = ?, STATION_JONC = ?, STATION_TYPE = ?, SERIE_RANG = ?, ENTREE_ID = ?, RESEAU_ID = ? WHERE STATION_ID = ? """, (Current_Serie_ID, 0, row[0], 0, 0,'ent', 0, row[0], 0, row[0])) cursor.execute(f""" UPDATE VISEE_FLAG SET SERIE_ID = {Current_Serie_ID}, ENTREE_ID = {row[0]}, RESEAU_ID = {0} WHERE SHOT_ID = {Current_Serie_ID} """) conn.commit() # print(f"\tCréation Série: {Current_Serie_ID} depuis la station d'entrée Station_ID: {row[0]}") # A partir des série vides, itération pour remplir les tables des JONCTION et des SERIE print(f"\033[1;32mPhase 4: Remplissage des tables d'après les séries vides jonctionnées aux\033[0m {Current_Serie_ID}\033[1;32m entrée(s)\033[0m") results = sql_serie_vides() Count = 1 New_Serie_IDOld = 0 New_Serie_ID = cursor.lastrowid Current_Station_ID_Old = 0 Current_Station_ID = 0 cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL") _compteur = cursor.fetchall() compteur_ttl = int(_compteur[0][0]) avt_compteur = 0 with alive_bar(compteur_ttl, title = "\x1b[32;1m\t Progression\x1b[0m", length = 20) as bar: while len(results) > 0: # type: ignore # print(f"\033[1;32mPhase 4.{Count}: Remplissage des tables JONCTION et SERIE itération: {Count}, séries créée(s): {New_Serie_ID} ajoutée(s): {New_Serie_ID-New_Serie_IDOld} à traiter: {len(results)}\033[0m") # type: ignore bar.text(f"itération: {Count}, série(s) créée(s): {New_Serie_ID}") # type: ignore cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL") _compteur = cursor.fetchall() compteur = int(_compteur[0][0]) if ( compteur_ttl - compteur ) > avt_compteur : ajout = compteur_ttl - compteur - avt_compteur bar(ajout) avt_compteur = compteur_ttl - compteur for row in results: # type: ignore # Suivi de la série Current_Serie_ID = int(row[0]) Current_Station_ID_Old = int(Current_Station_ID) # type: ignore Current_Station_ID = int(row[2]) Current_Nre_Shot = int(row[5]) # Current_Serie_Lenght = 0.0 + float(row[6]) # Current_Serie_Lenght_Surface = 0.0 + float(row[7]) # Current_Serie_Lenght_Duplicate = 0.0 + float(row[8]) Current_Ent = int(row[10]) Direction = int(row[9]) #print(f"\tSerie courante {Current_Serie_ID} Station_ID: {Current_Station_ID} results: {results}") Fin_Serie = False while Fin_Serie is False: if Direction == 1 : Next_Station_ID = sql_station_depart(Current_Station_ID) elif Direction == -1 : Current_Station_ID = int(row[4]) Next_Station_ID = sql_station_arrivee(Current_Station_ID) elif Direction == 0 : Next_Station_ID_1 = sql_station_depart(Current_Station_ID) Next_Station_ID_2 = sql_station_arrivee(Current_Station_ID) if len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 0: # type: ignore # Entrée sans départ et sans d'arrivée : fin de la série cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {Current_Serie_ID};") cursor.execute(f"UPDATE JONCTION SET SERIE_ENT = -1 WHERE STATION_ID = {Current_Station_ID};") conn.commit() Next_Station_ID = Next_Station_ID_1 elif len(Next_Station_ID_1) == 1 and len(Next_Station_ID_2) == 0: # type: ignore # Un départ pas d'arrivée Next_Station_ID = Next_Station_ID_1 Direction = 1 cursor.execute(f"UPDATE SERIE SET DIRECTION = 1 WHERE SERIE_ID = {Current_Serie_ID};") conn.commit() elif len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 1: # type: ignore # Une arrivée pas de départ Next_Station_ID = Next_Station_ID_2 Direction = -1 cursor.execute(f"UPDATE SERIE SET DIRECTION = -1 WHERE SERIE_ID = {Current_Serie_ID};") conn.commit() else : # A gérer nouvelles séries nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, 1, Current_Ent) # type: ignore # print (f"\033[34m\tA traiter création nouvelles séries inverses depuis l'entrée {Current_Station_ID} - {Next_Station_ID_2}, {Next_Station_ID_1}\033[0m") nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, -1, Current_Ent) Direction = 1 Next_Station_ID = Next_Station_ID_1 if len(Next_Station_ID) == 0 : # type: ignore Next_Station_ID_1 = sql_station_depart(Current_Station_ID) # type: ignore Next_Station_ID_2 = sql_station_arrivee(Current_Station_ID) # type: ignore # print(f"\033[34m\tA gérer, fin de la Série: {Current_Serie_ID} à la Station_ID: {Current_Station_ID} nbre: {Current_Nre_Shot} Next station: {Next_Station_ID}, départs directs {len(Next_Station_ID_1)}, départs inverses {len(Next_Station_ID_2)}\033[0m") # type: ignore # cursor.execute(f"UPDATE SERIE SET SERIE_DEP_ID = {Current_Serie_ID} WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore # cursor.execute(f"UPDATE SERIE SET STATION_DEP_ID = {Current_Station_ID} WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore # cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore cursor.execute(f"DELETE FROM SERIE WHERE SERIE_ID = {Current_Serie_ID};") conn.commit() # type: ignore Fin_Serie = True elif len(Next_Station_ID) == 1 : # type: ignore suivi_serie(Current_Serie_ID, bar) Fin_Serie = True else : #print(f"Station_ID {Current_Station_ID} de la Serie {Current_Serie_ID}, départs à gérer: {len(Next_Station_ID)}, Next station: {Next_Station_ID}") # type: ignore suivi_serie(Current_Serie_ID, bar) # Création de X nouvelles séries et mise à jour de la table des jonctions #nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, 1, Current_Ent) Fin_Serie = True # Exécution de la requête SQL resultsOld = results results = sql_serie_vides() # type: ignore New_Serie_IDOld = New_Serie_ID New_Serie_ID = cursor.lastrowid # type: ignore if resultsOld == results : #print(f"Erreur sortie itération qté: {len(resultsOld)} - {resultsOld} - {results}") print(f"\033[91mErreur sortie itération {Count}, séries restantes: {len(resultsOld)} - {results}\033[0m") # type: ignore error_count += 1 break Count += 1 cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL") _compteur = cursor.fetchall() compteur = int(_compteur[0][0]) if ( compteur_ttl ) > avt_compteur : ajout = compteur_ttl - avt_compteur bar(ajout) avt_compteur = compteur_ttl orphelines_shot() jonction_RESEAU() if compteur > 0 : print(f"\033[1;32mPhase 4: Fin du remplissage des tables,\033[91m attention \033[0m{compteur}\033[91m station(s) non comptabilisé(s)\033[0m") error_count += 1 # else : # print(f"\033[1;32mPhase 4: Fin du remplissage des tables voir {imported_database}\033[0m") except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution d'une des requêtes (construction_tables) code:\033[0m {e}") error_count += 1 return ##################################################################################################################################### # Création des séries entres les visées orphelines -> visées d'une visées entre 2 points marqués # ##################################################################################################################################### def orphelines_shot(): global error_count try: cursor.execute(""" -- Visées orphelines SELECT VISEE_FLAG.SHOT_ID, VISEE_FLAG.SERIE_ID, VISEE_FLAG.ENTREE_ID, SHOT_FLAG.FLAG, SHOT.FROM_ID, -- STATION_FROM.NAME, JONCTION_FROM.SERIE_ID, JONCTION_FROM.STATION_TYPE, JONCTION_FROM.ENTREE_ID, SHOT.TO_ID, -- STATION_TO.NAME, JONCTION_TO.SERIE_ID, JONCTION_TO.STATION_TYPE, JONCTION_TO.ENTREE_ID, SHOT.LENGTH --sum (SHOT.LENGTH) FROM VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID WHERE VISEE_FLAG.SERIE_ID is NULL and JONCTION_TO.SERIE_ID is not null and JONCTION_FROM.SERIE_ID is not null """) conn.commit() orphelines = cursor.fetchall() print(f"\t Intégrations des visées orphelines (entre 2 stations existantes) nbre: {len(orphelines)}") for row in orphelines: _SERIE_LENGHT = 0 _SERIE_LENGHT_SURFACE = 0 _SERIE_LENGHT_DUPLICATE = 0 if row[3] == 'dpl': _SERIE_LENGHT_DUPLICATE = row[12] elif row[3] == 'srf': _SERIE_LENGHT_SURFACE = row[12] else : _SERIE_LENGHT = row[12] cursor.execute(f""" INSERT INTO SERIE ( SERIE_DEP_ID, STATION_DEP_ID, SERIE_ARR_ID , STATION_ARR_ID, SERIE_NBRE_SHOT, SERIE_LENGHT, SERIE_LENGHT_SURFACE, SERIE_LENGHT_DUPLICATE, DIRECTION, STATION_ENT_ID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( row[5], row[4], row[9], row[8], 1, _SERIE_LENGHT, _SERIE_LENGHT_SURFACE, _SERIE_LENGHT_DUPLICATE, 1, row[7] )) _Current_Serie_ID = cursor.lastrowid cursor.execute(f""" UPDATE VISEE_FLAG SET SERIE_ID = {_Current_Serie_ID}, ENTREE_ID = {row[7]}, SERIE_RANG = {1} WHERE SHOT_ID = {row[0]} """) if row[7] != row[11] : cursor.execute(f"INSERT INTO RESEAU (STATION_JONC, ENT_1, ENT_2) VALUES (?, ?, ?)", (row[8], row[7], row[11])) # print (f"\033[36m\t Jonction des entrées à la Station_ID: {row[8]} entre: {row[7]} et: {row[11]}\033[0m") conn.commit() except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête orphelines_shot code:\033[0m {e}" ) error_count += 1 return ##################################################################################################################################### # Fonction pour joindre les reseaux # ##################################################################################################################################### def jonction_RESEAU(): global error_count try: cursor.execute(""" --Recherche des doublons dans les jonctions réseau SELECT ID --GROUP_CONCAT(RESEAU.ID) as DUPP_SHOT FROM RESEAU GROUP BY RESEAU.ENT_1, RESEAU.ENT_2, RESEAU.STATION_JONC HAVING COUNT(RESEAU.ENT_1)>1 AND COUNT(RESEAU.ENT_2)>1 AND COUNT(RESEAU.STATION_JONC)>1 """) conn.commit() doublons = cursor.fetchall() # print(f"\t Table des RESEAUX doublons nbre: {len(doublons)}") for row in doublons : cursor.execute(f"DELETE FROM RESEAU WHERE RESEAU.ID = {row[0]}") conn.commit() index_reseau = 0 while True: index_reseau += 1 cursor.execute(""" -- Liste des entrée dans la table RESEAU SELECT RESEAU.ENT_1 AS ENT, RESEAU.RESEAU_ID, STATION.NAME, STATION.Z FROM RESEAU JOIN STATION ON RESEAU.ENT_1 = STATION.ID WHERE RESEAU.RESEAU_ID IS NULL UNION --ALL SELECT RESEAU.ENT_2 AS ENT, RESEAU.RESEAU_ID, STATION.NAME, STATION.Z FROM RESEAU JOIN STATION ON RESEAU.ENT_2 = STATION.ID WHERE RESEAU.RESEAU_ID IS NULL ORDER BY STATION.Z DESC """) conn.commit() entrees = cursor.fetchall() if len(entrees) == 0: break # Sortie boucle while si plus d'entrées à traiter... cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_1 = {entrees[0][0]} ") cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_2 = {entrees[0][0]} ") conn.commit() liste_entrees_reseau = [] liste_entrees_reseau.append(entrees[0][0]) nbre_entree_reseau_old= 0 nbre_entree_reseau=len(liste_entrees_reseau) while nbre_entree_reseau_old != nbre_entree_reseau : nbre_entree_reseau_old = nbre_entree_reseau for row in liste_entrees_reseau : cursor.execute(f""" -- Recherche entrée jonctionnées SELECT RESEAU.ENT_1 FROM RESEAU WHERE RESEAU.ENT_2 = {row} -- AND RESEAU.RESEAU_ID IS NULL UNION SELECT RESEAU.ENT_2 FROM RESEAU WHERE RESEAU.ENT_1 = {row} -- AND RESEAU.RESEAU_ID IS NULL """) jonction = cursor.fetchall() for row2 in jonction: # type: ignore if row2[0] not in liste_entrees_reseau: # print(f"\t Jonction de l'entrée: {row2[0]} au reseau ID: {index_reseau}") cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_1 = {row2[0]} ") cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_2 = {row2[0]} ") liste_entrees_reseau.append(row2[0]) conn.commit() nbre_entree_reseau=len(liste_entrees_reseau) for row2 in liste_entrees_reseau : cursor.execute(f"UPDATE JONCTION SET RESEAU_ID = {index_reseau} WHERE JONCTION.ENTREE_ID = {row2} ") cursor.execute(f"UPDATE SERIE SET RESEAU_ID = {index_reseau} WHERE SERIE.STATION_ENT_ID = {row2} ") cursor.execute(f"UPDATE VISEE_FLAG SET RESEAU_ID = {index_reseau} WHERE VISEE_FLAG.ENTREE_ID = {row2} ") conn.commit() print(f"\t Réseau: {index_reseau}, entrées jonctionnées: {len(liste_entrees_reseau)}, {liste_entrees_reseau}") except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête Jonction_RESEAU code:\033[0m {e}") error_count += 1 return ##################################################################################################################################### # Fonction pour joindre les equates dans la table des SHOT # # ##################################################################################################################################### def SHOT_equates_station(): global error_count # Requête 8: recherche des equates try: cursor.execute(f""" -- Requête 8, recherche des equates -- SELECT -- STATION.ID, -- STATION.NAME, GROUP_CONCAT(STATION.ID) as ID_Group -- GROUP_CONCAT(STATION.X) as X_Group, -- GROUP_CONCAT(STATION.Y) as Y_Group, -- GROUP_CONCAT(STATION.Z) as Z_Group, -- COUNT(STATION.X) AS Qte_X, -- COUNT(STATION.Y) AS Qte_Y, -- COUNT(STATION.Z) AS Qte_Z FROM STATION WHERE STATION.NAME <> '.' AND STATION.NAME <> '-' -- INNER JOIN STATION AS STATION_Bis ON STATION.X = STATION_Bis.X AND STATION.Y = STATION_Bis.Y AND STATION.Z = STATION_Bis.Z --WHERE STATION.X = STATION_BIS.X GROUP BY STATION.X, STATION.Y, STATION.Z HAVING COUNT(STATION.X)>1 AND COUNT(STATION.Y)>1 AND COUNT(STATION.Y)>1 """) equate = cursor.fetchall() print(f"\t Jonction de SHOT equates nbre: {len(equate)}") for row in equate : sous_valeurs = row[0].split(',') # print(f": {sous_valeurs[0]} = ", end="") for val in range (1, len(sous_valeurs)) : # print(f"{sous_valeurs[val]},", end=" ") cursor.execute(f"SELECT SHOT.ID FROM SHOT WHERE SHOT.FROM_ID = {sous_valeurs[val]}") filtre = cursor.fetchall() for row in filtre : cursor.execute(f"UPDATE SHOT SET FROM_ID = {sous_valeurs[0]} WHERE ID = {row[0]};") cursor.execute(f"SELECT SHOT.ID FROM SHOT WHERE SHOT.TO_ID = {sous_valeurs[val]}") filtre = cursor.fetchall() for row in filtre : cursor.execute(f"UPDATE SHOT SET TO_ID = {sous_valeurs[0]} WHERE ID = {row[0]};") cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('equ', sous_valeurs[val])) cursor.execute(f"UPDATE JONCTION SET STATION_JONC = ? WHERE id = ?", (sous_valeurs[0], sous_valeurs[val])) #print(" ", end="") conn.commit() # print("") # if len(equate) == 0 : print(f"\tAucun 'equate'") except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête 8 (sql_8_equates) code:\033[0m {e}") error_count += 1 return ##################################################################################################################################### # Fonction pour supprimer les visées en double (même départs arrivée lg, az et pente) # ##################################################################################################################################### def duplicate_SHOT(): global error_count # Requête 10, recherche des doublons de visées try: cursor.execute(f""" -- Requête 10, recherche des doublons de visées -- SELECT SHOT.FROM_ID, SHOT.TO_ID, SHOT.LENGTH, -- SHOT.BEARING, -- SHOT.GRADIENT, GROUP_CONCAT(SHOT.ID) as DUPP_SHOT FROM SHOT GROUP BY SHOT.FROM_ID, SHOT.TO_ID --, SHOT.BEARING, SHOT.GRADIENT, SHOT.LENGTH HAVING COUNT(SHOT.FROM_ID)>1 AND COUNT(SHOT.TO_ID)>1 --AND COUNT(SHOT.BEARING)>1 AND COUNT(SHOT.GRADIENT)>1 AND COUNT(SHOT.LENGTH)>1 """) duplicate = cursor.fetchall() _total_length_err = 0.0 for row in duplicate : sous_valeurs = row[3].split(',') cursor.execute(f""" SELECT SHOT.ID, SHOT_FLAG.FLAG, round(SHOT.LENGTH, 2) FROM SHOT LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE SHOT.ID = {int(sous_valeurs[0])} """) shot_flag = cursor.fetchall() # print("\t " + str(shot_flag)) cursor.execute(f""" SELECT SHOT.ID, SHOT_FLAG.FLAG, round(SHOT.LENGTH, 2) FROM SHOT LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE SHOT.ID = {int(sous_valeurs[1])} """) shot_flag2 = cursor.fetchall() # print("\t " + str(shot_flag2)) # _total_length += int(row[2]) if shot_flag[0][1] is None and shot_flag2[0][1]== 'dpl': cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION") Current_Station_ID = cursor.fetchall() _Current_Station_ID = int(Current_Station_ID[0][0]) + 1 cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')") cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag2[0][0]}") cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})") elif shot_flag2[0][1] is None and shot_flag[0][1]== 'dpl': cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION") Current_Station_ID = cursor.fetchall() _Current_Station_ID = int(Current_Station_ID[0][0]) + 1 cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')") cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag[0][0]}") cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})") else : _total_length_err += float(shot_flag2[0][2]) cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION") Current_Station_ID = cursor.fetchall() _Current_Station_ID = int(Current_Station_ID[0][0]) + 1 cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')") cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag2[0][0]}") cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})") print(f"\033[91m\t Table des SHOT, visées en double à traiter à la source : \033[0m{shot_flag}, {shot_flag2}" + f"\033[91m, station crée : \033[0m{_Current_Station_ID}" + f"\033[91m, long. en double : \033[0m{"{:.2f}".format(_total_length_err)} m") conn.commit() # for val in range (1, len(sous_valeurs)) : # cursor.execute(f"DELETE FROM SHOT WHERE SHOT.ID = {sous_valeurs[val]}") # filtre = cursor.fetchall() if len(duplicate) > 0: print(f"\t Table des SHOT, visées dupliquées traités nbre: {len(duplicate)}") # print(f"\t Visées dupliqués supprimés {duplicate}") else : print(f"\t Table des SHOT, aucune visée dupliquée") except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête (duplicate_SHOT) code:\033[0m {e}") error_count += 1 return ##################################################################################################################################### # Fonction pour supprimer les visées en de longueur null sur elle même (même départ et arrivée et longueur nulle # ##################################################################################################################################### def issue_SHOT(): global error_count try: cursor.execute(f""" SELECT SHOT.ID --SHOT.FROM_ID, --SHOT.TO_ID, --SHOT.LENGTH FROM SHOT WHERE SHOT.FROM_ID = SHOT.TO_ID AND SHOT.LENGTH = 0 """) issue = cursor.fetchall() for row in issue : cursor.execute(f"DELETE FROM SHOT WHERE ID = {row[0]}") # cursor.execute(f"UPDATE SHOT SET LENGTH = 0.01 WHERE ID = {row[0]}") conn.commit() if len(issue) > 0: print(f"\t Table des SHOT, visée(s) bloquante(s), même départ et arrivée, longueur nulle supprimée(s) nbre: {len(issue)}") # print(f"\t Visée(s) bloquante(s) supprimée(s) {issue}") else : print(f"\t Table des SHOT, aucune visée bloquante") except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête (Issue_SHOT) code:\033[0m {e}") error_count += 1 return ##################################################################################################################################### # Fonction pour marquer les stations d'habillage # ##################################################################################################################################### def marquage_visee_station_habillage() : global error_count try: cursor.execute(f""" SELECT STATION.ID AS STATION_ID, SHOT.ID AS SHOT_ID FROM STATION JOIN SHOT ON SHOT.TO_ID = STATION.ID WHERE STATION.NAME = '.' or STATION.NAME = '-' """ ) filtre = cursor.fetchall() print(f"\t Marquage des visées et des stations d'habillage nbre: {len(filtre)}") for row in filtre : cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = 'hab' WHERE STATION_ID = {row[0]}") cursor.execute(f"UPDATE VISEE_FLAG SET SERIE_ID = -1 WHERE SHOT_ID = {row[1]}") conn.commit() except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête (marquage_station_habillage):\033[0m {e}") error_count += 1 return ##################################################################################################################################### # Fonction pour suivre une série jusqu'au prochain départ ou une jonction # ##################################################################################################################################### def suivi_serie( _Current_Serie_ID, bar) : global avt_compteur global error_count try: cursor.execute(f"SELECT * FROM SERIE WHERE SERIE_ID = {_Current_Serie_ID}") _Serie = cursor.fetchall() # if _Current_Serie_ID == 2 or _Current_Serie_ID == 3: # print("Debug, a suivre") _Current_Next_Station = 0 _Current_Shot = 0 _Current_Nre_Shot = int(_Serie[0][5]) # type: ignore _Current_Serie_Lenght = float(_Serie[0][6]) # type: ignore _Current_Serie_Lenght_Surface = float(_Serie[0][7]) # type: ignore _Current_Serie_Lenght_Duplicate = float(_Serie[0][8]) # type: ignore _Direction = int(_Serie[0][9]) _Current_Ent = int(_Serie[0][10]) if _Direction == 0 : # print(f"\t\033[34mA gérer séries sans direction station série: {_Current_Serie_ID}\033[0m") # type: ignore _Current_Station_ID = int(_Serie[0][2]) Next_Station_ID_1 = sql_station_depart(_Current_Station_ID) Next_Station_ID_2 = sql_station_arrivee(_Current_Station_ID) if len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 0: # type: ignore # Pas de départ fin et pas d'arrivée : fin de la série Next_Station_ID = Next_Station_ID_1 cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {_Current_Serie_ID};") conn.commit() return elif len(Next_Station_ID_1) == 1 and len(Next_Station_ID_2) == 0: # type: ignore # Un départ pas d'arrivée Next_Station_ID = Next_Station_ID_1 Direction = 1 cursor.execute(f"UPDATE SERIE SET DIRECTION = 1 WHERE SERIE_ID = {_Current_Serie_ID};") conn.commit() elif len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 1: # type: ignore # Une arrivée pas de départ Next_Station_ID = Next_Station_ID_2 _Serie[0][4] = _Serie[0][2] Direction = -1 cursor.execute(f"UPDATE SERIE SET DIRECTION = -1 WHERE SERIE_ID = {_Current_Serie_ID};") conn.commit() else : # A gérer nouvelles séries # nouvelles_series(_Current_Station_ID, _Current_Station_ID_Old, _Current_Serie_ID, 1, _Current_Ent) # type: ignore # print (f"\033[34m\tA vérifier dans suivi_serie nouvelles séries inverses depuis {_Current_Station_ID} - {Next_Station_ID_2}, {Next_Station_ID_1}\033[0m") # nouvelles_series(_Current_Station_ID, _Current_Station_ID_Old, _Current_Serie_ID, -1, _Current_Ent) return elif _Direction == -1 : _Current_Station_ID = int(_Serie[0][4]) _Next_Station_ID = sql_station_arrivee(_Current_Station_ID) # type: ignore _Current_Nre_Shot = 0 #print(f"\tDébut suivi serie inverse {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}") # type: ignore _ID_Suite = 0 _Force = False if len(_Next_Station_ID) > 1: # type: ignore while int(_Next_Station_ID[_ID_Suite][0]) != int(_Serie[0][2]) : # type: ignore _ID_Suite += 1 if _ID_Suite >= len(_Next_Station_ID): # type: ignore # print(f"\t \033[34mA vérifier, pas de suite trouvée à la serie inverse: {_Current_Serie_ID} station:{_Current_Station_ID}, shot: {_Current_Shot}\033[0m") #error_count += 1 return while (len(_Next_Station_ID)==1) or (_Force == False) : # type: ignore # if _Current_Station_ID == 12074: # print("Debug, a suivre") _Force = True _Current_Nre_Shot += 1 _Current_Serie_Lenght += _Next_Station_ID[_ID_Suite][1] # type: ignore _Current_Serie_Lenght_Surface += _Next_Station_ID[_ID_Suite][2] # type: ignore _Current_Serie_Lenght_Duplicate += _Next_Station_ID[_ID_Suite][3] # type: ignore _Current_Shot= _Next_Station_ID[_ID_Suite][4] # type: ignore _Current_Next_Station = int(_Next_Station_ID[_ID_Suite][0]) # type: ignore _Current_Old_Station = int(_Current_Station_ID) test_jonction(_Current_Next_Station, _Current_Serie_ID, _Current_Ent) # type: ignore cursor.execute(f""" UPDATE SERIE SET SERIE_DEP_ID = {_Current_Serie_ID}, STATION_DEP_ID = {_Current_Next_Station}, SERIE_NBRE_Shot = {_Current_Nre_Shot}, SERIE_LENGHT = {_Current_Serie_Lenght}, SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface}, SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate} WHERE SERIE_ID = {_Current_Serie_ID}; """) cursor.execute(f""" UPDATE JONCTION SET STATION_TYPE = 'inv', ENTREE_ID = {_Current_Ent}, SERIE_RANG = {_Current_Nre_Shot}, SERIE_ID = {_Current_Serie_ID} WHERE STATION_ID = {_Current_Station_ID} """) cursor.execute(f""" UPDATE VISEE_FLAG SET SERIE_ID = {_Current_Serie_ID}, ENTREE_ID = {_Current_Ent}, SERIE_RANG = {_Current_Nre_Shot} WHERE SHOT_ID = {_Current_Shot} """) #print(f"\tSuivi serie inverse {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Next station: {_Next_Station_ID}") conn.commit() depart = sql_station_depart(_Current_Station_ID) if (len(depart) >= 1 ) : # type: ignore #print(f'Départ direct à gérer station: {_Current_Station_ID} série: {_Current_Serie_ID}, nbre shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Arrivée(s) {depart}') nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore _Current_Station_ID = int(_Next_Station_ID[_ID_Suite][0]) # type: ignore _Next_Station_ID = sql_station_arrivee(_Current_Next_Station) _ID_Suite = 0 cursor.execute(f""" UPDATE SERIE SET SERIE_DEP_ID = {_Current_Serie_ID}, STATION_DEP_ID = {_Current_Next_Station}, SERIE_NBRE_Shot = {_Current_Nre_Shot}, SERIE_LENGHT = {_Current_Serie_Lenght}, SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface}, SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate} WHERE SERIE_ID = {_Current_Serie_ID}; """) # if _Current_Station_ID == 5047 : # print("debug point") cursor.execute(f""" UPDATE JONCTION SET STATION_TYPE = 'arv', ENTREE_ID = {_Current_Ent}, SERIE_RANG = {_Current_Nre_Shot}, SERIE_ID = {_Current_Serie_ID} WHERE STATION_ID = {_Current_Station_ID} """) cursor.execute(f""" UPDATE VISEE_FLAG SET SERIE_ID = {_Current_Serie_ID}, ENTREE_ID = {_Current_Ent}, SERIE_RANG = {_Current_Nre_Shot} WHERE SHOT_ID = {_Current_Shot} """) conn.commit() if _Current_Nre_Shot > 1 : avt_compteur = avt_compteur + _Current_Nre_Shot - 1 bar(_Current_Nre_Shot-1) #_Current_Next_Station = int(_Next_Station_ID[0][0]) # type: ignore depart = sql_station_depart(_Current_Station_ID) # type: ignore if (len(_Next_Station_ID)==0): # type: ignore # fin de la série # print (f"\tFin de la série inverse: {_Current_Serie_ID} (pas de suite) station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}") nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore if (len(depart)>=1):# type: ignore nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore else : nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore # print (f"\tFin de la série inverse: {_Current_Serie_ID} station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}") if (len(depart)>=1):# type: ignore nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore elif _Direction == 1 : _Current_Station_ID = int(_Serie[0][2]) _Next_Station_ID = sql_station_depart(_Current_Station_ID) _Current_Nre_Shot = 0 #print(f"\tDébut suivi serie directe {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}") _ID_Suite = 0 _Force = False if len(_Next_Station_ID) > 1: # type: ignore while int(_Next_Station_ID[_ID_Suite][0]) != int(_Serie[0][4]) : # type: ignore _ID_Suite += 1 if _ID_Suite >= len(_Next_Station_ID): # type: ignore # print(f"\t \033[34mA vérifier, pas de suite trouvée à la serie directe: {_Current_Serie_ID}, Station: {_Current_Station_ID}, shot: {_Current_Shot}\033[0m") # error_count += 1 return while (len(_Next_Station_ID)==1) or (_Force == False) : # type: ignore # if _Current_Station_ID == 12074: # print("Debug, a suivre") _Force = True _Current_Nre_Shot += 1 _Current_Serie_Lenght += _Next_Station_ID[0][1] # type: ignore _Current_Serie_Lenght_Surface += _Next_Station_ID[_ID_Suite][2] # type: ignore _Current_Serie_Lenght_Duplicate += _Next_Station_ID[_ID_Suite][3] # type: ignore _Current_Shot= _Next_Station_ID[_ID_Suite][4] # type: ignore _Current_Next_Station = int(_Next_Station_ID[0][0]) # type: ignore _Current_Old_Station = int(_Current_Station_ID) test_jonction(_Current_Next_Station, _Current_Serie_ID, _Current_Ent) # type: ignore cursor.execute(f""" UPDATE SERIE SET SERIE_DEP_ID = {_Current_Serie_ID}, STATION_DEP_ID = {_Current_Next_Station}, SERIE_NBRE_Shot = {_Current_Nre_Shot}, SERIE_LENGHT = {_Current_Serie_Lenght}, SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface}, SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate} WHERE SERIE_ID = {_Current_Serie_ID}; """) # if _Current_Station_ID == 5035 : # print("debug point") cursor.execute(f""" UPDATE JONCTION SET STATION_TYPE = 'dir', ENTREE_ID = {_Current_Ent}, SERIE_RANG = {_Current_Nre_Shot}, SERIE_ID = {_Current_Serie_ID} WHERE STATION_ID = {_Current_Station_ID} """) cursor.execute(f""" UPDATE VISEE_FLAG SET SERIE_ID = {_Current_Serie_ID}, ENTREE_ID = {_Current_Ent}, SERIE_RANG = {_Current_Nre_Shot} WHERE SHOT_ID = {_Current_Shot} """) conn.commit() #print(f"\tSuivi serie directe {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}") arrivee = sql_station_arrivee(_Current_Station_ID) if (len(arrivee) >= 1 ) : # type: ignore #print(f'Arrivée à gérer station: {_Current_Station_ID} série: {_Current_Serie_ID}, nbre shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Arrivée(s) {arrivee}') nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore _Current_Station_ID = int(_Next_Station_ID[0][0]) # type: ignore _Next_Station_ID = sql_station_depart(_Current_Next_Station) _ID_Suite = 0 cursor.execute(f""" UPDATE SERIE SET SERIE_DEP_ID = {_Current_Serie_ID}, STATION_DEP_ID = {_Current_Next_Station}, SERIE_NBRE_Shot = {_Current_Nre_Shot}, SERIE_LENGHT = {_Current_Serie_Lenght}, SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface}, SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate} WHERE SERIE_ID = {_Current_Serie_ID}; """) # type: ignore cursor.execute(f""" UPDATE JONCTION SET STATION_TYPE = 'end', ENTREE_ID = {_Current_Ent}, SERIE_RANG = {_Current_Nre_Shot}, SERIE_ID = {_Current_Serie_ID} WHERE STATION_ID = {_Current_Station_ID} """) cursor.execute(f""" UPDATE VISEE_FLAG SET SERIE_ID = {_Current_Serie_ID}, ENTREE_ID = {_Current_Ent}, SERIE_RANG = {_Current_Nre_Shot} WHERE SHOT_ID = {_Current_Shot} """) conn.commit() if _Current_Nre_Shot > 1: avt_compteur = avt_compteur + _Current_Nre_Shot - 1 bar(_Current_Nre_Shot-1) arrivee = sql_station_arrivee(_Current_Next_Station) # type: ignore if (len(_Next_Station_ID)==0): # type: ignore # fin de la série # print (f"\tFin de la série directe: {_Current_Serie_ID} (pas de suite) à la station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}") nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore if (len(arrivee)>=1): # type: ignore nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore else : nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore # print (f"\tFin de la série directe: {_Current_Serie_ID} station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}") if (len(arrivee)>=1): # type: ignore nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête de lecture de la série\033[0m {_Current_Serie_ID}\033[91m, suivi_serie:\033[0m {e}") error_count += 1 return return ##################################################################################################################################### # Fonction pour créer les nouvelles séries à une station # ##################################################################################################################################### def nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, _DIRECTION, _STATION_ENT_ID) : global error_count # if _Current_Serie_ID == 2 or _Current_Serie_ID == 3: # print("Debug, a suivre") try: if _DIRECTION == 1: _Next_Station = sql_station_depart(_Current_Station_ID) # cursor.execute(f""" # SELECT SHOT.TO_ID as TO_ID_RESULT # FROM SHOT # WHERE SHOT.FROM_ID = {_Current_Station_ID} # -- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT) # """) # _Next_Station = cursor.fetchall() elif _DIRECTION == -1: _Next_Station = sql_station_arrivee(_Current_Station_ID) # cursor.execute(f""" # SELECT SHOT.FROM_ID as FROM_ID_RESULT # FROM SHOT # WHERE SHOT.TO_ID = {_Current_Station_ID} # -- AND ( SELECT SHOT.FROM_ID FROM SHOT WHERE SHOT.TO_ID = FROM_ID_RESULT ) # """) # _Next_Station = cursor.fetchall() if _Next_Station is None: # type: ignore print(f"Pas de série crées à la station: {_Current_Station_ID}") return # boucle sur liste _Next_Station for Depart in _Next_Station: # type: ignore if _Current_Old_Station != Depart[0] : # type: ignore cursor.execute(f"UPDATE JONCTION SET SERIE_JONC = {_Current_Serie_ID} WHERE STATION_ID = {_Current_Station_ID};") cursor.execute(f"UPDATE JONCTION SET ENTREE_ID = {_STATION_ENT_ID} WHERE STATION_ID = {_Current_Station_ID};") cursor.execute(f"UPDATE JONCTION SET STATION_JONC = {Depart[0]} WHERE STATION_ID = {_Current_Station_ID};") cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', _Current_Station_ID)) if _DIRECTION == 1 : cursor.execute(f""" INSERT INTO SERIE ( SERIE_DEP_ID, STATION_DEP_ID, SERIE_ARR_ID , STATION_ARR_ID, SERIE_NBRE_SHOT, SERIE_LENGHT, SERIE_LENGHT_SURFACE, SERIE_LENGHT_DUPLICATE, DIRECTION, STATION_ENT_ID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (_Current_Serie_ID, _Current_Station_ID, -1, Depart[0], -1, 0.0, 0.0, 0.0, 1,_STATION_ENT_ID)) conn.commit() # print(f"\tCréation Série directe: {cursor.lastrowid} depuis la station: {_Current_Station_ID} vers {Depart[0]} ") elif _DIRECTION == -1 : cursor.execute(f""" INSERT INTO SERIE ( SERIE_DEP_ID, STATION_DEP_ID, SERIE_ARR_ID , STATION_ARR_ID, SERIE_NBRE_SHOT, SERIE_LENGHT, SERIE_LENGHT_SURFACE, SERIE_LENGHT_DUPLICATE, DIRECTION, STATION_ENT_ID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (-1, Depart[0], _Current_Serie_ID, _Current_Station_ID, -1, 0.0, 0.0, 0.0, -1,_STATION_ENT_ID)) conn.commit() # print(f"\tCréation Série inv.: {cursor.lastrowid} depuis {Depart[0]} vers la station: {_Current_Station_ID}") except sqlite3.Error as e: print(f"\033[91mErreur lors de la requête (nouvelle_serie):\033[0m {e}") error_count += 1 return ##################################################################################################################################### # Fonction pour tester si la station a déjà été lue # ##################################################################################################################################### def test_jonction(station, serie, entree) : global error_count try: cursor.execute(f""" -- Requête 6: Détection des départs depuis une station (visée directe) SELECT SHOT.TO_ID as TO_ID_RESULT, --JONCTION.STATION_TYPE, SHOT.LENGTH, CASE WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH ELSE 0 END AS LENGTH_SRF, CASE WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH ELSE 0 END AS LENGTH_DPL, SHOT.ID -- SHOT_FLAG.FLAG as Type_Flag FROM SHOT --JOIN JONCTION ON SHOT.TO_ID = JONCTION.STATION_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE SHOT.FROM_ID = {station} -- AND JONCTION.STATION_TYPE IS NULL -- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT) """) depart = cursor.fetchall() cursor.execute(f""" -- Requête 7: Détection des arrivées depuis une station (Visée inverse) SELECT SHOT.FROM_ID as FROM_ID_RESULT, --JONCTION.STATION_TYPE, SHOT.LENGTH, CASE WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH ELSE 0 END AS LENGTH_SRF, CASE WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH ELSE 0 END AS LENGTH_DPL, SHOT.ID -- SHOT_FLAG.FLAG FROM SHOT --JOIN JONCTION ON SHOT.FROM_ID = JONCTION.STATION_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE SHOT.TO_ID = {station} --AND JONCTION.STATION_TYPE IS NULL --AND ( SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE SHOT.TO_ID = FROM_ID_RESULT """) arrivee = cursor.fetchall() total = arrivee + depart # type: ignore for row in total: # type: ignore cursor.execute(f"SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}") # if row[1] == 5017: # print('Debug point') retour = cursor.fetchall() val = str(retour[0]) if val != '(None,)' : cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', row[0])) cursor.execute(f"SELECT JONCTION.ENTREE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}") retour = cursor.fetchall() cursor.execute(f"SELECT JONCTION.SERIE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}") _serie = cursor.fetchall() # print (f"\tJonction à proximité de la Station_ID: {row[0]}, retour: {str(val)}, serie {serie} - {_serie[0][0]}, entrée {entree} - {retour[0][0]}") if (retour[0][0] != entree) and (retour[0][0] != None) : print (f"\033[36m\t Jonction à la Station_ID: {row[0]} entre les entrées {entree} et {retour[0][0]}\033[0m") cursor.execute(f"INSERT INTO RESEAU ( STATION_JONC, ENT_1, ENT_2) VALUES (?, ?, ?)", (row[0], entree, retour[0][0])) conn.commit() # if _serie[0][0] != serie and (_serie[0][0] != None): # print (f"\033[34m\tJonction à la Station_ID: {row[0]} entre les series {serie} et {_serie[0][0]}\033[0m") cursor.execute(f"SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE JONCTION.STATION_ID = {station}") retour = cursor.fetchall() val = str(retour[0]) if val == '(None,)' : #print (f"\tPas de jonction à la Station_ID: {station}, retour: {val}") return False else : cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', station)) cursor.execute(f"SELECT JONCTION.ENTREE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {station}") retour = cursor.fetchall() cursor.execute(f"SELECT JONCTION.SERIE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {station}") _serie = cursor.fetchall() # print (f"\tJonction à proximité de la Station_ID: {row[0]}, retour: {str(val)}, serie {serie} - {_serie[0][0]}, entrée {entree} - {retour[0][0]}") if retour[0][0] != entree : print (f"\033[0m\t Jonction à la Station_ID: {station} entre les entrées {entree} et {retour[0][0]}\033[0m") if _serie[0][0] != serie : print (f"\033[0m\t Jonction à la Station_ID: {station} entre les series {serie} et {_serie[0][0]}\033[0m") return True except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requêtes (test_jonction): {e}\033[0m") error_count += 1 return ##################################################################################################################################### # Fonction pour exécuter une requête et sauvegarder les résultats dans un fichier texte # # # ##################################################################################################################################### def calcul_stats(output_file): global error_count global _largeurCol global _largeurColTete try: print(f"\033[1;32mPhase 5: Écriture des statistiques dans \033[0m{output_file}") # Enregistrement des résultats dans un fichier texte output_file_ligne =[] for i in range(9): output_file_ligne.append(titre[i].ljust(90)+"*\n") sql_query1 = (""" Select round(sum(LENGTH), 2) as Lg, round(sum(DUPLICATE_LENGTH),2) as Duplicate, round(sum(SURFACE_LENGTH),2) as Surface -- round(sum(LENGTH) + sum(DUPLICATE_LENGTH), 2) as Total from CENTRELINE length """) cursor.execute(sql_query1) results = cursor.fetchall() vide ="-".ljust(_largeurCol) # output_file_ligne.append(f"Développement total centerline:\t{"{:.2f}".format(results[0][0]).ljust(_largeurCol)}\t{"{:.2f}".format(results[0][1]).ljust(_largeurCol)}\t{vide}\t{vide}\t{vide}\tdev.(m), dupl.(m)\n") #print('Développement total: ' + formatted_row + 'm') output_file_ligne.append(f"Développement total centerline:\t%s\t%s\t%s\t%s\t%s\tDev.(m), Dupl.(m), Surf.(m)\n" %(str("{:.2f}".format(results[0][0]).ljust(_largeurCol)), str("{:.2f}".format(results[0][1]).ljust(_largeurCol)), str("{:.2f}".format(results[0][2]).ljust(_largeurCol)), str(vide), str(vide))) cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL") _compteur = cursor.fetchall() compteur = int(_compteur[0][0]) if compteur > 0 : # type: ignore output_file_ligne.append(f"Attention, {compteur} station(s) non comptabilisée(s) et raccordée(s)\n") results=sql_bilan_reseaux() if results[0][0] != None :# type: ignore output_file_ligne.append('\nDéveloppement total par réseaux\n') for row in results: # type: ignore formatted_row = '\t'.join(map(str, row)) output_file_ligne.append('\t' + formatted_row + '\n') #print('Développement total: ' + formatted_row + 'm') results=sql_bilan_annee() if results[0][0] != None :# type: ignore output_file_ligne.append('\nDéveloppement total topographié par année(s)\n') for row in results: # type: ignore formatted_row = '\t'.join(map(str, row)) output_file_ligne.append('\t' + formatted_row + '\n') #print('Développement total: ' + formatted_row + 'm') Rose(output_file_name_rose) Shot_lengths_histogram(output_file_name_histo) findetraitement = datetime.now() duree = findetraitement - maintenant jours, secondes = divmod(duree.seconds, 86400) # 86400 secondes dans une journée heures, secondes = divmod(secondes, 3600) # 3600 secondes dans une heure minutes, secondes = divmod(secondes, 60) # 60 secondes dans une minute if duree.seconds > 3600: duree_formatee = "{:02}:{:02}:{:02}(s)".format(heures, minutes, secondes) elif duree.seconds > 60: duree_formatee = "{:02}:{:02}(s)".format(minutes, secondes) else : duree_formatee = "{:02}(s)".format(secondes) if error_count == 0: output_file_ligne[7] = "* Durée calcul: " + duree_formatee + " sans erreur" output_file_ligne[7] = output_file_ligne[7].ljust(90)+"*\n" else : output_file_ligne[7] = "* Durée calcul: " + duree_formatee + " avec erreur(s): " + str(error_count) output_file_ligne[7] = output_file_ligne[7].ljust(90)+"*\n" with open(output_file, 'w', encoding='utf-8') as file: file.writelines(output_file_ligne) if error_count == 0 : print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee + f"\033[1;32m, résultats enregistrés dans \033[0m{output_file}") else : print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee + f",\033[91m avec \033[0m{error_count}\033[91m erreur(s), \033[1;32mrésultats enregistrés dans \033[0m{output_file}") except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution des requêtes calcul_stats:\033[0m {e}") error_count += 1 output_file_ligne.append(f"Erreur lors de l'exécution des requêtes calcul_stats: {e}\n") with open(output_file, 'w', encoding='utf-8') as file: file.writelines(output_file_ligne) except FileNotFoundError: print(f"\033[91mErreur d'ouverture du fichier: \033[0m{output_file} ") error_count += 1 except Exception as e: print(f"\033[91mErreur lors de l'exécution de calcul_stats:\033[0m {e}") error_count += 1 output_file_ligne.append(f"Erreur lors de l'exécution de calcul_stats: {e}\n") with open(output_file, 'w', encoding='utf-8') as file: file.writelines(output_file_ligne) return ##################################################################################################################################### # # Requête : Table des entrées (Liste des entrées avec coordonnées) # ##################################################################################################################################### def sql_liste_entree(): global error_count sql_query= (""" select STATION.ID, STATION.NAME, /*SURVEY.NAME, SURVEY.PARENT_ID, SURVEY.FULL_NAME, SURVEY.TITLE,*/ round(STATION.X, 1), round(STATION.Y, 1), round(STATION.Z, 1) /*, STATION_FLAG.FLAG , count(STATION.NAME) AS Nombre_Occurrences */ from STATION join STATION_FLAG on STATION_FLAG.STATION_ID = STATION.ID join SURVEY on SURVEY.ID = STATION.SURVEY_ID where STATION_FLAG.FLAG='ent' or STATION_FLAG.FLAG='fix' --and STATION.ID = 28548 group by STATION.NAME , STATION.Y, STATION.Z order by STATION.NAME ASC """) # sql_query2 = (""" # select # STATION.ID, # STATION.NAME, # /*SURVEY.NAME, SURVEY.PARENT_ID, SURVEY.FULL_NAME, SURVEY.TITLE,*/ # round(STATION.X, 1), # round(STATION.Y, 1), # round(STATION.Z, 1) # /*, STATION_FLAG.FLAG , count(STATION.NAME) AS Nombre_Occurrences */ # from STATION # join STATION_FLAG on STATION_FLAG.STATION_ID = STATION.ID # join SURVEY on SURVEY.ID = STATION.SURVEY_ID # where STATION_FLAG.FLAG='fix' --and STATION.ID = 28548 # --group by STATION.NAME # order by STATION.NAME ASC # """) try: cursor.execute(sql_query) result_ent = cursor.fetchall() # cursor.execute(sql_query) # result_fix = cursor.fetchall() if len(result_ent) == 0 : error_count += 1 print(f"\t \033[91mAttention aucune entrée ou point fix comptabilisé\033[0m") else : print(f"\t Table des STATION, entrée et fix nbre: {len(result_ent)}") return result_ent # if len(result_ent) == 0: # print(f"\033[91mPas d'entrées\033[0m") # if len(result_fix) == 0: # print(f"\033[91mPas de points fixes\033[0m") # return None # else : # print(f"\tTable des STATION, point fixe nbre: {len(result_fix)}") # return result_fix # elif len(result_ent) == len(result_fix) : # print(f"\tTable des STATION, entrée nbre: {len(result_ent)}") # # print(f"\tTable des STATION, point fixe nbre: {len(result_fix)}") # return result_ent # elif len(result_ent) > len(result_fix) : # print(f"\033[91mA gérer Points fixes > entrées, traitement uniquement des entrées\033[0m") # return result_ent # elif len(result_ent) < len(result_fix) : # print(f"\033[91mA gérer Points fixes < entrées, traitement uniquement des points fixes\033[0m") # return result_fix except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête 4 (sql_liste_entree):\033[0m {e}") error_count += 1 return None return ##################################################################################################################################### # # Requête : Table des séries vides # ##################################################################################################################################### def sql_serie_vides(): global error_count sql_query5 = (""" SELECT * FROM SERIE WHERE SERIE.SERIE_NBRE_SHOT = -1""") try: cursor.execute(sql_query5) # Exécution de la requête SQL retour = cursor.fetchall() # if len(retour) == 0 : # print(f"Aucune séries vides {len(retour)}") return retour except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête (sql_serie_vides): {e}\033[0m") error_count += 1 return None return ##################################################################################################################################### # # Requête: From_To (recherche si il y a un départ dans le sens From vers To depuis la station Current_Station_ID) # ##################################################################################################################################### def sql_station_depart(station): global error_count try: cursor.execute(f""" -- Requête 6: Détection des départs depuis une station (visée directe) SELECT SHOT.TO_ID as TO_ID_RESULT, --JONCTION.STATION_TYPE, SHOT.LENGTH, CASE WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH ELSE 0 END AS LENGTH_SRF, CASE WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH ELSE 0 END AS LENGTH_DPL, -- SHOT_FLAG.FLAG as Type_Flag SHOT.ID FROM SHOT JOIN JONCTION ON SHOT.TO_ID = JONCTION.STATION_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE SHOT.FROM_ID = {station} AND JONCTION.STATION_TYPE IS NULL -- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT) """) retour = cursor.fetchall() # if len(retour) == 0 : print(f"\tAucun départ depuis la station: {station}") return retour except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête 6 (sql_station_depart) code:\033[0m {e}") error_count += 1 return None return ##################################################################################################################################### # # Requête : To_From (recherche si il y a un départ dans le sens To vers From depuis la station Current_Station_ID) # ##################################################################################################################################### def sql_station_arrivee(station): global error_count try: cursor.execute(f""" -- Requête 7: Détection des arrivées depuis une station (Visée inverse) SELECT SHOT.FROM_ID as FROM_ID_RESULT, --JONCTION.STATION_TYPE, SHOT.LENGTH, CASE WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH ELSE 0 END AS LENGTH_SRF, CASE WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH ELSE 0 END AS LENGTH_DPL, SHOT.ID -- SHOT_FLAG.FLAG FROM SHOT JOIN JONCTION ON SHOT.FROM_ID = JONCTION.STATION_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE SHOT.TO_ID = {station} AND JONCTION.STATION_TYPE IS NULL --AND ( SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE SHOT.TO_ID = FROM_ID_RESULT """) retour = cursor.fetchall() # if len(retour) == 0 print(f"\tAucune arrivée depuis la station: {station}") return retour except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête 7 (sql_station_arrivee) code:\033[0m {e}") error_count += 1 return None return ##################################################################################################################################### # #-- Bilan table série # ##################################################################################################################################### def sql_bilan_serie(): global error_count try: cursor.execute(f""" -- Bilan table série select --STATION.NAME, -- sum(SERIE.SERIE_LENGHT) as Total, round(sum(SERIE.SERIE_LENGHT) - sum(SERIE.SERIE_LENGHT_SURFACE)- sum(SERIE.SERIE_LENGHT_DUPLICATE), 2) as Long, round(sum(SERIE.SERIE_LENGHT_DUPLICATE),2) as Duplicate, round(sum(SERIE.SERIE_LENGHT_SURFACE),2) as Surface, sum(SERIE.SERIE_NBRE_SHOT) as Nbre_Shot, COUNT(*) AS Nbre_serie FROM SERIE JOIN STATION ON SERIE.STATION_ENT_ID = STATION.ID """) retour = cursor.fetchall() return retour except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête 11 (sql_bilan_serie):\033[0m {e}") error_count += 1 return None return ##################################################################################################################################### # #-- Bilan table série By Réseaux # ##################################################################################################################################### def sql_bilan_reseaux(): global error_count global _largeurCol global _largeurColTete retour = [] try: ############################################################################################################### # Liste des réseaux ############################################################################################################### cursor.execute(f""" -- Bilan table série By Réseaux select --SURVEY_RESEAU.TITLE as Réseau, --SURVEY_RESEAU.NAME, --SURVEY_RESEAU.ID, RESEAU_ID, --STATION.NAME as Nom, -- sum(SERIE.SERIE_LENGHT) as Total, round(sum(SERIE.SERIE_LENGHT) - sum(SERIE.SERIE_LENGHT_SURFACE)- sum(SERIE.SERIE_LENGHT_DUPLICATE), 2) as Long, round(sum(SERIE.SERIE_LENGHT_DUPLICATE),2) as Duplicate, round(sum(SERIE.SERIE_LENGHT_SURFACE),2) as Surface, sum(SERIE.SERIE_NBRE_SHOT) as Nbre_Shot --COUNT(*) AS Nbre_serie --round(max(STATION.Z),2) as Max_Z, --round(min(STATION.Z),2) as Min_Z, --max(STATION.Z) - min(STATION.Z) as Delta_Z FROM SERIE JOIN STATION ON SERIE.STATION_ENT_ID = STATION.ID JOIN SURVEY AS SURVEY_JONCTION ON STATION.SURVEY_ID = SURVEY_JONCTION.ID JOIN SURVEY AS SURVEY_RESEAU ON SURVEY_JONCTION.PARENT_ID = SURVEY_RESEAU.ID WHERE RESEAU_ID is not NULL and RESEAU_ID !=0 GROUP BY SERIE.RESEAU_ID ORDER BY Long DESC """) result = cursor.fetchall() if len(result) >0 : # _ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "] # _ligne[0]= 'Aucun réseau'.ljust(_largeurColTete) # retour.append(_ligne) # return retour for row in result: ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "] result_shot = 0 cursor.execute(f""" select COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name from VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null and VISEE_FLAG.RESEAU_ID ={row[0]} """) _result_length = cursor.fetchall() result_length = float(_result_length[0][0]) result_shot = int(_result_length[0][1]) cursor.execute(f""" select COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name from VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl' and VISEE_FLAG.RESEAU_ID ={row[0]} """) _result_length_dpl = cursor.fetchall() result_length_dpl = float(_result_length_dpl[0][0]) result_shot += int(_result_length_dpl[0][1]) cursor.execute(f""" select COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name from VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf' and VISEE_FLAG.RESEAU_ID ={row[0]} """) _result_length_srf = cursor.fetchall() result_length_srf = float(_result_length_srf[0][0]) result_shot += int(_result_length_srf[0][1]) # ligne = [ 'none', 0, 0, 0, 0, 0, 0, 'none', 0, 'none', 0, 0] cursor.execute(f""" -- Liste des entrée dans la table RESEAU SELECT --RESEAU.ENT_1 AS ENT_ID, --RESEAU.RESEAU_ID, STATION.NAME --STATION.Z FROM RESEAU JOIN STATION ON RESEAU.ENT_1 = STATION.ID WHERE RESEAU_ID = {row[0]} UNION --ALL SELECT --RESEAU.ENT_2 AS ENT_ID, --RESEAU.RESEAU_ID, STATION.NAME --STATION.Z FROM RESEAU JOIN STATION ON RESEAU.ENT_2 = STATION.ID WHERE RESEAU_ID = {row[0]} GROUP BY STATION.NAME ORDER BY STATION.NAME --ORDER BY STATION.Z DESC """) liste_entree = cursor.fetchall() _liste_ent = liste_entree[0][0] index = 1 while index < len(liste_entree): _liste_ent += ", " + liste_entree[index][0] index += 1 if len(_liste_ent) > _largeurColTete : _largeurColTete = len(_liste_ent) + 2 ligne[0] =_liste_ent.ljust(_largeurColTete) # Liste Entrées ligne[1] = str(len(liste_entree)) # Nre Ent. ligne[2] = str("{:.2f}".format(result_length)) # Dev. ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl. ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf. ligne[6] = str(result_shot) # Visées cursor.execute(f""" -- Requête pour rechercher le point bas d'un réseau / entrée SELECT STATION.name, STATION.Z as Min from STATION join ( select Min(STATION.Z) as Val_Min from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID WHERE JONCTION.RESEAU_ID = {row[0]} ) min on STATION.Z = min.Val_Min LIMIT 1 """) altitude_min = cursor.fetchall() ligne[7] = altitude_min[0][0] ligne[8] = str("{:.2f}".format(altitude_min[0][1])) cursor.execute(f""" -- Requête pour rechercher le point haut d'un réseau / entrée SELECT STATION.name, STATION.Z as Max from STATION join ( select Max(STATION.Z) as Val_Max from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID WHERE JONCTION.RESEAU_ID = {row[0]} ) max on STATION.Z = max.Val_Max LIMIT 1 """) altitude_max = cursor.fetchall() ligne[9] = altitude_max[0][0] ligne[10] = str("{:.2f}".format(altitude_max[0][1])) ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1]) for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol) retour.append(ligne) # print(f"Reseau num {row[0]}, {len(liste_entree)} entrée(s): {_liste_ent} ") ############################################################################################################### # Liste des visées non raccordées ############################################################################################################### cursor.execute(f""" SELECT sum (SHOT.LENGTH) as Long FROM VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG is NULL """) result_long = cursor.fetchall() if result_long[0][0] is None : _result_long = 0.0 else : _result_long = result_long[0][0] cursor.execute(f""" SELECT sum (SHOT.LENGTH) as Long FROM VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG ='dpl' """) result_dpl = cursor.fetchall() if result_dpl[0][0] is None : _result_dpl = 0.0 else : _result_dpl = result_dpl[0][0] cursor.execute(f""" SELECT sum (SHOT.LENGTH) as Long FROM VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG ='srf' """) result_srf = cursor.fetchall() if result_srf[0][0] is None : _result_srf = 0.0 else : _result_srf = result_srf[0][0] cursor.execute(f""" SELECT count (SHOT.LENGTH) as Long FROM VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID WHERE VISEE_FLAG.SERIE_ID is NULL --and SHOT_FLAG.FLAG ='srf' """) result_count = cursor.fetchall() if result_count[0][0] is None : _result_count = 0.0 else : _result_count = result_count[0][0] ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "] _liste_ent = "Visée(s) non raccordées" _liste_ent = _liste_ent.ljust(_largeurColTete) ligne[0] = _liste_ent ligne[1] = str("0") ligne[2] = str("{:.2f}".format(_result_long)) ligne[4] = str("{:.2f}".format(_result_dpl)) ligne[5] = str("{:.2f}".format(_result_srf)) ligne[6] = str(_result_count) cursor.execute(f""" -- Requête pour rechercher le point bas d'un réseau / entrée SELECT STATION.name, STATION.Z as Min from STATION join ( select Min(STATION.Z) as Val_Min from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID WHERE JONCTION.SERIE_ID is null ) min on STATION.Z = min.Val_Min LIMIT 1 """) altitude_min = cursor.fetchall() if len(altitude_min) == 0 : _altitude_min = 0.0 _altitude_min_name = 'None' else : _altitude_min = altitude_min[0][1] _altitude_min_name = str(altitude_min[0][0]) ligne[7] = str(_altitude_min_name) ligne[8] = str("{:.2f}".format(_altitude_min)) cursor.execute(f""" -- Requête pour rechercher le point haut d'un réseau / entrée SELECT STATION.name, STATION.Z as Max from STATION join ( select Max(STATION.Z) as Val_Max from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID WHERE JONCTION.SERIE_ID is null ) max on STATION.Z = max.Val_Max LIMIT 1 """) altitude_max = cursor.fetchall() if len(altitude_max) == 0 : _altitude_max = 0.0 _altitude_max_name = 'None' else : _altitude_max = altitude_max[0][1] _altitude_max_name = str(altitude_max[0][0]) ligne[7] = str(_altitude_max_name) ligne[8] = str("{:.2f}".format(_altitude_max)) ligne[3] = "-" #"{:.2f}".format(altitude_max[0][1] - altitude_min[0][1]) for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol) if _result_long !=0 or _result_dpl != 0 or _result_srf !=0 or _result_count !=0: retour.append(ligne) ############################################################################################################### # Totaux ############################################################################################################### result_shot = 0 cursor.execute(f""" select COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name from VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null """) _result_length = cursor.fetchall() result_length = float(_result_length[0][0]) result_shot = int(_result_length[0][1]) cursor.execute(f""" SELECT COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name FROM VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl' """) _result_length_dpl = cursor.fetchall() result_length_dpl = float(_result_length_dpl[0][0]) result_shot += int(_result_length_dpl[0][1]) cursor.execute(f""" SELECT COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name FROM VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf' """) _result_length_srf = cursor.fetchall() result_length_srf = float(_result_length_srf[0][0]) result_shot += int(_result_length_srf[0][1]) cursor.execute(f""" -- Bilan table VISEE_FLAG By entrées select ENTREE_ID --STATION.NAME FROM VISEE_FLAG JOIN STATION ON VISEE_FLAG.ENTREE_ID = STATION.ID WHERE SERIE_ID >0 GROUP BY VISEE_FLAG.ENTREE_ID """) _result_entrees = cursor.fetchall() _total_entrees_topo = len(_result_entrees) result2 = sql_liste_entree() _total_entrees_non_topo = len(result2) - _total_entrees_topo # type: ignore if _result_length[0][1] != None : ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "] ligne[0] = "Totaux (entrées et points fixes)".ljust(_largeurColTete) # Liste Entrées ligne[1] = str("{:.0f}".format(len(result2))) # type: ignore # Nre Ent. ligne[2] = str("{:.2f}".format(result_length)) # Dev. ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl. ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf. ligne[6] = str(result_shot) # Visées cursor.execute(f""" -- Requête pour rechercher le point bas d'un réseau / entrée SELECT STATION.name, STATION.Z as Min from STATION join ( select Min(STATION.Z) as Val_Min from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID ) min on STATION.Z = min.Val_Min LIMIT 1 """) altitude_min = cursor.fetchall() ligne[7] = altitude_min[0][0] ligne[8] = str(altitude_min[0][1]) cursor.execute(f""" -- Requête pour rechercher le point haut d'un réseau / entrée SELECT STATION.name, STATION.Z as Max from STATION join ( select Max(STATION.Z) as Val_Max from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID ) max on STATION.Z = max.Val_Max LIMIT 1 """) altitude_max = cursor.fetchall() ligne[9] = altitude_max[0][0] ligne[10] = str("{:.2f}".format(altitude_max[0][1])) ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1]) for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol) retour.append(ligne) else : _total_entrees_non_topo = 0 _total_entrees_topo = 0 ############################################################################################################### # Liste des entrées uniques ############################################################################################################### cursor.execute(f""" -- Bilan table VISEE_FLAG By entrées select ENTREE_ID, STATION.NAME FROM VISEE_FLAG JOIN STATION ON VISEE_FLAG.ENTREE_ID = STATION.ID WHERE RESEAU_ID ==0 or RESEAU_ID is null and SERIE_ID >0 GROUP BY VISEE_FLAG.ENTREE_ID """) result = cursor.fetchall() for row in result : ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "] result_shot = 0 cursor.execute(f""" select COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name from VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null and VISEE_FLAG.ENTREE_ID ={row[0]} """) _result_length = cursor.fetchall() result_length = float(_result_length[0][0]) result_shot = int(_result_length[0][1]) cursor.execute(f""" select COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name from VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl' and VISEE_FLAG.ENTREE_ID ={row[0]} """) _result_length_dpl = cursor.fetchall() result_length_dpl = float(_result_length_dpl[0][0]) result_shot += int(_result_length_dpl[0][1]) cursor.execute(f""" select COALESCE(round(sum(SHOT.length), 2), 0) as ttl, count(VISEE_FLAG.ID) as count, STATION_TO.NAME as To_Name from VISEE_FLAG JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf' and VISEE_FLAG.ENTREE_ID ={row[0]} """) _result_length_srf = cursor.fetchall() result_length_srf = float(_result_length_srf[0][0]) result_shot += int(_result_length_srf[0][1]) if result_length_srf == 0.0 and result_length == 0.0 and result_length_dpl == 0.0 : _total_entrees_non_topo+=1 else : ligne[0] = str((row[1])).ljust(_largeurColTete) ligne[1] = str("1") ligne[2] = str("{:.2f}".format(result_length)) # Dev. ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl. ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf. ligne[6] = str(result_shot) # Visées cursor.execute(f""" -- Requête pour rechercher le point bas d'un réseau / entrée SELECT STATION.name, STATION.Z as Min from STATION join ( select Min(STATION.Z) as Val_Min from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID WHERE JONCTION.ENTREE_ID = {row[0]} ) min on STATION.Z = min.Val_Min LIMIT 1 """) altitude_min = cursor.fetchall() ligne[7] = altitude_min[0][0] ligne[8] = str("{:.2f}".format(altitude_min[0][1])) cursor.execute(f""" -- Requête pour rechercher le point haut d'un réseau / entrée SELECT STATION.name, STATION.Z as Max from STATION join ( select Max(STATION.Z) as Val_Max from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID WHERE JONCTION.ENTREE_ID = {row[0]} ) max on STATION.Z = max.Val_Max LIMIT 1 """) altitude_max = cursor.fetchall() ligne[9] = altitude_max[0][0] ligne[10] = str("{:.2f}".format(altitude_max[0][1])) ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1]) for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol) retour.append(ligne) ############################################################################################################### # Entrées sans topo ############################################################################################################### ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "] if _total_entrees_non_topo >=1 : ligne[0] = "Entrée(s) sans topographie".ljust(_largeurColTete) ligne[1] = str(_total_entrees_non_topo) ligne[2] = "0.00" ligne[3] = "0.00" ligne[4] = "0.00" ligne[5] = "0.00" ligne[6] = "0" cursor.execute(f""" -- Requête pour rechercher le points bas d'un réseau / entrée SELECT STATION.name, STATION.Z as Min from STATION join ( select Min(STATION.Z) as Val_Min from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID WHERE JONCTION.SERIE_ENT = -1 AND JONCTION.STATION_TYPE = 'ent' ) min on STATION.Z = min.Val_Min LIMIT 1 """) altitude_min = cursor.fetchall() ligne[7] = altitude_min[0][0] ligne[8] = str(altitude_min[0][1]) cursor.execute(f""" -- Requête pour rechercher le point haut d'un réseau / entrée SELECT STATION.name, STATION.Z as Max from STATION join ( select Max(STATION.Z) as Val_Max from STATION join JONCTION on STATION.ID = JONCTION.STATION_ID WHERE JONCTION.SERIE_ENT = -1 AND JONCTION.STATION_TYPE = 'ent' ) max on STATION.Z = max.Val_Max LIMIT 1 """) altitude_max = cursor.fetchall() ligne[9] = altitude_max[0][0] ligne[10] = str("{:.2f}".format(altitude_max[0][1])) ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1]) for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol) retour.append(ligne) ############################################################################################################### # Tri et résultats ############################################################################################################### entetes = [ 'Entrée(s)', "Nbre", "Dev.(m)", "Prof.(m)", "Dupl.(m)", "Surf.(m)", "Visées", "ID Sta.", "Alt. min(m)", "ID Sta.", "Alt. max(m)" ] entetes[0] = entetes[0].ljust(_largeurColTete) for i in range(9): entetes[i+1] = entetes[i+1].ljust(_largeurCol) _corps_retour = sorted(retour, key=cle_tri, reverse=True) _retour = [entetes] + _corps_retour return _retour # type: ignore except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête (sql_bilan_reseaux):\033[0m {e}") error_count += 1 return retour return ##################################################################################################################################### # # Clé de tri # ##################################################################################################################################### def cle_tri(element): return float(element[2]) ##################################################################################################################################### # #-- Bilan topo par années # ##################################################################################################################################### def sql_bilan_annee(): global error_count global _largeurCol global _largeurColTete try: retour = [] cursor.execute(f""" select strftime('%Y', TOPO_DATE) as annee from CENTRELINE Where TOPO_DATE is not NULL order by TOPO_DATE """) result = cursor.fetchall() entetes = [ "Année" , "Dev.(m)", "Cumul (m)", "Dupl.(m)", "Cumul (m)", "Surf.(m)", "Cumul (m)" ] cumul = 0.0 cumul_dpl = 0.0 cumul_srf = 0.0 for i in range(6): entetes[i] = entetes[i].ljust(_largeurCol) retour.append(entetes) # topo sans année cursor.execute(f""" select COALESCE( sum(LENGTH), 0), COALESCE (sum(DUPLICATE_LENGTH), 0), COALESCE( sum(SURFACE_LENGTH), 0) from CENTRELINE where TOPO_DATE IS NULL """) bilan_annee = cursor.fetchall() if ( len(bilan_annee) >= 1 ) and (( float(bilan_annee[0][0]) > 0.0) or (float(bilan_annee[0][1]) > 0.0 ) or (float(bilan_annee[0][2]) > 0.0 )): ligne = [ " - ", " - ", " - ", " - ", " - ", " - ", " - " ] cumul += bilan_annee[0][0] cumul_dpl += bilan_annee[0][1] cumul_srf += bilan_annee[0][2] ligne[0] = "-" ligne[1] = str("{:.2f}".format(bilan_annee[0][0])) ligne[2] = str("{:.2f}".format(cumul)) ligne[3] = str("{:.2f}".format(bilan_annee[0][1])) ligne[4] = str("{:.2f}".format(cumul_dpl)) ligne[5] = str("{:.2f}".format(bilan_annee[0][2])) ligne[6] = str("{:.2f}".format(cumul_srf)) for i in range(6): ligne[i] = ligne[i].ljust(_largeurCol) retour.append(ligne) # années standard debut = int(result[0][0]) fin = int(result[len(result)-1][0]) PlotExploYears(output_file_name_year, [debut, fin]) for annee in range(debut, fin + 1, 1 ): cursor.execute(f""" select COALESCE (sum(LENGTH), 0), COALESCE (sum(DUPLICATE_LENGTH), 0), COALESCE (sum(SURFACE_LENGTH), 0) from CENTRELINE where TOPO_DATE between '{annee}-01-01' and '{annee}-12-31'; """) bilan_annee = cursor.fetchall() ligne = [ " - ", " - ", " - ", " - ", " - ", " - ", " - " ] cumul += bilan_annee[0][0] cumul_dpl += bilan_annee[0][1] cumul_srf += bilan_annee[0][2] ligne[0] = str("{:.0f}".format(annee)) ligne[1] = str("{:.2f}".format(bilan_annee[0][0])) ligne[2] = str("{:.2f}".format(cumul)) ligne[3] = str("{:.2f}".format(bilan_annee[0][1])) ligne[4] = str("{:.2f}".format(cumul_dpl)) ligne[5] = str("{:.2f}".format(bilan_annee[0][2])) ligne[6] = str("{:.2f}".format(cumul_srf)) for i in range(6): ligne[i] = ligne[i].ljust(_largeurCol) retour.append(ligne) return retour # type: ignore except sqlite3.Error as e: print(f"\033[91mErreur lors de l'exécution de la requête (sql_bilan_annee):\033[0m {e}") error_count += 1 return None except Exception as e: print(f"\033[91mErreur lors de l'exécution de sql_bilan_annee:\033[0m {e}") error_count += 1 return None ##################################################################################################################################### # diagramme de "rose" # ##################################################################################################################################### def Rose(graph_name, bins = 72): """ Plot a Rose diagram of the entire database Args: conn (sqlite_db): database sqlite graph_name (str): path and name of the graph to save bins (int, optional): bins for the plot. Defaults to 72. """ # Extract the right data df = pd.read_sql_query("select * from SHOT;", conn) # Built the histogram #h, e = np.histogram(df["BEARING"] * np.pi/180., weights = df["LENGTH"], bins = bins) # Pour enlever les visées verticales (et donc de bearing systématiquement à 0°...) h, e = np.histogram(df["BEARING"] * np.pi/180., weights = df["LENGTH"]* (90-np.abs(df["GRADIENT"]))/100, bins = bins) # Plot the rose diagram ax = plt.subplot(111, projection = "polar") ax.set_theta_zero_location("N") # type: ignore ax.set_theta_direction(-1) # type: ignore ax.bar(e[:-1], h, align = "edge", width = e[1]-e[0]) # Save the rose diagram plt.savefig(graph_name) # Close the graph plt.close(plt.figure(1)) return ##################################################################################################################################### # diagramme de longueurs de visées # ##################################################################################################################################### def Shot_lengths_histogram(graph_name, bins = 72, log = None): """ Plot the histogram of the lengths of the shots for the entire database Args: conn (sqlite_db): database sqlite graph_name (str): path and name of the graph to save bins (int, optional): bins for the plot. Defaults to 72. log (str, optional): set it to 'log' to use a y-logscale. Defaults to None. """ # Extract the right data df = pd.read_sql_query("select * from SHOT;", conn) # plot the histogram plt.hist(df["LENGTH"], bins = bins) plt.xlabel("Longueur de visée (m)") plt.ylabel("Nombre") plt.xlim(0,50) # If log y-scale, set it if log: plt.yscale("log") # save plt.savefig(graph_name) plt.close(plt.figure(1)) return ##################################################################################################################################### # diagrammes par années # ##################################################################################################################################### def PlotExploYears(graph_name, rangeyear = [1959, datetime.now().year], systems = None): """ Args: conn (sqlite_db): database sqlite graph_name (str): path and name of the graph to save rangeyear (np.array of integers, optional): 2 elements numpy array that gives the range of the years to analyse. Defaults to [1959, datetime.date.today().year]. systems (list of str, optional): list of specific systems to plot if needed. Defaults to None. """ # define colors to use; You may add colors if needed colores = ['tab:blue', 'tab:red', 'tab:green', 'tab:orange', 'tab:purple', 'tab:marron', 'tab:olive', 'tab:pink', 'tab:cyan'] if systems: # Initiate variables #somme = pd.DataFrame(columns = ['System', 'Year', 'Longueur']) Sy = [] Yr = [] Lg = [] # Loop on the systems and the years for system in systems: for date in range(rangeyear[0], rangeyear[1]+1): # Define SQL query lquery = "select sum(LENGTH) from CENTRELINE where SURVEY_ID in (select ID from SURVEY where FULL_NAME LIKE '%s%s%s') and TOPO_DATE between '%s-01-01' and '%s-12-31';" %(chr(37), str(system),chr(37), str(date), str(date)) junk = pd.read_sql_query(lquery, conn) # Update the DataFrame line to line; DEPRECIATED since pandas 2.0 #somme = somme.append({'System' : system, # 'Year' : int(date), # 'Longueur' : junk.to_numpy()[0][0]}, ignore_index = True) Sy.append(system) Yr.append(int(date)) Lg.append(junk.to_numpy()[0][0]) #print(junk) somme = pd.DataFrame(list(zip(Sy, Yr, Lg)), columns = ['System', 'Year', 'Longueur']) print(max(somme['Longueur'])) # plot the histogram since the first survey fig = plt.figure(1) ax1 = fig.add_subplot(111) fig2 = plt.figure(2) ax2 = fig2.add_subplot(111) # Extract the values for the first system sommesys = somme[somme['System'] == systems[0]] # Change None values to 0 sommeplot = sommesys.fillna(0) # Remove the column with the names of the systems del sommeplot["System"] print(sommeplot) ax1.bar(sommeplot["Year"], sommeplot["Longueur"], width = 0.5, color = colores[0], label = systems[0]) ax2.bar(sommeplot["Year"], np.cumsum(sommeplot["Longueur"])/1000, width = 0.5, color = colores[0], label = systems[0]) # Skip the loop on systems if there is only one system requested --> stacked barplot not needed if len(systems) > 1: # Check if the number of colors is enough for the number of systems if len(systems)>len(colores): raise NameError('\033[91mERROR:\033[00m Number of colors lower than the number of systems!\n\tedit the code to add colors in the list, or lower the number of systems to plot') # Copy the length column in an other column to trace of it sommeplot[systems[0]] = sommeplot["Longueur"] for system in systems[1:]: # Extract the length for the system temp = somme[somme['System'] == system] # Replace NaN values by 0 to avoid None values in the sums tempplot = temp.fillna(0) # Reset the indexes to permit the sum of the length per year tempplot.reset_index(inplace = True) del tempplot["System"] # Update the barplot ax1.bar(tempplot["Year"], tempplot["Longueur"], bottom = sommeplot["Longueur"], width = 0.5, color = colores[systems.index(system)], label = system) # Print the cumulative barplot ax2.bar(tempplot["Year"], np.cumsum(tempplot["Longueur"])/1000, bottom = np.cumsum(sommeplot["Longueur"])/1000, width = 0.5, color = colores[systems.index(system)], label = system) # Do the sum of the length, and write it in the length column sommeplot["Longueur"] = sommeplot["Longueur"] + tempplot["Longueur"] # Copy the length of the system in a new column sommeplot[systems[systems.index(system)]] = tempplot["Longueur"] # Plot mean line ax1.axhline(y = somme["Longueur"].mean(), color='red', linestyle='--', label = 'Moy. annuelle') ax1.set_xlabel("Année") ax1.set_ylabel("Longueur topographiée (m)") ax1.legend(loc = 'best') # Save the histogram fig.savefig(graph_name + "_Reseau.pdf") plt.close(plt.figure(1)) # plot the cumulative histogram since the first survey ax2.set_xlabel("Année") ax2.set_ylabel("Longueur topographiée cumulée (km)") ax2.legend(loc = 'best') # Save the cumulative histogram fig2.savefig(graph_name + "Cum_Reseau.pdf") plt.close(plt.figure(1)) else: #somme = pd.DataFrame(columns = ['Year', 'Longueur']) Yr = [] Lg = [] for date in range(rangeyear[0], rangeyear[1]): lquery = "select sum(LENGTH) from CENTRELINE where TOPO_DATE between '%s-01-01' and '%s-12-31';" %(str(date), str(date)) junk = pd.read_sql_query(lquery, conn) ## Depreciated depuis Pandas 2.0 #somme = somme.append({'Year' : int(date), # 'Longueur' : junk.to_numpy()[0][0]}, ignore_index = True) Yr.append(int(date)) Lg.append(junk.to_numpy()[0][0]) somme = pd.DataFrame(list(zip(Yr, Lg)), columns = ['Year', 'Longueur']) # plot the histogram since the first survey plt.bar(somme["Year"], somme["Longueur"], width = 0.5) # plot mean plt.axhline(y = somme["Longueur"].mean(), color='red', linestyle='--', label = 'Moy. annuelle') plt.xlabel("Année") plt.ylabel("Longueur topographiée (m)") # Save the histogram plt.savefig(graph_name + ".pdf") plt.close(plt.figure(1)) # plot the cumulative histogram since the first survey plt.bar(somme["Year"], np.cumsum(somme["Longueur"].fillna(0))/1000, width = 0.5) plt.xlabel("Année") plt.ylabel("Longueur topographiée cumulée (km)") # Save the cumulative histogram plt.savefig(graph_name + "Cum.pdf") plt.close(plt.figure(1)) return ##################################################################################################################################### # Main # # # ##################################################################################################################################### if __name__ == '__main__': _largeurColTete = 30 _largeurCol = 10 avt_compteur = 0 error_count=0 visee_suprimmees= [ 0.0, 0.0, 0.0, 0] # Lg, Lg dpl, Lg surf input_file_name = "" outputs_path = "./Outputs/" inputs_path = "./Inputs/" if not os.path.exists(outputs_path): os.makedirs(outputs_path) maintenant = datetime.now() if len(sys.argv) < 2: # input_file = "databaseBB26.sql" # OK # input_file = "cave.sql" # Erreur car pas de point fix ou d'entrée # input_file = "padavka.sql" # Erreur car pas de point fix ou d'entrée # input_file = "rabbit.sql" # OK # input_file = "databaseCriou-2023.sql" # OK #input_file = "databaseCriou-2024_09_18.sql" # OK input_file = "databaseCriou-2023_12_26.sql" # OK # input_file = "databaseKoytendag-2024.sql" # # input_file = "databaseJB-2023.sql" # PB avec 4 visées en doubles, à voir si c'est une erreur de la centerline # input_file = "CheddarCatchment.sql" # PB avec 4 visées en doubles, beaucoup d'entrées, à voir si c'est une erreur de la centerline # input_file = "databaseMoilda-2021.sql" # A priori ça marche # input_file = "databaseSornin-2023.sql" # A priori ça marche input_file_name = inputs_path + input_file if os.name == 'posix': os.system('clear') # Linux, MacOS elif os.name == 'nt': os.system('cls')# Windows else: print("\n" * 100) else : input_file_name = sys.argv[1] # print("Le paramètre fourni est:", input_file_name) if os.path.isfile(input_file_name) is False : print(f"Erreur, fichier {input_file_name} inexistant") print("Commande : python pythStat.py votre_fichier_therion.sql") sys.exit() output_file_name = outputs_path + input_file[:-4]+"_stats.csv" output_file_name_rose = outputs_path + input_file[:-4]+"_rose.pdf" output_file_name_histo = outputs_path + input_file[:-4]+"_histo.pdf" output_file_name_year = outputs_path + input_file[:-4]+"_year" imported_database = outputs_path + input_file[:-4]+"_stats.db" titre = ['******************************************************************************************', '* Calcul des statistiques par entrées d\'une BD Therion', '* Script pythStat par alexandre.pont@yahoo.fr', '* Version Février 2024', '* Fichier source: ' + input_file_name, '* Fichier destination: ' + output_file_name, '* Date: ' + maintenant.strftime("%Y-%m-%d %H:%M:%S"), '* ', '******************************************************************************************'] for i in range(9): print(titre[i].ljust(90)+"*") importation_sql_data(input_file_name) conn = sqlite3.connect(imported_database) # Connexion à la base de données SQLite cursor = conn.cursor() construction_tables() calcul_stats(output_file_name) conn.close()