mirror of
https://github.com/Alex38Lyon/Synthese-PSM_LARRA.git
synced 2026-06-01 13:59:13 +00:00
2675 lines
136 KiB
Python
2675 lines
136 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
########################################################################################################################################
|
|
# #
|
|
# Script pour automatiser le calcul des statistiques #
|
|
# d'un fichier database (.sql) produit par Therion #
|
|
# By Alexandre PONT alexandre.pont@yahoo.fr #
|
|
# Février 2024 #
|
|
# #
|
|
# Utilisation: #
|
|
# Exporter le fichier sql avec therion, commande : export database -o Outputs/database.sql #
|
|
# Sélectionner le fichier database.sql à calculer dans main (ligne 2620) #
|
|
# Résultat : fichiers database_stats.csv et suivants dans dossier /output #
|
|
########################################################################################################################################
|
|
|
|
|
|
import sqlite3, sys, os, re
|
|
from alive_progress import alive_bar # https://github.com/rsalmei/alive-progress
|
|
from datetime import datetime
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
"""#####################################################################################################################################
|
|
# Fonction pour importer un fichier SQL dans une base de données SQLite #
|
|
# #
|
|
#####################################################################################################################################"""
|
|
def importation_sql_data(fichier_sql):
|
|
"""
|
|
Fonction pour importer un fichier SQL dans une base de données SQLite
|
|
|
|
Args:
|
|
fichier_sql (_type_): _description_
|
|
"""
|
|
|
|
global error_count
|
|
|
|
try:
|
|
# Si la base de données existe, supprimez-la pour forcer l'écriture
|
|
print(f"\033[1;32mPhase 1: Importation de la base de données Therion \033[0m{input_file_name}\033[1;32m dans: \033[0m{imported_database}")
|
|
if os.path.exists(imported_database):
|
|
#print("Suppression de la Bd existante: " + imported_database)
|
|
os.remove(imported_database)
|
|
|
|
connection = sqlite3.connect(imported_database)
|
|
cursor = connection.cursor()
|
|
|
|
# Lecture du fichier SQL et exécution des commandes
|
|
with open(fichier_sql, 'r') as sql_file:
|
|
sql_script = sql_file.read()
|
|
|
|
|
|
# Séparation du script en commandes individuelles
|
|
sql_script = re.sub(r', nan', ', 0', sql_script, flags=re.IGNORECASE)
|
|
commandes = [cmd.strip() + ';\n' for cmd in sql_script.split(';\n') if cmd.strip()]
|
|
|
|
|
|
# Exécution des commandes avec une barre de progression
|
|
with alive_bar(len(commandes), title = "\x1b[32;1m\t Progression\x1b[0m", length = 20) as bar:
|
|
for commande in commandes:
|
|
cursor.execute(commande)
|
|
connection.commit()
|
|
bar()
|
|
|
|
connection.close()
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête importation_sql_data code:\033[0m {e}")
|
|
error_count += 1
|
|
sys.exit(1) # Arrête le programme en cas d'erreur
|
|
|
|
return
|
|
|
|
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour construire les tables JONCTION, SERIE, VISEE_FLAG et RESEAU #
|
|
# #
|
|
#####################################################################################################################################
|
|
def construction_tables():
|
|
"""
|
|
Fonction pour construire les tables JONCTION, SERIE, VISEE_FLAG et RESEAU
|
|
"""
|
|
global avt_compteur
|
|
global error_count
|
|
# Principales requêtes
|
|
|
|
#conn = sqlite3.connect(database) # Connexion à la base de données SQLite
|
|
#cursor = conn.cursor()
|
|
# print(f"\033[1;32mConstruction des tables dans {imported_database}\033[0m")
|
|
|
|
try :
|
|
print(f"\033[1;32mPhase 2: Création des nouvelles tables, indexation\033[0m")
|
|
|
|
cursor.execute("DROP TABLE IF EXISTS JONCTION") # Créer et initialiser une nouvelle table de jonctions
|
|
cursor.execute("""
|
|
CREATE TABLE JONCTION (
|
|
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
STATION_ID INTEGER,
|
|
SERIE_ID INTEGER,
|
|
SERIE_RANG INTEGER,
|
|
SERIE_ENT INTEGER,
|
|
STATION_ENT INTEGER,
|
|
SERIE_JONC INTEGER,
|
|
STATION_JONC INTEGER,
|
|
STATION_TYPE varchar(4),
|
|
ENTREE_ID INTEGER,
|
|
RESEAU_ID INTEGER
|
|
)
|
|
""")
|
|
cursor.execute("select STATION.ID from STATION")
|
|
results = cursor.fetchall()
|
|
Next_Station_ID = cursor.fetchall() # Pour forcer le type
|
|
cursor.executemany("INSERT INTO JONCTION (STATION_ID) VALUES (?)", results)
|
|
conn.commit()
|
|
|
|
cursor.execute("DROP TABLE IF EXISTS SERIE") # Créer et initialiser une nouvelle table des Series
|
|
cursor.execute("""
|
|
CREATE TABLE SERIE (
|
|
SERIE_ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
SERIE_DEP_ID INTEGER,
|
|
STATION_DEP_ID INTEGER,
|
|
SERIE_ARR_ID INTEGER,
|
|
STATION_ARR_ID INTEGER,
|
|
SERIE_NBRE_SHOT INTEGER,
|
|
SERIE_LENGHT REAL,
|
|
SERIE_LENGHT_SURFACE REAL,
|
|
SERIE_LENGHT_DUPLICATE REAL,
|
|
DIRECTION INTEGER,
|
|
STATION_ENT_ID INTEGER,
|
|
RESEAU_ID INTEGER)
|
|
""")
|
|
Current_Serie_ID = cursor.lastrowid
|
|
conn.commit()
|
|
|
|
cursor.execute("DROP TABLE IF EXISTS RESEAU")
|
|
cursor.execute("""
|
|
CREATE TABLE RESEAU (
|
|
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
RESEAU_ID INTEGER,
|
|
STATION_JONC INTEGER,
|
|
ENT_1 INTEGER,
|
|
ENT_2 INTEGER)
|
|
""")
|
|
conn.commit()
|
|
|
|
SHOT_equates_station()
|
|
|
|
issue_SHOT()
|
|
duplicate_SHOT()
|
|
|
|
cursor.execute("DROP TABLE IF EXISTS VISEE_FLAG") # Créer et initialiser une nouvelle table VISEE_FLAG (suivi des visées lues)
|
|
cursor.execute("""
|
|
CREATE TABLE VISEE_FLAG (
|
|
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
SHOT_ID INTEGER,
|
|
SERIE_ID INTEGER,
|
|
ENTREE_ID INTEGER,
|
|
RESEAU_ID INTEGER,
|
|
SERIE_RANG INTEGER
|
|
)
|
|
""")
|
|
cursor.execute("select SHOT.ID from SHOT")
|
|
results = cursor.fetchall()
|
|
cursor.executemany("INSERT INTO VISEE_FLAG (SHOT_ID) VALUES (?)", results) # type: ignore
|
|
conn.commit()
|
|
|
|
print(f"\033[0m\t Création de l'index des tables principales et optimisation de la mémoire\033[0m")
|
|
|
|
cursor.execute("CREATE INDEX INDEX_JONCTION_STATION_ID ON JONCTION(STATION_ID)")
|
|
cursor.execute("PRAGMA index_list(SHOT)")
|
|
result = cursor.fetchall()
|
|
|
|
marquage_visee_station_habillage()
|
|
|
|
# print(result)
|
|
if len(result)==0 :
|
|
cursor.execute("CREATE INDEX INDEX_SHOT_FROM_ID ON SHOT(FROM_ID)")
|
|
cursor.execute("CREATE INDEX INDEX_SHOT_TO_ID ON SHOT(TO_ID)")
|
|
cursor.execute("CREATE INDEX INDEX_SHOT_FLAG_SHOT_ID ON SHOT_FLAG(SHOT_ID)")
|
|
cursor.execute("CREATE INDEX INDEX_STATION_ID ON STATION(ID)")
|
|
cursor.execute("CREATE INDEX INDEX_VISEE_FLAG_SHOT_ID ON VISEE_FLAG(SHOT_ID)")
|
|
cursor.execute("CREATE INDEX INDEX_STATION_FLAG_STATION_ID ON STATION_FLAG(STATION_ID)")
|
|
|
|
cursor.execute("VACUUM")
|
|
conn.commit()
|
|
|
|
# A partir des entrées, remplir les tables des jonctions et des séries
|
|
results = sql_liste_entree()
|
|
print(f"\033[1;32mPhase 3: Remplissage des tables d'après les \033[0m{len(results)}\033[1;32m entrée(s)\033[0m") # type: ignore
|
|
for row in results: # type: ignore
|
|
# if row[0]==28548:
|
|
# print("debug point")
|
|
|
|
cursor.execute(f"""
|
|
INSERT INTO SERIE (
|
|
SERIE_DEP_ID,
|
|
STATION_DEP_ID,
|
|
SERIE_ARR_ID ,
|
|
STATION_ARR_ID,
|
|
SERIE_NBRE_SHOT,
|
|
SERIE_LENGHT,
|
|
SERIE_LENGHT_SURFACE,
|
|
SERIE_LENGHT_DUPLICATE,
|
|
DIRECTION,
|
|
STATION_ENT_ID,
|
|
RESEAU_ID)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (0, row[0], 0, row[0], -1, 0, 0, 0, 0, row[0], 0))
|
|
Current_Serie_ID = cursor.lastrowid
|
|
|
|
cursor.execute(f"""
|
|
UPDATE JONCTION SET
|
|
SERIE_ID = ?,
|
|
SERIE_ENT = ?,
|
|
STATION_ENT = ?,
|
|
SERIE_JONC = ?,
|
|
STATION_JONC = ?,
|
|
STATION_TYPE = ?,
|
|
SERIE_RANG = ?,
|
|
ENTREE_ID = ?,
|
|
RESEAU_ID = ?
|
|
WHERE STATION_ID = ?
|
|
""", (Current_Serie_ID, 0, row[0], 0, 0,'ent', 0, row[0], 0, row[0]))
|
|
|
|
cursor.execute(f"""
|
|
UPDATE VISEE_FLAG SET
|
|
SERIE_ID = {Current_Serie_ID},
|
|
ENTREE_ID = {row[0]},
|
|
RESEAU_ID = {0}
|
|
WHERE SHOT_ID = {Current_Serie_ID}
|
|
""")
|
|
conn.commit()
|
|
|
|
# print(f"\tCréation Série: {Current_Serie_ID} depuis la station d'entrée Station_ID: {row[0]}")
|
|
|
|
|
|
# A partir des série vides, itération pour remplir les tables des JONCTION et des SERIE
|
|
print(f"\033[1;32mPhase 4: Remplissage des tables d'après les séries vides jonctionnées aux\033[0m {Current_Serie_ID}\033[1;32m entrée(s)\033[0m")
|
|
|
|
results = sql_serie_vides()
|
|
Count = 1
|
|
New_Serie_IDOld = 0
|
|
New_Serie_ID = cursor.lastrowid
|
|
Current_Station_ID_Old = 0
|
|
Current_Station_ID = 0
|
|
|
|
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
|
|
_compteur = cursor.fetchall()
|
|
compteur_ttl = int(_compteur[0][0])
|
|
avt_compteur = 0
|
|
|
|
with alive_bar(compteur_ttl, title = "\x1b[32;1m\t Progression\x1b[0m", length = 20) as bar:
|
|
while len(results) > 0: # type: ignore
|
|
# print(f"\033[1;32mPhase 4.{Count}: Remplissage des tables JONCTION et SERIE itération: {Count}, séries créée(s): {New_Serie_ID} ajoutée(s): {New_Serie_ID-New_Serie_IDOld} à traiter: {len(results)}\033[0m") # type: ignore
|
|
bar.text(f"itération: {Count}, série(s) créée(s): {New_Serie_ID}") # type: ignore
|
|
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
|
|
_compteur = cursor.fetchall()
|
|
compteur = int(_compteur[0][0])
|
|
if ( compteur_ttl - compteur ) > avt_compteur :
|
|
ajout = compteur_ttl - compteur - avt_compteur
|
|
bar(ajout)
|
|
avt_compteur = compteur_ttl - compteur
|
|
for row in results: # type: ignore
|
|
# Suivi de la série
|
|
Current_Serie_ID = int(row[0])
|
|
Current_Station_ID_Old = int(Current_Station_ID) # type: ignore
|
|
Current_Station_ID = int(row[2])
|
|
Current_Nre_Shot = int(row[5])
|
|
# Current_Serie_Lenght = 0.0 + float(row[6])
|
|
# Current_Serie_Lenght_Surface = 0.0 + float(row[7])
|
|
# Current_Serie_Lenght_Duplicate = 0.0 + float(row[8])
|
|
Current_Ent = int(row[10])
|
|
Direction = int(row[9])
|
|
#print(f"\tSerie courante {Current_Serie_ID} Station_ID: {Current_Station_ID} results: {results}")
|
|
Fin_Serie = False
|
|
while Fin_Serie is False:
|
|
if Direction == 1 :
|
|
Next_Station_ID = sql_station_depart(Current_Station_ID)
|
|
elif Direction == -1 :
|
|
Current_Station_ID = int(row[4])
|
|
Next_Station_ID = sql_station_arrivee(Current_Station_ID)
|
|
elif Direction == 0 :
|
|
Next_Station_ID_1 = sql_station_depart(Current_Station_ID)
|
|
Next_Station_ID_2 = sql_station_arrivee(Current_Station_ID)
|
|
if len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 0: # type: ignore
|
|
# Entrée sans départ et sans d'arrivée : fin de la série
|
|
cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {Current_Serie_ID};")
|
|
cursor.execute(f"UPDATE JONCTION SET SERIE_ENT = -1 WHERE STATION_ID = {Current_Station_ID};")
|
|
conn.commit()
|
|
Next_Station_ID = Next_Station_ID_1
|
|
elif len(Next_Station_ID_1) == 1 and len(Next_Station_ID_2) == 0: # type: ignore
|
|
# Un départ pas d'arrivée
|
|
Next_Station_ID = Next_Station_ID_1
|
|
Direction = 1
|
|
cursor.execute(f"UPDATE SERIE SET DIRECTION = 1 WHERE SERIE_ID = {Current_Serie_ID};")
|
|
conn.commit()
|
|
elif len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 1: # type: ignore
|
|
# Une arrivée pas de départ
|
|
Next_Station_ID = Next_Station_ID_2
|
|
Direction = -1
|
|
cursor.execute(f"UPDATE SERIE SET DIRECTION = -1 WHERE SERIE_ID = {Current_Serie_ID};")
|
|
conn.commit()
|
|
else :
|
|
# A gérer nouvelles séries
|
|
nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, 1, Current_Ent) # type: ignore
|
|
# print (f"\033[34m\tA traiter création nouvelles séries inverses depuis l'entrée {Current_Station_ID} - {Next_Station_ID_2}, {Next_Station_ID_1}\033[0m")
|
|
nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, -1, Current_Ent)
|
|
Direction = 1
|
|
Next_Station_ID = Next_Station_ID_1
|
|
|
|
if len(Next_Station_ID) == 0 : # type: ignore
|
|
Next_Station_ID_1 = sql_station_depart(Current_Station_ID) # type: ignore
|
|
Next_Station_ID_2 = sql_station_arrivee(Current_Station_ID) # type: ignore
|
|
# print(f"\033[34m\tA gérer, fin de la Série: {Current_Serie_ID} à la Station_ID: {Current_Station_ID} nbre: {Current_Nre_Shot} Next station: {Next_Station_ID}, départs directs {len(Next_Station_ID_1)}, départs inverses {len(Next_Station_ID_2)}\033[0m") # type: ignore
|
|
# cursor.execute(f"UPDATE SERIE SET SERIE_DEP_ID = {Current_Serie_ID} WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore
|
|
# cursor.execute(f"UPDATE SERIE SET STATION_DEP_ID = {Current_Station_ID} WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore
|
|
# cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {Current_Serie_ID};") # type: ignore
|
|
cursor.execute(f"DELETE FROM SERIE WHERE SERIE_ID = {Current_Serie_ID};")
|
|
conn.commit() # type: ignore
|
|
Fin_Serie = True
|
|
elif len(Next_Station_ID) == 1 : # type: ignore
|
|
suivi_serie(Current_Serie_ID, bar)
|
|
Fin_Serie = True
|
|
else :
|
|
#print(f"Station_ID {Current_Station_ID} de la Serie {Current_Serie_ID}, départs à gérer: {len(Next_Station_ID)}, Next station: {Next_Station_ID}") # type: ignore
|
|
suivi_serie(Current_Serie_ID, bar)
|
|
# Création de X nouvelles séries et mise à jour de la table des jonctions
|
|
#nouvelles_series(Current_Station_ID, Current_Station_ID_Old, Current_Serie_ID, 1, Current_Ent)
|
|
Fin_Serie = True
|
|
# Exécution de la requête SQL
|
|
resultsOld = results
|
|
results = sql_serie_vides() # type: ignore
|
|
New_Serie_IDOld = New_Serie_ID
|
|
New_Serie_ID = cursor.lastrowid # type: ignore
|
|
if resultsOld == results :
|
|
#print(f"Erreur sortie itération qté: {len(resultsOld)} - {resultsOld} - {results}")
|
|
print(f"\033[91mErreur sortie itération {Count}, séries restantes: {len(resultsOld)} - {results}\033[0m") # type: ignore
|
|
error_count += 1
|
|
break
|
|
|
|
Count += 1
|
|
|
|
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
|
|
_compteur = cursor.fetchall()
|
|
compteur = int(_compteur[0][0])
|
|
|
|
if ( compteur_ttl ) > avt_compteur :
|
|
ajout = compteur_ttl - avt_compteur
|
|
bar(ajout)
|
|
avt_compteur = compteur_ttl
|
|
|
|
|
|
orphelines_shot()
|
|
jonction_RESEAU()
|
|
|
|
if compteur > 0 :
|
|
print(f"\033[1;32mPhase 4: Fin du remplissage des tables,\033[91m attention \033[0m{compteur}\033[91m station(s) non comptabilisé(s)\033[0m")
|
|
error_count += 1
|
|
# else :
|
|
# print(f"\033[1;32mPhase 4: Fin du remplissage des tables voir {imported_database}\033[0m")
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution d'une des requêtes (construction_tables) code:\033[0m {e}")
|
|
error_count += 1
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Création des séries entres les visées orphelines -> visées d'une visées entre 2 points marqués #
|
|
#####################################################################################################################################
|
|
def orphelines_shot():
|
|
global error_count
|
|
|
|
try:
|
|
|
|
cursor.execute("""
|
|
-- Visées orphelines
|
|
SELECT
|
|
VISEE_FLAG.SHOT_ID,
|
|
VISEE_FLAG.SERIE_ID,
|
|
VISEE_FLAG.ENTREE_ID,
|
|
SHOT_FLAG.FLAG,
|
|
SHOT.FROM_ID,
|
|
-- STATION_FROM.NAME,
|
|
JONCTION_FROM.SERIE_ID,
|
|
JONCTION_FROM.STATION_TYPE,
|
|
JONCTION_FROM.ENTREE_ID,
|
|
SHOT.TO_ID,
|
|
-- STATION_TO.NAME,
|
|
JONCTION_TO.SERIE_ID,
|
|
JONCTION_TO.STATION_TYPE,
|
|
JONCTION_TO.ENTREE_ID,
|
|
SHOT.LENGTH
|
|
--sum (SHOT.LENGTH)
|
|
FROM VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
|
|
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
|
|
WHERE VISEE_FLAG.SERIE_ID is NULL and JONCTION_TO.SERIE_ID is not null and JONCTION_FROM.SERIE_ID is not null
|
|
""")
|
|
conn.commit()
|
|
orphelines = cursor.fetchall()
|
|
|
|
print(f"\t Intégrations des visées orphelines (entre 2 stations existantes) nbre: {len(orphelines)}")
|
|
|
|
for row in orphelines:
|
|
_SERIE_LENGHT = 0
|
|
_SERIE_LENGHT_SURFACE = 0
|
|
_SERIE_LENGHT_DUPLICATE = 0
|
|
|
|
if row[3] == 'dpl':
|
|
_SERIE_LENGHT_DUPLICATE = row[12]
|
|
elif row[3] == 'srf':
|
|
_SERIE_LENGHT_SURFACE = row[12]
|
|
else :
|
|
_SERIE_LENGHT = row[12]
|
|
|
|
cursor.execute(f"""
|
|
INSERT INTO SERIE (
|
|
SERIE_DEP_ID,
|
|
STATION_DEP_ID,
|
|
SERIE_ARR_ID ,
|
|
STATION_ARR_ID,
|
|
SERIE_NBRE_SHOT,
|
|
SERIE_LENGHT,
|
|
SERIE_LENGHT_SURFACE,
|
|
SERIE_LENGHT_DUPLICATE,
|
|
DIRECTION,
|
|
STATION_ENT_ID)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( row[5], row[4], row[9], row[8], 1, _SERIE_LENGHT, _SERIE_LENGHT_SURFACE, _SERIE_LENGHT_DUPLICATE, 1, row[7] ))
|
|
_Current_Serie_ID = cursor.lastrowid
|
|
|
|
cursor.execute(f"""
|
|
UPDATE VISEE_FLAG SET
|
|
SERIE_ID = {_Current_Serie_ID},
|
|
ENTREE_ID = {row[7]},
|
|
SERIE_RANG = {1}
|
|
WHERE SHOT_ID = {row[0]}
|
|
""")
|
|
|
|
if row[7] != row[11] :
|
|
cursor.execute(f"INSERT INTO RESEAU (STATION_JONC, ENT_1, ENT_2) VALUES (?, ?, ?)", (row[8], row[7], row[11]))
|
|
# print (f"\033[36m\t Jonction des entrées à la Station_ID: {row[8]} entre: {row[7]} et: {row[11]}\033[0m")
|
|
|
|
conn.commit()
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête orphelines_shot code:\033[0m {e}" )
|
|
error_count += 1
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour joindre les reseaux #
|
|
#####################################################################################################################################
|
|
def jonction_RESEAU():
|
|
global error_count
|
|
|
|
try:
|
|
cursor.execute("""
|
|
--Recherche des doublons dans les jonctions réseau
|
|
SELECT
|
|
ID
|
|
--GROUP_CONCAT(RESEAU.ID) as DUPP_SHOT
|
|
FROM RESEAU
|
|
GROUP BY RESEAU.ENT_1, RESEAU.ENT_2, RESEAU.STATION_JONC
|
|
HAVING COUNT(RESEAU.ENT_1)>1 AND COUNT(RESEAU.ENT_2)>1 AND COUNT(RESEAU.STATION_JONC)>1
|
|
""")
|
|
conn.commit()
|
|
doublons = cursor.fetchall()
|
|
|
|
# print(f"\t Table des RESEAUX doublons nbre: {len(doublons)}")
|
|
for row in doublons : cursor.execute(f"DELETE FROM RESEAU WHERE RESEAU.ID = {row[0]}")
|
|
conn.commit()
|
|
|
|
index_reseau = 0
|
|
|
|
while True:
|
|
index_reseau += 1
|
|
cursor.execute("""
|
|
-- Liste des entrée dans la table RESEAU
|
|
SELECT
|
|
RESEAU.ENT_1 AS ENT,
|
|
RESEAU.RESEAU_ID,
|
|
STATION.NAME,
|
|
STATION.Z
|
|
FROM RESEAU
|
|
JOIN STATION ON RESEAU.ENT_1 = STATION.ID
|
|
WHERE RESEAU.RESEAU_ID IS NULL
|
|
UNION --ALL
|
|
SELECT
|
|
RESEAU.ENT_2 AS ENT,
|
|
RESEAU.RESEAU_ID,
|
|
STATION.NAME,
|
|
STATION.Z
|
|
FROM RESEAU
|
|
JOIN STATION ON RESEAU.ENT_2 = STATION.ID
|
|
WHERE RESEAU.RESEAU_ID IS NULL
|
|
ORDER BY STATION.Z DESC
|
|
""")
|
|
conn.commit()
|
|
entrees = cursor.fetchall()
|
|
|
|
if len(entrees) == 0:
|
|
break # Sortie boucle while si plus d'entrées à traiter...
|
|
|
|
cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_1 = {entrees[0][0]} ")
|
|
cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_2 = {entrees[0][0]} ")
|
|
conn.commit()
|
|
|
|
liste_entrees_reseau = []
|
|
liste_entrees_reseau.append(entrees[0][0])
|
|
|
|
nbre_entree_reseau_old= 0
|
|
nbre_entree_reseau=len(liste_entrees_reseau)
|
|
|
|
while nbre_entree_reseau_old != nbre_entree_reseau :
|
|
nbre_entree_reseau_old = nbre_entree_reseau
|
|
for row in liste_entrees_reseau :
|
|
cursor.execute(f"""
|
|
-- Recherche entrée jonctionnées
|
|
SELECT RESEAU.ENT_1 FROM RESEAU WHERE RESEAU.ENT_2 = {row} -- AND RESEAU.RESEAU_ID IS NULL
|
|
UNION
|
|
SELECT RESEAU.ENT_2 FROM RESEAU WHERE RESEAU.ENT_1 = {row} -- AND RESEAU.RESEAU_ID IS NULL
|
|
""")
|
|
jonction = cursor.fetchall()
|
|
|
|
for row2 in jonction: # type: ignore
|
|
if row2[0] not in liste_entrees_reseau:
|
|
# print(f"\t Jonction de l'entrée: {row2[0]} au reseau ID: {index_reseau}")
|
|
cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_1 = {row2[0]} ")
|
|
cursor.execute(f"UPDATE RESEAU SET RESEAU_ID = {index_reseau} WHERE RESEAU.ENT_2 = {row2[0]} ")
|
|
liste_entrees_reseau.append(row2[0])
|
|
conn.commit()
|
|
nbre_entree_reseau=len(liste_entrees_reseau)
|
|
for row2 in liste_entrees_reseau :
|
|
cursor.execute(f"UPDATE JONCTION SET RESEAU_ID = {index_reseau} WHERE JONCTION.ENTREE_ID = {row2} ")
|
|
cursor.execute(f"UPDATE SERIE SET RESEAU_ID = {index_reseau} WHERE SERIE.STATION_ENT_ID = {row2} ")
|
|
cursor.execute(f"UPDATE VISEE_FLAG SET RESEAU_ID = {index_reseau} WHERE VISEE_FLAG.ENTREE_ID = {row2} ")
|
|
conn.commit()
|
|
|
|
print(f"\t Réseau: {index_reseau}, entrées jonctionnées: {len(liste_entrees_reseau)}, {liste_entrees_reseau}")
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête Jonction_RESEAU code:\033[0m {e}")
|
|
error_count += 1
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour joindre les equates dans la table des SHOT # #
|
|
#####################################################################################################################################
|
|
def SHOT_equates_station():
|
|
global error_count
|
|
# Requête 8: recherche des equates
|
|
|
|
try:
|
|
cursor.execute(f"""
|
|
-- Requête 8, recherche des equates --
|
|
SELECT
|
|
-- STATION.ID,
|
|
-- STATION.NAME,
|
|
GROUP_CONCAT(STATION.ID) as ID_Group
|
|
-- GROUP_CONCAT(STATION.X) as X_Group,
|
|
-- GROUP_CONCAT(STATION.Y) as Y_Group,
|
|
-- GROUP_CONCAT(STATION.Z) as Z_Group,
|
|
-- COUNT(STATION.X) AS Qte_X,
|
|
-- COUNT(STATION.Y) AS Qte_Y,
|
|
-- COUNT(STATION.Z) AS Qte_Z
|
|
FROM STATION
|
|
WHERE STATION.NAME <> '.' AND STATION.NAME <> '-'
|
|
-- INNER JOIN STATION AS STATION_Bis ON STATION.X = STATION_Bis.X AND STATION.Y = STATION_Bis.Y AND STATION.Z = STATION_Bis.Z
|
|
--WHERE STATION.X = STATION_BIS.X
|
|
GROUP BY STATION.X, STATION.Y, STATION.Z
|
|
HAVING COUNT(STATION.X)>1 AND COUNT(STATION.Y)>1 AND COUNT(STATION.Y)>1
|
|
""")
|
|
equate = cursor.fetchall()
|
|
|
|
print(f"\t Jonction de SHOT equates nbre: {len(equate)}")
|
|
for row in equate :
|
|
sous_valeurs = row[0].split(',')
|
|
# print(f": {sous_valeurs[0]} = ", end="")
|
|
for val in range (1, len(sous_valeurs)) :
|
|
# print(f"{sous_valeurs[val]},", end=" ")
|
|
cursor.execute(f"SELECT SHOT.ID FROM SHOT WHERE SHOT.FROM_ID = {sous_valeurs[val]}")
|
|
filtre = cursor.fetchall()
|
|
|
|
for row in filtre :
|
|
cursor.execute(f"UPDATE SHOT SET FROM_ID = {sous_valeurs[0]} WHERE ID = {row[0]};")
|
|
|
|
cursor.execute(f"SELECT SHOT.ID FROM SHOT WHERE SHOT.TO_ID = {sous_valeurs[val]}")
|
|
filtre = cursor.fetchall()
|
|
|
|
for row in filtre :
|
|
cursor.execute(f"UPDATE SHOT SET TO_ID = {sous_valeurs[0]} WHERE ID = {row[0]};")
|
|
|
|
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('equ', sous_valeurs[val]))
|
|
cursor.execute(f"UPDATE JONCTION SET STATION_JONC = ? WHERE id = ?", (sous_valeurs[0], sous_valeurs[val]))
|
|
#print(" ", end="")
|
|
conn.commit()
|
|
|
|
# print("")
|
|
|
|
# if len(equate) == 0 : print(f"\tAucun 'equate'")
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête 8 (sql_8_equates) code:\033[0m {e}")
|
|
error_count += 1
|
|
|
|
return
|
|
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour supprimer les visées en double (même départs arrivée lg, az et pente) #
|
|
#####################################################################################################################################
|
|
def duplicate_SHOT():
|
|
global error_count
|
|
# Requête 10, recherche des doublons de visées
|
|
|
|
try:
|
|
cursor.execute(f"""
|
|
-- Requête 10, recherche des doublons de visées --
|
|
SELECT
|
|
SHOT.FROM_ID,
|
|
SHOT.TO_ID,
|
|
SHOT.LENGTH,
|
|
-- SHOT.BEARING,
|
|
-- SHOT.GRADIENT,
|
|
GROUP_CONCAT(SHOT.ID) as DUPP_SHOT
|
|
FROM SHOT
|
|
GROUP BY SHOT.FROM_ID, SHOT.TO_ID --, SHOT.BEARING, SHOT.GRADIENT, SHOT.LENGTH
|
|
HAVING COUNT(SHOT.FROM_ID)>1 AND COUNT(SHOT.TO_ID)>1 --AND COUNT(SHOT.BEARING)>1 AND COUNT(SHOT.GRADIENT)>1 AND COUNT(SHOT.LENGTH)>1
|
|
""")
|
|
duplicate = cursor.fetchall()
|
|
|
|
_total_length_err = 0.0
|
|
for row in duplicate :
|
|
sous_valeurs = row[3].split(',')
|
|
cursor.execute(f"""
|
|
SELECT
|
|
SHOT.ID,
|
|
SHOT_FLAG.FLAG,
|
|
round(SHOT.LENGTH, 2)
|
|
FROM SHOT
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE SHOT.ID = {int(sous_valeurs[0])}
|
|
""")
|
|
shot_flag = cursor.fetchall()
|
|
# print("\t " + str(shot_flag))
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
SHOT.ID,
|
|
SHOT_FLAG.FLAG,
|
|
round(SHOT.LENGTH, 2)
|
|
FROM SHOT
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE SHOT.ID = {int(sous_valeurs[1])}
|
|
""")
|
|
shot_flag2 = cursor.fetchall()
|
|
# print("\t " + str(shot_flag2))
|
|
# _total_length += int(row[2])
|
|
if shot_flag[0][1] is None and shot_flag2[0][1]== 'dpl':
|
|
cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION")
|
|
Current_Station_ID = cursor.fetchall()
|
|
_Current_Station_ID = int(Current_Station_ID[0][0]) + 1
|
|
cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')")
|
|
cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag2[0][0]}")
|
|
cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})")
|
|
elif shot_flag2[0][1] is None and shot_flag[0][1]== 'dpl':
|
|
cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION")
|
|
Current_Station_ID = cursor.fetchall()
|
|
_Current_Station_ID = int(Current_Station_ID[0][0]) + 1
|
|
cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')")
|
|
cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag[0][0]}")
|
|
cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})")
|
|
else :
|
|
_total_length_err += float(shot_flag2[0][2])
|
|
cursor.execute("SELECT COUNT(*) AS nombre_enregistrements FROM STATION")
|
|
Current_Station_ID = cursor.fetchall()
|
|
_Current_Station_ID = int(Current_Station_ID[0][0]) + 1
|
|
cursor.execute(f"INSERT INTO STATION (ID, NAME) VALUES ({_Current_Station_ID}, 'isu')")
|
|
cursor.execute(f"UPDATE SHOT SET TO_ID = {_Current_Station_ID} WHERE SHOT.ID = {shot_flag2[0][0]}")
|
|
cursor.execute(f"INSERT INTO JONCTION (STATION_ID) VALUES ({_Current_Station_ID})")
|
|
print(f"\033[91m\t Table des SHOT, visées en double à traiter à la source : \033[0m{shot_flag}, {shot_flag2}" +
|
|
f"\033[91m, station crée : \033[0m{_Current_Station_ID}" +
|
|
f"\033[91m, long. en double : \033[0m{"{:.2f}".format(_total_length_err)} m")
|
|
|
|
conn.commit()
|
|
|
|
|
|
# for val in range (1, len(sous_valeurs)) :
|
|
# cursor.execute(f"DELETE FROM SHOT WHERE SHOT.ID = {sous_valeurs[val]}")
|
|
# filtre = cursor.fetchall()
|
|
|
|
if len(duplicate) > 0:
|
|
print(f"\t Table des SHOT, visées dupliquées traités nbre: {len(duplicate)}")
|
|
# print(f"\t Visées dupliqués supprimés {duplicate}")
|
|
else :
|
|
print(f"\t Table des SHOT, aucune visée dupliquée")
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête (duplicate_SHOT) code:\033[0m {e}")
|
|
error_count += 1
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour supprimer les visées en de longueur null sur elle même (même départ et arrivée et longueur nulle #
|
|
#####################################################################################################################################
|
|
def issue_SHOT():
|
|
global error_count
|
|
|
|
try:
|
|
cursor.execute(f"""
|
|
SELECT
|
|
SHOT.ID
|
|
--SHOT.FROM_ID,
|
|
--SHOT.TO_ID,
|
|
--SHOT.LENGTH
|
|
FROM SHOT
|
|
WHERE SHOT.FROM_ID = SHOT.TO_ID AND SHOT.LENGTH = 0
|
|
""")
|
|
issue = cursor.fetchall()
|
|
|
|
for row in issue :
|
|
cursor.execute(f"DELETE FROM SHOT WHERE ID = {row[0]}")
|
|
# cursor.execute(f"UPDATE SHOT SET LENGTH = 0.01 WHERE ID = {row[0]}")
|
|
conn.commit()
|
|
|
|
|
|
if len(issue) > 0:
|
|
print(f"\t Table des SHOT, visée(s) bloquante(s), même départ et arrivée, longueur nulle supprimée(s) nbre: {len(issue)}")
|
|
# print(f"\t Visée(s) bloquante(s) supprimée(s) {issue}")
|
|
else :
|
|
print(f"\t Table des SHOT, aucune visée bloquante")
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête (Issue_SHOT) code:\033[0m {e}")
|
|
error_count += 1
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour marquer les stations d'habillage #
|
|
#####################################################################################################################################
|
|
def marquage_visee_station_habillage() :
|
|
global error_count
|
|
|
|
try:
|
|
cursor.execute(f"""
|
|
SELECT
|
|
STATION.ID AS STATION_ID,
|
|
SHOT.ID AS SHOT_ID
|
|
FROM STATION
|
|
JOIN SHOT ON SHOT.TO_ID = STATION.ID
|
|
WHERE STATION.NAME = '.' or STATION.NAME = '-'
|
|
""" )
|
|
|
|
filtre = cursor.fetchall()
|
|
print(f"\t Marquage des visées et des stations d'habillage nbre: {len(filtre)}")
|
|
for row in filtre :
|
|
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = 'hab' WHERE STATION_ID = {row[0]}")
|
|
cursor.execute(f"UPDATE VISEE_FLAG SET SERIE_ID = -1 WHERE SHOT_ID = {row[1]}")
|
|
conn.commit()
|
|
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête (marquage_station_habillage):\033[0m {e}")
|
|
error_count += 1
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour suivre une série jusqu'au prochain départ ou une jonction #
|
|
#####################################################################################################################################
|
|
def suivi_serie( _Current_Serie_ID, bar) :
|
|
global avt_compteur
|
|
global error_count
|
|
|
|
try:
|
|
cursor.execute(f"SELECT * FROM SERIE WHERE SERIE_ID = {_Current_Serie_ID}")
|
|
_Serie = cursor.fetchall()
|
|
|
|
|
|
# if _Current_Serie_ID == 2 or _Current_Serie_ID == 3:
|
|
# print("Debug, a suivre")
|
|
|
|
_Current_Next_Station = 0
|
|
_Current_Shot = 0
|
|
_Current_Nre_Shot = int(_Serie[0][5]) # type: ignore
|
|
_Current_Serie_Lenght = float(_Serie[0][6]) # type: ignore
|
|
_Current_Serie_Lenght_Surface = float(_Serie[0][7]) # type: ignore
|
|
_Current_Serie_Lenght_Duplicate = float(_Serie[0][8]) # type: ignore
|
|
_Direction = int(_Serie[0][9])
|
|
_Current_Ent = int(_Serie[0][10])
|
|
|
|
if _Direction == 0 :
|
|
# print(f"\t\033[34mA gérer séries sans direction station série: {_Current_Serie_ID}\033[0m") # type: ignore
|
|
_Current_Station_ID = int(_Serie[0][2])
|
|
Next_Station_ID_1 = sql_station_depart(_Current_Station_ID)
|
|
Next_Station_ID_2 = sql_station_arrivee(_Current_Station_ID)
|
|
if len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 0: # type: ignore
|
|
# Pas de départ fin et pas d'arrivée : fin de la série
|
|
Next_Station_ID = Next_Station_ID_1
|
|
cursor.execute(f"UPDATE SERIE SET SERIE_NBRE_SHOT = 0 WHERE SERIE_ID = {_Current_Serie_ID};")
|
|
conn.commit()
|
|
return
|
|
elif len(Next_Station_ID_1) == 1 and len(Next_Station_ID_2) == 0: # type: ignore
|
|
# Un départ pas d'arrivée
|
|
Next_Station_ID = Next_Station_ID_1
|
|
Direction = 1
|
|
cursor.execute(f"UPDATE SERIE SET DIRECTION = 1 WHERE SERIE_ID = {_Current_Serie_ID};")
|
|
conn.commit()
|
|
elif len(Next_Station_ID_1) == 0 and len(Next_Station_ID_2) == 1: # type: ignore
|
|
# Une arrivée pas de départ
|
|
Next_Station_ID = Next_Station_ID_2
|
|
_Serie[0][4] = _Serie[0][2]
|
|
Direction = -1
|
|
cursor.execute(f"UPDATE SERIE SET DIRECTION = -1 WHERE SERIE_ID = {_Current_Serie_ID};")
|
|
conn.commit()
|
|
else :
|
|
# A gérer nouvelles séries
|
|
# nouvelles_series(_Current_Station_ID, _Current_Station_ID_Old, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
|
|
# print (f"\033[34m\tA vérifier dans suivi_serie nouvelles séries inverses depuis {_Current_Station_ID} - {Next_Station_ID_2}, {Next_Station_ID_1}\033[0m")
|
|
# nouvelles_series(_Current_Station_ID, _Current_Station_ID_Old, _Current_Serie_ID, -1, _Current_Ent)
|
|
return
|
|
|
|
elif _Direction == -1 :
|
|
_Current_Station_ID = int(_Serie[0][4])
|
|
_Next_Station_ID = sql_station_arrivee(_Current_Station_ID) # type: ignore
|
|
_Current_Nre_Shot = 0
|
|
#print(f"\tDébut suivi serie inverse {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}") # type: ignore
|
|
|
|
_ID_Suite = 0
|
|
_Force = False
|
|
if len(_Next_Station_ID) > 1: # type: ignore
|
|
while int(_Next_Station_ID[_ID_Suite][0]) != int(_Serie[0][2]) : # type: ignore
|
|
_ID_Suite += 1
|
|
if _ID_Suite >= len(_Next_Station_ID): # type: ignore
|
|
# print(f"\t \033[34mA vérifier, pas de suite trouvée à la serie inverse: {_Current_Serie_ID} station:{_Current_Station_ID}, shot: {_Current_Shot}\033[0m")
|
|
#error_count += 1
|
|
return
|
|
|
|
while (len(_Next_Station_ID)==1) or (_Force == False) : # type: ignore
|
|
# if _Current_Station_ID == 12074:
|
|
# print("Debug, a suivre")
|
|
|
|
_Force = True
|
|
_Current_Nre_Shot += 1
|
|
_Current_Serie_Lenght += _Next_Station_ID[_ID_Suite][1] # type: ignore
|
|
_Current_Serie_Lenght_Surface += _Next_Station_ID[_ID_Suite][2] # type: ignore
|
|
_Current_Serie_Lenght_Duplicate += _Next_Station_ID[_ID_Suite][3] # type: ignore
|
|
_Current_Shot= _Next_Station_ID[_ID_Suite][4] # type: ignore
|
|
_Current_Next_Station = int(_Next_Station_ID[_ID_Suite][0]) # type: ignore
|
|
_Current_Old_Station = int(_Current_Station_ID)
|
|
test_jonction(_Current_Next_Station, _Current_Serie_ID, _Current_Ent) # type: ignore
|
|
|
|
cursor.execute(f"""
|
|
UPDATE SERIE SET
|
|
SERIE_DEP_ID = {_Current_Serie_ID},
|
|
STATION_DEP_ID = {_Current_Next_Station},
|
|
SERIE_NBRE_Shot = {_Current_Nre_Shot},
|
|
SERIE_LENGHT = {_Current_Serie_Lenght},
|
|
SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface},
|
|
SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate}
|
|
WHERE SERIE_ID = {_Current_Serie_ID};
|
|
""")
|
|
|
|
|
|
cursor.execute(f"""
|
|
UPDATE JONCTION SET
|
|
STATION_TYPE = 'inv',
|
|
ENTREE_ID = {_Current_Ent},
|
|
SERIE_RANG = {_Current_Nre_Shot},
|
|
SERIE_ID = {_Current_Serie_ID}
|
|
WHERE STATION_ID = {_Current_Station_ID}
|
|
""")
|
|
|
|
cursor.execute(f"""
|
|
UPDATE VISEE_FLAG SET
|
|
SERIE_ID = {_Current_Serie_ID},
|
|
ENTREE_ID = {_Current_Ent},
|
|
SERIE_RANG = {_Current_Nre_Shot}
|
|
WHERE SHOT_ID = {_Current_Shot}
|
|
""")
|
|
|
|
#print(f"\tSuivi serie inverse {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Next station: {_Next_Station_ID}")
|
|
conn.commit()
|
|
depart = sql_station_depart(_Current_Station_ID)
|
|
|
|
if (len(depart) >= 1 ) : # type: ignore
|
|
#print(f'Départ direct à gérer station: {_Current_Station_ID} série: {_Current_Serie_ID}, nbre shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Arrivée(s) {depart}')
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
|
|
|
|
_Current_Station_ID = int(_Next_Station_ID[_ID_Suite][0]) # type: ignore
|
|
_Next_Station_ID = sql_station_arrivee(_Current_Next_Station)
|
|
_ID_Suite = 0
|
|
|
|
cursor.execute(f"""
|
|
UPDATE SERIE SET
|
|
SERIE_DEP_ID = {_Current_Serie_ID},
|
|
STATION_DEP_ID = {_Current_Next_Station},
|
|
SERIE_NBRE_Shot = {_Current_Nre_Shot},
|
|
SERIE_LENGHT = {_Current_Serie_Lenght},
|
|
SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface},
|
|
SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate}
|
|
WHERE SERIE_ID = {_Current_Serie_ID};
|
|
""")
|
|
|
|
# if _Current_Station_ID == 5047 :
|
|
# print("debug point")
|
|
|
|
cursor.execute(f"""
|
|
UPDATE JONCTION SET
|
|
STATION_TYPE = 'arv',
|
|
ENTREE_ID = {_Current_Ent},
|
|
SERIE_RANG = {_Current_Nre_Shot},
|
|
SERIE_ID = {_Current_Serie_ID}
|
|
WHERE STATION_ID = {_Current_Station_ID}
|
|
""")
|
|
|
|
cursor.execute(f"""
|
|
UPDATE VISEE_FLAG SET
|
|
SERIE_ID = {_Current_Serie_ID},
|
|
ENTREE_ID = {_Current_Ent},
|
|
SERIE_RANG = {_Current_Nre_Shot}
|
|
WHERE SHOT_ID = {_Current_Shot}
|
|
""")
|
|
|
|
conn.commit()
|
|
|
|
if _Current_Nre_Shot > 1 :
|
|
avt_compteur = avt_compteur + _Current_Nre_Shot - 1
|
|
bar(_Current_Nre_Shot-1)
|
|
|
|
#_Current_Next_Station = int(_Next_Station_ID[0][0]) # type: ignore
|
|
depart = sql_station_depart(_Current_Station_ID) # type: ignore
|
|
if (len(_Next_Station_ID)==0): # type: ignore
|
|
# fin de la série
|
|
# print (f"\tFin de la série inverse: {_Current_Serie_ID} (pas de suite) station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}")
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
|
|
if (len(depart)>=1):# type: ignore
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
|
|
else :
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
|
|
# print (f"\tFin de la série inverse: {_Current_Serie_ID} station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}")
|
|
if (len(depart)>=1):# type: ignore
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
|
|
elif _Direction == 1 :
|
|
_Current_Station_ID = int(_Serie[0][2])
|
|
_Next_Station_ID = sql_station_depart(_Current_Station_ID)
|
|
_Current_Nre_Shot = 0
|
|
#print(f"\tDébut suivi serie directe {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}")
|
|
|
|
_ID_Suite = 0
|
|
_Force = False
|
|
if len(_Next_Station_ID) > 1: # type: ignore
|
|
while int(_Next_Station_ID[_ID_Suite][0]) != int(_Serie[0][4]) : # type: ignore
|
|
_ID_Suite += 1
|
|
if _ID_Suite >= len(_Next_Station_ID): # type: ignore
|
|
# print(f"\t \033[34mA vérifier, pas de suite trouvée à la serie directe: {_Current_Serie_ID}, Station: {_Current_Station_ID}, shot: {_Current_Shot}\033[0m")
|
|
# error_count += 1
|
|
return
|
|
|
|
while (len(_Next_Station_ID)==1) or (_Force == False) : # type: ignore
|
|
# if _Current_Station_ID == 12074:
|
|
# print("Debug, a suivre")
|
|
_Force = True
|
|
_Current_Nre_Shot += 1
|
|
_Current_Serie_Lenght += _Next_Station_ID[0][1] # type: ignore
|
|
_Current_Serie_Lenght_Surface += _Next_Station_ID[_ID_Suite][2] # type: ignore
|
|
_Current_Serie_Lenght_Duplicate += _Next_Station_ID[_ID_Suite][3] # type: ignore
|
|
_Current_Shot= _Next_Station_ID[_ID_Suite][4] # type: ignore
|
|
_Current_Next_Station = int(_Next_Station_ID[0][0]) # type: ignore
|
|
_Current_Old_Station = int(_Current_Station_ID)
|
|
test_jonction(_Current_Next_Station, _Current_Serie_ID, _Current_Ent) # type: ignore
|
|
|
|
cursor.execute(f"""
|
|
UPDATE SERIE SET
|
|
SERIE_DEP_ID = {_Current_Serie_ID},
|
|
STATION_DEP_ID = {_Current_Next_Station},
|
|
SERIE_NBRE_Shot = {_Current_Nre_Shot},
|
|
SERIE_LENGHT = {_Current_Serie_Lenght},
|
|
SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface},
|
|
SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate}
|
|
WHERE SERIE_ID = {_Current_Serie_ID};
|
|
""")
|
|
|
|
# if _Current_Station_ID == 5035 :
|
|
# print("debug point")
|
|
|
|
|
|
cursor.execute(f"""
|
|
UPDATE JONCTION SET
|
|
STATION_TYPE = 'dir',
|
|
ENTREE_ID = {_Current_Ent},
|
|
SERIE_RANG = {_Current_Nre_Shot},
|
|
SERIE_ID = {_Current_Serie_ID}
|
|
WHERE STATION_ID = {_Current_Station_ID}
|
|
""")
|
|
|
|
cursor.execute(f"""
|
|
UPDATE VISEE_FLAG SET
|
|
SERIE_ID = {_Current_Serie_ID},
|
|
ENTREE_ID = {_Current_Ent},
|
|
SERIE_RANG = {_Current_Nre_Shot}
|
|
WHERE SHOT_ID = {_Current_Shot}
|
|
""")
|
|
|
|
conn.commit()
|
|
#print(f"\tSuivi serie directe {_Current_Serie_ID} Station_ID: {_Current_Station_ID} Nre: {_Current_Nre_Shot} Next station: {_Next_Station_ID}")
|
|
|
|
arrivee = sql_station_arrivee(_Current_Station_ID)
|
|
|
|
if (len(arrivee) >= 1 ) : # type: ignore
|
|
#print(f'Arrivée à gérer station: {_Current_Station_ID} série: {_Current_Serie_ID}, nbre shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}, Arrivée(s) {arrivee}')
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
|
|
|
|
_Current_Station_ID = int(_Next_Station_ID[0][0]) # type: ignore
|
|
_Next_Station_ID = sql_station_depart(_Current_Next_Station)
|
|
_ID_Suite = 0
|
|
|
|
cursor.execute(f"""
|
|
UPDATE SERIE SET
|
|
SERIE_DEP_ID = {_Current_Serie_ID},
|
|
STATION_DEP_ID = {_Current_Next_Station},
|
|
SERIE_NBRE_Shot = {_Current_Nre_Shot},
|
|
SERIE_LENGHT = {_Current_Serie_Lenght},
|
|
SERIE_LENGHT_SURFACE = {_Current_Serie_Lenght_Surface},
|
|
SERIE_LENGHT_DUPLICATE = {_Current_Serie_Lenght_Duplicate}
|
|
WHERE SERIE_ID = {_Current_Serie_ID};
|
|
""") # type: ignore
|
|
|
|
cursor.execute(f"""
|
|
UPDATE JONCTION SET
|
|
STATION_TYPE = 'end',
|
|
ENTREE_ID = {_Current_Ent},
|
|
SERIE_RANG = {_Current_Nre_Shot},
|
|
SERIE_ID = {_Current_Serie_ID}
|
|
WHERE STATION_ID = {_Current_Station_ID}
|
|
""")
|
|
|
|
cursor.execute(f"""
|
|
UPDATE VISEE_FLAG SET
|
|
SERIE_ID = {_Current_Serie_ID},
|
|
ENTREE_ID = {_Current_Ent},
|
|
SERIE_RANG = {_Current_Nre_Shot}
|
|
WHERE SHOT_ID = {_Current_Shot}
|
|
""")
|
|
|
|
conn.commit()
|
|
|
|
if _Current_Nre_Shot > 1:
|
|
avt_compteur = avt_compteur + _Current_Nre_Shot - 1
|
|
bar(_Current_Nre_Shot-1)
|
|
|
|
arrivee = sql_station_arrivee(_Current_Next_Station) # type: ignore
|
|
if (len(_Next_Station_ID)==0): # type: ignore
|
|
# fin de la série
|
|
# print (f"\tFin de la série directe: {_Current_Serie_ID} (pas de suite) à la station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}")
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
|
|
if (len(arrivee)>=1): # type: ignore
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
|
|
else :
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, 1, _Current_Ent) # type: ignore
|
|
# print (f"\tFin de la série directe: {_Current_Serie_ID} station: {_Current_Station_ID}, nbre de shot: {_Current_Nre_Shot}, long: {_Current_Serie_Lenght:.2f}")
|
|
if (len(arrivee)>=1): # type: ignore
|
|
nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, -1, _Current_Ent) # type: ignore
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête de lecture de la série\033[0m {_Current_Serie_ID}\033[91m, suivi_serie:\033[0m {e}")
|
|
error_count += 1
|
|
return
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour créer les nouvelles séries à une station #
|
|
#####################################################################################################################################
|
|
def nouvelles_series(_Current_Station_ID, _Current_Old_Station, _Current_Serie_ID, _DIRECTION, _STATION_ENT_ID) :
|
|
global error_count
|
|
|
|
# if _Current_Serie_ID == 2 or _Current_Serie_ID == 3:
|
|
# print("Debug, a suivre")
|
|
|
|
|
|
try:
|
|
if _DIRECTION == 1:
|
|
_Next_Station = sql_station_depart(_Current_Station_ID)
|
|
# cursor.execute(f"""
|
|
# SELECT SHOT.TO_ID as TO_ID_RESULT
|
|
# FROM SHOT
|
|
# WHERE SHOT.FROM_ID = {_Current_Station_ID}
|
|
# -- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT)
|
|
# """)
|
|
# _Next_Station = cursor.fetchall()
|
|
elif _DIRECTION == -1:
|
|
_Next_Station = sql_station_arrivee(_Current_Station_ID)
|
|
# cursor.execute(f"""
|
|
# SELECT SHOT.FROM_ID as FROM_ID_RESULT
|
|
# FROM SHOT
|
|
# WHERE SHOT.TO_ID = {_Current_Station_ID}
|
|
# -- AND ( SELECT SHOT.FROM_ID FROM SHOT WHERE SHOT.TO_ID = FROM_ID_RESULT )
|
|
# """)
|
|
# _Next_Station = cursor.fetchall()
|
|
|
|
if _Next_Station is None: # type: ignore
|
|
print(f"Pas de série crées à la station: {_Current_Station_ID}")
|
|
return
|
|
# boucle sur liste _Next_Station
|
|
for Depart in _Next_Station: # type: ignore
|
|
if _Current_Old_Station != Depart[0] : # type: ignore
|
|
cursor.execute(f"UPDATE JONCTION SET SERIE_JONC = {_Current_Serie_ID} WHERE STATION_ID = {_Current_Station_ID};")
|
|
cursor.execute(f"UPDATE JONCTION SET ENTREE_ID = {_STATION_ENT_ID} WHERE STATION_ID = {_Current_Station_ID};")
|
|
cursor.execute(f"UPDATE JONCTION SET STATION_JONC = {Depart[0]} WHERE STATION_ID = {_Current_Station_ID};")
|
|
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', _Current_Station_ID))
|
|
if _DIRECTION == 1 :
|
|
cursor.execute(f"""
|
|
INSERT INTO SERIE (
|
|
SERIE_DEP_ID,
|
|
STATION_DEP_ID,
|
|
SERIE_ARR_ID ,
|
|
STATION_ARR_ID,
|
|
SERIE_NBRE_SHOT,
|
|
SERIE_LENGHT,
|
|
SERIE_LENGHT_SURFACE,
|
|
SERIE_LENGHT_DUPLICATE,
|
|
DIRECTION,
|
|
STATION_ENT_ID)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (_Current_Serie_ID, _Current_Station_ID, -1, Depart[0], -1, 0.0, 0.0, 0.0, 1,_STATION_ENT_ID))
|
|
conn.commit()
|
|
# print(f"\tCréation Série directe: {cursor.lastrowid} depuis la station: {_Current_Station_ID} vers {Depart[0]} ")
|
|
elif _DIRECTION == -1 :
|
|
cursor.execute(f"""
|
|
INSERT INTO SERIE (
|
|
SERIE_DEP_ID,
|
|
STATION_DEP_ID,
|
|
SERIE_ARR_ID ,
|
|
STATION_ARR_ID,
|
|
SERIE_NBRE_SHOT,
|
|
SERIE_LENGHT,
|
|
SERIE_LENGHT_SURFACE,
|
|
SERIE_LENGHT_DUPLICATE,
|
|
DIRECTION,
|
|
STATION_ENT_ID)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (-1, Depart[0], _Current_Serie_ID, _Current_Station_ID, -1, 0.0, 0.0, 0.0, -1,_STATION_ENT_ID))
|
|
conn.commit()
|
|
|
|
# print(f"\tCréation Série inv.: {cursor.lastrowid} depuis {Depart[0]} vers la station: {_Current_Station_ID}")
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de la requête (nouvelle_serie):\033[0m {e}")
|
|
error_count += 1
|
|
|
|
return
|
|
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour tester si la station a déjà été lue #
|
|
#####################################################################################################################################
|
|
def test_jonction(station, serie, entree) :
|
|
global error_count
|
|
|
|
try:
|
|
|
|
cursor.execute(f"""
|
|
-- Requête 6: Détection des départs depuis une station (visée directe)
|
|
SELECT
|
|
SHOT.TO_ID as TO_ID_RESULT,
|
|
--JONCTION.STATION_TYPE,
|
|
SHOT.LENGTH,
|
|
CASE
|
|
WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH
|
|
ELSE 0
|
|
END AS LENGTH_SRF,
|
|
CASE
|
|
WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH
|
|
ELSE 0
|
|
END AS LENGTH_DPL,
|
|
SHOT.ID
|
|
-- SHOT_FLAG.FLAG as Type_Flag
|
|
FROM SHOT
|
|
--JOIN JONCTION ON SHOT.TO_ID = JONCTION.STATION_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE SHOT.FROM_ID = {station} -- AND JONCTION.STATION_TYPE IS NULL
|
|
-- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT)
|
|
""")
|
|
depart = cursor.fetchall()
|
|
|
|
cursor.execute(f"""
|
|
-- Requête 7: Détection des arrivées depuis une station (Visée inverse)
|
|
SELECT
|
|
SHOT.FROM_ID as FROM_ID_RESULT,
|
|
--JONCTION.STATION_TYPE,
|
|
SHOT.LENGTH,
|
|
CASE
|
|
WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH
|
|
ELSE 0
|
|
END AS LENGTH_SRF,
|
|
CASE
|
|
WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH
|
|
ELSE 0
|
|
END AS LENGTH_DPL,
|
|
SHOT.ID
|
|
-- SHOT_FLAG.FLAG
|
|
FROM SHOT
|
|
--JOIN JONCTION ON SHOT.FROM_ID = JONCTION.STATION_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE SHOT.TO_ID = {station} --AND JONCTION.STATION_TYPE IS NULL
|
|
--AND ( SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE SHOT.TO_ID = FROM_ID_RESULT
|
|
""")
|
|
arrivee = cursor.fetchall()
|
|
|
|
total = arrivee + depart # type: ignore
|
|
for row in total: # type: ignore
|
|
cursor.execute(f"SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}")
|
|
|
|
|
|
# if row[1] == 5017:
|
|
# print('Debug point')
|
|
|
|
retour = cursor.fetchall()
|
|
val = str(retour[0])
|
|
|
|
if val != '(None,)' :
|
|
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', row[0]))
|
|
cursor.execute(f"SELECT JONCTION.ENTREE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}")
|
|
retour = cursor.fetchall()
|
|
cursor.execute(f"SELECT JONCTION.SERIE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {row[0]}")
|
|
_serie = cursor.fetchall()
|
|
# print (f"\tJonction à proximité de la Station_ID: {row[0]}, retour: {str(val)}, serie {serie} - {_serie[0][0]}, entrée {entree} - {retour[0][0]}")
|
|
if (retour[0][0] != entree) and (retour[0][0] != None) :
|
|
print (f"\033[36m\t Jonction à la Station_ID: {row[0]} entre les entrées {entree} et {retour[0][0]}\033[0m")
|
|
cursor.execute(f"INSERT INTO RESEAU ( STATION_JONC, ENT_1, ENT_2) VALUES (?, ?, ?)", (row[0], entree, retour[0][0]))
|
|
conn.commit()
|
|
# if _serie[0][0] != serie and (_serie[0][0] != None):
|
|
# print (f"\033[34m\tJonction à la Station_ID: {row[0]} entre les series {serie} et {_serie[0][0]}\033[0m")
|
|
|
|
|
|
cursor.execute(f"SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE JONCTION.STATION_ID = {station}")
|
|
retour = cursor.fetchall()
|
|
val = str(retour[0])
|
|
|
|
if val == '(None,)' :
|
|
#print (f"\tPas de jonction à la Station_ID: {station}, retour: {val}")
|
|
return False
|
|
else :
|
|
cursor.execute(f"UPDATE JONCTION SET STATION_TYPE = ? WHERE id = ?", ('jon', station))
|
|
cursor.execute(f"SELECT JONCTION.ENTREE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {station}")
|
|
retour = cursor.fetchall()
|
|
cursor.execute(f"SELECT JONCTION.SERIE_ID FROM JONCTION WHERE JONCTION.STATION_ID = {station}")
|
|
_serie = cursor.fetchall()
|
|
# print (f"\tJonction à proximité de la Station_ID: {row[0]}, retour: {str(val)}, serie {serie} - {_serie[0][0]}, entrée {entree} - {retour[0][0]}")
|
|
if retour[0][0] != entree :
|
|
print (f"\033[0m\t Jonction à la Station_ID: {station} entre les entrées {entree} et {retour[0][0]}\033[0m")
|
|
if _serie[0][0] != serie :
|
|
print (f"\033[0m\t Jonction à la Station_ID: {station} entre les series {serie} et {_serie[0][0]}\033[0m")
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requêtes (test_jonction): {e}\033[0m")
|
|
error_count += 1
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Fonction pour exécuter une requête et sauvegarder les résultats dans un fichier texte #
|
|
# #
|
|
#####################################################################################################################################
|
|
def calcul_stats(output_file):
|
|
global error_count
|
|
global _largeurCol
|
|
global _largeurColTete
|
|
|
|
try:
|
|
print(f"\033[1;32mPhase 5: Écriture des statistiques dans \033[0m{output_file}")
|
|
# Enregistrement des résultats dans un fichier texte
|
|
output_file_ligne =[]
|
|
|
|
for i in range(9): output_file_ligne.append(titre[i].ljust(90)+"*\n")
|
|
|
|
sql_query1 = ("""
|
|
Select
|
|
round(sum(LENGTH), 2) as Lg,
|
|
round(sum(DUPLICATE_LENGTH),2) as Duplicate,
|
|
round(sum(SURFACE_LENGTH),2) as Surface
|
|
-- round(sum(LENGTH) + sum(DUPLICATE_LENGTH), 2) as Total
|
|
from CENTRELINE length
|
|
""")
|
|
|
|
cursor.execute(sql_query1)
|
|
results = cursor.fetchall()
|
|
vide ="-".ljust(_largeurCol)
|
|
|
|
# output_file_ligne.append(f"Développement total centerline:\t{"{:.2f}".format(results[0][0]).ljust(_largeurCol)}\t{"{:.2f}".format(results[0][1]).ljust(_largeurCol)}\t{vide}\t{vide}\t{vide}\tdev.(m), dupl.(m)\n")
|
|
#print('Développement total: ' + formatted_row + 'm')
|
|
|
|
output_file_ligne.append(f"Développement total centerline:\t%s\t%s\t%s\t%s\t%s\tDev.(m), Dupl.(m), Surf.(m)\n" %(str("{:.2f}".format(results[0][0]).ljust(_largeurCol)),
|
|
str("{:.2f}".format(results[0][1]).ljust(_largeurCol)),
|
|
str("{:.2f}".format(results[0][2]).ljust(_largeurCol)),
|
|
str(vide), str(vide)))
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) AS nbre FROM JONCTION WHERE STATION_TYPE IS NULL")
|
|
_compteur = cursor.fetchall()
|
|
compteur = int(_compteur[0][0])
|
|
|
|
if compteur > 0 : # type: ignore
|
|
output_file_ligne.append(f"Attention, {compteur} station(s) non comptabilisée(s) et raccordée(s)\n")
|
|
|
|
|
|
results=sql_bilan_reseaux()
|
|
if results[0][0] != None :# type: ignore
|
|
output_file_ligne.append('\nDéveloppement total par réseaux\n')
|
|
for row in results: # type: ignore
|
|
formatted_row = '\t'.join(map(str, row))
|
|
output_file_ligne.append('\t' + formatted_row + '\n')
|
|
#print('Développement total: ' + formatted_row + 'm')
|
|
|
|
results=sql_bilan_annee()
|
|
if results[0][0] != None :# type: ignore
|
|
output_file_ligne.append('\nDéveloppement total topographié par année(s)\n')
|
|
for row in results: # type: ignore
|
|
formatted_row = '\t'.join(map(str, row))
|
|
output_file_ligne.append('\t' + formatted_row + '\n')
|
|
#print('Développement total: ' + formatted_row + 'm')
|
|
|
|
Rose(output_file_name_rose)
|
|
|
|
Shot_lengths_histogram(output_file_name_histo)
|
|
|
|
|
|
|
|
findetraitement = datetime.now()
|
|
|
|
duree = findetraitement - maintenant
|
|
jours, secondes = divmod(duree.seconds, 86400) # 86400 secondes dans une journée
|
|
heures, secondes = divmod(secondes, 3600) # 3600 secondes dans une heure
|
|
minutes, secondes = divmod(secondes, 60) # 60 secondes dans une minute
|
|
if duree.seconds > 3600:
|
|
duree_formatee = "{:02}:{:02}:{:02}(s)".format(heures, minutes, secondes)
|
|
elif duree.seconds > 60:
|
|
duree_formatee = "{:02}:{:02}(s)".format(minutes, secondes)
|
|
else :
|
|
duree_formatee = "{:02}(s)".format(secondes)
|
|
|
|
if error_count == 0:
|
|
output_file_ligne[7] = "* Durée calcul: " + duree_formatee + " sans erreur"
|
|
output_file_ligne[7] = output_file_ligne[7].ljust(90)+"*\n"
|
|
else :
|
|
output_file_ligne[7] = "* Durée calcul: " + duree_formatee + " avec erreur(s): " + str(error_count)
|
|
output_file_ligne[7] = output_file_ligne[7].ljust(90)+"*\n"
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as file:
|
|
file.writelines(output_file_ligne)
|
|
|
|
if error_count == 0 :
|
|
print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee + f"\033[1;32m, résultats enregistrés dans \033[0m{output_file}")
|
|
else :
|
|
print(f"\033[1;32mPhase 6: Fin de traitement en \033[0m" + duree_formatee
|
|
+ f",\033[91m avec \033[0m{error_count}\033[91m erreur(s), \033[1;32mrésultats enregistrés dans \033[0m{output_file}")
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution des requêtes calcul_stats:\033[0m {e}")
|
|
error_count += 1
|
|
output_file_ligne.append(f"Erreur lors de l'exécution des requêtes calcul_stats: {e}\n")
|
|
with open(output_file, 'w', encoding='utf-8') as file:
|
|
file.writelines(output_file_ligne)
|
|
|
|
except FileNotFoundError:
|
|
print(f"\033[91mErreur d'ouverture du fichier: \033[0m{output_file} ")
|
|
error_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"\033[91mErreur lors de l'exécution de calcul_stats:\033[0m {e}")
|
|
error_count += 1
|
|
output_file_ligne.append(f"Erreur lors de l'exécution de calcul_stats: {e}\n")
|
|
with open(output_file, 'w', encoding='utf-8') as file:
|
|
file.writelines(output_file_ligne)
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# # Requête : Table des entrées (Liste des entrées avec coordonnées) #
|
|
#####################################################################################################################################
|
|
def sql_liste_entree():
|
|
global error_count
|
|
|
|
sql_query= ("""
|
|
select
|
|
STATION.ID,
|
|
STATION.NAME,
|
|
/*SURVEY.NAME, SURVEY.PARENT_ID, SURVEY.FULL_NAME, SURVEY.TITLE,*/
|
|
round(STATION.X, 1),
|
|
round(STATION.Y, 1),
|
|
round(STATION.Z, 1)
|
|
/*, STATION_FLAG.FLAG , count(STATION.NAME) AS Nombre_Occurrences */
|
|
from STATION
|
|
join STATION_FLAG on STATION_FLAG.STATION_ID = STATION.ID
|
|
join SURVEY on SURVEY.ID = STATION.SURVEY_ID
|
|
where STATION_FLAG.FLAG='ent' or STATION_FLAG.FLAG='fix' --and STATION.ID = 28548
|
|
group by STATION.NAME , STATION.Y, STATION.Z
|
|
order by STATION.NAME ASC
|
|
""")
|
|
|
|
# sql_query2 = ("""
|
|
# select
|
|
# STATION.ID,
|
|
# STATION.NAME,
|
|
# /*SURVEY.NAME, SURVEY.PARENT_ID, SURVEY.FULL_NAME, SURVEY.TITLE,*/
|
|
# round(STATION.X, 1),
|
|
# round(STATION.Y, 1),
|
|
# round(STATION.Z, 1)
|
|
# /*, STATION_FLAG.FLAG , count(STATION.NAME) AS Nombre_Occurrences */
|
|
# from STATION
|
|
# join STATION_FLAG on STATION_FLAG.STATION_ID = STATION.ID
|
|
# join SURVEY on SURVEY.ID = STATION.SURVEY_ID
|
|
# where STATION_FLAG.FLAG='fix' --and STATION.ID = 28548
|
|
# --group by STATION.NAME
|
|
# order by STATION.NAME ASC
|
|
# """)
|
|
try:
|
|
cursor.execute(sql_query)
|
|
result_ent = cursor.fetchall()
|
|
# cursor.execute(sql_query)
|
|
# result_fix = cursor.fetchall()
|
|
if len(result_ent) == 0 :
|
|
error_count += 1
|
|
print(f"\t \033[91mAttention aucune entrée ou point fix comptabilisé\033[0m")
|
|
else :
|
|
print(f"\t Table des STATION, entrée et fix nbre: {len(result_ent)}")
|
|
|
|
return result_ent
|
|
|
|
# if len(result_ent) == 0:
|
|
# print(f"\033[91mPas d'entrées\033[0m")
|
|
# if len(result_fix) == 0:
|
|
# print(f"\033[91mPas de points fixes\033[0m")
|
|
# return None
|
|
# else :
|
|
# print(f"\tTable des STATION, point fixe nbre: {len(result_fix)}")
|
|
# return result_fix
|
|
# elif len(result_ent) == len(result_fix) :
|
|
# print(f"\tTable des STATION, entrée nbre: {len(result_ent)}")
|
|
# # print(f"\tTable des STATION, point fixe nbre: {len(result_fix)}")
|
|
# return result_ent
|
|
# elif len(result_ent) > len(result_fix) :
|
|
# print(f"\033[91mA gérer Points fixes > entrées, traitement uniquement des entrées\033[0m")
|
|
# return result_ent
|
|
# elif len(result_ent) < len(result_fix) :
|
|
# print(f"\033[91mA gérer Points fixes < entrées, traitement uniquement des points fixes\033[0m")
|
|
# return result_fix
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête 4 (sql_liste_entree):\033[0m {e}")
|
|
error_count += 1
|
|
return None
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# # Requête : Table des séries vides #
|
|
#####################################################################################################################################
|
|
def sql_serie_vides():
|
|
global error_count
|
|
|
|
sql_query5 = ("""
|
|
SELECT *
|
|
FROM SERIE
|
|
WHERE SERIE.SERIE_NBRE_SHOT = -1""")
|
|
try:
|
|
cursor.execute(sql_query5) # Exécution de la requête SQL
|
|
retour = cursor.fetchall()
|
|
# if len(retour) == 0 :
|
|
# print(f"Aucune séries vides {len(retour)}")
|
|
return retour
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête (sql_serie_vides): {e}\033[0m")
|
|
error_count += 1
|
|
return None
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# # Requête: From_To (recherche si il y a un départ dans le sens From vers To depuis la station Current_Station_ID) #
|
|
#####################################################################################################################################
|
|
def sql_station_depart(station):
|
|
global error_count
|
|
|
|
try:
|
|
cursor.execute(f"""
|
|
-- Requête 6: Détection des départs depuis une station (visée directe)
|
|
SELECT
|
|
SHOT.TO_ID as TO_ID_RESULT,
|
|
--JONCTION.STATION_TYPE,
|
|
SHOT.LENGTH,
|
|
CASE
|
|
WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH
|
|
ELSE 0
|
|
END AS LENGTH_SRF,
|
|
CASE
|
|
WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH
|
|
ELSE 0
|
|
END AS LENGTH_DPL,
|
|
-- SHOT_FLAG.FLAG as Type_Flag
|
|
SHOT.ID
|
|
FROM SHOT
|
|
JOIN JONCTION ON SHOT.TO_ID = JONCTION.STATION_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE SHOT.FROM_ID = {station} AND JONCTION.STATION_TYPE IS NULL
|
|
-- AND ( SELECT SHOT.TO_ID FROM SHOT WHERE SHOT.FROM_ID = TO_ID_RESULT)
|
|
""")
|
|
retour = cursor.fetchall()
|
|
# if len(retour) == 0 : print(f"\tAucun départ depuis la station: {station}")
|
|
return retour
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête 6 (sql_station_depart) code:\033[0m {e}")
|
|
error_count += 1
|
|
return None
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# # Requête : To_From (recherche si il y a un départ dans le sens To vers From depuis la station Current_Station_ID) #
|
|
#####################################################################################################################################
|
|
def sql_station_arrivee(station):
|
|
global error_count
|
|
|
|
try:
|
|
cursor.execute(f"""
|
|
-- Requête 7: Détection des arrivées depuis une station (Visée inverse)
|
|
SELECT
|
|
SHOT.FROM_ID as FROM_ID_RESULT,
|
|
--JONCTION.STATION_TYPE,
|
|
SHOT.LENGTH,
|
|
CASE
|
|
WHEN SHOT_FLAG.FLAG = 'srf' THEN SHOT.LENGTH
|
|
ELSE 0
|
|
END AS LENGTH_SRF,
|
|
CASE
|
|
WHEN SHOT_FLAG.FLAG = 'dpl' THEN SHOT.LENGTH
|
|
ELSE 0
|
|
END AS LENGTH_DPL,
|
|
SHOT.ID
|
|
-- SHOT_FLAG.FLAG
|
|
FROM SHOT
|
|
JOIN JONCTION ON SHOT.FROM_ID = JONCTION.STATION_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE SHOT.TO_ID = {station} AND JONCTION.STATION_TYPE IS NULL
|
|
--AND ( SELECT JONCTION.STATION_TYPE FROM JONCTION WHERE SHOT.TO_ID = FROM_ID_RESULT
|
|
""")
|
|
retour = cursor.fetchall()
|
|
# if len(retour) == 0 print(f"\tAucune arrivée depuis la station: {station}")
|
|
return retour
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête 7 (sql_station_arrivee) code:\033[0m {e}")
|
|
error_count += 1
|
|
return None
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# #-- Bilan table série #
|
|
#####################################################################################################################################
|
|
def sql_bilan_serie():
|
|
global error_count
|
|
|
|
try:
|
|
cursor.execute(f"""
|
|
-- Bilan table série
|
|
select
|
|
--STATION.NAME,
|
|
-- sum(SERIE.SERIE_LENGHT) as Total,
|
|
round(sum(SERIE.SERIE_LENGHT) - sum(SERIE.SERIE_LENGHT_SURFACE)- sum(SERIE.SERIE_LENGHT_DUPLICATE), 2) as Long,
|
|
round(sum(SERIE.SERIE_LENGHT_DUPLICATE),2) as Duplicate,
|
|
round(sum(SERIE.SERIE_LENGHT_SURFACE),2) as Surface,
|
|
sum(SERIE.SERIE_NBRE_SHOT) as Nbre_Shot,
|
|
COUNT(*) AS Nbre_serie
|
|
FROM SERIE
|
|
JOIN STATION ON SERIE.STATION_ENT_ID = STATION.ID
|
|
""")
|
|
retour = cursor.fetchall()
|
|
return retour
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête 11 (sql_bilan_serie):\033[0m {e}")
|
|
error_count += 1
|
|
return None
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# #-- Bilan table série By Réseaux #
|
|
#####################################################################################################################################
|
|
def sql_bilan_reseaux():
|
|
global error_count
|
|
global _largeurCol
|
|
global _largeurColTete
|
|
retour = []
|
|
|
|
try:
|
|
|
|
|
|
###############################################################################################################
|
|
# Liste des réseaux
|
|
###############################################################################################################
|
|
|
|
cursor.execute(f"""
|
|
-- Bilan table série By Réseaux
|
|
select
|
|
--SURVEY_RESEAU.TITLE as Réseau,
|
|
--SURVEY_RESEAU.NAME,
|
|
--SURVEY_RESEAU.ID,
|
|
RESEAU_ID,
|
|
--STATION.NAME as Nom,
|
|
-- sum(SERIE.SERIE_LENGHT) as Total,
|
|
round(sum(SERIE.SERIE_LENGHT) - sum(SERIE.SERIE_LENGHT_SURFACE)- sum(SERIE.SERIE_LENGHT_DUPLICATE), 2) as Long,
|
|
round(sum(SERIE.SERIE_LENGHT_DUPLICATE),2) as Duplicate,
|
|
round(sum(SERIE.SERIE_LENGHT_SURFACE),2) as Surface,
|
|
sum(SERIE.SERIE_NBRE_SHOT) as Nbre_Shot
|
|
--COUNT(*) AS Nbre_serie
|
|
--round(max(STATION.Z),2) as Max_Z,
|
|
--round(min(STATION.Z),2) as Min_Z,
|
|
--max(STATION.Z) - min(STATION.Z) as Delta_Z
|
|
FROM SERIE
|
|
JOIN STATION ON SERIE.STATION_ENT_ID = STATION.ID
|
|
JOIN SURVEY AS SURVEY_JONCTION ON STATION.SURVEY_ID = SURVEY_JONCTION.ID
|
|
JOIN SURVEY AS SURVEY_RESEAU ON SURVEY_JONCTION.PARENT_ID = SURVEY_RESEAU.ID
|
|
WHERE RESEAU_ID is not NULL and RESEAU_ID !=0
|
|
GROUP BY SERIE.RESEAU_ID
|
|
ORDER BY Long DESC
|
|
""")
|
|
result = cursor.fetchall()
|
|
|
|
if len(result) >0 :
|
|
# _ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
|
|
# _ligne[0]= 'Aucun réseau'.ljust(_largeurColTete)
|
|
# retour.append(_ligne)
|
|
# return retour
|
|
|
|
for row in result:
|
|
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
|
|
|
|
result_shot = 0
|
|
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
from VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null and VISEE_FLAG.RESEAU_ID ={row[0]}
|
|
""")
|
|
_result_length = cursor.fetchall()
|
|
result_length = float(_result_length[0][0])
|
|
result_shot = int(_result_length[0][1])
|
|
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
from VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl' and VISEE_FLAG.RESEAU_ID ={row[0]}
|
|
""")
|
|
|
|
_result_length_dpl = cursor.fetchall()
|
|
result_length_dpl = float(_result_length_dpl[0][0])
|
|
result_shot += int(_result_length_dpl[0][1])
|
|
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
from VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf' and VISEE_FLAG.RESEAU_ID ={row[0]}
|
|
""")
|
|
|
|
_result_length_srf = cursor.fetchall()
|
|
result_length_srf = float(_result_length_srf[0][0])
|
|
result_shot += int(_result_length_srf[0][1])
|
|
|
|
|
|
# ligne = [ 'none', 0, 0, 0, 0, 0, 0, 'none', 0, 'none', 0, 0]
|
|
cursor.execute(f"""
|
|
-- Liste des entrée dans la table RESEAU
|
|
SELECT
|
|
--RESEAU.ENT_1 AS ENT_ID,
|
|
--RESEAU.RESEAU_ID,
|
|
STATION.NAME
|
|
--STATION.Z
|
|
FROM RESEAU
|
|
JOIN STATION ON RESEAU.ENT_1 = STATION.ID
|
|
WHERE RESEAU_ID = {row[0]}
|
|
UNION --ALL
|
|
SELECT
|
|
--RESEAU.ENT_2 AS ENT_ID,
|
|
--RESEAU.RESEAU_ID,
|
|
STATION.NAME
|
|
--STATION.Z
|
|
FROM RESEAU
|
|
JOIN STATION ON RESEAU.ENT_2 = STATION.ID
|
|
WHERE RESEAU_ID = {row[0]}
|
|
GROUP BY STATION.NAME
|
|
ORDER BY STATION.NAME
|
|
--ORDER BY STATION.Z DESC
|
|
""")
|
|
liste_entree = cursor.fetchall()
|
|
|
|
|
|
_liste_ent = liste_entree[0][0]
|
|
|
|
index = 1
|
|
while index < len(liste_entree):
|
|
_liste_ent += ", " + liste_entree[index][0]
|
|
index += 1
|
|
|
|
if len(_liste_ent) > _largeurColTete :
|
|
_largeurColTete = len(_liste_ent) + 2
|
|
|
|
|
|
ligne[0] =_liste_ent.ljust(_largeurColTete) # Liste Entrées
|
|
ligne[1] = str(len(liste_entree)) # Nre Ent.
|
|
ligne[2] = str("{:.2f}".format(result_length)) # Dev.
|
|
ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl.
|
|
ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf.
|
|
ligne[6] = str(result_shot) # Visées
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point bas d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Min
|
|
from STATION
|
|
join (
|
|
select Min(STATION.Z) as Val_Min
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
WHERE JONCTION.RESEAU_ID = {row[0]}
|
|
) min on STATION.Z = min.Val_Min
|
|
LIMIT 1
|
|
""")
|
|
altitude_min = cursor.fetchall()
|
|
|
|
ligne[7] = altitude_min[0][0]
|
|
ligne[8] = str("{:.2f}".format(altitude_min[0][1]))
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point haut d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Max
|
|
from STATION
|
|
join (
|
|
select Max(STATION.Z) as Val_Max
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
WHERE JONCTION.RESEAU_ID = {row[0]}
|
|
) max on STATION.Z = max.Val_Max
|
|
LIMIT 1
|
|
""")
|
|
altitude_max = cursor.fetchall()
|
|
|
|
ligne[9] = altitude_max[0][0]
|
|
ligne[10] = str("{:.2f}".format(altitude_max[0][1]))
|
|
ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
|
|
|
|
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
|
|
retour.append(ligne)
|
|
|
|
# print(f"Reseau num {row[0]}, {len(liste_entree)} entrée(s): {_liste_ent} ")
|
|
|
|
|
|
###############################################################################################################
|
|
# Liste des visées non raccordées
|
|
###############################################################################################################
|
|
cursor.execute(f"""
|
|
SELECT
|
|
sum (SHOT.LENGTH) as Long
|
|
FROM VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
|
|
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
|
|
WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG is NULL
|
|
""")
|
|
result_long = cursor.fetchall()
|
|
|
|
if result_long[0][0] is None :
|
|
_result_long = 0.0
|
|
else :
|
|
_result_long = result_long[0][0]
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
sum (SHOT.LENGTH) as Long
|
|
FROM VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
|
|
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
|
|
WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG ='dpl'
|
|
""")
|
|
result_dpl = cursor.fetchall()
|
|
|
|
if result_dpl[0][0] is None :
|
|
_result_dpl = 0.0
|
|
else :
|
|
_result_dpl = result_dpl[0][0]
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
sum (SHOT.LENGTH) as Long
|
|
FROM VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
|
|
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
|
|
WHERE VISEE_FLAG.SERIE_ID is NULL and SHOT_FLAG.FLAG ='srf'
|
|
""")
|
|
result_srf = cursor.fetchall()
|
|
if result_srf[0][0] is None :
|
|
_result_srf = 0.0
|
|
else :
|
|
_result_srf = result_srf[0][0]
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
count (SHOT.LENGTH) as Long
|
|
FROM VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_FROM ON SHOT.FROM_ID = STATION_FROM.ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
JOIN JONCTION AS JONCTION_FROM ON SHOT.FROM_ID = JONCTION_FROM.ID
|
|
JOIN JONCTION AS JONCTION_TO ON SHOT.TO_ID = JONCTION_TO.ID
|
|
WHERE VISEE_FLAG.SERIE_ID is NULL --and SHOT_FLAG.FLAG ='srf'
|
|
""")
|
|
result_count = cursor.fetchall()
|
|
|
|
if result_count[0][0] is None :
|
|
_result_count = 0.0
|
|
else :
|
|
_result_count = result_count[0][0]
|
|
|
|
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
|
|
_liste_ent = "Visée(s) non raccordées"
|
|
_liste_ent = _liste_ent.ljust(_largeurColTete)
|
|
ligne[0] = _liste_ent
|
|
ligne[1] = str("0")
|
|
ligne[2] = str("{:.2f}".format(_result_long))
|
|
ligne[4] = str("{:.2f}".format(_result_dpl))
|
|
ligne[5] = str("{:.2f}".format(_result_srf))
|
|
ligne[6] = str(_result_count)
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point bas d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Min
|
|
from STATION
|
|
join (
|
|
select Min(STATION.Z) as Val_Min
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
WHERE JONCTION.SERIE_ID is null
|
|
) min on STATION.Z = min.Val_Min
|
|
LIMIT 1
|
|
""")
|
|
altitude_min = cursor.fetchall()
|
|
|
|
if len(altitude_min) == 0 :
|
|
_altitude_min = 0.0
|
|
_altitude_min_name = 'None'
|
|
else :
|
|
_altitude_min = altitude_min[0][1]
|
|
_altitude_min_name = str(altitude_min[0][0])
|
|
|
|
ligne[7] = str(_altitude_min_name)
|
|
ligne[8] = str("{:.2f}".format(_altitude_min))
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point haut d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Max
|
|
from STATION
|
|
join (
|
|
select Max(STATION.Z) as Val_Max
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
WHERE JONCTION.SERIE_ID is null
|
|
) max on STATION.Z = max.Val_Max
|
|
LIMIT 1
|
|
""")
|
|
altitude_max = cursor.fetchall()
|
|
|
|
if len(altitude_max) == 0 :
|
|
_altitude_max = 0.0
|
|
_altitude_max_name = 'None'
|
|
else :
|
|
_altitude_max = altitude_max[0][1]
|
|
_altitude_max_name = str(altitude_max[0][0])
|
|
|
|
ligne[7] = str(_altitude_max_name)
|
|
ligne[8] = str("{:.2f}".format(_altitude_max))
|
|
ligne[3] = "-" #"{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
|
|
|
|
|
|
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
|
|
if _result_long !=0 or _result_dpl != 0 or _result_srf !=0 or _result_count !=0:
|
|
retour.append(ligne)
|
|
|
|
###############################################################################################################
|
|
# Totaux
|
|
###############################################################################################################
|
|
|
|
result_shot = 0
|
|
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
from VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null
|
|
""")
|
|
_result_length = cursor.fetchall()
|
|
|
|
result_length = float(_result_length[0][0])
|
|
result_shot = int(_result_length[0][1])
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
FROM VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl'
|
|
""")
|
|
_result_length_dpl = cursor.fetchall()
|
|
|
|
result_length_dpl = float(_result_length_dpl[0][0])
|
|
result_shot += int(_result_length_dpl[0][1])
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
FROM VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf'
|
|
""")
|
|
_result_length_srf = cursor.fetchall()
|
|
|
|
result_length_srf = float(_result_length_srf[0][0])
|
|
result_shot += int(_result_length_srf[0][1])
|
|
|
|
cursor.execute(f"""
|
|
-- Bilan table VISEE_FLAG By entrées
|
|
select
|
|
ENTREE_ID
|
|
--STATION.NAME
|
|
FROM VISEE_FLAG
|
|
JOIN STATION ON VISEE_FLAG.ENTREE_ID = STATION.ID
|
|
WHERE SERIE_ID >0
|
|
GROUP BY VISEE_FLAG.ENTREE_ID
|
|
""")
|
|
_result_entrees = cursor.fetchall()
|
|
|
|
_total_entrees_topo = len(_result_entrees)
|
|
|
|
result2 = sql_liste_entree()
|
|
_total_entrees_non_topo = len(result2) - _total_entrees_topo # type: ignore
|
|
|
|
|
|
if _result_length[0][1] != None :
|
|
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
|
|
|
|
ligne[0] = "Totaux (entrées et points fixes)".ljust(_largeurColTete) # Liste Entrées
|
|
ligne[1] = str("{:.0f}".format(len(result2))) # type: ignore # Nre Ent.
|
|
ligne[2] = str("{:.2f}".format(result_length)) # Dev.
|
|
ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl.
|
|
ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf.
|
|
ligne[6] = str(result_shot) # Visées
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point bas d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Min
|
|
from STATION
|
|
join (
|
|
select Min(STATION.Z) as Val_Min
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
) min on STATION.Z = min.Val_Min
|
|
LIMIT 1
|
|
""")
|
|
altitude_min = cursor.fetchall()
|
|
|
|
ligne[7] = altitude_min[0][0]
|
|
ligne[8] = str(altitude_min[0][1])
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point haut d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Max
|
|
from STATION
|
|
join (
|
|
select Max(STATION.Z) as Val_Max
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
) max on STATION.Z = max.Val_Max
|
|
LIMIT 1
|
|
""")
|
|
altitude_max = cursor.fetchall()
|
|
|
|
ligne[9] = altitude_max[0][0]
|
|
ligne[10] = str("{:.2f}".format(altitude_max[0][1]))
|
|
ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
|
|
|
|
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
|
|
retour.append(ligne)
|
|
else :
|
|
_total_entrees_non_topo = 0
|
|
_total_entrees_topo = 0
|
|
|
|
|
|
###############################################################################################################
|
|
# Liste des entrées uniques
|
|
###############################################################################################################
|
|
cursor.execute(f"""
|
|
-- Bilan table VISEE_FLAG By entrées
|
|
select
|
|
ENTREE_ID,
|
|
STATION.NAME
|
|
FROM VISEE_FLAG
|
|
JOIN STATION ON VISEE_FLAG.ENTREE_ID = STATION.ID
|
|
WHERE RESEAU_ID ==0 or RESEAU_ID is null and SERIE_ID >0
|
|
GROUP BY VISEE_FLAG.ENTREE_ID
|
|
""")
|
|
result = cursor.fetchall()
|
|
|
|
for row in result :
|
|
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
|
|
|
|
result_shot = 0
|
|
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
from VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG is null and VISEE_FLAG.ENTREE_ID ={row[0]}
|
|
""")
|
|
_result_length = cursor.fetchall()
|
|
|
|
result_length = float(_result_length[0][0])
|
|
result_shot = int(_result_length[0][1])
|
|
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
from VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='dpl' and VISEE_FLAG.ENTREE_ID ={row[0]}
|
|
""")
|
|
_result_length_dpl = cursor.fetchall()
|
|
|
|
result_length_dpl = float(_result_length_dpl[0][0])
|
|
result_shot += int(_result_length_dpl[0][1])
|
|
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE(round(sum(SHOT.length), 2), 0) as ttl,
|
|
count(VISEE_FLAG.ID) as count,
|
|
STATION_TO.NAME as To_Name
|
|
from VISEE_FLAG
|
|
JOIN SHOT ON SHOT.ID = VISEE_FLAG.SHOT_ID
|
|
JOIN STATION AS STATION_TO ON SHOT.TO_ID = STATION_TO.ID
|
|
LEFT JOIN SHOT_FLAG ON SHOT.ID = SHOT_FLAG.SHOT_ID
|
|
WHERE To_Name!='.' AND To_Name!='-' AND SHOT_FLAG.FLAG ='srf' and VISEE_FLAG.ENTREE_ID ={row[0]}
|
|
""")
|
|
_result_length_srf = cursor.fetchall()
|
|
|
|
result_length_srf = float(_result_length_srf[0][0])
|
|
result_shot += int(_result_length_srf[0][1])
|
|
|
|
if result_length_srf == 0.0 and result_length == 0.0 and result_length_dpl == 0.0 :
|
|
_total_entrees_non_topo+=1
|
|
else :
|
|
ligne[0] = str((row[1])).ljust(_largeurColTete)
|
|
ligne[1] = str("1")
|
|
ligne[2] = str("{:.2f}".format(result_length)) # Dev.
|
|
ligne[4] = str("{:.2f}".format(result_length_dpl)) # Dupl.
|
|
ligne[5] = str("{:.2f}".format(result_length_srf)) # Surf.
|
|
ligne[6] = str(result_shot) # Visées
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point bas d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Min
|
|
from STATION
|
|
join (
|
|
select Min(STATION.Z) as Val_Min
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
WHERE JONCTION.ENTREE_ID = {row[0]}
|
|
) min on STATION.Z = min.Val_Min
|
|
LIMIT 1
|
|
""")
|
|
altitude_min = cursor.fetchall()
|
|
|
|
ligne[7] = altitude_min[0][0]
|
|
ligne[8] = str("{:.2f}".format(altitude_min[0][1]))
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point haut d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Max
|
|
from STATION
|
|
join (
|
|
select Max(STATION.Z) as Val_Max
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
WHERE JONCTION.ENTREE_ID = {row[0]}
|
|
) max on STATION.Z = max.Val_Max
|
|
LIMIT 1
|
|
""")
|
|
altitude_max = cursor.fetchall()
|
|
|
|
ligne[9] = altitude_max[0][0]
|
|
ligne[10] = str("{:.2f}".format(altitude_max[0][1]))
|
|
ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
|
|
|
|
|
|
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
|
|
retour.append(ligne)
|
|
|
|
###############################################################################################################
|
|
# Entrées sans topo
|
|
###############################################################################################################
|
|
|
|
ligne = [ ' - ', " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - ", " - "]
|
|
|
|
if _total_entrees_non_topo >=1 :
|
|
ligne[0] = "Entrée(s) sans topographie".ljust(_largeurColTete)
|
|
ligne[1] = str(_total_entrees_non_topo)
|
|
ligne[2] = "0.00"
|
|
ligne[3] = "0.00"
|
|
ligne[4] = "0.00"
|
|
ligne[5] = "0.00"
|
|
ligne[6] = "0"
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le points bas d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Min
|
|
from STATION
|
|
join (
|
|
select Min(STATION.Z) as Val_Min
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
WHERE JONCTION.SERIE_ENT = -1 AND JONCTION.STATION_TYPE = 'ent'
|
|
) min on STATION.Z = min.Val_Min
|
|
LIMIT 1
|
|
""")
|
|
altitude_min = cursor.fetchall()
|
|
|
|
ligne[7] = altitude_min[0][0]
|
|
ligne[8] = str(altitude_min[0][1])
|
|
|
|
cursor.execute(f"""
|
|
-- Requête pour rechercher le point haut d'un réseau / entrée
|
|
SELECT
|
|
STATION.name,
|
|
STATION.Z as Max
|
|
from STATION
|
|
join (
|
|
select Max(STATION.Z) as Val_Max
|
|
from STATION
|
|
join JONCTION on STATION.ID = JONCTION.STATION_ID
|
|
WHERE JONCTION.SERIE_ENT = -1 AND JONCTION.STATION_TYPE = 'ent'
|
|
) max on STATION.Z = max.Val_Max
|
|
LIMIT 1
|
|
""")
|
|
altitude_max = cursor.fetchall()
|
|
|
|
ligne[9] = altitude_max[0][0]
|
|
ligne[10] = str("{:.2f}".format(altitude_max[0][1]))
|
|
ligne[3] = "{:.2f}".format(altitude_max[0][1] - altitude_min[0][1])
|
|
|
|
for i in range(9): ligne[i+1] = ligne[i+1].ljust(_largeurCol)
|
|
retour.append(ligne)
|
|
|
|
|
|
|
|
###############################################################################################################
|
|
# Tri et résultats
|
|
###############################################################################################################
|
|
|
|
entetes = [ 'Entrée(s)', "Nbre", "Dev.(m)", "Prof.(m)", "Dupl.(m)", "Surf.(m)", "Visées", "ID Sta.", "Alt. min(m)", "ID Sta.", "Alt. max(m)" ]
|
|
entetes[0] = entetes[0].ljust(_largeurColTete)
|
|
for i in range(9): entetes[i+1] = entetes[i+1].ljust(_largeurCol)
|
|
|
|
_corps_retour = sorted(retour, key=cle_tri, reverse=True)
|
|
|
|
_retour = [entetes] + _corps_retour
|
|
|
|
return _retour # type: ignore
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête (sql_bilan_reseaux):\033[0m {e}")
|
|
error_count += 1
|
|
return retour
|
|
|
|
return
|
|
|
|
|
|
#####################################################################################################################################
|
|
# # Clé de tri #
|
|
#####################################################################################################################################
|
|
def cle_tri(element):
|
|
return float(element[2])
|
|
|
|
|
|
|
|
#####################################################################################################################################
|
|
# #-- Bilan topo par années #
|
|
#####################################################################################################################################
|
|
def sql_bilan_annee():
|
|
global error_count
|
|
global _largeurCol
|
|
global _largeurColTete
|
|
|
|
try:
|
|
retour = []
|
|
|
|
cursor.execute(f"""
|
|
select strftime('%Y', TOPO_DATE) as annee
|
|
from CENTRELINE
|
|
Where TOPO_DATE is not NULL
|
|
order by TOPO_DATE
|
|
""")
|
|
result = cursor.fetchall()
|
|
|
|
entetes = [ "Année" , "Dev.(m)", "Cumul (m)", "Dupl.(m)", "Cumul (m)", "Surf.(m)", "Cumul (m)" ]
|
|
cumul = 0.0
|
|
cumul_dpl = 0.0
|
|
cumul_srf = 0.0
|
|
|
|
for i in range(6): entetes[i] = entetes[i].ljust(_largeurCol)
|
|
retour.append(entetes)
|
|
|
|
# topo sans année
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE( sum(LENGTH), 0),
|
|
COALESCE (sum(DUPLICATE_LENGTH), 0),
|
|
COALESCE( sum(SURFACE_LENGTH), 0)
|
|
from CENTRELINE
|
|
where TOPO_DATE IS NULL
|
|
""")
|
|
bilan_annee = cursor.fetchall()
|
|
|
|
if ( len(bilan_annee) >= 1 ) and (( float(bilan_annee[0][0]) > 0.0) or (float(bilan_annee[0][1]) > 0.0 ) or (float(bilan_annee[0][2]) > 0.0 )):
|
|
ligne = [ " - ", " - ", " - ", " - ", " - ", " - ", " - " ]
|
|
|
|
|
|
cumul += bilan_annee[0][0]
|
|
cumul_dpl += bilan_annee[0][1]
|
|
cumul_srf += bilan_annee[0][2]
|
|
ligne[0] = "-"
|
|
ligne[1] = str("{:.2f}".format(bilan_annee[0][0]))
|
|
ligne[2] = str("{:.2f}".format(cumul))
|
|
ligne[3] = str("{:.2f}".format(bilan_annee[0][1]))
|
|
ligne[4] = str("{:.2f}".format(cumul_dpl))
|
|
ligne[5] = str("{:.2f}".format(bilan_annee[0][2]))
|
|
ligne[6] = str("{:.2f}".format(cumul_srf))
|
|
|
|
for i in range(6): ligne[i] = ligne[i].ljust(_largeurCol)
|
|
retour.append(ligne)
|
|
|
|
# années standard
|
|
debut = int(result[0][0])
|
|
fin = int(result[len(result)-1][0])
|
|
|
|
PlotExploYears(output_file_name_year, [debut, fin])
|
|
|
|
for annee in range(debut, fin + 1, 1 ):
|
|
cursor.execute(f"""
|
|
select
|
|
COALESCE (sum(LENGTH), 0),
|
|
COALESCE (sum(DUPLICATE_LENGTH), 0),
|
|
COALESCE (sum(SURFACE_LENGTH), 0)
|
|
from CENTRELINE
|
|
where TOPO_DATE between '{annee}-01-01' and '{annee}-12-31';
|
|
""")
|
|
bilan_annee = cursor.fetchall()
|
|
|
|
ligne = [ " - ", " - ", " - ", " - ", " - ", " - ", " - " ]
|
|
|
|
cumul += bilan_annee[0][0]
|
|
cumul_dpl += bilan_annee[0][1]
|
|
cumul_srf += bilan_annee[0][2]
|
|
ligne[0] = str("{:.0f}".format(annee))
|
|
ligne[1] = str("{:.2f}".format(bilan_annee[0][0]))
|
|
ligne[2] = str("{:.2f}".format(cumul))
|
|
ligne[3] = str("{:.2f}".format(bilan_annee[0][1]))
|
|
ligne[4] = str("{:.2f}".format(cumul_dpl))
|
|
ligne[5] = str("{:.2f}".format(bilan_annee[0][2]))
|
|
ligne[6] = str("{:.2f}".format(cumul_srf))
|
|
|
|
for i in range(6): ligne[i] = ligne[i].ljust(_largeurCol)
|
|
retour.append(ligne)
|
|
|
|
return retour # type: ignore
|
|
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"\033[91mErreur lors de l'exécution de la requête (sql_bilan_annee):\033[0m {e}")
|
|
error_count += 1
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"\033[91mErreur lors de l'exécution de sql_bilan_annee:\033[0m {e}")
|
|
error_count += 1
|
|
|
|
return None
|
|
|
|
#####################################################################################################################################
|
|
# diagramme de "rose" #
|
|
#####################################################################################################################################
|
|
def Rose(graph_name, bins = 72):
|
|
"""
|
|
Plot a Rose diagram of the entire database
|
|
|
|
Args:
|
|
conn (sqlite_db): database sqlite
|
|
graph_name (str): path and name of the graph to save
|
|
bins (int, optional): bins for the plot. Defaults to 72.
|
|
"""
|
|
|
|
# Extract the right data
|
|
df = pd.read_sql_query("select * from SHOT;", conn)
|
|
|
|
# Built the histogram
|
|
#h, e = np.histogram(df["BEARING"] * np.pi/180., weights = df["LENGTH"], bins = bins)
|
|
# Pour enlever les visées verticales (et donc de bearing systématiquement à 0°...)
|
|
h, e = np.histogram(df["BEARING"] * np.pi/180., weights = df["LENGTH"]* (90-np.abs(df["GRADIENT"]))/100, bins = bins)
|
|
|
|
# Plot the rose diagram
|
|
ax = plt.subplot(111, projection = "polar")
|
|
ax.set_theta_zero_location("N") # type: ignore
|
|
ax.set_theta_direction(-1) # type: ignore
|
|
ax.bar(e[:-1], h, align = "edge", width = e[1]-e[0])
|
|
|
|
# Save the rose diagram
|
|
plt.savefig(graph_name)
|
|
# Close the graph
|
|
plt.close(plt.figure(1))
|
|
|
|
return
|
|
|
|
|
|
#####################################################################################################################################
|
|
# diagramme de longueurs de visées #
|
|
#####################################################################################################################################
|
|
def Shot_lengths_histogram(graph_name, bins = 72, log = None):
|
|
"""
|
|
Plot the histogram of the lengths of the shots for the entire database
|
|
|
|
Args:
|
|
conn (sqlite_db): database sqlite
|
|
graph_name (str): path and name of the graph to save
|
|
bins (int, optional): bins for the plot. Defaults to 72.
|
|
log (str, optional): set it to 'log' to use a y-logscale. Defaults to None.
|
|
"""
|
|
|
|
# Extract the right data
|
|
df = pd.read_sql_query("select * from SHOT;", conn)
|
|
|
|
# plot the histogram
|
|
plt.hist(df["LENGTH"], bins = bins)
|
|
plt.xlabel("Longueur de visée (m)")
|
|
plt.ylabel("Nombre")
|
|
plt.xlim(0,50)
|
|
|
|
# If log y-scale, set it
|
|
if log:
|
|
plt.yscale("log")
|
|
|
|
# save
|
|
plt.savefig(graph_name)
|
|
plt.close(plt.figure(1))
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# diagrammes par années #
|
|
#####################################################################################################################################
|
|
def PlotExploYears(graph_name, rangeyear = [1959, datetime.now().year], systems = None):
|
|
"""
|
|
Args:
|
|
conn (sqlite_db): database sqlite
|
|
graph_name (str): path and name of the graph to save
|
|
rangeyear (np.array of integers, optional): 2 elements numpy array that gives the range of the years to analyse. Defaults to [1959, datetime.date.today().year].
|
|
systems (list of str, optional): list of specific systems to plot if needed. Defaults to None.
|
|
"""
|
|
|
|
# define colors to use; You may add colors if needed
|
|
colores = ['tab:blue', 'tab:red', 'tab:green', 'tab:orange', 'tab:purple',
|
|
'tab:marron', 'tab:olive', 'tab:pink', 'tab:cyan']
|
|
|
|
if systems:
|
|
# Initiate variables
|
|
#somme = pd.DataFrame(columns = ['System', 'Year', 'Longueur'])
|
|
Sy = []
|
|
Yr = []
|
|
Lg = []
|
|
# Loop on the systems and the years
|
|
for system in systems:
|
|
for date in range(rangeyear[0], rangeyear[1]+1):
|
|
# Define SQL query
|
|
lquery = "select sum(LENGTH) from CENTRELINE where SURVEY_ID in (select ID from SURVEY where FULL_NAME LIKE '%s%s%s') and TOPO_DATE between '%s-01-01' and '%s-12-31';" %(chr(37), str(system),chr(37), str(date), str(date))
|
|
junk = pd.read_sql_query(lquery, conn)
|
|
# Update the DataFrame line to line; DEPRECIATED since pandas 2.0
|
|
#somme = somme.append({'System' : system,
|
|
# 'Year' : int(date),
|
|
# 'Longueur' : junk.to_numpy()[0][0]}, ignore_index = True)
|
|
Sy.append(system)
|
|
Yr.append(int(date))
|
|
Lg.append(junk.to_numpy()[0][0])
|
|
#print(junk)
|
|
somme = pd.DataFrame(list(zip(Sy, Yr, Lg)), columns = ['System', 'Year', 'Longueur'])
|
|
print(max(somme['Longueur']))
|
|
|
|
# plot the histogram since the first survey
|
|
fig = plt.figure(1)
|
|
ax1 = fig.add_subplot(111)
|
|
|
|
fig2 = plt.figure(2)
|
|
ax2 = fig2.add_subplot(111)
|
|
|
|
# Extract the values for the first system
|
|
sommesys = somme[somme['System'] == systems[0]]
|
|
# Change None values to 0
|
|
sommeplot = sommesys.fillna(0)
|
|
# Remove the column with the names of the systems
|
|
del sommeplot["System"]
|
|
print(sommeplot)
|
|
|
|
ax1.bar(sommeplot["Year"],
|
|
sommeplot["Longueur"],
|
|
width = 0.5,
|
|
color = colores[0],
|
|
label = systems[0])
|
|
ax2.bar(sommeplot["Year"],
|
|
np.cumsum(sommeplot["Longueur"])/1000,
|
|
width = 0.5,
|
|
color = colores[0],
|
|
label = systems[0])
|
|
|
|
# Skip the loop on systems if there is only one system requested --> stacked barplot not needed
|
|
if len(systems) > 1:
|
|
# Check if the number of colors is enough for the number of systems
|
|
if len(systems)>len(colores):
|
|
raise NameError('\033[91mERROR:\033[00m Number of colors lower than the number of systems!\n\tedit the code to add colors in the list, or lower the number of systems to plot')
|
|
# Copy the length column in an other column to trace of it
|
|
sommeplot[systems[0]] = sommeplot["Longueur"]
|
|
|
|
for system in systems[1:]:
|
|
# Extract the length for the system
|
|
temp = somme[somme['System'] == system]
|
|
# Replace NaN values by 0 to avoid None values in the sums
|
|
tempplot = temp.fillna(0)
|
|
# Reset the indexes to permit the sum of the length per year
|
|
tempplot.reset_index(inplace = True)
|
|
|
|
del tempplot["System"]
|
|
# Update the barplot
|
|
ax1.bar(tempplot["Year"],
|
|
tempplot["Longueur"],
|
|
bottom = sommeplot["Longueur"],
|
|
width = 0.5,
|
|
color = colores[systems.index(system)],
|
|
label = system)
|
|
|
|
# Print the cumulative barplot
|
|
ax2.bar(tempplot["Year"],
|
|
np.cumsum(tempplot["Longueur"])/1000,
|
|
bottom = np.cumsum(sommeplot["Longueur"])/1000,
|
|
width = 0.5,
|
|
color = colores[systems.index(system)],
|
|
label = system)
|
|
|
|
# Do the sum of the length, and write it in the length column
|
|
sommeplot["Longueur"] = sommeplot["Longueur"] + tempplot["Longueur"]
|
|
# Copy the length of the system in a new column
|
|
sommeplot[systems[systems.index(system)]] = tempplot["Longueur"]
|
|
|
|
|
|
# Plot mean line
|
|
ax1.axhline(y = somme["Longueur"].mean(), color='red', linestyle='--', label = 'Moy. annuelle')
|
|
ax1.set_xlabel("Année")
|
|
ax1.set_ylabel("Longueur topographiée (m)")
|
|
ax1.legend(loc = 'best')
|
|
# Save the histogram
|
|
fig.savefig(graph_name + "_Reseau.pdf")
|
|
plt.close(plt.figure(1))
|
|
|
|
# plot the cumulative histogram since the first survey
|
|
ax2.set_xlabel("Année")
|
|
ax2.set_ylabel("Longueur topographiée cumulée (km)")
|
|
ax2.legend(loc = 'best')
|
|
# Save the cumulative histogram
|
|
fig2.savefig(graph_name + "Cum_Reseau.pdf")
|
|
plt.close(plt.figure(1))
|
|
|
|
else:
|
|
#somme = pd.DataFrame(columns = ['Year', 'Longueur'])
|
|
Yr = []
|
|
Lg = []
|
|
for date in range(rangeyear[0], rangeyear[1]):
|
|
lquery = "select sum(LENGTH) from CENTRELINE where TOPO_DATE between '%s-01-01' and '%s-12-31';" %(str(date), str(date))
|
|
junk = pd.read_sql_query(lquery, conn)
|
|
## Depreciated depuis Pandas 2.0
|
|
#somme = somme.append({'Year' : int(date),
|
|
# 'Longueur' : junk.to_numpy()[0][0]}, ignore_index = True)
|
|
Yr.append(int(date))
|
|
Lg.append(junk.to_numpy()[0][0])
|
|
|
|
somme = pd.DataFrame(list(zip(Yr, Lg)), columns = ['Year', 'Longueur'])
|
|
|
|
|
|
# plot the histogram since the first survey
|
|
plt.bar(somme["Year"], somme["Longueur"], width = 0.5)
|
|
# plot mean
|
|
plt.axhline(y = somme["Longueur"].mean(), color='red', linestyle='--', label = 'Moy. annuelle')
|
|
plt.xlabel("Année")
|
|
plt.ylabel("Longueur topographiée (m)")
|
|
# Save the histogram
|
|
plt.savefig(graph_name + ".pdf")
|
|
plt.close(plt.figure(1))
|
|
|
|
# plot the cumulative histogram since the first survey
|
|
plt.bar(somme["Year"], np.cumsum(somme["Longueur"].fillna(0))/1000, width = 0.5)
|
|
plt.xlabel("Année")
|
|
plt.ylabel("Longueur topographiée cumulée (km)")
|
|
# Save the cumulative histogram
|
|
plt.savefig(graph_name + "Cum.pdf")
|
|
plt.close(plt.figure(1))
|
|
|
|
return
|
|
|
|
#####################################################################################################################################
|
|
# Main #
|
|
# #
|
|
#####################################################################################################################################
|
|
|
|
if __name__ == '__main__':
|
|
_largeurColTete = 30
|
|
_largeurCol = 10
|
|
avt_compteur = 0
|
|
error_count=0
|
|
visee_suprimmees= [ 0.0, 0.0, 0.0, 0] # Lg, Lg dpl, Lg surf
|
|
input_file_name = ""
|
|
outputs_path = "./Outputs/"
|
|
inputs_path = "./Inputs/"
|
|
if not os.path.exists(outputs_path): os.makedirs(outputs_path)
|
|
|
|
maintenant = datetime.now()
|
|
|
|
if len(sys.argv) < 2:
|
|
# input_file = "databaseBB26.sql" # OK
|
|
# input_file = "cave.sql" # Erreur car pas de point fix ou d'entrée
|
|
# input_file = "padavka.sql" # Erreur car pas de point fix ou d'entrée
|
|
# input_file = "rabbit.sql" # OK
|
|
# input_file = "databaseCriou-2023.sql" # OK
|
|
#input_file = "databaseCriou-2024_09_18.sql" # OK
|
|
input_file = "databaseCriou-2023_12_26.sql" # OK
|
|
# input_file = "databaseKoytendag-2024.sql" #
|
|
# input_file = "databaseJB-2023.sql" # PB avec 4 visées en doubles, à voir si c'est une erreur de la centerline
|
|
# input_file = "CheddarCatchment.sql" # PB avec 4 visées en doubles, beaucoup d'entrées, à voir si c'est une erreur de la centerline
|
|
# input_file = "databaseMoilda-2021.sql" # A priori ça marche
|
|
# input_file = "databaseSornin-2023.sql" # A priori ça marche
|
|
input_file_name = inputs_path + input_file
|
|
if os.name == 'posix': os.system('clear') # Linux, MacOS
|
|
elif os.name == 'nt': os.system('cls')# Windows
|
|
else: print("\n" * 100)
|
|
else :
|
|
input_file_name = sys.argv[1]
|
|
# print("Le paramètre fourni est:", input_file_name)
|
|
|
|
if os.path.isfile(input_file_name) is False :
|
|
print(f"Erreur, fichier {input_file_name} inexistant")
|
|
print("Commande : python pythStat.py votre_fichier_therion.sql")
|
|
sys.exit()
|
|
|
|
|
|
output_file_name = outputs_path + input_file[:-4]+"_stats.csv"
|
|
output_file_name_rose = outputs_path + input_file[:-4]+"_rose.pdf"
|
|
output_file_name_histo = outputs_path + input_file[:-4]+"_histo.pdf"
|
|
output_file_name_year = outputs_path + input_file[:-4]+"_year"
|
|
imported_database = outputs_path + input_file[:-4]+"_stats.db"
|
|
|
|
titre = ['******************************************************************************************',
|
|
'* Calcul des statistiques par entrées d\'une BD Therion',
|
|
'* Script pythStat par alexandre.pont@yahoo.fr',
|
|
'* Version Février 2024',
|
|
'* Fichier source: ' + input_file_name,
|
|
'* Fichier destination: ' + output_file_name,
|
|
'* Date: ' + maintenant.strftime("%Y-%m-%d %H:%M:%S"),
|
|
'* ',
|
|
'******************************************************************************************']
|
|
|
|
|
|
for i in range(9): print(titre[i].ljust(90)+"*")
|
|
|
|
importation_sql_data(input_file_name)
|
|
|
|
conn = sqlite3.connect(imported_database) # Connexion à la base de données SQLite
|
|
cursor = conn.cursor()
|
|
|
|
construction_tables()
|
|
calcul_stats(output_file_name)
|
|
|
|
conn.close() |