Files
Synthese-PSM_LARRA/Scripts/pyThStat/pythStat.py
T
2026-01-08 18:55:48 +01:00

2816 lines
142 KiB
Python

# -*- coding: utf-8 -*-
########################################################################################################################################
# #
# Script pour calculer les statistiques des entités jonctionnées #
# d'un fichier database (.sql) produit par Therion #
# By Alexandre PONT alexandre.pont@yahoo.fr #
# #
# Utilisation: #
# Exporter le fichier sql avec therion, commande therion.thconfig : export database -o Outputs/database.sql #
# Commande : python pythStat.py ./chemin/fichier.sql #
# ou : python pythStat.py pour ouvrir une fenêtre #
# Résultat : fichiers dans le dossier crée du fichier source #
########################################################################################################################################
import sqlite3, sys, os, re, argparse
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tkinter as tk
from tkinter import filedialog
from alive_progress import alive_bar # https://github.com/rsalmei/alive-progress
from datetime import datetime
Version ="2025.02.07"
"""#####################################################################################################################################
# Fonction pour importer un fichier SQL dans une base de données SQLite #
# #
#####################################################################################################################################"""
def importation_sql_data(fichier_sql):
"""
Fonction pour importer un fichier SQL dans une base de données SQLite
Args:
fichier_sql (_type_): _description_
"""
global error_count
try:
# Si la base de données existe, supprimez-la pour forcer l'écriture
print(f"\033[1;32mPhase 1: Importation de la base de données Therion \033[0m{safe_relpath(input_file_name)}\033[1;32m dans: \033[0m{safe_relpath(imported_database)}")
if os.path.exists(imported_database):
#print("Suppression de la Bd existante: " + imported_database)
os.remove(imported_database)
connection = sqlite3.connect(imported_database)
cursor = connection.cursor()
# Lecture du fichier SQL et exécution des commandes
with open(fichier_sql, 'r') as sql_file:
sql_script = sql_file.read()
# Séparation du script en commandes individuelles
sql_script = re.sub(r', nan', ', 0', sql_script, flags=re.IGNORECASE)
commandes = [cmd.strip() + ';\n' for cmd in sql_script.split(';\n') if cmd.strip()]
# Exécution des commandes avec une barre de progression
with alive_bar(len(commandes), title = "\x1b[32;1m\t Progression\x1b[0m", length = 20) as bar:
for commande in commandes:
cursor.execute(commande)
connection.commit()
bar()
connection.close()
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête importation_sql_data code:\033[0m {e}")
error_count += 1
sys.exit(1) # Arrête le programme en cas d'erreur
return
#####################################################################################################################################
# Fonction pour construire les tables JONCTION, SERIE, VISEE_FLAG et RESEAU #
# #
#####################################################################################################################################
def construction_tables():
"""
Fonction pour construire les tables JONCTION, SERIE, VISEE_FLAG et RESEAU
"""
global avt_compteur
global error_count
# Principales requêtes
#conn = sqlite3.connect(database) # Connexion à la base de données SQLite
#cursor = conn.cursor()
# print(f"\033[1;32mConstruction des tables dans {imported_database}\033[0m")
try :
print(f"\033[1;32mPhase 2: Création des nouvelles tables, indexation\033[0m")
cursor.execute("DROP TABLE IF EXISTS JONCTION") # Créer et initialiser une nouvelle table de jonctions
cursor.execute("""
CREATE TABLE JONCTION (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
STATION_ID INTEGER,
SERIE_ID INTEGER,
SERIE_RANG INTEGER,
SERIE_ENT INTEGER,
STATION_ENT INTEGER,
SERIE_JONC INTEGER,
STATION_JONC INTEGER,
STATION_TYPE varchar(4),
ENTREE_ID INTEGER,
RESEAU_ID INTEGER
)
""")
cursor.execute("select STATION.ID from STATION")
results = cursor.fetchall()
Next_Station_ID = cursor.fetchall() # Pour forcer le type
cursor.executemany("INSERT INTO JONCTION (STATION_ID) VALUES (?)", results)
conn.commit()
cursor.execute("DROP TABLE IF EXISTS SERIE") # Créer et initialiser une nouvelle table des Series
cursor.execute("""
CREATE TABLE SERIE (
SERIE_ID INTEGER PRIMARY KEY AUTOINCREMENT,
SERIE_DEP_ID INTEGER,
STATION_DEP_ID INTEGER,
SERIE_ARR_ID INTEGER,
STATION_ARR_ID INTEGER,
SERIE_NBRE_SHOT INTEGER,
SERIE_LENGHT REAL,
SERIE_LENGHT_SURFACE REAL,
SERIE_LENGHT_DUPLICATE REAL,
DIRECTION INTEGER,
STATION_ENT_ID INTEGER,
RESEAU_ID INTEGER)
""")
Current_Serie_ID = cursor.lastrowid
conn.commit()
cursor.execute("DROP TABLE IF EXISTS RESEAU")
cursor.execute("""
CREATE TABLE RESEAU (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
RESEAU_ID INTEGER,
STATION_JONC INTEGER,
ENT_1 INTEGER,
ENT_2 INTEGER)
""")
conn.commit()
SHOT_equates_station()
issue_SHOT()
duplicate_SHOT()
cursor.execute("DROP TABLE IF EXISTS VISEE_FLAG") # Créer et initialiser une nouvelle table VISEE_FLAG (suivi des visées lues)
cursor.execute("""
CREATE TABLE VISEE_FLAG (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
SHOT_ID INTEGER,
SERIE_ID INTEGER,
ENTREE_ID INTEGER,
RESEAU_ID INTEGER,
SERIE_RANG INTEGER
)
""")
cursor.execute("select SHOT.ID from SHOT")
results = cursor.fetchall()
cursor.executemany("INSERT INTO VISEE_FLAG (SHOT_ID) VALUES (?)", results) # type: ignore
conn.commit()
print(f"\033[0m\t Création de l'index des tables principales et optimisation de la mémoire\033[0m")
cursor.execute("CREATE INDEX INDEX_JONCTION_STATION_ID ON JONCTION(STATION_ID)")
cursor.execute("PRAGMA index_list(SHOT)")
result = cursor.fetchall()
marquage_visee_station_habillage()
# print(result)
if len(result)==0 :
cursor.execute("CREATE INDEX INDEX_SHOT_FROM_ID ON SHOT(FROM_ID)")
cursor.execute("CREATE INDEX INDEX_SHOT_TO_ID ON SHOT(TO_ID)")
cursor.execute("CREATE INDEX INDEX_SHOT_FLAG_SHOT_ID ON SHOT_FLAG(SHOT_ID)")
cursor.execute("CREATE INDEX INDEX_STATION_ID ON STATION(ID)")
cursor.execute("CREATE INDEX INDEX_VISEE_FLAG_SHOT_ID ON VISEE_FLAG(SHOT_ID)")
cursor.execute("CREATE INDEX INDEX_STATION_FLAG_STATION_ID ON STATION_FLAG(STATION_ID)")
cursor.execute("VACUUM")
conn.commit()
# A partir des entrées, remplir les tables des jonctions et des séries
results = sql_liste_entree()
print(f"\033[1;32mPhase 3: Remplissage des tables d'après les \033[0m{len(results)}\033[1;32m entrée(s)\033[0m") # type: ignore
for row in results: # type: ignore
# if row[0]==28548:
# print("debug point")
cursor.execute(f"""
INSERT INTO SERIE (
SERIE_DEP_ID,
STATION_DEP_ID,
SERIE_ARR_ID ,
STATION_ARR_ID,
SERIE_NBRE_SHOT,
SERIE_LENGHT,
SERIE_LENGHT_SURFACE,
SERIE_LENGHT_DUPLICATE,
DIRECTION,
STATION_ENT_ID,
RESEAU_ID)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (0, row[0], 0, row[0], -1, 0, 0, 0, 0, row[0], 0))
Current_Serie_ID = cursor.lastrowid
cursor.execute(f"""
UPDATE JONCTION SET
SERIE_ID = ?,
SERIE_ENT = ?,
STATION_ENT = ?,
SERIE_JONC = ?,
STATION_JONC = ?,
STATION_TYPE = ?,
SERIE_RANG = ?,
ENTREE_ID = ?,
RESEAU_ID = ?
WHERE STATION_ID = ?
""", (Current_Serie_ID, 0, row[0], 0, 0,'ent', 0, row[0], 0, row[0]))
cursor.execute(f"""
UPDATE VISEE_FLAG SET
SERIE_ID = {Current_Serie_ID},
ENTREE_ID = {row[0]},
RESEAU_ID = {0}
WHERE SHOT_ID = {Current_Serie_ID}
""")
conn.commit()
# print(f"\tCréation Série: {Current_Serie_ID} depuis la station d'entrée Station_ID: {row[0]}")
# A partir des série vides, itération pour remplir les tables des JONCTION et des SERIE
print(f"\033[1;32mPhase 4: Remplissage des tables d'après les séries vides jonctionnées aux\033[0m {Current_Serie_ID}\033[1;32m entrée(s)\033[0m")
results = sql_serie_vides()
Count = 1
New_Serie_IDOld = 0
New_Serie_ID = cursor.lastrowid
Current_Station_ID_Old = 0
Current_Station_ID = 0
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
_compteur = cursor.fetchall()
compteur_ttl = int(_compteur[0][0])
avt_compteur = 0
with alive_bar(compteur_ttl, title = "\x1b[32;1m\t Progression\x1b[0m", length = 20) as bar:
while len(results) > 0: # type: ignore
# print(f"\033[1;32mPhase 4.{Count}: Remplissage des tables JONCTION et SERIE itération: {Count}, séries créée(s): {New_Serie_ID} ajoutée(s): {New_Serie_ID-New_Serie_IDOld} à traiter: {len(results)}\033[0m") # type: ignore
bar.text(f"itération: {Count}, série(s) créée(s): {New_Serie_ID}") # type: ignore
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
_compteur = cursor.fetchall()
compteur = int(_compteur[0][0])
if ( compteur_ttl - compteur ) > avt_compteur :
ajout = compteur_ttl - compteur - avt_compteur
bar(ajout)
avt_compteur = compteur_ttl - compteur
for row in results: # type: ignore
# Suivi de la série
Current_Serie_ID = int(row[0])
Current_Station_ID_Old = int(Current_Station_ID) # type: ignore
Current_Station_ID = int(row[2])
Current_Nre_Shot = int(row[5])
# Current_Serie_Lenght = 0.0 + float(row[6])
# Current_Serie_Lenght_Surface = 0.0 + float(row[7])
# Current_Serie_Lenght_Duplicate = 0.0 + float(row[8])
Current_Ent = int(row[10])
Direction = int(row[9])
#print(f"\tSerie courante {Current_Serie_ID} Station_ID: {Current_Station_ID} results: {results}")
Fin_Serie = False
while Fin_Serie is False:
if Direction == 1 :
Next_Station_ID = sql_station_depart(Current_Station_ID)
elif Direction == -1 :
Current_Station_ID = int(row[4])
Next_Station_ID = sql_station_arrivee(Current_Station_ID)
elif Direction == 0 :
Next_Station_ID_1 = sql_station_depart(Current_Station_ID)
Next_Station_ID_2 = sql_station_arrivee(Current_Station_ID)
if len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 0: # type: ignore
# Entrée sans départ et sans d'arrivée : fin de la série
cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {Current_Serie_ID};")
cursor.execute(f"UPDATE JONCTION SET SERIE_ENT = -1 WHERE STATION_ID = {Current_Station_ID};")
conn.commit()
Next_Station_ID = Next_Station_ID_1
elif len(Next_Station_ID_1) == 1 and len(Next_Station_ID_2) == 0: # type: ignore
# Un départ pas d'arrivée
Next_Station_ID = Next_Station_ID_1
Direction = 1
cursor.execute(f"UPDATE SERIE SET DIRECTION = 1 WHERE SERIE_ID = {Current_Serie_ID};")
conn.commit()
elif len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 1: # type: ignore
# Une arrivée pas de départ
Next_Station_ID = Next_Station_ID_2
Direction = -1
cursor.execute(f"UPDATE SERIE SET DIRECTION = -1 WHERE SERIE_ID = {Current_Serie_ID};")
conn.commit()
else :
# A gérer nouvelles séries
nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, 1, Current_Ent) # type: ignore
# print (f"\033[34m\tA traiter création nouvelles séries inverses depuis l'entrée {Current_Station_ID} - {Next_Station_ID_2}, {Next_Station_ID_1}\033[0m")
nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, -1, Current_Ent)
Direction = 1
Next_Station_ID = Next_Station_ID_1
if len(Next_Station_ID) == 0 : # type: ignore
Next_Station_ID_1 = sql_station_depart(Current_Station_ID) # type: ignore
Next_Station_ID_2 = sql_station_arrivee(Current_Station_ID) # type: ignore
# print(f"\033[34m\tA gérer, fin de la Série: {Current_Serie_ID} à la Station_ID: {Current_Station_ID} nbre: {Current_Nre_Shot} Next station: {Next_Station_ID}, départs directs {len(Next_Station_ID_1)}, départs inverses {len(Next_Station_ID_2)}\033[0m") # type: ignore
# cursor.execute(f"UPDATE SERIE SET SERIE_DEP_ID = {Current_Serie_ID} WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore
# cursor.execute(f"UPDATE SERIE SET STATION_DEP_ID = {Current_Station_ID} WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore
# cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore
cursor.execute(f"DELETE FROM SERIE WHERE SERIE_ID = {Current_Serie_ID};")
conn.commit() # type: ignore
Fin_Serie = True
elif len(Next_Station_ID) == 1 : # type: ignore
suivi_serie(Current_Serie_ID, bar)
Fin_Serie = True
else :
#print(f"Station_ID {Current_Station_ID} de la Serie {Current_Serie_ID}, départs à gérer: {len(Next_Station_ID)}, Next station: {Next_Station_ID}") # type: ignore
suivi_serie(Current_Serie_ID, bar)
# Création de X nouvelles séries et mise à jour de la table des jonctions
#nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, 1, Current_Ent)
Fin_Serie = True
# Exécution de la requête SQL
resultsOld = results
results = sql_serie_vides() # type: ignore
New_Serie_IDOld = New_Serie_ID
New_Serie_ID = cursor.lastrowid # type: ignore
if resultsOld == results :
#print(f"Erreur sortie itération qté: {len(resultsOld)} - {resultsOld} - {results}")
print(f"\033[91mErreur sortie itération {Count}, séries restantes: {len(resultsOld)} - {results}\033[0m") # type: ignore
error_count += 1
break
Count += 1
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
_compteur = cursor.fetchall()
compteur = int(_compteur[0][0])
if ( compteur_ttl ) > avt_compteur :
ajout = compteur_ttl - avt_compteur
bar(ajout)
avt_compteur = compteur_ttl
orphelines_shot()
jonction_RESEAU()
if compteur > 0 :
print(f"\033[1;32mPhase 4: Fin du remplissage des tables,\033[91m attention \033[0m{compteur}\033[91m station(s) non comptabilisé(s)\033[0m")
error_count += 1
# else :
# print(f"\033[1;32mPhase 4: Fin du remplissage des tables voir {imported_database}\033[0m")
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution d'une des requêtes (construction_tables) code:\033[0m {e}")
error_count += 1
return
#####################################################################################################################################
# Création des séries entres les visées orphelines -> visées d'une visées entre 2 points marqués #
#####################################################################################################################################
def orphelines_shot():
global error_count
try:
cursor.execute("""
-- Visées orphelines
SELECT
VISEE_FLAG.SHOT_ID,
VISEE_FLAG.SERIE_ID,
VISEE_FLAG.ENTREE_ID,
SHOT_FLAG.FLAG,
SHOT.FROM_ID,
-- STATION_FROM.NAME,
JONCTION_FROM.SERIE_ID,
JONCTION_FROM.STATION_TYPE,
JONCTION_FROM.ENTREE_ID,
SHOT.TO_ID,
-- STATION_TO.NAME,
JONCTION_TO.SERIE_ID,
JONCTION_TO.STATION_TYPE,
JONCTION_TO.ENTREE_ID,
SHOT.LENGTH
--sum (SHOT.LENGTH)
FROM VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
WHERE VISEE_FLAG.SERIE_ID is NULL and JONCTION_TO.SERIE_ID is not null and JONCTION_FROM.SERIE_ID is not null
""")
conn.commit()
orphelines = cursor.fetchall()
print(f"\t Intégrations des visées orphelines (entre 2 stations existantes) nbre: {len(orphelines)}")
for row in orphelines:
_SERIE_LENGHT = 0
_SERIE_LENGHT_SURFACE = 0
_SERIE_LENGHT_DUPLICATE = 0
if row[3] == 'dpl':
_SERIE_LENGHT_DUPLICATE = row[12]
elif row[3] == 'srf':
_SERIE_LENGHT_SURFACE = row[12]
else :
_SERIE_LENGHT = row[12]
cursor.execute(f"""
INSERT INTO SERIE (
SERIE_DEP_ID,
STATION_DEP_ID,
SERIE_ARR_ID ,
STATION_ARR_ID,
SERIE_NBRE_SHOT,
SERIE_LENGHT,
SERIE_LENGHT_SURFACE,
SERIE_LENGHT_DUPLICATE,
DIRECTION,
STATION_ENT_ID)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( row[5], row[4], row[9], row[8], 1, _SERIE_LENGHT, _SERIE_LENGHT_SURFACE, _SERIE_LENGHT_DUPLICATE, 1, row[7] ))
_Current_Serie_ID = cursor.lastrowid
cursor.execute(f"""
UPDATE VISEE_FLAG SET
SERIE_ID = {_Current_Serie_ID},
ENTREE_ID = {row[7]},
SERIE_RANG = {1}
WHERE SHOT_ID = {row[0]}
""")
if row[7] != row[11] :
cursor.execute(f"INSERT INTO RESEAU (STATION_JONC, ENT_1, ENT_2) VALUES (?, ?, ?)", (row[8], row[7], row[11]))
# print (f"\033[36m\t Jonction des entrées à la Station_ID: {row[8]} entre: {row[7]} et: {row[11]}\033[0m")
conn.commit()
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête orphelines_shot code:\033[0m {e}" )
error_count += 1
return
#####################################################################################################################################
# Fonction pour joindre les reseaux #
#####################################################################################################################################
def jonction_RESEAU():
global error_count
try:
cursor.execute("""
--Recherche des doublons dans les jonctions réseau
SELECT
ID
--GROUP_CONCAT(RESEAU.ID) as DUPP_SHOT
FROM RESEAU
GROUP BY RESEAU.ENT_1, RESEAU.ENT_2, RESEAU.STATION_JONC
HAVING COUNT(RESEAU.ENT_1)>1 AND COUNT(RESEAU.ENT_2)>1 AND COUNT(RESEAU.STATION_JONC)>1
""")
conn.commit()
doublons = cursor.fetchall()
# print(f"\t Table des RESEAUX doublons nbre: {len(doublons)}")
for row in doublons : cursor.execute(f"DELETE FROM RESEAU WHERE RESEAU.ID = {row[0]}")
conn.commit()
index_reseau = 0
while True:
index_reseau += 1
cursor.execute("""
-- Liste des entrée dans la table RESEAU
SELECT
RESEAU.ENT_1 AS ENT,
RESEAU.RESEAU_ID,
STATION.NAME,
STATION.Z
FROM RESEAU
JOIN STATION ON RESEAU.ENT_1 = STATION.ID
WHERE RESEAU.RESEAU_ID IS NULL
UNION --ALL
SELECT
RESEAU.ENT_2 AS ENT,
RESEAU.RESEAU_ID,
STATION.NAME,
STATION.Z
FROM RESEAU
JOIN STATION ON RESEAU.ENT_2 = STATION.ID
WHERE RESEAU.RESEAU_ID IS NULL
ORDER BY STATION.Z DESC
""")
conn.commit()
entrees = cursor.fetchall()
if len(entrees) == 0:
break # Sortie boucle while si plus d'entrées à traiter...
cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_1 = {entrees[0][0]} ")
cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_2 = {entrees[0][0]} ")
conn.commit()
liste_entrees_reseau = []
liste_entrees_reseau.append(entrees[0][0])
nbre_entree_reseau_old= 0
nbre_entree_reseau=len(liste_entrees_reseau)
while nbre_entree_reseau_old != nbre_entree_reseau :
nbre_entree_reseau_old = nbre_entree_reseau
for row in liste_entrees_reseau :
cursor.execute(f"""
-- Recherche entrée jonctionnées
SELECT RESEAU.ENT_1 FROM RESEAU WHERE RESEAU.ENT_2 = {row} -- AND RESEAU.RESEAU_ID IS NULL
UNION
SELECT RESEAU.ENT_2 FROM RESEAU WHERE RESEAU.ENT_1 = {row} -- AND RESEAU.RESEAU_ID IS NULL
""")
jonction = cursor.fetchall()
for row2 in jonction: # type: ignore
if row2[0] not in liste_entrees_reseau:
# print(f"\t Jonction de l'entrée: {row2[0]} au reseau ID: {index_reseau}")
cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_1 = {row2[0]} ")
cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_2 = {row2[0]} ")
liste_entrees_reseau.append(row2[0])
conn.commit()
nbre_entree_reseau=len(liste_entrees_reseau)
for row2 in liste_entrees_reseau :
cursor.execute(f"UPDATE JONCTION SET RESEAU_ID = {index_reseau} WHERE JONCTION.ENTREE_ID = {row2} ")
cursor.execute(f"UPDATE SERIE SET RESEAU_ID = {index_reseau} WHERE SERIE.STATION_ENT_ID = {row2} ")
cursor.execute(f"UPDATE VISEE_FLAG SET RESEAU_ID = {index_reseau} WHERE VISEE_FLAG.ENTREE_ID = {row2} ")
conn.commit()
print(f"\t Réseau: {index_reseau}, entrées jonctionnées: {len(liste_entrees_reseau)}, {liste_entrees_reseau}")
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête Jonction_RESEAU code:\033[0m {e}")
error_count += 1
return
#####################################################################################################################################
# Fonction pour joindre les equates dans la table des SHOT # #
#####################################################################################################################################
def SHOT_equates_station():
global error_count
# Requête 8: recherche des equates
try:
cursor.execute(f"""
-- Requête 8, recherche des equates --
SELECT
-- STATION.ID,
-- STATION.NAME,
GROUP_CONCAT(STATION.ID) as ID_Group
-- GROUP_CONCAT(STATION.X) as X_Group,
-- GROUP_CONCAT(STATION.Y) as Y_Group,
-- GROUP_CONCAT(STATION.Z) as Z_Group,
-- COUNT(STATION.X) AS Qte_X,
-- COUNT(STATION.Y) AS Qte_Y,
-- COUNT(STATION.Z) AS Qte_Z
FROM STATION
WHERE STATION.NAME <> '.' AND STATION.NAME <> '-'
-- INNER JOIN STATION AS STATION_Bis ON STATION.X = STATION_Bis.X AND STATION.Y = STATION_Bis.Y AND STATION.Z = STATION_Bis.Z
--WHERE STATION.X = STATION_BIS.X
GROUP BY STATION.X, STATION.Y, STATION.Z
HAVING COUNT(STATION.X)>1 AND COUNT(STATION.Y)>1 AND COUNT(STATION.Y)>1
""")
equate = cursor.fetchall()
print(f"\t Jonction de SHOT equates nbre: {len(equate)}")
for row in equate :
sous_valeurs = row[0].split(',')
# print(f": {sous_valeurs[0]} = ", end="")
for val in range (1, len(sous_valeurs)) :
# print(f"{sous_valeurs[val]},", end=" ")
cursor.execute(f"SELECT SHOT.ID FROM SHOT WHERE SHOT.FROM_ID = {sous_valeurs[val]}")
filtre = cursor.fetchall()
for row in filtre :
cursor.execute(f"UPDATE SHOT SET FROM_ID = {sous_valeurs[0]} WHERE ID = {row[0]};")
cursor.execute(f"SELECT SHOT.ID FROM SHOT WHERE SHOT.TO_ID = {sous_valeurs[val]}")
filtre = cursor.fetchall()
for row in filtre :
cursor.execute(f"UPDATE SHOT SET TO_ID = {sous_valeurs[0]} WHERE ID = {row[0]};")
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('equ', sous_valeurs[val]))
cursor.execute(f"UPDATE JONCTION SET STATION_JONC = ? WHERE id = ?", (sous_valeurs[0], sous_valeurs[val]))
#print(" ", end="")
conn.commit()
# print("")
# if len(equate) == 0 : print(f"\tAucun 'equate'")
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête 8 (sql_8_equates) code:\033[0m {e}")
error_count += 1
return
#####################################################################################################################################
# Fonction pour supprimer les visées en double (même départs arrivée lg, az et pente) #
#####################################################################################################################################
def duplicate_SHOT():
global error_count
# Requête 10, recherche des doublons de visées
try:
cursor.execute(f"""
-- Requête 10, recherche des doublons de visées --
SELECT
SHOT.FROM_ID,
SHOT.TO_ID,
SHOT.LENGTH,
-- SHOT.BEARING,
-- SHOT.GRADIENT,
GROUP_CONCAT(SHOT.ID) as DUPP_SHOT
FROM SHOT
GROUP BY SHOT.FROM_ID, SHOT.TO_ID --, SHOT.BEARING, SHOT.GRADIENT, SHOT.LENGTH
HAVING COUNT(SHOT.FROM_ID)>1 AND COUNT(SHOT.TO_ID)>1 --AND COUNT(SHOT.BEARING)>1 AND COUNT(SHOT.GRADIENT)>1 AND COUNT(SHOT.LENGTH)>1
""")
duplicate = cursor.fetchall()
_total_length_err = 0.0
for row in duplicate :
sous_valeurs = row[3].split(',')
cursor.execute(f"""
SELECT
SHOT.ID,
SHOT_FLAG.FLAG,
round(SHOT.LENGTH, 2)
FROM SHOT
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE SHOT.ID = {int(sous_valeurs[0])}
""")
shot_flag = cursor.fetchall()
# print("\t " + str(shot_flag))
cursor.execute(f"""
SELECT
SHOT.ID,
SHOT_FLAG.FLAG,
round(SHOT.LENGTH, 2)
FROM SHOT
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE SHOT.ID = {int(sous_valeurs[1])}
""")
shot_flag2 = cursor.fetchall()
# print("\t " + str(shot_flag2))
# _total_length += int(row[2])
if shot_flag[0][1] is None and shot_flag2[0][1]== 'dpl':
cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION")
Current_Station_ID = cursor.fetchall()
_Current_Station_ID = int(Current_Station_ID[0][0]) + 1
cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')")
cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag2[0][0]}")
cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})")
elif shot_flag2[0][1] is None and shot_flag[0][1]== 'dpl':
cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION")
Current_Station_ID = cursor.fetchall()
_Current_Station_ID = int(Current_Station_ID[0][0]) + 1
cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')")
cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag[0][0]}")
cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})")
else :
_total_length_err += float(shot_flag2[0][2])
cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION")
Current_Station_ID = cursor.fetchall()
_Current_Station_ID = int(Current_Station_ID[0][0]) + 1
cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')")
cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag2[0][0]}")
cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})")
print(f"\033[91m\t Table des SHOT, visées en double à traiter à la source : \033[0m{shot_flag}, {shot_flag2}" +
f"\033[91m, station crée : \033[0m{_Current_Station_ID}" +
f"\033[91m, long. en double : \033[0m{"{:.2f}".format(_total_length_err)} m")
conn.commit()
# for val in range (1, len(sous_valeurs)) :
# cursor.execute(f"DELETE FROM SHOT WHERE SHOT.ID = {sous_valeurs[val]}")
# filtre = cursor.fetchall()
if len(duplicate) > 0:
print(f"\t Table des SHOT, visées dupliquées traités nbre: {len(duplicate)}")
# print(f"\t Visées dupliqués supprimés {duplicate}")
else :
print(f"\t Table des SHOT, aucune visée dupliquée")
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête (duplicate_SHOT) code:\033[0m {e}")
error_count += 1
return
#####################################################################################################################################
# Fonction pour supprimer les visées en de longueur null sur elle même (même départ et arrivée et longueur nulle #
#####################################################################################################################################
def issue_SHOT():
global error_count
try:
cursor.execute(f"""
SELECT
SHOT.ID
--SHOT.FROM_ID,
--SHOT.TO_ID,
--SHOT.LENGTH
FROM SHOT
WHERE SHOT.FROM_ID = SHOT.TO_ID AND SHOT.LENGTH = 0
""")
issue = cursor.fetchall()
for row in issue :
cursor.execute(f"DELETE FROM SHOT WHERE ID = {row[0]}")
# cursor.execute(f"UPDATE SHOT SET LENGTH = 0.01 WHERE ID = {row[0]}")
conn.commit()
if len(issue) > 0:
print(f"\t Table des SHOT, visée(s) bloquante(s), même départ et arrivée, longueur nulle supprimée(s) nbre: {len(issue)}")
# print(f"\t Visée(s) bloquante(s) supprimée(s) {issue}")
else :
print(f"\t Table des SHOT, aucune visée bloquante")
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête (Issue_SHOT) code:\033[0m {e}")
error_count += 1
return
#####################################################################################################################################
# Fonction pour marquer les stations d'habillage #
#####################################################################################################################################
def marquage_visee_station_habillage() :
global error_count
try:
cursor.execute(f"""
SELECT
STATION.ID AS STATION_ID,
SHOT.ID AS SHOT_ID
FROM STATION
JOIN SHOT ON SHOT.TO_ID = STATION.ID
WHERE STATION.NAME = '.' or STATION.NAME = '-'
""" )
filtre = cursor.fetchall()
print(f"\t Marquage des visées et des stations d'habillage nbre: {len(filtre)}")
for row in filtre :
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = 'hab' WHERE STATION_ID = {row[0]}")
cursor.execute(f"UPDATE VISEE_FLAG SET SERIE_ID = -1 WHERE SHOT_ID = {row[1]}")
conn.commit()
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête (marquage_station_habillage):\033[0m {e}")
error_count += 1
return
#####################################################################################################################################
# Fonction pour suivre une série jusqu'au prochain départ ou une jonction #
#####################################################################################################################################
def suivi_serie( _Current_Serie_ID, bar) :
global avt_compteur
global error_count
try:
cursor.execute(f"SELECT * FROM SERIE WHERE SERIE_ID = {_Current_Serie_ID}")
_Serie = cursor.fetchall()
# if _Current_Serie_ID == 2 or _Current_Serie_ID == 3:
# print("Debug, a suivre")
_Current_Next_Station = 0
_Current_Shot = 0
_Current_Nre_Shot = int(_Serie[0][5]) # type: ignore
_Current_Serie_Lenght = float(_Serie[0][6]) # type: ignore
_Current_Serie_Lenght_Surface = float(_Serie[0][7]) # type: ignore
_Current_Serie_Lenght_Duplicate = float(_Serie[0][8]) # type: ignore
_Direction = int(_Serie[0][9])
_Current_Ent = int(_Serie[0][10])
if _Direction == 0 :
# print(f"\t\033[34mA gérer séries sans direction station série: {_Current_Serie_ID}\033[0m") # type: ignore
_Current_Station_ID = int(_Serie[0][2])
Next_Station_ID_1 = sql_station_depart(_Current_Station_ID)
Next_Station_ID_2 = sql_station_arrivee(_Current_Station_ID)
if len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 0: # type: ignore
# Pas de départ fin et pas d'arrivée : fin de la série
Next_Station_ID = Next_Station_ID_1
cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {_Current_Serie_ID};")
conn.commit()
return
elif len(Next_Station_ID_1) == 1 and len(Next_Station_ID_2) == 0: # type: ignore
# Un départ pas d'arrivée
Next_Station_ID = Next_Station_ID_1
Direction = 1
cursor.execute(f"UPDATE SERIE SET DIRECTION = 1 WHERE SERIE_ID = {_Current_Serie_ID};")
conn.commit()
elif len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 1: # type: ignore
# Une arrivée pas de départ
Next_Station_ID = Next_Station_ID_2
_Serie[0][4] = _Serie[0][2]
Direction = -1
cursor.execute(f"UPDATE SERIE SET DIRECTION = -1 WHERE SERIE_ID = {_Current_Serie_ID};")
conn.commit()
else :
# A gérer nouvelles séries
# nouvelles_series(_Current_Station_ID, _Current_Station_ID_Old, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
# print (f"\033[34m\tA vérifier dans suivi_serie nouvelles séries inverses depuis {_Current_Station_ID} - {Next_Station_ID_2}, {Next_Station_ID_1}\033[0m")
# nouvelles_series(_Current_Station_ID, _Current_Station_ID_Old, _Current_Serie_ID, -1, _Current_Ent)
return
elif _Direction == -1 :
_Current_Station_ID = int(_Serie[0][4])
_Next_Station_ID = sql_station_arrivee(_Current_Station_ID) # type: ignore
_Current_Nre_Shot = 0
#print(f"\tDébut suivi serie inverse {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}") # type: ignore
_ID_Suite = 0
_Force = False
if len(_Next_Station_ID) > 1: # type: ignore
while int(_Next_Station_ID[_ID_Suite][0]) != int(_Serie[0][2]) : # type: ignore
_ID_Suite += 1
if _ID_Suite >= len(_Next_Station_ID): # type: ignore
# print(f"\t \033[34mA vérifier, pas de suite trouvée à la serie inverse: {_Current_Serie_ID} station:{_Current_Station_ID}, shot: {_Current_Shot}\033[0m")
#error_count += 1
return
while (len(_Next_Station_ID)==1) or (_Force == False) : # type: ignore
# if _Current_Station_ID == 12074:
# print("Debug, a suivre")
_Force = True
_Current_Nre_Shot += 1
_Current_Serie_Lenght += _Next_Station_ID[_ID_Suite][1] # type: ignore
_Current_Serie_Lenght_Surface += _Next_Station_ID[_ID_Suite][2] # type: ignore
_Current_Serie_Lenght_Duplicate += _Next_Station_ID[_ID_Suite][3] # type: ignore
_Current_Shot= _Next_Station_ID[_ID_Suite][4] # type: ignore
_Current_Next_Station = int(_Next_Station_ID[_ID_Suite][0]) # type: ignore
_Current_Old_Station = int(_Current_Station_ID)
test_jonction(_Current_Next_Station, _Current_Serie_ID, _Current_Ent) # type: ignore
cursor.execute(f"""
UPDATE SERIE SET
SERIE_DEP_ID = {_Current_Serie_ID},
STATION_DEP_ID = {_Current_Next_Station},
SERIE_NBRE_Shot = {_Current_Nre_Shot},
SERIE_LENGHT = {_Current_Serie_Lenght},
SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface},
SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate}
WHERE SERIE_ID = {_Current_Serie_ID};
""")
cursor.execute(f"""
UPDATE JONCTION SET
STATION_TYPE = 'inv',
ENTREE_ID = {_Current_Ent},
SERIE_RANG = {_Current_Nre_Shot},
SERIE_ID = {_Current_Serie_ID}
WHERE STATION_ID = {_Current_Station_ID}
""")
cursor.execute(f"""
UPDATE VISEE_FLAG SET
SERIE_ID = {_Current_Serie_ID},
ENTREE_ID = {_Current_Ent},
SERIE_RANG = {_Current_Nre_Shot}
WHERE SHOT_ID = {_Current_Shot}
""")
#print(f"\tSuivi serie inverse {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Next station: {_Next_Station_ID}")
conn.commit()
depart = sql_station_depart(_Current_Station_ID)
if (len(depart) >= 1 ) : # type: ignore
#print(f'Départ direct à gérer station: {_Current_Station_ID} série: {_Current_Serie_ID}, nbre shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Arrivée(s) {depart}')
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
_Current_Station_ID = int(_Next_Station_ID[_ID_Suite][0]) # type: ignore
_Next_Station_ID = sql_station_arrivee(_Current_Next_Station)
_ID_Suite = 0
cursor.execute(f"""
UPDATE SERIE SET
SERIE_DEP_ID = {_Current_Serie_ID},
STATION_DEP_ID = {_Current_Next_Station},
SERIE_NBRE_Shot = {_Current_Nre_Shot},
SERIE_LENGHT = {_Current_Serie_Lenght},
SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface},
SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate}
WHERE SERIE_ID = {_Current_Serie_ID};
""")
# if _Current_Station_ID == 5047 :
# print("debug point")
cursor.execute(f"""
UPDATE JONCTION SET
STATION_TYPE = 'arv',
ENTREE_ID = {_Current_Ent},
SERIE_RANG = {_Current_Nre_Shot},
SERIE_ID = {_Current_Serie_ID}
WHERE STATION_ID = {_Current_Station_ID}
""")
cursor.execute(f"""
UPDATE VISEE_FLAG SET
SERIE_ID = {_Current_Serie_ID},
ENTREE_ID = {_Current_Ent},
SERIE_RANG = {_Current_Nre_Shot}
WHERE SHOT_ID = {_Current_Shot}
""")
conn.commit()
if _Current_Nre_Shot > 1 :
avt_compteur = avt_compteur + _Current_Nre_Shot - 1
bar(_Current_Nre_Shot-1)
#_Current_Next_Station = int(_Next_Station_ID[0][0]) # type: ignore
depart = sql_station_depart(_Current_Station_ID) # type: ignore
if (len(_Next_Station_ID)==0): # type: ignore
# fin de la série
# print (f"\tFin de la série inverse: {_Current_Serie_ID} (pas de suite) station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}")
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
if (len(depart)>=1):# type: ignore
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
else :
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
# print (f"\tFin de la série inverse: {_Current_Serie_ID} station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}")
if (len(depart)>=1):# type: ignore
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
elif _Direction == 1 :
_Current_Station_ID = int(_Serie[0][2])
_Next_Station_ID = sql_station_depart(_Current_Station_ID)
_Current_Nre_Shot = 0
#print(f"\tDébut suivi serie directe {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}")
_ID_Suite = 0
_Force = False
if len(_Next_Station_ID) > 1: # type: ignore
while int(_Next_Station_ID[_ID_Suite][0]) != int(_Serie[0][4]) : # type: ignore
_ID_Suite += 1
if _ID_Suite >= len(_Next_Station_ID): # type: ignore
# print(f"\t \033[34mA vérifier, pas de suite trouvée à la serie directe: {_Current_Serie_ID}, Station: {_Current_Station_ID}, shot: {_Current_Shot}\033[0m")
# error_count += 1
return
while (len(_Next_Station_ID)==1) or (_Force == False) : # type: ignore
# if _Current_Station_ID == 12074:
# print("Debug, a suivre")
_Force = True
_Current_Nre_Shot += 1
_Current_Serie_Lenght += _Next_Station_ID[0][1] # type: ignore
_Current_Serie_Lenght_Surface += _Next_Station_ID[_ID_Suite][2] # type: ignore
_Current_Serie_Lenght_Duplicate += _Next_Station_ID[_ID_Suite][3] # type: ignore
_Current_Shot= _Next_Station_ID[_ID_Suite][4] # type: ignore
_Current_Next_Station = int(_Next_Station_ID[0][0]) # type: ignore
_Current_Old_Station = int(_Current_Station_ID)
test_jonction(_Current_Next_Station, _Current_Serie_ID, _Current_Ent) # type: ignore
cursor.execute(f"""
UPDATE SERIE SET
SERIE_DEP_ID = {_Current_Serie_ID},
STATION_DEP_ID = {_Current_Next_Station},
SERIE_NBRE_Shot = {_Current_Nre_Shot},
SERIE_LENGHT = {_Current_Serie_Lenght},
SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface},
SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate}
WHERE SERIE_ID = {_Current_Serie_ID};
""")
# if _Current_Station_ID == 5035 :
# print("debug point")
cursor.execute(f"""
UPDATE JONCTION SET
STATION_TYPE = 'dir',
ENTREE_ID = {_Current_Ent},
SERIE_RANG = {_Current_Nre_Shot},
SERIE_ID = {_Current_Serie_ID}
WHERE STATION_ID = {_Current_Station_ID}
""")
cursor.execute(f"""
UPDATE VISEE_FLAG SET
SERIE_ID = {_Current_Serie_ID},
ENTREE_ID = {_Current_Ent},
SERIE_RANG = {_Current_Nre_Shot}
WHERE SHOT_ID = {_Current_Shot}
""")
conn.commit()
#print(f"\tSuivi serie directe {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}")
arrivee = sql_station_arrivee(_Current_Station_ID)
if (len(arrivee) >= 1 ) : # type: ignore
#print(f'Arrivée à gérer station: {_Current_Station_ID} série: {_Current_Serie_ID}, nbre shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Arrivée(s) {arrivee}')
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
_Current_Station_ID = int(_Next_Station_ID[0][0]) # type: ignore
_Next_Station_ID = sql_station_depart(_Current_Next_Station)
_ID_Suite = 0
cursor.execute(f"""
UPDATE SERIE SET
SERIE_DEP_ID = {_Current_Serie_ID},
STATION_DEP_ID = {_Current_Next_Station},
SERIE_NBRE_Shot = {_Current_Nre_Shot},
SERIE_LENGHT = {_Current_Serie_Lenght},
SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface},
SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate}
WHERE SERIE_ID = {_Current_Serie_ID};
""") # type: ignore
cursor.execute(f"""
UPDATE JONCTION SET
STATION_TYPE = 'end',
ENTREE_ID = {_Current_Ent},
SERIE_RANG = {_Current_Nre_Shot},
SERIE_ID = {_Current_Serie_ID}
WHERE STATION_ID = {_Current_Station_ID}
""")
cursor.execute(f"""
UPDATE VISEE_FLAG SET
SERIE_ID = {_Current_Serie_ID},
ENTREE_ID = {_Current_Ent},
SERIE_RANG = {_Current_Nre_Shot}
WHERE SHOT_ID = {_Current_Shot}
""")
conn.commit()
if _Current_Nre_Shot > 1:
avt_compteur = avt_compteur + _Current_Nre_Shot - 1
bar(_Current_Nre_Shot-1)
arrivee = sql_station_arrivee(_Current_Next_Station) # type: ignore
if (len(_Next_Station_ID)==0): # type: ignore
# fin de la série
# print (f"\tFin de la série directe: {_Current_Serie_ID} (pas de suite) à la station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}")
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
if (len(arrivee)>=1): # type: ignore
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
else :
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
# print (f"\tFin de la série directe: {_Current_Serie_ID} station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}")
if (len(arrivee)>=1): # type: ignore
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête de lecture de la série\033[0m {_Current_Serie_ID}\033[91m, suivi_serie:\033[0m {e}")
error_count += 1
return
return
#####################################################################################################################################
# Fonction pour créer les nouvelles séries à une station #
#####################################################################################################################################
def nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, _DIRECTION, _STATION_ENT_ID) :
global error_count
# if _Current_Serie_ID == 2 or _Current_Serie_ID == 3:
# print("Debug, a suivre")
try:
if _DIRECTION == 1:
_Next_Station = sql_station_depart(_Current_Station_ID)
# cursor.execute(f"""
# SELECT SHOT.TO_ID as TO_ID_RESULT
# FROM SHOT
# WHERE SHOT.FROM_ID = {_Current_Station_ID}
# -- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT)
# """)
# _Next_Station = cursor.fetchall()
elif _DIRECTION == -1:
_Next_Station = sql_station_arrivee(_Current_Station_ID)
# cursor.execute(f"""
# SELECT SHOT.FROM_ID as FROM_ID_RESULT
# FROM SHOT
# WHERE SHOT.TO_ID = {_Current_Station_ID}
# -- AND ( SELECT SHOT.FROM_ID FROM SHOT WHERE SHOT.TO_ID = FROM_ID_RESULT )
# """)
# _Next_Station = cursor.fetchall()
if _Next_Station is None: # type: ignore
print(f"Pas de série crées à la station: {_Current_Station_ID}")
return
# boucle sur liste _Next_Station
for Depart in _Next_Station: # type: ignore
if _Current_Old_Station != Depart[0] : # type: ignore
cursor.execute(f"UPDATE JONCTION SET SERIE_JONC = {_Current_Serie_ID} WHERE STATION_ID = {_Current_Station_ID};")
cursor.execute(f"UPDATE JONCTION SET ENTREE_ID = {_STATION_ENT_ID} WHERE STATION_ID = {_Current_Station_ID};")
cursor.execute(f"UPDATE JONCTION SET STATION_JONC = {Depart[0]} WHERE STATION_ID = {_Current_Station_ID};")
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', _Current_Station_ID))
if _DIRECTION == 1 :
cursor.execute(f"""
INSERT INTO SERIE (
SERIE_DEP_ID,
STATION_DEP_ID,
SERIE_ARR_ID ,
STATION_ARR_ID,
SERIE_NBRE_SHOT,
SERIE_LENGHT,
SERIE_LENGHT_SURFACE,
SERIE_LENGHT_DUPLICATE,
DIRECTION,
STATION_ENT_ID)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (_Current_Serie_ID, _Current_Station_ID, -1, Depart[0], -1, 0.0, 0.0, 0.0, 1,_STATION_ENT_ID))
conn.commit()
# print(f"\tCréation Série directe: {cursor.lastrowid} depuis la station: {_Current_Station_ID} vers {Depart[0]} ")
elif _DIRECTION == -1 :
cursor.execute(f"""
INSERT INTO SERIE (
SERIE_DEP_ID,
STATION_DEP_ID,
SERIE_ARR_ID ,
STATION_ARR_ID,
SERIE_NBRE_SHOT,
SERIE_LENGHT,
SERIE_LENGHT_SURFACE,
SERIE_LENGHT_DUPLICATE,
DIRECTION,
STATION_ENT_ID)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (-1, Depart[0], _Current_Serie_ID, _Current_Station_ID, -1, 0.0, 0.0, 0.0, -1,_STATION_ENT_ID))
conn.commit()
# print(f"\tCréation Série inv.: {cursor.lastrowid} depuis {Depart[0]} vers la station: {_Current_Station_ID}")
except sqlite3.Error as e:
print(f"\033[91mErreur lors de la requête (nouvelle_serie):\033[0m {e}")
error_count += 1
return
#####################################################################################################################################
# Fonction pour tester si la station a déjà été lue #
#####################################################################################################################################
def test_jonction(station, serie, entree) :
global error_count
try:
cursor.execute(f"""
-- Requête 6: Détection des départs depuis une station (visée directe)
SELECT
SHOT.TO_ID as TO_ID_RESULT,
--JONCTION.STATION_TYPE,
SHOT.LENGTH,
CASE
WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH
ELSE 0
END AS LENGTH_SRF,
CASE
WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH
ELSE 0
END AS LENGTH_DPL,
SHOT.ID
-- SHOT_FLAG.FLAG as Type_Flag
FROM SHOT
--JOIN JONCTION ON SHOT.TO_ID = JONCTION.STATION_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE SHOT.FROM_ID = {station} -- AND JONCTION.STATION_TYPE IS NULL
-- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT)
""")
depart = cursor.fetchall()
cursor.execute(f"""
-- Requête 7: Détection des arrivées depuis une station (Visée inverse)
SELECT
SHOT.FROM_ID as FROM_ID_RESULT,
--JONCTION.STATION_TYPE,
SHOT.LENGTH,
CASE
WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH
ELSE 0
END AS LENGTH_SRF,
CASE
WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH
ELSE 0
END AS LENGTH_DPL,
SHOT.ID
-- SHOT_FLAG.FLAG
FROM SHOT
--JOIN JONCTION ON SHOT.FROM_ID = JONCTION.STATION_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE SHOT.TO_ID = {station} --AND JONCTION.STATION_TYPE IS NULL
--AND ( SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE SHOT.TO_ID = FROM_ID_RESULT
""")
arrivee = cursor.fetchall()
total = arrivee + depart # type: ignore
for row in total: # type: ignore
cursor.execute(f"SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}")
# if row[1] == 5017:
# print('Debug point')
retour = cursor.fetchall()
val = str(retour[0])
if val != '(None,)' :
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', row[0]))
cursor.execute(f"SELECT JONCTION.ENTREE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}")
retour = cursor.fetchall()
cursor.execute(f"SELECT JONCTION.SERIE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}")
_serie = cursor.fetchall()
# print (f"\tJonction à proximité de la Station_ID: {row[0]}, retour: {str(val)}, serie {serie} - {_serie[0][0]}, entrée {entree} - {retour[0][0]}")
if (retour[0][0] != entree) and (retour[0][0] != None) :
print (f"\033[36m\t Jonction à la Station_ID: {row[0]} entre les entrées {entree} et {retour[0][0]}\033[0m")
cursor.execute(f"INSERT INTO RESEAU ( STATION_JONC, ENT_1, ENT_2) VALUES (?, ?, ?)", (row[0], entree, retour[0][0]))
conn.commit()
# if _serie[0][0] != serie and (_serie[0][0] != None):
# print (f"\033[34m\tJonction à la Station_ID: {row[0]} entre les series {serie} et {_serie[0][0]}\033[0m")
cursor.execute(f"SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE JONCTION.STATION_ID = {station}")
retour = cursor.fetchall()
val = str(retour[0])
if val == '(None,)' :
#print (f"\tPas de jonction à la Station_ID: {station}, retour: {val}")
return False
else :
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', station))
cursor.execute(f"SELECT JONCTION.ENTREE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {station}")
retour = cursor.fetchall()
cursor.execute(f"SELECT JONCTION.SERIE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {station}")
_serie = cursor.fetchall()
# print (f"\tJonction à proximité de la Station_ID: {row[0]}, retour: {str(val)}, serie {serie} - {_serie[0][0]}, entrée {entree} - {retour[0][0]}")
if retour[0][0] != entree :
print (f"\033[0m\t Jonction à la Station_ID: {station} entre les entrées {entree} et {retour[0][0]}\033[0m")
if _serie[0][0] != serie :
print (f"\033[0m\t Jonction à la Station_ID: {station} entre les series {serie} et {_serie[0][0]}\033[0m")
return True
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requêtes (test_jonction): {e}\033[0m")
error_count += 1
return
#####################################################################################################################################
# Fonction pour exécuter une requête et sauvegarder les résultats dans un fichier texte #
# #
#####################################################################################################################################
def calcul_stats(output_file):
global error_count
global _largeurCol
global _largeurColTete
try:
print(f"\033[1;32mPhase 5: Écriture des statistiques dans \033[0m{safe_relpath(output_file)}")
# Enregistrement des résultats dans un fichier texte
output_file_ligne =[]
for i in range(9): output_file_ligne.append(titre[i].ljust(120)+"*\n")
sql_query1 = ("""
Select
round(sum(LENGTH), 2) as Lg,
round(sum(DUPLICATE_LENGTH),2) as Duplicate,
round(sum(SURFACE_LENGTH),2) as Surface
-- round(sum(LENGTH) + sum(DUPLICATE_LENGTH), 2) as Total
from CENTRELINE length
""")
cursor.execute(sql_query1)
results = cursor.fetchall()
vide ="-".ljust(_largeurCol)
# output_file_ligne.append(f"Développement total centerline:\t{"{:.2f}".format(results[0][0]).ljust(_largeurCol)}\t{"{:.2f}".format(results[0][1]).ljust(_largeurCol)}\t{vide}\t{vide}\t{vide}\tdev.(m), dupl.(m)\n")
#print('Développement total: ' + formatted_row + 'm')
# output_file_ligne.append(f"**Développement total centerline:**\t%s\t%s\t%s\t%s\t%s\tDev.(m), Dupl.(m), Surf.(m)\n" %(str("{:.2f}".format(results[0][0]).ljust(_largeurCol)),
# str("{:.2f}".format(results[0][1]).ljust(_largeurCol)),
# str("{:.2f}".format(results[0][2]).ljust(_largeurCol)),
# str(vide), str(vide)))
output_file_ligne.append(
f"**Développement total des centerlines (m):** "
f"**, Développement:** {results[0][0]:.2f} "
f"**, Dupliqué:** {results[0][1]:.2f} "
f"**, Surface:** {results[0][2]:.2f}\n"
)
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
_compteur = cursor.fetchall()
compteur = int(_compteur[0][0])
if compteur > 0 : # type: ignore
output_file_ligne.append(f"!!Attention, {compteur} station(s) non comptabilisée(s) et raccordée(s)\n")
results=sql_bilan_reseaux()
if results[0][0] != None :# type: ignore
output_file_ligne.append(f"\n--------------\n")
output_file_ligne.append("**Développement total par réseaux**\n")
for row in results: # type: ignore
# formatted_row = '\t'.join(map(str, row))
# output_file_ligne.append('\t' + formatted_row + '\n')
formatted_row = '| ' + ' | '.join(map(str, row)) + ' |'
output_file_ligne.append(formatted_row + '\n')
#print('Développement total: ' + formatted_row + 'm')
results=sql_bilan_annee()
if results[0][0] != None :# type: ignore
output_file_ligne.append(f"\n--------------\n")
output_file_ligne.append("**Développement total topographié par année(s)**\n")
for row in results: # type: ignore
if row[1].strip() != "0.00" or row[3].strip() != "0.00" or row[5].strip() != "0.00" :
# Formatage pour Markdown avec alignement simple
formatted_row = '| ' + ' | '.join(map(str, row)) + ' |'
output_file_ligne.append(formatted_row + '\n')
#print('Développement total: ' + formatted_row + 'm')
Rose(output_file_name_rose)
Shot_lengths_histogram(output_file_name_histo)
findetraitement = datetime.now()
duree = findetraitement - maintenant
jours, secondes = divmod(duree.seconds, 86400) # 86400 secondes dans une journée
heures, secondes = divmod(secondes, 3600) # 3600 secondes dans une heure
minutes, secondes = divmod(secondes, 60) # 60 secondes dans une minute
if duree.seconds > 3600:
duree_formatee = "{:02}(h){:02}(m){:02}(s)".format(heures, minutes, secondes)
elif duree.seconds > 60:
duree_formatee = "{:02}(m){:02}(s)".format(minutes, secondes)
else :
duree_formatee = "{:02}(s)".format(secondes)
if error_count == 0:
output_file_ligne[7] = "* Durée calcul: " + duree_formatee + " sans erreur"
output_file_ligne[7] = output_file_ligne[7].ljust(120)+"*\n"
else :
output_file_ligne[7] = "* !!!Durée calcul: " + duree_formatee + " avec erreur(s): " + str(error_count) + "!!!"
output_file_ligne[7] = output_file_ligne[7].ljust(120)+"*\n"
with open(output_file, 'w', encoding='utf-8') as file:
file.writelines(output_file_ligne)
if error_count == 0 :
print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee + f"\033[1;32m, résultats enregistrés dans \033[0m{safe_relpath(output_file)}")
else :
print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee
+ f",\033[91m avec \033[0m{error_count}\033[91m erreur(s), \033[1;32mrésultats enregistrés dans \033[0m{safe_relpath(output_file)}")
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution des requêtes calcul_stats:\033[0m {e}")
error_count += 1
output_file_ligne.append(f"Erreur lors de l'exécution des requêtes calcul_stats: {e}\n")
with open(output_file, 'w', encoding='utf-8') as file:
file.writelines(output_file_ligne)
except FileNotFoundError:
print(f"\033[91mErreur d'ouverture du fichier: \033[0m{safe_relpath(output_file)} ")
error_count += 1
except Exception as e:
print(f"\033[91mErreur lors de l'exécution de calcul_stats:\033[0m {e}")
error_count += 1
output_file_ligne.append(f"Erreur lors de l'exécution de calcul_stats: {e}\n")
with open(output_file, 'w', encoding='utf-8') as file:
file.writelines(output_file_ligne)
return
#####################################################################################################################################
# # Requête : Table des entrées (Liste des entrées avec coordonnées) #
#####################################################################################################################################
def sql_liste_entree():
global error_count
sql_query= ("""
select
STATION.ID,
STATION.NAME,
/*SURVEY.NAME, SURVEY.PARENT_ID, SURVEY.FULL_NAME, SURVEY.TITLE,*/
round(STATION.X, 1),
round(STATION.Y, 1),
round(STATION.Z, 1)
/*, STATION_FLAG.FLAG , count(STATION.NAME) AS Nombre_Occurrences */
from STATION
join STATION_FLAG on STATION_FLAG.STATION_ID = STATION.ID
join SURVEY on SURVEY.ID = STATION.SURVEY_ID
where STATION_FLAG.FLAG='ent' or STATION_FLAG.FLAG='fix' --and STATION.ID = 28548
group by STATION.NAME , STATION.Y, STATION.Z
order by STATION.NAME ASC
""")
# sql_query2 = ("""
# select
# STATION.ID,
# STATION.NAME,
# /*SURVEY.NAME, SURVEY.PARENT_ID, SURVEY.FULL_NAME, SURVEY.TITLE,*/
# round(STATION.X, 1),
# round(STATION.Y, 1),
# round(STATION.Z, 1)
# /*, STATION_FLAG.FLAG , count(STATION.NAME) AS Nombre_Occurrences */
# from STATION
# join STATION_FLAG on STATION_FLAG.STATION_ID = STATION.ID
# join SURVEY on SURVEY.ID = STATION.SURVEY_ID
# where STATION_FLAG.FLAG='fix' --and STATION.ID = 28548
# --group by STATION.NAME
# order by STATION.NAME ASC
# """)
try:
cursor.execute(sql_query)
result_ent = cursor.fetchall()
# cursor.execute(sql_query)
# result_fix = cursor.fetchall()
if len(result_ent) == 0 :
error_count += 1
print(f"\t \033[91mAttention aucune entrée ou point fix comptabilisé\033[0m")
else :
print(f"\t Table des STATION, entrée et fix nbre: {len(result_ent)}")
return result_ent
# if len(result_ent) == 0:
# print(f"\033[91mPas d'entrées\033[0m")
# if len(result_fix) == 0:
# print(f"\033[91mPas de points fixes\033[0m")
# return None
# else :
# print(f"\tTable des STATION, point fixe nbre: {len(result_fix)}")
# return result_fix
# elif len(result_ent) == len(result_fix) :
# print(f"\tTable des STATION, entrée nbre: {len(result_ent)}")
# # print(f"\tTable des STATION, point fixe nbre: {len(result_fix)}")
# return result_ent
# elif len(result_ent) > len(result_fix) :
# print(f"\033[91mA gérer Points fixes > entrées, traitement uniquement des entrées\033[0m")
# return result_ent
# elif len(result_ent) < len(result_fix) :
# print(f"\033[91mA gérer Points fixes < entrées, traitement uniquement des points fixes\033[0m")
# return result_fix
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête 4 (sql_liste_entree):\033[0m {e}")
error_count += 1
return None
return
#####################################################################################################################################
# # Requête : Table des séries vides #
#####################################################################################################################################
def sql_serie_vides():
global error_count
sql_query5 = ("""
SELECT *
FROM SERIE
WHERE SERIE.SERIE_NBRE_SHOT = -1""")
try:
cursor.execute(sql_query5) # Exécution de la requête SQL
retour = cursor.fetchall()
# if len(retour) == 0 :
# print(f"Aucune séries vides {len(retour)}")
return retour
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête (sql_serie_vides): {e}\033[0m")
error_count += 1
return None
return
#####################################################################################################################################
# # Requête: From_To (recherche si il y a un départ dans le sens From vers To depuis la station Current_Station_ID) #
#####################################################################################################################################
def sql_station_depart(station):
global error_count
try:
cursor.execute(f"""
-- Requête 6: Détection des départs depuis une station (visée directe)
SELECT
SHOT.TO_ID as TO_ID_RESULT,
--JONCTION.STATION_TYPE,
SHOT.LENGTH,
CASE
WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH
ELSE 0
END AS LENGTH_SRF,
CASE
WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH
ELSE 0
END AS LENGTH_DPL,
-- SHOT_FLAG.FLAG as Type_Flag
SHOT.ID
FROM SHOT
JOIN JONCTION ON SHOT.TO_ID = JONCTION.STATION_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE SHOT.FROM_ID = {station} AND JONCTION.STATION_TYPE IS NULL
-- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT)
""")
retour = cursor.fetchall()
# if len(retour) == 0 : print(f"\tAucun départ depuis la station: {station}")
return retour
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête 6 (sql_station_depart) code:\033[0m {e}")
error_count += 1
return None
return
#####################################################################################################################################
# # Requête : To_From (recherche si il y a un départ dans le sens To vers From depuis la station Current_Station_ID) #
#####################################################################################################################################
def sql_station_arrivee(station):
global error_count
try:
cursor.execute(f"""
-- Requête 7: Détection des arrivées depuis une station (Visée inverse)
SELECT
SHOT.FROM_ID as FROM_ID_RESULT,
--JONCTION.STATION_TYPE,
SHOT.LENGTH,
CASE
WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH
ELSE 0
END AS LENGTH_SRF,
CASE
WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH
ELSE 0
END AS LENGTH_DPL,
SHOT.ID
-- SHOT_FLAG.FLAG
FROM SHOT
JOIN JONCTION ON SHOT.FROM_ID = JONCTION.STATION_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE SHOT.TO_ID = {station} AND JONCTION.STATION_TYPE IS NULL
--AND ( SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE SHOT.TO_ID = FROM_ID_RESULT
""")
retour = cursor.fetchall()
# if len(retour) == 0 print(f"\tAucune arrivée depuis la station: {station}")
return retour
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête 7 (sql_station_arrivee) code:\033[0m {e}")
error_count += 1
return None
return
#####################################################################################################################################
# #-- Bilan table série #
#####################################################################################################################################
def sql_bilan_serie():
global error_count
try:
cursor.execute(f"""
-- Bilan table série
select
--STATION.NAME,
-- sum(SERIE.SERIE_LENGHT) as Total,
round(sum(SERIE.SERIE_LENGHT) - sum(SERIE.SERIE_LENGHT_SURFACE)- sum(SERIE.SERIE_LENGHT_DUPLICATE), 2) as Long,
round(sum(SERIE.SERIE_LENGHT_DUPLICATE),2) as Duplicate,
round(sum(SERIE.SERIE_LENGHT_SURFACE),2) as Surface,
sum(SERIE.SERIE_NBRE_SHOT) as Nbre_Shot,
COUNT(*) AS Nbre_serie
FROM SERIE
JOIN STATION ON SERIE.STATION_ENT_ID = STATION.ID
""")
retour = cursor.fetchall()
return retour
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête 11 (sql_bilan_serie):\033[0m {e}")
error_count += 1
return None
return
#####################################################################################################################################
# #-- Bilan table série By Réseaux #
#####################################################################################################################################
def sql_bilan_reseaux():
global error_count
global _largeurCol
global _largeurColTete
retour = []
try:
###############################################################################################################
# Liste des réseaux
###############################################################################################################
cursor.execute(f"""
-- Bilan table série By Réseaux
select
--SURVEY_RESEAU.TITLE as Réseau,
--SURVEY_RESEAU.NAME,
--SURVEY_RESEAU.ID,
RESEAU_ID,
--STATION.NAME as Nom,
-- sum(SERIE.SERIE_LENGHT) as Total,
round(sum(SERIE.SERIE_LENGHT) - sum(SERIE.SERIE_LENGHT_SURFACE)- sum(SERIE.SERIE_LENGHT_DUPLICATE), 2) as Long,
round(sum(SERIE.SERIE_LENGHT_DUPLICATE),2) as Duplicate,
round(sum(SERIE.SERIE_LENGHT_SURFACE),2) as Surface,
sum(SERIE.SERIE_NBRE_SHOT) as Nbre_Shot
--COUNT(*) AS Nbre_serie
--round(max(STATION.Z),2) as Max_Z,
--round(min(STATION.Z),2) as Min_Z,
--max(STATION.Z) - min(STATION.Z) as Delta_Z
FROM SERIE
JOIN STATION ON SERIE.STATION_ENT_ID = STATION.ID
JOIN SURVEY AS SURVEY_JONCTION ON STATION.SURVEY_ID = SURVEY_JONCTION.ID
JOIN SURVEY AS SURVEY_RESEAU ON SURVEY_JONCTION.PARENT_ID = SURVEY_RESEAU.ID
WHERE RESEAU_ID is not NULL and RESEAU_ID !=0
GROUP BY SERIE.RESEAU_ID
ORDER BY Long DESC
""")
result = cursor.fetchall()
if len(result) >0 :
# _ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
# _ligne[0]= 'Aucun réseau'.ljust(_largeurColTete)
# retour.append(_ligne)
# return retour
for row in result:
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
result_shot = 0
cursor.execute(f"""
select
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
from VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null and VISEE_FLAG.RESEAU_ID ={row[0]}
""")
_result_length = cursor.fetchall()
result_length = float(_result_length[0][0])
result_shot = int(_result_length[0][1])
cursor.execute(f"""
select
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
from VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl' and VISEE_FLAG.RESEAU_ID ={row[0]}
""")
_result_length_dpl = cursor.fetchall()
result_length_dpl = float(_result_length_dpl[0][0])
result_shot += int(_result_length_dpl[0][1])
cursor.execute(f"""
select
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
from VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf' and VISEE_FLAG.RESEAU_ID ={row[0]}
""")
_result_length_srf = cursor.fetchall()
result_length_srf = float(_result_length_srf[0][0])
result_shot += int(_result_length_srf[0][1])
# ligne = [ 'none', 0, 0, 0, 0, 0, 0, 'none', 0, 'none', 0, 0]
cursor.execute(f"""
-- Liste des entrée dans la table RESEAU
SELECT
--RESEAU.ENT_1 AS ENT_ID,
--RESEAU.RESEAU_ID,
STATION.NAME
--STATION.Z
FROM RESEAU
JOIN STATION ON RESEAU.ENT_1 = STATION.ID
WHERE RESEAU_ID = {row[0]}
UNION --ALL
SELECT
--RESEAU.ENT_2 AS ENT_ID,
--RESEAU.RESEAU_ID,
STATION.NAME
--STATION.Z
FROM RESEAU
JOIN STATION ON RESEAU.ENT_2 = STATION.ID
WHERE RESEAU_ID = {row[0]}
GROUP BY STATION.NAME
ORDER BY STATION.NAME
--ORDER BY STATION.Z DESC
""")
liste_entree = cursor.fetchall()
_liste_ent = liste_entree[0][0]
index = 1
while index < len(liste_entree):
_liste_ent += ", " + liste_entree[index][0]
index += 1
if len(_liste_ent) > _largeurColTete :
_largeurColTete = len(_liste_ent) + 2
ligne[0] =_liste_ent.ljust(_largeurColTete) # Liste Entrées
ligne[1] = str(len(liste_entree)) # Nre Ent.
ligne[2] = str("{:.2f}".format(result_length)) # Dev.
ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl.
ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf.
ligne[6] = str(result_shot) # Visées
cursor.execute(f"""
-- Requête pour rechercher le point bas d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Min
from STATION
join (
select Min(STATION.Z) as Val_Min
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
WHERE JONCTION.RESEAU_ID = {row[0]}
) min on STATION.Z = min.Val_Min
LIMIT 1
""")
altitude_min = cursor.fetchall()
ligne[7] = altitude_min[0][0]
ligne[8] = str("{:.2f}".format(altitude_min[0][1]))
cursor.execute(f"""
-- Requête pour rechercher le point haut d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Max
from STATION
join (
select Max(STATION.Z) as Val_Max
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
WHERE JONCTION.RESEAU_ID = {row[0]}
) max on STATION.Z = max.Val_Max
LIMIT 1
""")
altitude_max = cursor.fetchall()
ligne[9] = altitude_max[0][0]
ligne[10] = str("{:.2f}".format(altitude_max[0][1]))
ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
retour.append(ligne)
# print(f"Reseau num {row[0]}, {len(liste_entree)} entrée(s): {_liste_ent} ")
###############################################################################################################
# Liste des visées non raccordées
###############################################################################################################
cursor.execute(f"""
SELECT
sum (SHOT.LENGTH) as Long
FROM VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG is NULL
""")
result_long = cursor.fetchall()
if result_long[0][0] is None :
_result_long = 0.0
else :
_result_long = result_long[0][0]
cursor.execute(f"""
SELECT
sum (SHOT.LENGTH) as Long
FROM VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG ='dpl'
""")
result_dpl = cursor.fetchall()
if result_dpl[0][0] is None :
_result_dpl = 0.0
else :
_result_dpl = result_dpl[0][0]
cursor.execute(f"""
SELECT
sum (SHOT.LENGTH) as Long
FROM VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG ='srf'
""")
result_srf = cursor.fetchall()
if result_srf[0][0] is None :
_result_srf = 0.0
else :
_result_srf = result_srf[0][0]
cursor.execute(f"""
SELECT
count (SHOT.LENGTH) as Long
FROM VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
WHERE VISEE_FLAG.SERIE_ID is NULL --and SHOT_FLAG.FLAG ='srf'
""")
result_count = cursor.fetchall()
if result_count[0][0] is None :
_result_count = 0.0
else :
_result_count = result_count[0][0]
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
_liste_ent = "Visée(s) non raccordées"
_liste_ent = _liste_ent.ljust(_largeurColTete)
ligne[0] = _liste_ent
ligne[1] = str("0")
ligne[2] = str("{:.2f}".format(_result_long))
ligne[4] = str("{:.2f}".format(_result_dpl))
ligne[5] = str("{:.2f}".format(_result_srf))
ligne[6] = str(_result_count)
cursor.execute(f"""
-- Requête pour rechercher le point bas d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Min
from STATION
join (
select Min(STATION.Z) as Val_Min
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
WHERE JONCTION.SERIE_ID is null
) min on STATION.Z = min.Val_Min
LIMIT 1
""")
altitude_min = cursor.fetchall()
if len(altitude_min) == 0 :
_altitude_min = 0.0
_altitude_min_name = 'None'
else :
_altitude_min = altitude_min[0][1]
_altitude_min_name = str(altitude_min[0][0])
ligne[7] = str(_altitude_min_name)
ligne[8] = str("{:.2f}".format(_altitude_min))
cursor.execute(f"""
-- Requête pour rechercher le point haut d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Max
from STATION
join (
select Max(STATION.Z) as Val_Max
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
WHERE JONCTION.SERIE_ID is null
) max on STATION.Z = max.Val_Max
LIMIT 1
""")
altitude_max = cursor.fetchall()
if len(altitude_max) == 0 :
_altitude_max = 0.0
_altitude_max_name = 'None'
else :
_altitude_max = altitude_max[0][1]
_altitude_max_name = str(altitude_max[0][0])
ligne[7] = str(_altitude_max_name)
ligne[8] = str("{:.2f}".format(_altitude_max))
ligne[3] = "-" #"{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
if _result_long !=0 or _result_dpl != 0 or _result_srf !=0 or _result_count !=0:
retour.append(ligne)
###############################################################################################################
# Totaux
###############################################################################################################
result_shot = 0
cursor.execute(f"""
select
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
from VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null
""")
_result_length = cursor.fetchall()
result_length = float(_result_length[0][0])
result_shot = int(_result_length[0][1])
cursor.execute(f"""
SELECT
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
FROM VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl'
""")
_result_length_dpl = cursor.fetchall()
result_length_dpl = float(_result_length_dpl[0][0])
result_shot += int(_result_length_dpl[0][1])
cursor.execute(f"""
SELECT
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
FROM VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf'
""")
_result_length_srf = cursor.fetchall()
result_length_srf = float(_result_length_srf[0][0])
result_shot += int(_result_length_srf[0][1])
cursor.execute(f"""
-- Bilan table VISEE_FLAG By entrées
select
ENTREE_ID
--STATION.NAME
FROM VISEE_FLAG
JOIN STATION ON VISEE_FLAG.ENTREE_ID = STATION.ID
WHERE SERIE_ID >0
GROUP BY VISEE_FLAG.ENTREE_ID
""")
_result_entrees = cursor.fetchall()
_total_entrees_topo = len(_result_entrees)
result2 = sql_liste_entree()
_total_entrees_non_topo = len(result2) - _total_entrees_topo # type: ignore
if _result_length[0][1] != None :
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
ligne[0] = "Totaux (entrées et points fixes)".ljust(_largeurColTete) # Liste Entrées
ligne[1] = str("{:.0f}".format(len(result2))) # type: ignore # Nre Ent.
ligne[2] = str("{:.2f}".format(result_length)) # Dev.
ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl.
ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf.
ligne[6] = str(result_shot) # Visées
cursor.execute(f"""
-- Requête pour rechercher le point bas d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Min
from STATION
join (
select Min(STATION.Z) as Val_Min
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
) min on STATION.Z = min.Val_Min
LIMIT 1
""")
altitude_min = cursor.fetchall()
ligne[7] = altitude_min[0][0]
ligne[8] = str(altitude_min[0][1])
cursor.execute(f"""
-- Requête pour rechercher le point haut d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Max
from STATION
join (
select Max(STATION.Z) as Val_Max
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
) max on STATION.Z = max.Val_Max
LIMIT 1
""")
altitude_max = cursor.fetchall()
ligne[9] = altitude_max[0][0]
ligne[10] = str("{:.2f}".format(altitude_max[0][1]))
ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
retour.append(ligne)
else :
_total_entrees_non_topo = 0
_total_entrees_topo = 0
###############################################################################################################
# Liste des entrées uniques
###############################################################################################################
cursor.execute(f"""
-- Bilan table VISEE_FLAG By entrées
select
ENTREE_ID,
STATION.NAME
FROM VISEE_FLAG
JOIN STATION ON VISEE_FLAG.ENTREE_ID = STATION.ID
WHERE RESEAU_ID ==0 or RESEAU_ID is null and SERIE_ID >0
GROUP BY VISEE_FLAG.ENTREE_ID
""")
result = cursor.fetchall()
for row in result :
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
result_shot = 0
cursor.execute(f"""
select
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
from VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null and VISEE_FLAG.ENTREE_ID ={row[0]}
""")
_result_length = cursor.fetchall()
result_length = float(_result_length[0][0])
result_shot = int(_result_length[0][1])
cursor.execute(f"""
select
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
from VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl' and VISEE_FLAG.ENTREE_ID ={row[0]}
""")
_result_length_dpl = cursor.fetchall()
result_length_dpl = float(_result_length_dpl[0][0])
result_shot += int(_result_length_dpl[0][1])
cursor.execute(f"""
select
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
count(VISEE_FLAG.ID) as count,
STATION_TO.NAME as To_Name
from VISEE_FLAG
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf' and VISEE_FLAG.ENTREE_ID ={row[0]}
""")
_result_length_srf = cursor.fetchall()
result_length_srf = float(_result_length_srf[0][0])
result_shot += int(_result_length_srf[0][1])
if result_length_srf == 0.0 and result_length == 0.0 and result_length_dpl == 0.0 :
_total_entrees_non_topo+=1
else :
ligne[0] = str((row[1])).ljust(_largeurColTete)
ligne[1] = str("1")
ligne[2] = str("{:.2f}".format(result_length)) # Dev.
ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl.
ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf.
ligne[6] = str(result_shot) # Visées
cursor.execute(f"""
-- Requête pour rechercher le point bas d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Min
from STATION
join (
select Min(STATION.Z) as Val_Min
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
WHERE JONCTION.ENTREE_ID = {row[0]}
) min on STATION.Z = min.Val_Min
LIMIT 1
""")
altitude_min = cursor.fetchall()
ligne[7] = altitude_min[0][0]
ligne[8] = str("{:.2f}".format(altitude_min[0][1]))
cursor.execute(f"""
-- Requête pour rechercher le point haut d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Max
from STATION
join (
select Max(STATION.Z) as Val_Max
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
WHERE JONCTION.ENTREE_ID = {row[0]}
) max on STATION.Z = max.Val_Max
LIMIT 1
""")
altitude_max = cursor.fetchall()
ligne[9] = altitude_max[0][0]
ligne[10] = str("{:.2f}".format(altitude_max[0][1]))
ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
retour.append(ligne)
###############################################################################################################
# Entrées sans topo
###############################################################################################################
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
if _total_entrees_non_topo >=1 :
ligne[0] = "Entrée(s) sans topographie".ljust(_largeurColTete)
ligne[1] = str(_total_entrees_non_topo)
ligne[2] = "0.00"
ligne[3] = "0.00"
ligne[4] = "0.00"
ligne[5] = "0.00"
ligne[6] = "0"
cursor.execute(f"""
-- Requête pour rechercher le points bas d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Min
from STATION
join (
select Min(STATION.Z) as Val_Min
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
WHERE JONCTION.SERIE_ENT = -1 AND JONCTION.STATION_TYPE = 'ent'
) min on STATION.Z = min.Val_Min
LIMIT 1
""")
altitude_min = cursor.fetchall()
ligne[7] = altitude_min[0][0]
ligne[8] = str(altitude_min[0][1])
cursor.execute(f"""
-- Requête pour rechercher le point haut d'un réseau / entrée
SELECT
STATION.name,
STATION.Z as Max
from STATION
join (
select Max(STATION.Z) as Val_Max
from STATION
join JONCTION on STATION.ID = JONCTION.STATION_ID
WHERE JONCTION.SERIE_ENT = -1 AND JONCTION.STATION_TYPE = 'ent'
) max on STATION.Z = max.Val_Max
LIMIT 1
""")
altitude_max = cursor.fetchall()
ligne[9] = altitude_max[0][0]
ligne[10] = str("{:.2f}".format(altitude_max[0][1]))
ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
retour.append(ligne)
###############################################################################################################
# Tri et résultats
###############################################################################################################
entetes = [ 'Entrée(s)', "Nbre", "Dev.(m)", "Prof.(m)", "Dupl.(m)", "Surf.(m)", "Visées", "ID Sta.", "Alt. min(m)", "ID Sta.", "Alt. max(m)" ]
entetes[0] = entetes[0].ljust(_largeurColTete)
for i in range(9): entetes[i+1] = entetes[i+1].ljust(_largeurCol)
_corps_retour = sorted(retour, key=cle_tri, reverse=True)
_retour = [entetes] + _corps_retour
return _retour # type: ignore
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête (sql_bilan_reseaux):\033[0m {e}")
error_count += 1
return retour
return
#####################################################################################################################################
# # Clé de tri #
#####################################################################################################################################
def cle_tri(element):
return float(element[2])
#####################################################################################################################################
# #-- Bilan topo par années #
#####################################################################################################################################
def sql_bilan_annee():
global error_count
global _largeurCol
global _largeurColTete
try:
retour = []
cursor.execute(f"""
select strftime('%Y', TOPO_DATE) as annee
from CENTRELINE
Where TOPO_DATE is not NULL
order by TOPO_DATE
""")
result = cursor.fetchall()
entetes = [ "Année" , "Dev.(m)", "Cumul (m)", "Dupl.(m)", "Cumul (m)", "Surf.(m)", "Cumul (m)" ]
cumul = 0.0
cumul_dpl = 0.0
cumul_srf = 0.0
for i in range(6): entetes[i] = entetes[i].ljust(_largeurCol)
retour.append(entetes)
# topo sans année
cursor.execute(f"""
select
COALESCE( sum(LENGTH), 0),
COALESCE (sum(DUPLICATE_LENGTH), 0),
COALESCE( sum(SURFACE_LENGTH), 0)
from CENTRELINE
where TOPO_DATE IS NULL
""")
bilan_annee = cursor.fetchall()
if ( len(bilan_annee) >= 1 ) and (( float(bilan_annee[0][0]) > 0.0) or (float(bilan_annee[0][1]) > 0.0 ) or (float(bilan_annee[0][2]) > 0.0 )):
ligne = [ " - ", " - ", " - ", " - ", " - ", " - ", " - " ]
cumul += bilan_annee[0][0]
cumul_dpl += bilan_annee[0][1]
cumul_srf += bilan_annee[0][2]
ligne[0] = "-"
ligne[1] = str("{:.2f}".format(bilan_annee[0][0]))
ligne[2] = str("{:.2f}".format(cumul))
ligne[3] = str("{:.2f}".format(bilan_annee[0][1]))
ligne[4] = str("{:.2f}".format(cumul_dpl))
ligne[5] = str("{:.2f}".format(bilan_annee[0][2]))
ligne[6] = str("{:.2f}".format(cumul_srf))
for i in range(6): ligne[i] = ligne[i].ljust(_largeurCol)
retour.append(ligne)
# années standard
debut = int(result[0][0])
fin = int(result[len(result)-1][0])
PlotExploYears(output_file_name_year, [debut, fin])
for annee in range(debut, fin + 1, 1 ):
cursor.execute(f"""
select
COALESCE (sum(LENGTH), 0),
COALESCE (sum(DUPLICATE_LENGTH), 0),
COALESCE (sum(SURFACE_LENGTH), 0)
from CENTRELINE
where TOPO_DATE between '{annee}-01-01' and '{annee}-12-31';
""")
bilan_annee = cursor.fetchall()
ligne = [ " - ", " - ", " - ", " - ", " - ", " - ", " - " ]
cumul += bilan_annee[0][0]
cumul_dpl += bilan_annee[0][1]
cumul_srf += bilan_annee[0][2]
ligne[0] = str("{:.0f}".format(annee))
ligne[1] = str("{:.2f}".format(bilan_annee[0][0]))
ligne[2] = str("{:.2f}".format(cumul))
ligne[3] = str("{:.2f}".format(bilan_annee[0][1]))
ligne[4] = str("{:.2f}".format(cumul_dpl))
ligne[5] = str("{:.2f}".format(bilan_annee[0][2]))
ligne[6] = str("{:.2f}".format(cumul_srf))
for i in range(6): ligne[i] = ligne[i].ljust(_largeurCol)
retour.append(ligne)
return retour # type: ignore
except sqlite3.Error as e:
print(f"\033[91mErreur lors de l'exécution de la requête (sql_bilan_annee):\033[0m {e}")
error_count += 1
return None
except Exception as e:
print(f"\033[91mErreur lors de l'exécution de sql_bilan_annee:\033[0m {e}")
error_count += 1
return None
#####################################################################################################################################
# diagramme de "rose" #
#####################################################################################################################################
def Rose(graph_name, bins = 72):
"""
Plot a Rose diagram of the entire database
Args:
conn (sqlite_db): database sqlite
graph_name (str): path and name of the graph to save
bins (int, optional): bins for the plot. Defaults to 72.
"""
# Extract the right data
df = pd.read_sql_query("select * from SHOT;", conn)
# Built the histogram
#h, e = np.histogram(df["BEARING"] * np.pi/180., weights = df["LENGTH"], bins = bins)
# Pour enlever les visées verticales (et donc de bearing systématiquement à 0°...)
h, e = np.histogram(df["BEARING"] * np.pi/180., weights = df["LENGTH"]* (90-np.abs(df["GRADIENT"]))/100, bins = bins)
# Plot the rose diagram
ax = plt.subplot(111, projection = "polar")
ax.set_theta_zero_location("N") # type: ignore
ax.set_theta_direction(-1) # type: ignore
ax.bar(e[:-1], h, align = "edge", width = e[1]-e[0])
# Save the rose diagram
plt.savefig(graph_name)
# Close the graph
plt.close(plt.figure(1))
return
#####################################################################################################################################
# diagramme de longueurs de visées #
#####################################################################################################################################
def Shot_lengths_histogram(graph_name, bins = 72, log = None):
"""
Plot the histogram of the lengths of the shots for the entire database
Args:
conn (sqlite_db): database sqlite
graph_name (str): path and name of the graph to save
bins (int, optional): bins for the plot. Defaults to 72.
log (str, optional): set it to 'log' to use a y-logscale. Defaults to None.
"""
# Extract the right data
df = pd.read_sql_query("select * from SHOT;", conn)
# plot the histogram
plt.hist(df["LENGTH"], bins = bins)
plt.xlabel("Longueur de visée (m)")
plt.ylabel("Nombre")
plt.xlim(0,50)
# If log y-scale, set it
if log:
plt.yscale("log")
# save
plt.savefig(graph_name)
plt.close(plt.figure(1))
return
#####################################################################################################################################
# diagrammes par années #
#####################################################################################################################################
def PlotExploYears(graph_name, rangeyear = [1959, datetime.now().year], systems = None):
"""
Args:
conn (sqlite_db): database sqlite
graph_name (str): path and name of the graph to save
rangeyear (np.array of integers, optional): 2 elements numpy array that gives the range of the years to analyse. Defaults to [1959, datetime.date.today().year].
systems (list of str, optional): list of specific systems to plot if needed. Defaults to None.
"""
# define colors to use; You may add colors if needed
colores = ['tab:blue', 'tab:red', 'tab:green', 'tab:orange', 'tab:purple',
'tab:marron', 'tab:olive', 'tab:pink', 'tab:cyan']
if systems:
# Initiate variables
#somme = pd.DataFrame(columns = ['System', 'Year', 'Longueur'])
Sy = []
Yr = []
Lg = []
# Loop on the systems and the years
for system in systems:
for date in range(rangeyear[0], rangeyear[1]+1):
# Define SQL query
lquery = "select sum(LENGTH) from CENTRELINE where SURVEY_ID in (select ID from SURVEY where FULL_NAME LIKE '%s%s%s') and TOPO_DATE between '%s-01-01' and '%s-12-31';" %(chr(37), str(system),chr(37), str(date), str(date))
junk = pd.read_sql_query(lquery, conn)
# Update the DataFrame line to line; DEPRECIATED since pandas 2.0
#somme = somme.append({'System' : system,
# 'Year' : int(date),
# 'Longueur' : junk.to_numpy()[0][0]}, ignore_index = True)
Sy.append(system)
Yr.append(int(date))
Lg.append(junk.to_numpy()[0][0])
#print(junk)
somme = pd.DataFrame(list(zip(Sy, Yr, Lg)), columns = ['System', 'Year', 'Longueur'])
print(max(somme['Longueur']))
# plot the histogram since the first survey
fig = plt.figure(1)
ax1 = fig.add_subplot(111)
fig2 = plt.figure(2)
ax2 = fig2.add_subplot(111)
# Extract the values for the first system
sommesys = somme[somme['System'] == systems[0]]
# Change None values to 0
sommeplot = sommesys.fillna(0)
# Remove the column with the names of the systems
del sommeplot["System"]
print(sommeplot)
ax1.bar(sommeplot["Year"],
sommeplot["Longueur"],
width = 0.5,
color = colores[0],
label = systems[0])
ax2.bar(sommeplot["Year"],
np.cumsum(sommeplot["Longueur"])/1000,
width = 0.5,
color = colores[0],
label = systems[0])
# Skip the loop on systems if there is only one system requested --> stacked barplot not needed
if len(systems) > 1:
# Check if the number of colors is enough for the number of systems
if len(systems)>len(colores):
raise NameError('\033[91mERROR:\033[00m Number of colors lower than the number of systems!\n\tedit the code to add colors in the list, or lower the number of systems to plot')
# Copy the length column in an other column to trace of it
sommeplot[systems[0]] = sommeplot["Longueur"]
for system in systems[1:]:
# Extract the length for the system
temp = somme[somme['System'] == system]
# Replace NaN values by 0 to avoid None values in the sums
tempplot = temp.fillna(0)
# Reset the indexes to permit the sum of the length per year
tempplot.reset_index(inplace = True)
del tempplot["System"]
# Update the barplot
ax1.bar(tempplot["Year"],
tempplot["Longueur"],
bottom = sommeplot["Longueur"],
width = 0.5,
color = colores[systems.index(system)],
label = system)
# Print the cumulative barplot
ax2.bar(tempplot["Year"],
np.cumsum(tempplot["Longueur"])/1000,
bottom = np.cumsum(sommeplot["Longueur"])/1000,
width = 0.5,
color = colores[systems.index(system)],
label = system)
# Do the sum of the length, and write it in the length column
sommeplot["Longueur"] = sommeplot["Longueur"] + tempplot["Longueur"]
# Copy the length of the system in a new column
sommeplot[systems[systems.index(system)]] = tempplot["Longueur"]
# Plot mean line
ax1.axhline(y = somme["Longueur"].mean(), color='red', linestyle='--', label = 'Moy. annuelle')
ax1.set_xlabel("Année")
ax1.set_ylabel("Longueur topographiée (m)")
ax1.legend(loc = 'best')
# Save the histogram
fig.savefig(graph_name + "_Reseau.pdf")
plt.close(plt.figure(1))
# plot the cumulative histogram since the first survey
ax2.set_xlabel("Année")
ax2.set_ylabel("Longueur topographiée cumulée (km)")
ax2.legend(loc = 'best')
# Save the cumulative histogram
fig2.savefig(graph_name + "Cum_Reseau.pdf")
plt.close(plt.figure(1))
else:
#somme = pd.DataFrame(columns = ['Year', 'Longueur'])
Yr = []
Lg = []
for date in range(rangeyear[0], rangeyear[1]):
lquery = "select sum(LENGTH) from CENTRELINE where TOPO_DATE between '%s-01-01' and '%s-12-31';" %(str(date), str(date))
junk = pd.read_sql_query(lquery, conn)
## Depreciated depuis Pandas 2.0
#somme = somme.append({'Year' : int(date),
# 'Longueur' : junk.to_numpy()[0][0]}, ignore_index = True)
Yr.append(int(date))
Lg.append(junk.to_numpy()[0][0])
somme = pd.DataFrame(list(zip(Yr, Lg)), columns = ['Year', 'Longueur'])
# plot the histogram since the first survey
plt.bar(somme["Year"], somme["Longueur"], width = 0.5)
# plot mean
plt.axhline(y = somme["Longueur"].mean(), color='red', linestyle='--', label = 'Moy. annuelle')
plt.xlabel("Année")
plt.ylabel("Longueur topographiée (m)")
# Save the histogram
plt.savefig(graph_name + ".pdf")
plt.close(plt.figure(1))
# plot the cumulative histogram since the first survey
plt.bar(somme["Year"], np.cumsum(somme["Longueur"].fillna(0))/1000, width = 0.5)
plt.xlabel("Année")
plt.ylabel("Longueur topographiée cumulée (km)")
# Save the cumulative histogram
plt.savefig(graph_name + "Cum.pdf")
plt.close(plt.figure(1))
return
#################################################################################################
# fonction pour réduire l'affichage des chemins long #
#################################################################################################
def safe_relpath(path, base_dir=None, max_depth=3, max_name_len=50, prefix="~"):
"""
Retourne un chemin lisible et sûr pour affichage (logs / UI).
- Compatible Windows / Linux / macOS
- Tronque la profondeur du chemin
- Tronque le nom de fichier si trop long
- Ne lève jamais d'exception
"""
try:
path = Path(path).expanduser().resolve()
except Exception:
return str(path)
try:
base = Path(base_dir).expanduser().resolve() if base_dir else Path.cwd().resolve()
except Exception:
base = None
name = path.name or str(path)
if len(name) > max_name_len:
stem = path.stem[: max(1, max_name_len - 6)]
name = f"{stem}...{path.suffix}"
try:
if base:
rel = path.relative_to(base)
parts = list(rel.parts)
else:
raise ValueError
except Exception:
parts = list(path.parts)
if not parts:
parts = ["."]
if isinstance(max_depth, int) and max_depth > 0 and len(parts) > max_depth:
parts = parts[-max_depth:]
parts.insert(0, prefix)
if parts and parts[-1] not in (".", os.sep):
parts[-1] = name
try:
return os.path.join(*parts)
except Exception:
return name
#################################################################################################
# Coloration des messages d'aide d'arg #
#################################################################################################
def colored_help(parser):
"""
Affiche l'aide colorée pour les arguments de la ligne de commande.
Args:
parser (argparse.ArgumentParser): Le parseur d'arguments.
Returns:
None
"""
# Captures the help output
help_text = parser.format_help()
# Coloration des différentes parties
colored_help_text = help_text.replace(
'usage:', f'\033[91musage:\033[0m'
).replace(
'options:', f'\033[92moptions:\033[0m'
).replace('positional arguments:', f'\033[94mpositional arguments:\033[0m'
).replace(', --help', f'\033[94m, --help:\033[0m'
).replace('elp:', f'\033[94melp\033[0m')
# Surligner les arguments
for action in parser._actions:
if action.option_strings:
# Colorer les options (--xyz)
for opt in action.option_strings:
colored_help_text = colored_help_text.replace(opt, f'\033[94m{opt}\033[0m').replace('--help', f'\033[94m--help:\033[0m')
# Imprimer le texte coloré
print(colored_help_text)
sys.exit(1)
#####################################################################################################################################
# #
# Main #
# #
#####################################################################################################################################
if __name__ == '__main__':
_largeurColTete = 30
_largeurCol = 10
avt_compteur = 0
error_count = 0
visee_suprimmees= [ 0.0, 0.0, 0.0, 0] # Lg, Lg dpl, Lg surf
input_file_name = ""
outputs_path = "./Test/"
inputs_path = "./Test/"
# if not os.path.exists(outputs_path): os.makedirs(outputs_path)
if os.name == 'posix': os.system('clear') # Linux, MacOS
elif os.name == 'nt': os.system('cls')# Windows
else: print("\n" * 100)
maintenant = datetime.now()
parser = argparse.ArgumentParser(description=f"Calcul des statistiques par entrées d'une BD Therion",
formatter_class=argparse.RawTextHelpFormatter)
parser.print_help = colored_help.__get__(parser)
parser.add_argument(
'--option',
default="sync",
choices=["sync", "update"],
help=(
f"Options d'execution de pythStat.py\nsync\t-> Synchronisation des données depuis une nouvelle base de données(défaut)\n"
f"update\t-> Mise à jour des statistiques de la base de données\n"
)
)
parser.add_argument("--file", help="Chemin vers le fichier SQL d'entrée (pas de d'option : fenêtre de choix)")
parser.epilog = (f"Commande therion (fichier .thconfig) : export database -o Outputs/database.sql")
# Analyser les arguments de ligne de commande
args = parser.parse_args()
if not args.file: # Si aucun fichier n'est fourni en ligne de commande, ouvrir une fenêtre Tkinter pour sélectionner un fichier
# input_file = "rabbit.sql" # Erreur car pas de point fix ou d'entrée python
# input_file_name = inputs_path + input_file
root = tk.Tk()
root.withdraw() # Cacher la fenêtre principale de Tkinter
input_file_name = filedialog.askopenfilename( title="Sélectionnez le fichier SQL", filetypes=(("Fichiers SQL", "*.sql"), ("Tous les fichiers", "*.*")) )
if not input_file_name:
print("Aucun fichier sélectionné. Le programme va se terminer.")
sys.exit()
outputs_path = os.path.dirname(input_file_name) + "/"
input_file = os.path.basename(input_file_name)
else : # Si le fichier est fourni en ligne de commande
input_file_name = args.file
# print("Le paramètre fourni est:", input_file_name)
if os.path.isfile(input_file_name) is False :
print(f"\033[91mErreur : fichier \033[0m{input_file_name}\033[91m inexistant\033[0m")
print(f"\033[92mCommande : \033[0mpython pythStat.py votre_fichier_therion.sql")
sys.exit()
else :
outputs_path = os.path.dirname(input_file_name) + "/"
input_file = os.path.basename(input_file_name)
if os.name == 'posix': os.system('clear') # Linux, MacOS
elif os.name == 'nt': os.system('cls')# Windows
else: print("\n" * 100)
outputfolder = outputs_path + "stat_" + input_file[:-4] + "_" + maintenant.strftime("%Y-%m-%d") + "/"
if not os.path.exists(outputfolder): os.makedirs(outputfolder)
output_file_name = outputfolder + input_file[:-4]+"_stats.md"
output_file_name_rose = outputfolder + input_file[:-4]+"_rose.pdf"
output_file_name_histo = outputfolder + input_file[:-4]+"_histo.pdf"
output_file_name_year = outputfolder + input_file[:-4]+"_year"
imported_database = outputfolder + input_file[:-4]+"_stats.db"
_titre =['\033[1;32m************************************************************************************************************************\033[0m',
'\033[1;32m* Calcul des statistiques par entrées d\'une BD Therion\033[0m',
'\033[1;32m* Script pythStat par alexandre.pont@yahoo.fr\033[0m',
'\033[1;32m* Version : \033[0m' + Version,
'\033[1;32m* Fichier source : \033[0m' + safe_relpath(input_file_name),
'\033[1;32m* Dossier destination : \033[0m' + safe_relpath(outputfolder),
'\033[1;32m* Date : \033[0m' + maintenant.strftime("%Y-%m-%d %H:%M:%S"),
'\033[1;32m* \033[0m',
'\033[1;32m************************************************************************************************************************\033[0m']
for i in range(9): print(_titre[i].ljust(131)+"\033[1;32m*\033[0m")
titre = [ligne.replace("\033[1;32m", "").replace("\033[0m", "") for ligne in _titre]
if args.option == "sync" :
importation_sql_data(input_file_name)
conn = sqlite3.connect(imported_database) # Connexion à la base de données SQLite
cursor = conn.cursor()
construction_tables()
calcul_stats(output_file_name)
elif args.option == "update" :
conn = sqlite3.connect(imported_database) # Connexion à la base de données SQLite
cursor = conn.cursor()
calcul_stats(output_file_name)
conn.close()