mirror of
https://github.com/Alex38Lyon/Synthese-PSM_LARRA.git
synced 2026-06-01 13:59:13 +00:00
2486 lines
116 KiB
Python
2486 lines
116 KiB
Python
|
|
"""
|
|
!############################################################################################
|
|
# #
|
|
# Script pour convertir des données topographiques des formats #
|
|
# .th de Therion (brut, sans les dossiers) #
|
|
# .mak ou .dat de compass #
|
|
# .tro de visual topo #
|
|
# #
|
|
# au format th et th2 de Therion #
|
|
# by Alexandre PONT (alexandre_pont@yahoo.fr) #
|
|
# #
|
|
# Définir les différentes variables dans fichier config.ini #
|
|
# #
|
|
# Usage : python pyCreateTh.py #
|
|
# Commandes : pyCreateTh.py --help #
|
|
# #
|
|
!############################################################################################
|
|
|
|
Merci à :
|
|
- Tanguy Racine pour les scripts https://github.com/tr1813
|
|
- Xavier Robert pour les principes de base https://github.com/robertxa
|
|
- Xavier Robert pour les scripts de conversion .tro https://github.com/robertxa/pytherion
|
|
- Benoit Urruty https://github.com/BenoitURRUTY
|
|
|
|
Sources documentaires :
|
|
- Format des fichiers compass : https://fountainware.com/compass/Documents/FileFormats/FileFormats.htm
|
|
|
|
|
|
Création Alex le 2025 06 09
|
|
|
|
En cours :
|
|
- Exports Tro :
|
|
- pas possible de gérer les fichiers tro avec plusieurs entrées / points fixes car pas sauvegardé dans le format tro
|
|
- gérer pour ne pas avoir de copie de config.ini
|
|
- modifier les coordonnées de km vers m, ajouter les points fixes dans read-me
|
|
- Exports TroX
|
|
- A créer pour avoir notamment les réseaux à plusieurs entrées
|
|
- Exports DAT/MARK
|
|
- Attention les Flags '#|L#' posent problèmes (A voir convertisseur PdB vers compass...)
|
|
- tester avec les dernières option de la version de DAT (CORRECTION2 et suivants)
|
|
- améliorer fonction wall shot pour faire habillage des th2 files, les jointures...
|
|
- traiter les series avec 1 ou 2 stations
|
|
- PB des cartouches et des échelles pour faire des pdf automatiquement
|
|
|
|
"""
|
|
|
|
#################################################################################################
|
|
#################################################################################################
|
|
import os, re, argparse, shutil, sys, time, math
|
|
from os.path import isfile, join, abspath, splitext
|
|
from pathlib import Path
|
|
import numpy as np
|
|
import networkx as nx
|
|
import pandas as pd
|
|
pd.set_option('future.no_silent_downcasting', True)
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
from copy import deepcopy
|
|
from alive_progress import alive_bar # https://github.com/rsalmei/alive-progress
|
|
from contextlib import redirect_stdout
|
|
|
|
from Lib.survey import SurveyLoader, NoSurveysFoundException
|
|
from Lib.therion import compile_template, compile_file, get_stats_from_log
|
|
from Lib.general_fonctions import setup_logger, Colors, safe_relpath, colored_help
|
|
from Lib.general_fonctions import load_config, select_file_tk_window, release_log_file, sanitize_filename, load_text_file_utf8
|
|
from Lib.general_fonctions import copy_template_if_not_exists, add_copyright_header, copy_file_with_copyright, update_template_files
|
|
import Lib.global_data as globalData
|
|
from Lib.pytro2th.tro2th import convert_tro #Version local modifiée
|
|
from Lib.trox2th import analyse_xml_balises
|
|
from Lib.th2th import create_th_folders
|
|
|
|
|
|
#################################################################################################
|
|
debug_log = False # Mode debug des messages
|
|
|
|
|
|
#################################################################################################
|
|
# Renommage des tableau pdFrame de station #
|
|
#################################################################################################
|
|
@pd.api.extensions.register_series_accessor("stationName")
|
|
class StationNameAccessor:
|
|
def __init__(self, pandas_obj):
|
|
self._obj = pandas_obj
|
|
|
|
def __call__(self):
|
|
return (
|
|
self._obj
|
|
.str.replace('[', '_d_', regex=False)
|
|
.str.replace(']', '_f_', regex=False)
|
|
.str.replace('@', '_a_', regex=False)
|
|
.str.replace(' ', '_e_', regex=False)
|
|
.str.replace('p', '_p_', regex=False)
|
|
)
|
|
|
|
|
|
#################################################################################################
|
|
def parse_therion_centerline(file_data):
|
|
"""Découpe des centerline Therion et extrait :
|
|
- DATA : lignes de tirs
|
|
- date : date du levé
|
|
- type : liste des stations
|
|
- lines : bloc complet
|
|
"""
|
|
centerline_list = []
|
|
|
|
try:
|
|
lines = file_data.splitlines()
|
|
|
|
current_block = []
|
|
current_data = []
|
|
current_date = None
|
|
current_stations = set()
|
|
in_centerline = False
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
low = stripped.lower()
|
|
|
|
# Début centerline
|
|
if low.startswith("centerline"):
|
|
in_centerline = True
|
|
current_block = [line]
|
|
current_data = []
|
|
current_date = None
|
|
current_stations = set()
|
|
continue
|
|
|
|
if not in_centerline:
|
|
continue
|
|
|
|
current_block.append(line)
|
|
|
|
# Commentaire ou vide
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
|
|
# Date
|
|
m = re.match(r"^[ \t]*date\s+(.+)$", line, re.IGNORECASE)
|
|
if m:
|
|
current_date = m.group(1).strip()
|
|
continue
|
|
|
|
parts = stripped.split()
|
|
|
|
# Ligne DATA (tir)
|
|
if len(parts) >= 2 and parts[0].lower() not in globalData.THERION_KEYWORDS:
|
|
current_data.append(line)
|
|
|
|
for p in parts[:2]:
|
|
if (
|
|
p.lower() not in globalData.THERION_KEYWORDS
|
|
and not re.match(r"^[0-9.+-]+$", p)
|
|
):
|
|
current_stations.add(p)
|
|
|
|
# Fin centerline
|
|
if low.startswith("endcenterline"):
|
|
centerline_list.append({
|
|
"lines": current_block,
|
|
"DATA": current_data,
|
|
"date": current_date,
|
|
"type": sorted(current_stations)
|
|
})
|
|
|
|
in_centerline = False
|
|
current_block = []
|
|
current_data = []
|
|
current_date = None
|
|
current_stations = set()
|
|
|
|
except Exception as e:
|
|
log.error(f"An error occurred (parse_therion_centerline): {Colors.ENDC}{e}")
|
|
globalData.error_count += 1
|
|
|
|
return centerline_list
|
|
|
|
|
|
#################################################################################################
|
|
def regroupe_date(centerline_list):
|
|
"""Regroupe les centerlines par date et concatène les champs.
|
|
|
|
Args:
|
|
centerline_list (list): liste de dicts contenant :
|
|
- lines (list)
|
|
- DATA (list)
|
|
- date (str|None)
|
|
- type (list)
|
|
|
|
Returns:
|
|
list: liste de dicts regroupés par date
|
|
"""
|
|
grouped = {}
|
|
|
|
try:
|
|
for idx, cl in enumerate(centerline_list):
|
|
|
|
# Sécurité : cl doit être un dict
|
|
if not isinstance(cl, dict):
|
|
log.warning(f"regroupe_date: entrée ignorée (index {idx}, type invalide)")
|
|
continue
|
|
|
|
date = cl.get("date")
|
|
|
|
if date not in grouped:
|
|
grouped[date] = {
|
|
"date": date,
|
|
"lines": [],
|
|
"DATA": [],
|
|
"type": set()
|
|
}
|
|
|
|
# Concaténations sécurisées
|
|
if isinstance(cl.get("lines"), list):
|
|
grouped[date]["lines"].extend(cl["lines"])
|
|
|
|
if isinstance(cl.get("DATA"), list):
|
|
grouped[date]["DATA"].extend(cl["DATA"])
|
|
|
|
if isinstance(cl.get("type"), (list, set)):
|
|
grouped[date]["type"].update(cl["type"])
|
|
|
|
# Finalisation (conversion set → list)
|
|
result = []
|
|
for g in grouped.values():
|
|
g["type"] = sorted(g["type"])
|
|
result.append(g)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
log.error(f"An error occurred (regroupe_date): {Colors.ENDC}{e}")
|
|
globalData.error_count += 1
|
|
return []
|
|
|
|
|
|
#################################################################################################
|
|
# lecture d'un fichier .mak #
|
|
#################################################################################################
|
|
def mak_to_th_file(ENTRY_FILE) :
|
|
"""Convertit un fichier .mak en fichier .th.
|
|
|
|
Args:
|
|
ENTRY_FILE (str): Le chemin vers le fichier .mak d'entrée.
|
|
|
|
Returns:
|
|
bool: True si la conversion a réussi, False sinon.
|
|
|
|
"""
|
|
|
|
# Liste des threads lancés
|
|
threads = []
|
|
|
|
_ConfigPath = "./../../"
|
|
shortCurentFile = safe_relpath(ENTRY_FILE)
|
|
|
|
totReadMeList = ""
|
|
totReadMeError = ""
|
|
totReadMeFixPoint = ""
|
|
|
|
|
|
|
|
datFiles = []
|
|
patternDat = re.compile(r'^#.*?\.dat[,;]$', re.IGNORECASE) # Motif insensible à la casse
|
|
|
|
fixPoints = []
|
|
patternFixPoints = re.compile(r'^([\w-]+)\[(m|f)\s*[, ]\s*(-?\d+\.?\d*)\s*[, ]\s*(-?\d+\.?\d*)\s*[, ]\s*(-?\d+\.?\d*)\]\s*[,;]?\s*(?:/.*)?$',re.IGNORECASE)
|
|
|
|
UTM = []
|
|
|
|
Datums = set() # Pour stocker les valeurs uniques trouvées
|
|
|
|
try:
|
|
with open(ENTRY_FILE, 'r') as file:
|
|
for line in file:
|
|
line = line.strip() # Supprime les espaces et sauts de ligne
|
|
if patternDat.match(line):
|
|
# Supprime le '#' au début et '.dat,' ou '.dat;' à la fin (insensible à la casse)
|
|
cleaned_entry = re.sub(r'^#|\.dat[,;]$', '', line, flags=re.IGNORECASE)
|
|
datFiles.append(cleaned_entry + ".DAT")
|
|
|
|
match = patternFixPoints.match(line)
|
|
|
|
if match:
|
|
name_point, mf, x, y, z = match.groups()
|
|
fixPoints.append([name_point, mf.lower(), float(x), float(y), float(z)])
|
|
|
|
if line.startswith('@') and line.endswith(';'):
|
|
parts = line[1:-1].split(',') # Supprime "@" et ";", puis découpe
|
|
if len(parts) >= 4:
|
|
UTM.append(int(parts[3]) if parts[3].isdigit() else parts[3])
|
|
|
|
if line.startswith('&') and line.endswith(';'):
|
|
# Extrait la valeur entre & et ;
|
|
Datum = line[1:-1].strip() # Supprime '&' et ';'
|
|
Datums.add(Datum)
|
|
|
|
except FileNotFoundError:
|
|
log.error(f"The mak file {Colors.ENDC}{ENTRY_FILE}{Colors.ERROR} dit not exist")
|
|
globalData.error_count += 1
|
|
|
|
except Exception as e:
|
|
log.error(f"An error occurred (readMakFile): {Colors.ENDC}{e}")
|
|
globalData.error_count += 1
|
|
|
|
|
|
# Vérification des valeurs
|
|
if len(Datums) > 1:
|
|
log.critical(f"Several different Datums found in {Colors.ENDC}{shortCurentFile}{Colors.CRITICAL}, case not handled! : {Colors.ENDC}{Datums}")
|
|
exit(0)
|
|
elif not Datums :
|
|
log.critical(f"no datum found in mak file : {Colors.ENDC}{shortCurentFile}")
|
|
exit(0)
|
|
elif not datFiles :
|
|
log.critical(f"No dat file found in mak file : {Colors.ENDC}{shortCurentFile}")
|
|
exit(0)
|
|
elif not fixPoints :
|
|
log.critical(f"No fix points found in mak file : {Colors.ENDC}{shortCurentFile}")
|
|
exit(0)
|
|
|
|
datum_lower = next(iter(Datums)).strip().lower().replace(" ","")
|
|
|
|
if datum_lower not in globalData.datumToEPSG:
|
|
log.critical(f"Unknown Datum : {datum_lower}")
|
|
exit(0)
|
|
|
|
# Extraction du numéro de zone UTM et de l'hémisphère (N/S)
|
|
if int(UTM[0]) >= 0 :
|
|
zone_num = int(UTM[0])
|
|
hemisphere = "N"
|
|
else :
|
|
zone_num = -int(UTM[0])
|
|
hemisphere = "S"
|
|
|
|
# print(zone_num)
|
|
|
|
# Vérification de la validité de la zone UTM (1-60)
|
|
if not 1 <= zone_num <= 60:
|
|
log.critical("The UTM zone must be between 1 and 60")
|
|
exit(0)
|
|
|
|
# Construction du code EPSG
|
|
epsg_prefix = globalData.datumToEPSG[datum_lower]
|
|
epsg_code = f"{epsg_prefix}{zone_num}" if hemisphere == "N" else f"{epsg_prefix}{zone_num + 100}"
|
|
|
|
# Génération du CRS QGIS (format WKT)
|
|
crs_wkt = f'EPSG:{epsg_code}'
|
|
|
|
log.info(f"Reading mak file: {Colors.ENDC}{shortCurentFile}{Colors.GREEN}, fixed station: {Colors.ENDC}{len(fixPoints)}{Colors.GREEN}, files : {Colors.ENDC}{len(datFiles)}{Colors.GREEN}, UTM Zone : {Colors.ENDC}{UTM[0]}{Colors.GREEN}, Datum : {Colors.ENDC}{next(iter(Datums))}{Colors.GREEN}, SCR : {Colors.ENDC}{crs_wkt}")
|
|
totReadMeFixPoint = f"\t* Source mak file : {os.path.basename(ENTRY_FILE)}, fixed station: {len(fixPoints)}, files : {len(datFiles)}, UTM Zone : {UTM[0]}, Datum : {next(iter(Datums))}, SCR : {crs_wkt}\n"
|
|
|
|
QtySections = 0
|
|
|
|
for file in datFiles :
|
|
ABS_file = os.path.dirname(abspath(args.file)) + "\\"+ file
|
|
content, val, encodage = load_text_file_utf8(ABS_file, os.path.basename(ABS_file))
|
|
section = content.split('\x0c')
|
|
QtySections += len(section)
|
|
|
|
|
|
SurveyTitleMak = sanitize_filename(os.path.basename(abspath(args.file))[:-4])
|
|
|
|
folderDest = os.path.dirname(abspath(args.file)) + "/" + SurveyTitleMak
|
|
|
|
copy_template_if_not_exists(globalData.templatePath,folderDest)
|
|
|
|
|
|
##############################################################################################
|
|
# Boucle pour lire les dat #
|
|
##############################################################################################
|
|
|
|
|
|
stationList = pd.DataFrame(columns=['StationName', 'Survey_Name_01', 'Survey_Name_02'])
|
|
totdata = f"\t## Input list:\n"
|
|
totMapsPlan = ""
|
|
totMapsExtended = ""
|
|
|
|
proj = args.proj.lower()
|
|
values = {
|
|
"none": ("# ", "# ", "# "),
|
|
"plan": ("", "", "# "),
|
|
"extended": ("", "# ", ""),
|
|
}
|
|
|
|
maps, plan, extended = values.get(proj, ("", "", ""))
|
|
|
|
with alive_bar(QtySections,
|
|
title=f"{Colors.GREEN}Surveys progress: {Colors.BLUE}",
|
|
length = 20,
|
|
enrich_print=False,
|
|
stats=True, # Désactive les stats par défaut pour plus de lisibilité
|
|
elapsed=True, # Optionnel : masque le temps écoulé
|
|
monitor=True, # Optionnel : masque les métriques (ex: "eta")
|
|
bar="smooth" # Style de la barre (autres options: "smooth", "classic", "blocks")
|
|
) as bar:
|
|
|
|
with redirect_stdout(sys.__stdout__):
|
|
for file in datFiles:
|
|
|
|
if globalData.error_count > 0:
|
|
bar.text(f"{Colors.INFO}file: {Colors.ENDC}{file[:-4]}{Colors.ERROR}, error: {Colors.ENDC}{globalData.error_count}")
|
|
else :
|
|
bar.text(f"{Colors.INFO}file: {Colors.ENDC}{file[:-4]}")
|
|
|
|
_file = os.path.dirname(abspath(args.file)) + "\\" + file
|
|
shutil.copy(_file, folderDest + "\\Data\\")
|
|
ABS_file = folderDest + "\\Data\\" + file
|
|
|
|
totReadMeError += f"\t* file: {file}\n"
|
|
totReadMeList += f"\tfile: {file}\n"
|
|
|
|
Station, SurveyTitle, totReadMeError, thread2 = dat_to_th_files(ABS_file, fixPoints, crs_wkt, _ConfigPath, totReadMeError, bar)
|
|
|
|
threads += thread2
|
|
|
|
totdata += f"\tinput Data/{SurveyTitle}/{SurveyTitle}-tot.th\n"
|
|
totMapsPlan += f"\t{plan}MP-{SurveyTitle}-Plan-tot@{SurveyTitle}\n\t{plan}break\n"
|
|
totMapsExtended += f"\t{extended}MC-{SurveyTitle}-Extended-tot@{SurveyTitle}\n\t{extended}break\n"
|
|
|
|
if not Station.empty:
|
|
__stationList = pd.concat([stationList, Station], ignore_index=True)
|
|
__stationList.sort_values(by='Survey_Name_02', inplace=True, ignore_index=True)
|
|
stationList = __stationList.copy()
|
|
|
|
destination = os.path.join(folderDest, "Sources", os.path.basename(ABS_file))
|
|
if os.path.exists(destination):
|
|
os.remove(destination)
|
|
|
|
shutil.move(ABS_file, destination)
|
|
|
|
bar()
|
|
|
|
|
|
#################################################################################################
|
|
# Gestion des equates
|
|
#################################################################################################
|
|
|
|
totdata +=f"\n"
|
|
|
|
_stationList = stationList.copy()
|
|
|
|
_stationList["Survey_Name_01"] = _stationList["Survey_Name_01"] + "."+ _stationList["Survey_Name_01"]+ "." + _stationList["Survey_Name_02"]
|
|
# On numérote les doublons de Survey_Name pour chaque StationName
|
|
_stationList['Survey_Number'] = _stationList.groupby('StationName').cumcount() + 1
|
|
|
|
# print(_stationList)
|
|
|
|
# On pivote le tableau pour que chaque Survey_Name devienne une colonne
|
|
tableau_pivot = _stationList.pivot(index='StationName', columns='Survey_Number', values='Survey_Name_01')
|
|
|
|
tableau_pivot.columns = [f'Survey_Name_{i}' for i in tableau_pivot.columns]
|
|
|
|
# print(f"tableau_pivot : {Colors.ENDC}{tableau_pivot}{Colors.INFO} in {Colors.ENDC}{args.file}")
|
|
|
|
totdata +=f"\n\t## Equates list:\n"
|
|
|
|
if 'Survey_Name_2' in tableau_pivot.columns:
|
|
# On réinitialise l'index pour avoir StationName comme colonne normale
|
|
tableau_pivot = tableau_pivot.reset_index()
|
|
tableau_equate = tableau_pivot[tableau_pivot['Survey_Name_2'].notna()]
|
|
|
|
log.info(f"Total des '{Colors.ENDC}equates{Colors.INFO}' in mak file: {Colors.ENDC}{len(tableau_equate)}{Colors.INFO} in {Colors.ENDC}{safe_relpath(args.file)}")
|
|
# print(tableau_equate)
|
|
# print(f"fixPoints: {Colors.ENDC}{fixPoints}{Colors.INFO} in {Colors.ENDC}{args.file}")
|
|
|
|
# Pour chaque ligne du tableau
|
|
for _, row in tableau_equate.iterrows():
|
|
station = row['StationName']
|
|
|
|
# On récupère tous les Survey_Name non vides (NaN exclus)
|
|
surveys = [row[col] for col in tableau_equate.columns if col.startswith('Survey_Name') and pd.notna(row[col])]
|
|
|
|
# Pour chaque paire unique (i < j), on écrit la ligne 'equate'
|
|
for i in range(len(surveys)):
|
|
for j in range(i + 1, len(surveys)):
|
|
if surveys[i].split('.')[2] != surveys[j].split('.')[2]:
|
|
totdata +=f"\tequate {station}@{surveys[i]} {station}@{surveys[j]}\n"
|
|
# print(f"\tequate {station}@{surveys[i]} {station}@{surveys[j]}")
|
|
else:
|
|
log.info(f"No 'equats' found in {Colors.ENDC}{args.file}")
|
|
|
|
totdata +=f"\n\t## Maps list:\n\t{maps}input {SurveyTitleMak}-maps.th\n"
|
|
|
|
config_vars = {
|
|
'fileName': SurveyTitleMak,
|
|
'caveName': SurveyTitleMak.replace("_", " "),
|
|
'Author': globalData.Author,
|
|
'Copyright': globalData.Copyright,
|
|
'Scale' : args.scale,
|
|
'Target' : "TARGET",
|
|
'mapComment' : globalData.mapComment,
|
|
'club' : globalData.club,
|
|
'thanksto' : globalData.thanksto,
|
|
'datat' : globalData.datat,
|
|
'wpage' : globalData.wpage,
|
|
'cs' : crs_wkt,
|
|
'configPath' : " ",
|
|
'totData' : totdata,
|
|
'maps' : maps,
|
|
'plan': plan,
|
|
'extended': extended,
|
|
'XVIscale':globalData.XVIScale,
|
|
'other_scraps_plan' : totMapsPlan,
|
|
'other_scraps_extended' : totMapsExtended,
|
|
'readMeList' : totReadMeList,
|
|
'errorList' : totReadMeError,
|
|
'fixPointList' : totReadMeFixPoint,
|
|
'file_info' : f"# File generated by pyCreateTh.py version: {globalData.Version} date: {datetime.now().strftime("%Y.%m.%d-%H:%M:%S")}",
|
|
}
|
|
|
|
DEST_PATH = os.path.dirname(args.file) + '/' + SurveyTitleMak
|
|
|
|
update_template_files(DEST_PATH + '/template.thconfig', config_vars, DEST_PATH + '/' + SurveyTitleMak + '.thconfig')
|
|
update_template_files(DEST_PATH + '/template-tot.th', config_vars, DEST_PATH + '/' + SurveyTitleMak + '-tot.th')
|
|
update_template_files(DEST_PATH + '/template-maps.th', config_vars, DEST_PATH + '/' + SurveyTitleMak + '-maps.th')
|
|
|
|
#################################################################################################
|
|
# Final therion compilation #
|
|
#################################################################################################
|
|
|
|
if globalData.finalTherionExe == True:
|
|
FILE = DEST_PATH + '/' + SurveyTitleMak + '.thconfig'
|
|
t = compile_file(FILE, therion_path=globalData.therionPath)
|
|
threads.append(t)
|
|
|
|
for thread in threads: # Attendre que tous les threads se terminent
|
|
thread.join()
|
|
|
|
logfile = (DEST_PATH + '/therion.log').replace("\\", "/")
|
|
|
|
with open(logfile, 'r') as f:
|
|
content = f.read()
|
|
# print(content)
|
|
|
|
stat = get_stats_from_log(content)
|
|
|
|
if stat["length"] != 0.0 and stat["depth"] != 0.0 :
|
|
totReadMeList += f"\tFinal compilation successful length: {stat["length"]} m, depth: {stat["depth"]} m\n"
|
|
log.info(f"Final compilation successful length: {Colors.ENDC}{stat["length"]}{Colors.INFO} m, depth: {Colors.ENDC}{stat["depth"]}{Colors.INFO} m")
|
|
else :
|
|
totReadMeList += f"\tFinal compilation error, check log file\n"
|
|
log.error(f"Final compilation error, check log file")
|
|
|
|
config_vars['readMeList'] = totReadMeList
|
|
|
|
update_template_files(DEST_PATH + '/template-readme.md', config_vars, DEST_PATH +'/' + SurveyTitle + '-readme.md')
|
|
|
|
|
|
return SurveyTitleMak, threads
|
|
|
|
|
|
#################################################################################################
|
|
def station_list_dat(data, list, list_fixed,fixPoints, currentSurveyName) :
|
|
"""Crée une liste de stations à partir des données fournies issues d'un fichier dat.
|
|
|
|
Args:
|
|
data (DataFrame): Les données d'entrée contenant les informations sur les stations.
|
|
list (DataFrame): La liste des stations existantes.
|
|
fixPoints (list): Les points de fixation à considérer.
|
|
currentSurveyName (str): Le nom de l'enquête en cours.
|
|
|
|
Returns:
|
|
DataFrame: La liste mise à jour des stations.
|
|
|
|
"""
|
|
|
|
|
|
# Création d'un DataFrame à partir des données
|
|
rows1 = [line.split() for line in data['DATA']]
|
|
dfDATA = pd.DataFrame(rows1)
|
|
|
|
# stations = pd.concat([dfDATA.iloc[1:, 0], dfDATA.iloc[1:, 1]]).drop_duplicates().str.replace('[', '%').str.replace(']', '%%').str.replace('@', '_._')
|
|
|
|
stations = pd.concat([dfDATA.iloc[1:, 0], dfDATA.iloc[1:, 1]]).drop_duplicates().stationName()
|
|
stationsFixed = pd.concat([dfDATA.iloc[1:, 0], dfDATA.iloc[1:, 1]]).drop_duplicates().stationName()
|
|
|
|
|
|
fixed_names = {point[0] for point in fixPoints}
|
|
stations = stations[~stations.isin(fixed_names)]
|
|
stations_fixed = stationsFixed[stationsFixed.isin(fixed_names)]
|
|
|
|
new_entries = pd.DataFrame({
|
|
'StationName': stations,
|
|
'Survey_Name_01': currentSurveyName
|
|
})
|
|
|
|
new_fixed = pd.DataFrame({
|
|
'Station_Fix': stations_fixed,
|
|
'Survey_Name': currentSurveyName
|
|
})
|
|
|
|
list = pd.concat([list, new_entries], ignore_index=True)
|
|
list_fixed = pd.concat([list_fixed, new_fixed], ignore_index=True)
|
|
|
|
return list_fixed, list, dfDATA
|
|
|
|
|
|
#################################################################################################
|
|
def station_list_th(data, list, list_fixed, fixPoints, currentSurveyName) :
|
|
"""Crée une liste de stations à partir des données fournies issues d'un fichier tro.
|
|
|
|
Args:
|
|
data (DataFrame): Les données d'entrée contenant les informations sur les stations.
|
|
list (DataFrame): La liste des stations existantes.
|
|
fixPoints (list): Les points de fixation à considérer.
|
|
currentSurveyName (str): Le nom de l'enquête en cours.
|
|
|
|
Returns:
|
|
DataFrame: La liste mise à jour des stations.
|
|
|
|
"""
|
|
|
|
# Création d'un DataFrame à partir des données
|
|
rows1 = [line.split() for line in data['DATA']]
|
|
dfDATA = pd.DataFrame(rows1)
|
|
|
|
# stations = pd.concat([dfDATA.iloc[1:, 0], dfDATA.iloc[1:, 1]]).drop_duplicates().str.replace('[', '%').str.replace(']', '%%').str.replace('@', '_._')
|
|
# stations = pd.concat([dfDATA.iloc[1:, 0], dfDATA.iloc[1:, 1]]).drop_duplicates().stationName()
|
|
# stations = pd.concat([dfDATA.iloc[:, 0], dfDATA.iloc[:, 1]]).drop_duplicates().reset_index(drop=True)
|
|
|
|
stations = pd.concat([dfDATA.iloc[:, 0], dfDATA.iloc[:, 1]]).dropna().astype(str).loc[lambda s: ~s.isin(["-", "*"])].drop_duplicates().reset_index(drop=True)
|
|
stationsFixed = pd.concat([dfDATA.iloc[:, 0], dfDATA.iloc[:, 1]]).dropna().astype(str).loc[lambda s: ~s.isin(["-", "*"])].drop_duplicates().reset_index(drop=True)
|
|
|
|
# print(stations)
|
|
|
|
fixed_names = {point[0] for point in fixPoints}
|
|
stations = stations[~stations.isin(fixed_names)]
|
|
stations_fixed = stationsFixed[stationsFixed.isin(fixed_names)]
|
|
|
|
new_entries = pd.DataFrame({
|
|
'StationName': stations,
|
|
'Survey_Name_01': currentSurveyName
|
|
})
|
|
|
|
new_fixed = pd.DataFrame({
|
|
'Station_Fix': stations_fixed,
|
|
'Survey_Name': currentSurveyName
|
|
})
|
|
|
|
list = pd.concat([list, new_entries], ignore_index=True)
|
|
list_fixed = pd.concat([list_fixed, new_fixed], ignore_index=True)
|
|
|
|
# print(new_entries)
|
|
|
|
return list_fixed, list, dfDATA
|
|
|
|
|
|
#################################################################################################
|
|
def formated_station_list(df, dataFormat, unit = "meter", shortCurentFile ="None") :
|
|
"""Formate une liste de stations à partir d'un DataFrame.
|
|
Args:
|
|
df (DataFrame): Le DataFrame contenant les données des stations.
|
|
dataFormat (str): Le format des données à utiliser pour le traitement.
|
|
unit (str): L'unité de mesure à utiliser (par défaut "meter").
|
|
shortCurentFile (str): Le nom du fichier en cours de traitement (pour les logs).
|
|
|
|
Returns:
|
|
DataFrame: Le DataFrame formaté avec les colonnes appropriées.
|
|
"""
|
|
|
|
|
|
# Remplacer les None/NaN par des espaces
|
|
df = df.fillna(" ")
|
|
|
|
# Conserver la première ligne (en-têtes) séparément
|
|
header_row = df.iloc[0]
|
|
|
|
# Traiter uniquement les lignes à partir de la deuxième (index 1)
|
|
df_data = df.iloc[1:].copy()
|
|
|
|
columns = dataFormat.split()
|
|
|
|
Koef = 0.3048 if unit == "length meter" else 1.0
|
|
|
|
if "length" in columns:
|
|
col_name = df_data.columns[columns.index("length") - 2]
|
|
df_data.iloc[:, col_name] = (df_data.iloc[:, col_name].astype(float) * Koef).apply(lambda x: f"{x:.2f}")
|
|
|
|
if "up" in columns:
|
|
col_name = df_data.columns[columns.index("up") - 2]
|
|
df_data[col_name] = pd.to_numeric(df_data[col_name], errors='coerce') * Koef
|
|
df_data[col_name] = df_data[col_name].apply(lambda x: "-" if pd.notna(x) and x < 0 else f"{x:.2f}" if pd.notna(x) else "")
|
|
|
|
if "down" in columns:
|
|
col_name = df_data.columns[columns.index("down") - 2]
|
|
df_data[col_name] = pd.to_numeric(df_data[col_name], errors='coerce') * Koef
|
|
df_data[col_name] = df_data[col_name].apply(lambda x: "-" if pd.notna(x) and x < 0 else f"{x:.2f}" if pd.notna(x) else "")
|
|
|
|
if "right" in columns:
|
|
col_name = df_data.columns[columns.index("right") - 2]
|
|
df_data[col_name] = pd.to_numeric(df_data[col_name], errors='coerce') * Koef
|
|
df_data[col_name] = df_data[col_name].apply(lambda x: "-" if pd.notna(x) and x < 0 else f"{x:.2f}" if pd.notna(x) else "")
|
|
|
|
if "left" in columns:
|
|
col_name = df_data.columns[columns.index("left") - 2]
|
|
df_data[col_name] = pd.to_numeric(df_data[col_name], errors='coerce') * Koef
|
|
df_data[col_name] = df_data[col_name].apply(lambda x: "-" if pd.notna(x) and x < 0 else f"{x:.2f}" if pd.notna(x) else "")
|
|
|
|
if "compass" in columns:
|
|
df_data.iloc[:, columns.index("compass")-2] = (df_data.iloc[:, columns.index("compass")-2].astype(float)).apply(lambda x: f"{x:.1f}")
|
|
|
|
if "clino" in columns:
|
|
df_data.iloc[:, columns.index("clino")-2] = (df_data.iloc[:, columns.index("clino")-2].astype(float)).apply(lambda x: f"{x:.1f}")
|
|
|
|
if "from" in columns:
|
|
df_data.iloc[:, columns.index("from")-2] = (df_data.iloc[:, columns.index("from")-2].astype(str).stationName())
|
|
|
|
if "to" in columns:
|
|
df_data.iloc[:, columns.index("to")-2] = (df_data.iloc[:, columns.index("to")-2].astype(str).stationName())
|
|
|
|
# Remplacer les NaN par des espaces après transformation
|
|
df_data = df_data.fillna(" ")
|
|
|
|
# Ajouter un '# ' au début de la colonne 9 (si non vide)
|
|
df_data.iloc[:, 9] = df_data.iloc[:, 9].apply(lambda x: f"# {x}" if str(x).strip() and str(x) != " " else x)
|
|
|
|
# Ajouter "_hab" à la colonne 2 si FROM == TO
|
|
df_data.iloc[:, 1] = df_data.apply(
|
|
lambda row: f"{row.iloc[1]}_hab" if str(row.iloc[0]).strip() == str(row.iloc[1]).strip() else row.iloc[1],
|
|
axis=1
|
|
)
|
|
|
|
# Gestion des flags surface et not surface
|
|
new_rows = []
|
|
|
|
for idx, row in df_data.iterrows():
|
|
col10 = str(row.iloc[9])
|
|
|
|
# Si la colonne 10 contient #|L# Exclude from Length
|
|
if "#|L#" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "flags duplicate"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "flags not duplicate"
|
|
new_rows.append(not_surface_row)
|
|
log.warning(f"Flags '{Colors.ENDC}{col10}{Colors.WARNING}' not implemented in therion, line {Colors.ENDC}{idx+1}{Colors.WARNING} in {Colors.ENDC}{shortCurentFile}")
|
|
|
|
# Si la colonne 10 contient #|S# type Spay (habillages)
|
|
elif "#|S#" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "flags splay"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
row[1] = "-"
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "flags not splay"
|
|
new_rows.append(not_surface_row)
|
|
|
|
# Si la colonne 10 contient #|X# total exclusion
|
|
elif "#|X#" in col10 or "#|XL#" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "flags duplicate"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "flags not duplicate"
|
|
new_rows.append(not_surface_row)
|
|
log.warning(f"Flags '{Colors.ENDC}{col10}{Colors.WARNING}' not implemented in therion, line {Colors.ENDC}{idx+1}{Colors.WARNING} in {Colors.ENDC}{shortCurentFile}")
|
|
|
|
# Si la colonne 10 contient #|P# exclude from plotting
|
|
elif "#|P#" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "# flags exclude from plot no implemented"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "# flags not exclude from plot no implemented"
|
|
new_rows.append(not_surface_row)
|
|
log.warning(f"Flags exclude from plot #|P# not implemented in therion, line {Colors.ENDC}{idx+1}{Colors.WARNING} in {Colors.ENDC}{shortCurentFile}")
|
|
|
|
# Si la colonne 10 contient #|C# exclude from closure
|
|
elif "#|C#" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "# flags exclude from closure no implemented"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "# flags not exclude from closure no implemented"
|
|
new_rows.append(not_surface_row)
|
|
log.warning(f"Flags #|C# exclude from closure not implemented in therion, line {Colors.ENDC}{idx+1}{Colors.WARNING} in {Colors.ENDC}{shortCurentFile}")
|
|
|
|
# Si la colonne 10 contient #|PL# exclude from plotting and Length ( i.e converti en splay)
|
|
elif "#|PL#" in col10 or "#|LP#" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "flags splay"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
row[1] = "-"
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "flags not splay"
|
|
new_rows.append(not_surface_row)
|
|
# log.warning(f"Flags '{Colors.ENDC}{col10}{Colors.WARNING}' not implemented in therion, line {Colors.ENDC}{idx+1}{Colors.WARNING} in {Colors.ENDC}{shortCurentFile}")
|
|
|
|
# Si la colonne 10 contient #|LC# exclude from Length and Closure
|
|
elif "#|LC#" in col10 or "#|CL#" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "flags duplicate"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "flags not duplicate"
|
|
new_rows.append(not_surface_row)
|
|
log.warning(f"Flags '{Colors.ENDC}{col10}{Colors.WARNING}' not implemented in therion, line {Colors.ENDC}{idx+1}{Colors.WARNING} in {Colors.ENDC}{shortCurentFile}")
|
|
|
|
# Si la colonne 10 contient #|PLC# exclude from plotting, closure and length
|
|
elif "#|PLC#" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "flags duplicate"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "flags not duplicate"
|
|
new_rows.append(not_surface_row)
|
|
|
|
elif "#|" in col10:
|
|
surface_row = [" "] * len(row)
|
|
surface_row[0] = "# flags unknown no implemented"
|
|
new_rows.append(surface_row)
|
|
|
|
new_rows.append(row.tolist())
|
|
|
|
not_surface_row = [" "] * len(row)
|
|
not_surface_row[0] = "# flags not unknown no implemented"
|
|
new_rows.append(not_surface_row)
|
|
log.error(f"Flags unknown '{Colors.ENDC}{col10}{Colors.WARNING}' not implemented, line {Colors.ENDC}{idx+1}{Colors.WARNING} in {Colors.ENDC}{shortCurentFile}")
|
|
globalData.error_count += 1
|
|
|
|
else:
|
|
new_rows.append(row.tolist())
|
|
|
|
prev_row = row # Garder trace de la ligne précédente
|
|
|
|
cleaned_rows = []
|
|
i = 0
|
|
while i < len(new_rows):
|
|
current = new_rows[i]
|
|
if (i + 1 < len(new_rows) and
|
|
str(current[0]).strip() == "flags not surface" and
|
|
str(new_rows[i + 1][0]).strip() == "flags surface"):
|
|
i += 2
|
|
elif (i + 1 < len(new_rows) and
|
|
str(current[0]).strip() == "flags not splay" and
|
|
str(new_rows[i + 1][0]).strip() == "flags splay"):
|
|
i += 2
|
|
elif (i + 1 < len(new_rows) and
|
|
str(current[0]).strip() == "flags not duplicate" and
|
|
str(new_rows[i + 1][0]).strip() == "flags duplicate"):
|
|
i += 2
|
|
elif (i + 1 < len(new_rows) and
|
|
str(current[0]).strip() == "# flags not exclude from closure no implemented" and
|
|
str(new_rows[i + 1][0]).strip() == "# flags exclude from closure no implemented"):
|
|
i += 2
|
|
elif (i + 1 < len(new_rows) and
|
|
str(current[0]).strip() == "# flags not exclude from plot no implemented" and
|
|
str(new_rows[i + 1][0]).strip() == "# flags exclude from plot no implemented"):
|
|
i += 2
|
|
elif (i + 1 < len(new_rows) and
|
|
str(current[0]).strip() == "# flags not unknown no implemented" and
|
|
str(new_rows[i + 1][0]).strip() == "# flags unknown no implemented"):
|
|
i += 2
|
|
else:
|
|
cleaned_rows.append(current)
|
|
i += 1
|
|
|
|
# Convertir les lignes en chaines formatées
|
|
output = []
|
|
|
|
# Ajouter la première ligne (en-têtes) telle quelle
|
|
header_str = "\t\t" + "\t".join(map(str, header_row))
|
|
output.append(header_str)
|
|
|
|
# Ajouter les autres lignes traitées
|
|
for row in cleaned_rows:
|
|
row_str = "\t\t"
|
|
flag = False
|
|
for i in row :
|
|
if str(i) == " " :
|
|
row_str += ""
|
|
elif str(i).startswith("#") or flag == True :
|
|
row_str += f" {str(i)}"
|
|
flag = True
|
|
else:
|
|
row_str += f"\t{str(i)}"
|
|
output.append(row_str)
|
|
|
|
return "\n".join(output)
|
|
|
|
|
|
#################################################################################################
|
|
def find_duplicates_by_date_and_team(data):
|
|
"""Finds duplicates in the data based on SURVEY_DATE and SURVEY_TEAM.
|
|
|
|
Args:
|
|
data (list): A list of dictionaries containing survey data.
|
|
|
|
Returns:
|
|
list: A list of dictionaries containing information about duplicates found.
|
|
|
|
"""
|
|
grouped = defaultdict(list)
|
|
|
|
# Étape 1 : regroupement par (SURVEY_DATE, SURVEY_TEAM)
|
|
for entry in data:
|
|
key = (entry['SURVEY_DATE'], entry['SURVEY_TEAM'])
|
|
grouped[key].append(entry)
|
|
|
|
duplicates = []
|
|
|
|
for key, entries in grouped.items():
|
|
if len(entries) < 2:
|
|
continue
|
|
|
|
# Construire un mapping ID -> stations
|
|
id_to_entry = {entry['ID']: entry for entry in entries}
|
|
id_to_stations = {entry['ID']: set(entry['STATION'].iloc[:, 0]) for entry in entries}
|
|
|
|
# Construire les connexions directes (graphe implicite)
|
|
adjacency = defaultdict(set)
|
|
ids = list(id_to_entry.keys())
|
|
|
|
for i in range(len(ids)):
|
|
for j in range(i + 1, len(ids)):
|
|
id_i, id_j = ids[i], ids[j]
|
|
if id_to_stations[id_i] & id_to_stations[id_j]: # intersection non vide
|
|
adjacency[id_i].add(id_j)
|
|
adjacency[id_j].add(id_i)
|
|
|
|
# Trouver les composantes connexes (DFS)
|
|
visited = set()
|
|
|
|
def dfs(node, component):
|
|
visited.add(node)
|
|
component.append(node)
|
|
for neighbor in adjacency[node]:
|
|
if neighbor not in visited:
|
|
dfs(neighbor, component)
|
|
|
|
for id_ in ids:
|
|
if id_ not in visited:
|
|
component = []
|
|
dfs(id_, component)
|
|
if len(component) > 1:
|
|
# Calcul des stations communes (fusion de toutes)
|
|
stations_union = set()
|
|
for i in range(len(component)):
|
|
for j in range(i + 1, len(component)):
|
|
common = id_to_stations[component[i]] & id_to_stations[component[j]]
|
|
stations_union.update(common)
|
|
|
|
duplicates.append({
|
|
'SURVEY_DATE': key[0],
|
|
'SURVEY_TEAM': key[1],
|
|
'IDS': sorted(component),
|
|
'COMMON_STATIONS': sorted(stations_union)
|
|
})
|
|
|
|
return duplicates
|
|
|
|
|
|
#################################################################################################
|
|
def find_duplicates_by_date(data):
|
|
"""Finds duplicates in the data based on SURVEY_DATE.
|
|
|
|
Args:
|
|
data (list): A list of dictionaries containing survey data.
|
|
|
|
Returns:
|
|
list: A list of dictionaries containing information about duplicates found.
|
|
"""
|
|
|
|
grouped = defaultdict(list)
|
|
|
|
# Étape 1 : regroupement uniquement par SURVEY_DATE
|
|
for entry in data:
|
|
key = entry['SURVEY_DATE']
|
|
grouped[key].append(entry)
|
|
|
|
duplicates = []
|
|
|
|
for survey_date, entries in grouped.items():
|
|
if len(entries) < 2:
|
|
continue
|
|
|
|
# Construire un mapping ID -> stations
|
|
id_to_entry = {entry['ID']: entry for entry in entries}
|
|
id_to_stations = {entry['ID']: set(entry['STATION'].iloc[:, 0]) for entry in entries}
|
|
|
|
# Construire les connexions directes (graphe implicite)
|
|
adjacency = defaultdict(set)
|
|
ids = list(id_to_entry.keys())
|
|
|
|
for i in range(len(ids)):
|
|
for j in range(i + 1, len(ids)):
|
|
id_i, id_j = ids[i], ids[j]
|
|
if id_to_stations[id_i] & id_to_stations[id_j]: # intersection non vide
|
|
adjacency[id_i].add(id_j)
|
|
adjacency[id_j].add(id_i)
|
|
|
|
# Trouver les composantes connexes (DFS)
|
|
visited = set()
|
|
|
|
def dfs(node, component):
|
|
visited.add(node)
|
|
component.append(node)
|
|
for neighbor in adjacency[node]:
|
|
if neighbor not in visited:
|
|
dfs(neighbor, component)
|
|
|
|
for id_ in ids:
|
|
if id_ not in visited:
|
|
component = []
|
|
dfs(id_, component)
|
|
if len(component) > 1:
|
|
# Calcul des stations communes (fusion de toutes)
|
|
stations_union = set()
|
|
for i in range(len(component)):
|
|
for j in range(i + 1, len(component)):
|
|
common = id_to_stations[component[i]] & id_to_stations[component[j]]
|
|
stations_union.update(common)
|
|
|
|
# Utiliser le SURVEY_TEAM de la première occurrence
|
|
first_entry = id_to_entry[component[0]]
|
|
|
|
duplicates.append({
|
|
'SURVEY_DATE': survey_date,
|
|
'SURVEY_TEAM': first_entry['SURVEY_TEAM'],
|
|
'IDS': sorted(component),
|
|
'COMMON_STATIONS': sorted(stations_union)
|
|
})
|
|
|
|
return duplicates
|
|
|
|
|
|
#################################################################################################
|
|
def points_uniques(data, crs_wkt):
|
|
"""Extrait les points uniques de la colonne 0 du DataFrame 'data' et les compare avec la colonne 1.
|
|
Exclut les points présents dans 'crs_wkt' si fourni.
|
|
|
|
Args:
|
|
data (DataFrame): Le DataFrame contenant les données.
|
|
crs_wkt (list, optional): Une liste de points à exclure.
|
|
|
|
Returns:
|
|
list: Une liste de points uniques.
|
|
"""
|
|
|
|
# Création d'un DataFrame à partir des lignes de données
|
|
rows = [line.split() for line in data['DATA']]
|
|
dfDATA = pd.DataFrame(rows)
|
|
|
|
# Extraction des colonnes 0 et 1, en ignorant la première ligne (souvent en-tête)
|
|
col0 = dfDATA.iloc[1:, 0]
|
|
col1 = dfDATA.iloc[1:, 1]
|
|
|
|
# Nettoyage des noms (remplacement des crochets)
|
|
col0_clean = col0.stationName()
|
|
col1_clean = col1.stationName()
|
|
|
|
# Exclure les points présents dans la colonne 1
|
|
uniques_col0 = col0_clean[~col0_clean.isin(col1_clean)]
|
|
|
|
# Supprimer les doublons
|
|
uniques_col0 = uniques_col0.drop_duplicates()
|
|
|
|
# Exclure les points présents dans la liste crs_wkt
|
|
if isinstance(crs_wkt, (set, list)):
|
|
uniques_col0 = uniques_col0[~uniques_col0.isin(crs_wkt)]
|
|
|
|
return uniques_col0.reset_index(drop=True).tolist()
|
|
|
|
|
|
#################################################################################################
|
|
def merge_duplicate_surveys(data, duplicates, id_offset=10000):
|
|
"""Merges duplicate survey entries into a single entry.
|
|
|
|
Args:
|
|
data (list): A list of dictionaries containing survey data.
|
|
duplicates (list): A list of dictionaries containing information about duplicates found.
|
|
id_offset (int, optional): An offset to apply to the IDs of merged entries. Defaults to 10000.
|
|
|
|
Returns:
|
|
list: A list of merged survey entries.
|
|
|
|
"""
|
|
|
|
|
|
id_to_entry = {entry['ID']: entry for entry in data}
|
|
merged_data = []
|
|
used_ids = set()
|
|
|
|
for i, group in enumerate(duplicates):
|
|
ids = group['IDS']
|
|
merged_entry = {
|
|
'ID': id_offset + i,
|
|
'SURVEY_TITLE': data[ids[0]]['SURVEY_TITLE'],
|
|
'SURVEY_NAME': None,
|
|
'SURVEY_DATE': group['SURVEY_DATE'],
|
|
'COMMENT': data[ids[0]]['COMMENT'],
|
|
'SURVEY_TEAM': group['SURVEY_TEAM'],
|
|
'DECLINATION': data[ids[0]]['DECLINATION'],
|
|
'FORMAT': data[ids[0]]['FORMAT'],
|
|
'CORRECTIONS': data[ids[0]]['CORRECTIONS'],
|
|
"CORRECTIONS2": data[ids[0]]['CORRECTIONS2'],
|
|
"DISCOVERY": data[ids[0]]['DISCOVERY'],
|
|
"PREFIX": data[ids[0]]['PREFIX'],
|
|
'DATA': [],
|
|
'STATION': [],
|
|
'SOURCE': []
|
|
}
|
|
|
|
# Liste des champs texte simples à hériter (on peut affiner selon stratégie souhaitée)
|
|
text_fields = ['SURVEY_TITLE', 'COMMENT', 'DECLINATION', 'FORMAT', 'CORRECTIONS']
|
|
|
|
# Regrouper les valeurs pour tous les champs à fusionner
|
|
text_values = {field: set() for field in text_fields}
|
|
survey_name_list = set()
|
|
source_set = set()
|
|
station_frames = []
|
|
|
|
first_data_line = True
|
|
|
|
for id_ in ids:
|
|
entry = id_to_entry[id_]
|
|
used_ids.add(id_)
|
|
|
|
for field in text_fields:
|
|
value = entry.get(field)
|
|
if value not in [None, '']:
|
|
text_values[field].add(value)
|
|
|
|
name = entry.get('SURVEY_NAME')
|
|
if name not in [None, '']:
|
|
survey_name_list.add(name)
|
|
|
|
data_lines = entry.get('DATA', [])
|
|
if data_lines:
|
|
if first_data_line:
|
|
merged_entry['DATA'].extend(data_lines)
|
|
first_data_line = False
|
|
else:
|
|
merged_entry['DATA'].extend(data_lines[1:]) # ignorer l'entête
|
|
|
|
sources = entry.get('SOURCE', [])
|
|
if isinstance(sources, str):
|
|
source_set.add(sources)
|
|
elif isinstance(sources, list):
|
|
source_set.update(sources)
|
|
|
|
if isinstance(entry['STATION'], pd.DataFrame):
|
|
station_frames.append(entry['STATION'])
|
|
|
|
# Affecter les valeurs texte (si une seule unique valeur, sinon None)
|
|
for field in text_fields:
|
|
if len(text_values[field]) == 1:
|
|
merged_entry[field] = next(iter(text_values[field]))
|
|
|
|
# Nouveau nom concaténé avec "_"
|
|
if survey_name_list:
|
|
sorted_names = sorted(survey_name_list)
|
|
full_name = "_".join(sorted_names)
|
|
if len(full_name) <= 40:
|
|
merged_entry['SURVEY_NAME'] = full_name
|
|
else:
|
|
# Tronquer au milieu
|
|
prefix = sorted_names[0]
|
|
suffix = sorted_names[-1]
|
|
connector = "_-_"
|
|
max_prefix_suffix_len = 50 - len(connector)
|
|
# On répartit équitablement entre début et fin (si possible)
|
|
half_len = max_prefix_suffix_len // 2
|
|
prefix = prefix[:half_len]
|
|
suffix = suffix[-(max_prefix_suffix_len - len(prefix)):]
|
|
merged_entry['SURVEY_NAME'] = prefix + connector + suffix
|
|
|
|
# Fusionner les DataFrames STATION
|
|
if station_frames:
|
|
merged_entry['STATION'] = pd.concat(station_frames, ignore_index=True)
|
|
|
|
merged_entry['SOURCE'] = "\n".join(sorted(source_set))
|
|
merged_data.append(merged_entry)
|
|
|
|
# Ajouter les entrées qui ne faisaient pas partie des doublons
|
|
for entry in data:
|
|
if entry['ID'] not in used_ids:
|
|
merged_data.append(deepcopy(entry))
|
|
|
|
return merged_data
|
|
|
|
|
|
#################################################################################################
|
|
def dat_survey_format_extract(section_data, headerData, currentSurveyName, fichier, totReadMeError):
|
|
"""Extracts and validates the format code from the section data.
|
|
|
|
Args:
|
|
section_data (dict): The section data containing survey information.
|
|
headerData (dict): The header data for the survey.
|
|
currentSurveyName (str): The name of the current survey.
|
|
fichier (str): The file being processed.
|
|
totReadMeError (str): A string to accumulate error messages.
|
|
|
|
Returns:
|
|
dataFormat (str), length (int), compass (str), clino (str), totReadMeError (str)
|
|
|
|
"""
|
|
|
|
if section_data['FORMAT'] is None or len(section_data['FORMAT']) < 11 or len(section_data['FORMAT']) > 15 :
|
|
log.error(f"Error in format code {Colors.ENDC}{section_data['FORMAT']}{Colors.ERROR} in {Colors.ENDC}{currentSurveyName}")
|
|
log.debug(f"Error in format code SURVEY_NAME {Colors.ENDC}{section_data['SURVEY_NAME']}")
|
|
log.debug(f"Error in format code SURVEY_DATE {Colors.ENDC}{section_data['SURVEY_DATE']}")
|
|
log.debug(f"SURVEY TITLE: {Colors.ENDC}{section_data['SURVEY_TITLE']}")
|
|
log.debug(f"COMMENT: {Colors.ENDC}{section_data['COMMENT']}")
|
|
log.debug(f"SURVEY TEAM: {Colors.ENDC}{section_data['SURVEY_TEAM']}")
|
|
log.debug(f"DECLINATION: {Colors.ENDC}{section_data['DECLINATION']}")
|
|
log.debug(f"FORMAT: {Colors.ENDC}{section_data['FORMAT']}")
|
|
log.debug(f"CORRECTIONS: {Colors.ENDC}{section_data['CORRECTIONS']}")
|
|
log.debug(f"DATA: {Colors.ENDC}{(section_data['DATA'])}")
|
|
log.debug(f"DATA Qté: {Colors.ENDC}{len(section_data['DATA'])}")
|
|
log.debug(f"STATION: {Colors.ENDC}{(section_data['STATION'])}")
|
|
log.debug(f"SOURCE: {Colors.ENDC}{section_data['SOURCE']}\n")
|
|
globalData.error_count += 1
|
|
totReadMeError += f"\tError in format code {section_data['FORMAT']} in {currentSurveyName}\n"
|
|
|
|
def Dimension(string="") :
|
|
directions = {'U': " up", 'D': " down", 'R': " right", 'L': " left"}
|
|
if string in directions:
|
|
return directions[string]
|
|
else:
|
|
log.error(f"Error in format str {Colors.ENDC}{string}{Colors.ERROR} code {Colors.ENDC}{section_data['FORMAT']}{Colors.ERROR} in {Colors.ENDC}{fichier}{Colors.ERROR} in {Colors.ENDC}{currentSurveyName}")
|
|
totReadMeError += f"\tError in format str {string} code {section_data['FORMAT']} in {fichier} in {currentSurveyName}\n"
|
|
globalData.error_count += 1
|
|
return ""
|
|
|
|
def LRUD_association(string="") :
|
|
# In Therion the standard LRUD association is the shot and not the station
|
|
# LRUD Association: F=From Station, T=To Station
|
|
if string == 'F' : return ""
|
|
elif string == 'T' : return ""
|
|
else :
|
|
log.error(f"Error in format str {Colors.ENDC}{string}{Colors.ERROR} code {Colors.ENDC}{section_data['FORMAT']}{Colors.ERROR} in {Colors.ENDC}{fichier}{Colors.ERROR} in {Colors.ENDC}{currentSurveyName}")
|
|
totReadMeError += f"\tError in format str {string} code {section_data['FORMAT']} in {fichier} in {currentSurveyName}\n"
|
|
globalData.error_count += 1
|
|
return ""
|
|
|
|
def Backsight(string="") : # Backsight: B=Redundant, N or empty=No Redundant Backsights.
|
|
if string == 'B' :
|
|
log.error(f"Backsight unit not yet implemented {Colors.ENDC}{section_data['FORMAT']}{Colors.ERROR} in {Colors.ENDC}{currentSurveyName}")
|
|
totReadMeError += f"\tBacksight unit not yet implemented {Colors.ENDC}{section_data['FORMAT']}{Colors.ERROR} in {Colors.ENDC}{currentSurveyName}\n"
|
|
globalData.error_count += 1
|
|
return ""
|
|
elif string == 'N' : return ""
|
|
else :
|
|
log.error(f"Error in format str {Colors.ENDC}{string}{Colors.ERROR} code {Colors.ENDC}{section_data['FORMAT']}{Colors.ERROR} in {Colors.ENDC}{fichier}{Colors.ERROR} in {Colors.ENDC}{currentSurveyName}")
|
|
totReadMeError += f"\tError in format str {string} code {section_data['FORMAT']} in {fichier} in {currentSurveyName}\n"
|
|
globalData.error_count += 1
|
|
return ""
|
|
|
|
def ShotOrder(string="") :
|
|
if string == 'L' : return " length"
|
|
elif string == 'A' : return " compass"
|
|
elif string == 'D' :
|
|
if clino == 'depth feet' : return " depthchange"
|
|
else : return " clino"
|
|
elif string == 'a' : return " backcompass"
|
|
elif string == 'd' : return " backclino"
|
|
else :
|
|
log.error(f"Error in format str {Colors.ENDC}{string}{Colors.ERROR} code {Colors.ENDC}{section_data['FORMAT']}{Colors.ERROR} in {Colors.ENDC}{fichier}{Colors.ERROR} in {Colors.ENDC}{currentSurveyName}")
|
|
totReadMeError += f"\tError in format str {string} code {section_data['FORMAT']} in {fichier} in {currentSurveyName}\n"
|
|
globalData.error_count += 1
|
|
return ""
|
|
|
|
type_Data = "normal"
|
|
|
|
################################################ Section Units 0-3 ###############################################
|
|
if section_data['FORMAT'][0] == 'D' : compass = 'compass degree'
|
|
elif section_data['FORMAT'][0] == 'R' : compass = 'compass grads'
|
|
else :
|
|
compass = 'Compass_error'
|
|
log.error(f"Compass bearing unit 'quads' not yet implemented in {Colors.ENDC}{currentSurveyName}")
|
|
globalData.error_count += 1
|
|
totReadMeError += f"\tCompass bearing unit 'quads' not yet implemented in survey {currentSurveyName}\n"
|
|
|
|
if section_data['FORMAT'][1] == 'D' : length = 'length feet'
|
|
elif section_data['FORMAT'][1] == 'M' : length = 'length meter'
|
|
else :
|
|
length = 'Length_error'
|
|
log.error(f"Length unit 'Feet and Inches' not yet implemented in {Colors.ENDC}{currentSurveyName}")
|
|
globalData.error_count += 1
|
|
totReadMeError += f"\tLength unit 'Feet and Inches' not yet implemented in {currentSurveyName}\n"
|
|
|
|
|
|
if section_data['FORMAT'][3] == 'D' : clino = 'clino degree'
|
|
elif section_data['FORMAT'][3] == 'R' : clino = 'clino grads'
|
|
# elif section_data['FORMAT'][3] == 'G' : clino = 'percent' # %Grades à vérifier?
|
|
# elif section_data['FORMAT'][3] == 'M' : clino = 'grads' # Degrees and Minutes
|
|
elif section_data['FORMAT'][3] == 'W' :
|
|
clino = 'clino degree' # Depth Gauge
|
|
type_Data = "normal" # Depth Gauge
|
|
else :
|
|
clino = 'Inclination_error'
|
|
log.error(f"Inclination unit not yet implemented in {Colors.ENDC}{currentSurveyName}")
|
|
globalData.error_count += 1
|
|
totReadMeError += f"\tInclination unit not yet implemented in {currentSurveyName}\n"
|
|
|
|
################################################ Section dimensions 4-7 ###############################################
|
|
dataFormat = " " + headerData[5].lower()
|
|
dataFormat += " " + headerData[6].lower()
|
|
dataFormat += " " + headerData[7].lower()
|
|
dataFormat += " " + headerData[8].lower()
|
|
|
|
################################################ Section Shot 8-11 ou 13 ###############################################
|
|
if len(section_data['FORMAT']) == 11 or len(section_data['FORMAT']) == 12 or len(section_data['FORMAT']) == 13:
|
|
if len(section_data['FORMAT']) == 13 : # UUUUDDDDSSSBL
|
|
dataFormat = LRUD_association(section_data['FORMAT'][12]) + dataFormat
|
|
dataFormat = Backsight(section_data['FORMAT'][11]) + dataFormat # UUUUDDDDSSSB
|
|
elif len(section_data['FORMAT']) == 12 : dataFormat = Backsight(section_data['FORMAT'][11]) + dataFormat
|
|
dataFormat = ShotOrder(section_data['FORMAT'][10]) + dataFormat
|
|
dataFormat = ShotOrder(section_data['FORMAT'][9]) + dataFormat
|
|
dataFormat = ShotOrder(section_data['FORMAT'][8]) + dataFormat
|
|
|
|
elif len(section_data['FORMAT']) == 15 : # UUUUDDDDSSSSSBL
|
|
dataFormat = LRUD_association(section_data['FORMAT'][14]) + dataFormat
|
|
dataFormat = Backsight(section_data['FORMAT'][13]) + dataFormat
|
|
dataFormat = ShotOrder(section_data['FORMAT'][11]) + dataFormat
|
|
dataFormat = ShotOrder(section_data['FORMAT'][9]) + dataFormat
|
|
dataFormat = ShotOrder(section_data['FORMAT'][8]) + dataFormat
|
|
|
|
################################################ Section Shot 8-11 ou 13 ###############################################
|
|
|
|
|
|
dataFormat = "data " + type_Data + " from to" + dataFormat + " # comment"
|
|
|
|
return dataFormat, length, compass, clino, totReadMeError
|
|
|
|
|
|
#################################################################################################
|
|
# Convertit un fichier .tro en fichiers .th #
|
|
#################################################################################################
|
|
def tro_to_th_files(ENTRY_FILE, centerlines = [],
|
|
entrance = "",
|
|
fileTitle = "",
|
|
coordinates = [],
|
|
coordsyst = "",
|
|
fle_th_fnme = "",
|
|
CONFIG_PATH = "",
|
|
totReadMeError = "",
|
|
bar=None) :
|
|
"""
|
|
Convertit un fichier .tro en fichiers .th
|
|
|
|
Args:
|
|
ENTRY_FILE (str): Le chemin vers le fichier .dat d'entrée.
|
|
fixPoints (list, optional): Liste des points de fixation. Defaults to [].
|
|
crs_wkt (str, optional): Le système de référence spatiale en WKT. Defaults to "".
|
|
CONFIG_PATH (str, optional): Le chemin vers le fichier de configuration. Defaults to "".
|
|
|
|
Returns:
|
|
tuple: Un tuple contenant un DataFrame des stations et le nom du survey.
|
|
|
|
"""
|
|
|
|
#################################################################################################
|
|
# 1 : Initialisations #
|
|
#################################################################################################
|
|
data = []
|
|
unique_id = 1
|
|
totdata = f"\t## Input list:\n"
|
|
totMapsPlan = ""
|
|
totMapsExtended = ""
|
|
totReadMeErrorDat = ""
|
|
maps = ""
|
|
plan = ""
|
|
extended = ""
|
|
totReadMe = ""
|
|
surveyCount = 0
|
|
totReadMeFixPoint = f"\tcs {coordsyst}\n"
|
|
totReadMeFixPoint += f"\tFix point, station : {entrance}, coordinates: [{coordinates[0]} m, {coordinates[1]} m, {coordinates[2]} m]\n"
|
|
listStationSection = pd.DataFrame(columns=['StationName', 'Survey_Name'])
|
|
listStationSectionFixed = pd.DataFrame(columns=['Station_Fix', 'Survey_Name'])
|
|
threads = []
|
|
fixPoints = []
|
|
fixPoints.append([entrance, " ", coordinates[0], coordinates[1], coordinates[2]])
|
|
|
|
log.debug(f"{Colors.INFO}------------------------------------------------------------------------------------------------------------------{Colors.ENDC}")
|
|
|
|
SurveyTitle = sanitize_filename(os.path.basename(ENTRY_FILE)[:-4])
|
|
folderDest = os.path.dirname(ENTRY_FILE) + "\\" + SurveyTitle
|
|
|
|
copy_template_if_not_exists(globalData.templatePath,folderDest)
|
|
|
|
#################################################################################################
|
|
# 2 : Boucle pour convertir les centerlines #
|
|
#################################################################################################
|
|
|
|
for i, cl in enumerate( sorted(centerlines, key=lambda x: (x['date'] is None, x['date'])), start=1 ):
|
|
|
|
currentSurveyName = f"{globalData.SurveyPrefixName}{i:02d}_{sanitize_filename(cl['date'])}"
|
|
fileName = folderDest + "\\Data\\" + currentSurveyName + ".th"
|
|
|
|
log.debug(f"{Colors.INFO}Centerline # {Colors.ENDC}{i}")
|
|
log.debug(f"{Colors.INFO}Date : {Colors.ENDC}{cl['date']}")
|
|
log.debug(f"{Colors.INFO}Stations: {Colors.ENDC}{cl['DATA']}")
|
|
log.debug(f"{Colors.INFO}Lignes :{Colors.ENDC}")
|
|
|
|
add_lines = "\nencoding utf-8\n"
|
|
add_lines+= f"# File generated by pyCreateTh.py version: {globalData.Version} date: {datetime.now().strftime("%Y.%m.%d %H:%M:%S")}\n"
|
|
add_lines+= f'\nsurvey {globalData.SurveyPrefixName}{i:02d}_{sanitize_filename(cl['date'])} -title "{fileTitle} Explo num {i:02d}"'
|
|
|
|
cl['lines'] = [add_lines] + cl['lines'] + ["endsurvey"]
|
|
|
|
with open(str(fileName), "w+", encoding="utf-8") as f:
|
|
for line in cl['lines']:
|
|
log.debug(line)
|
|
f.write(f"{line}\n")
|
|
|
|
f.write(f"\n\n#############################################################################################")
|
|
f.write(f"\n# Originals data file : {args.file}")
|
|
if globalData.error_count == 0 :
|
|
f.write(f"\n# Conversion with pyCreateTh version {globalData.Version}, the {datetime.now().strftime("%Y.%m.%d %H:%M:%S")}, without error")
|
|
else :
|
|
f.write(f"\n# Conversion with pyCreateTh version {globalData.Version}, the {datetime.now().strftime("%Y.%m.%d %H:%M:%S")}, with {globalData.error_count} error(s)")
|
|
|
|
f.write(f"\n#############################################################################################\n\n")
|
|
for line in source_content.splitlines():
|
|
f.write(f"# {line}\n")
|
|
|
|
log.debug(f"{Colors.INFO}------------------------------------------------------------------------------------------------------------------{Colors.ENDC}")
|
|
|
|
# Ajouter les données de la section à la liste
|
|
if len(cl['DATA']) > 0 :
|
|
listStationSectionFixed, listStationSection, dfDATA = station_list_th(cl, listStationSection, listStationSectionFixed, fixPoints, currentSurveyName)
|
|
# print(f"Explo {i}, dfDATA : {dfDATA}")
|
|
# print(listStationSection)
|
|
|
|
StatCreateFolder, stat, totReadMeErrorDat, thread2 = create_th_folders(
|
|
fileName,
|
|
TARGET = None,
|
|
PROJECTION= args.proj,
|
|
SCALE = args.scale,
|
|
UPDATE = args.update,
|
|
CONFIG_PATH = "",
|
|
totReadMeError = totReadMeErrorDat,
|
|
args_file = args.file,
|
|
proj = args.proj.lower())
|
|
threads += thread2
|
|
|
|
log.info(f"File: {Colors.ENDC}{currentSurveyName}{Colors.INFO}, compilation successful, length: {Colors.ENDC}{stat["length"]}m{Colors.INFO}, depth: {Colors.ENDC}{stat["depth"]}m")
|
|
totReadMe += f"\t{currentSurveyName} compilation successful length: {stat["length"]} m, depth: {stat["depth"]} m\n"
|
|
|
|
if not StatCreateFolder :
|
|
totMapsPlan += f"\t{plan}MP-{currentSurveyName}-Plan-tot@{currentSurveyName}\n\t{plan}break\n"
|
|
totMapsExtended += f"\t{extended}MC-{currentSurveyName}-Extended-tot@{currentSurveyName}\n\t{extended}break\n"
|
|
surveyCount += 1
|
|
|
|
|
|
totdata +=f"\tinput Data/{currentSurveyName}/{currentSurveyName}-tot.th\n"
|
|
|
|
_destination = fileName[:-3] + "\\Sources"
|
|
destination_path = os.path.join(_destination, os.path.basename(fileName))
|
|
shutil.move(fileName, destination_path)
|
|
|
|
bar(1)
|
|
|
|
# pd.set_option("display.max_rows", None)
|
|
# pd.set_option("display.max_columns", None)
|
|
# pd.set_option("display.width", None)
|
|
|
|
# print(f"{Colors.DEBUG}listStationSection : {Colors.ENDC}{listStationSection}")
|
|
|
|
#################################################################################################
|
|
# Gestion des equates
|
|
#################################################################################################
|
|
|
|
totdata +=f"\n"
|
|
|
|
_stationList = listStationSection.copy()
|
|
|
|
# On numérote les doublons de Survey_Name pour chaque StationName
|
|
_stationList['Survey_Number'] = _stationList.groupby('StationName').cumcount() + 1
|
|
|
|
# print(f"{Colors.DEBUG}_stationList : {Colors.ENDC}{_stationList}")
|
|
|
|
# On pivote le tableau pour que chaque Survey_Name devienne une colonne
|
|
tableau_pivot = _stationList.pivot(index='StationName', columns='Survey_Number', values='Survey_Name_01')
|
|
|
|
tableau_pivot.columns = [f'Survey_Name_{i}' for i in tableau_pivot.columns]
|
|
|
|
# print(f"{Colors.DEBUG}tableau_pivot : {Colors.ENDC}{tableau_pivot}{Colors.DEBUG} in {Colors.ENDC}{currentSurveyName}")
|
|
|
|
totdata +=f"\n\t## equates list:\n"
|
|
|
|
if 'Survey_Name_2' in tableau_pivot.columns:
|
|
# On réinitialise l'index pour avoir StationName comme colonne normale
|
|
tableau_pivot = tableau_pivot.reset_index()
|
|
tableau_equate = tableau_pivot[tableau_pivot['Survey_Name_2'].notna()]
|
|
|
|
log.info(f"Total 'equates' founds: {Colors.ENDC}{len(tableau_equate)}{Colors.INFO} in {Colors.ENDC}{currentSurveyName}")
|
|
|
|
# print(f"{Colors.DEBUG}tableau_equate : {Colors.ENDC}{tableau_equate}")
|
|
# print(f"{Colors.DEBUG}fixePoints : {Colors.ENDC}{fixPoints}{Colors.DEBUG} in {Colors.ENDC}{currentSurveyName}")
|
|
|
|
# Pour chaque ligne du tableau
|
|
for _, row in tableau_equate.iterrows():
|
|
station = row['StationName']
|
|
|
|
# On récupère tous les Survey_Name non vides (NaN exclus)
|
|
surveys = [row[col] for col in tableau_equate.columns if col.startswith('Survey_Name') and pd.notna(row[col])]
|
|
|
|
# Pour chaque paire unique (i < j), on écrit la ligne 'equate'
|
|
for i in range(len(surveys)):
|
|
for j in range(i + 1, len(surveys)):
|
|
totdata +=f"\tequate {station}@{surveys[i]}.{surveys[i]} {station}@{surveys[j]}.{surveys[j]}\n"
|
|
else:
|
|
log.info(f"No 'equates' found in {Colors.ENDC}{currentSurveyName}")
|
|
|
|
totdata +=f"\n\t## Maps list:\n\t{maps}input {SurveyTitle}-maps.th\n"
|
|
|
|
if totReadMeErrorDat == "" : totReadMeErrorDat += "\tThis file has no errors, perfect!\n"
|
|
|
|
for index, row in listStationSectionFixed.iterrows():
|
|
# log.info(f"Fixed station : {Colors.ENDC}{row['Station_Fix']}{Colors.INFO}, Survey : {Colors.ENDC}{row['Survey_Name']}")
|
|
totReadMeFixPoint += f"\tFixed station : {row['Station_Fix']}, Survey : {row['Survey_Name']}\n"
|
|
|
|
config_vars = {
|
|
'fileName': SurveyTitle,
|
|
'caveName': SurveyTitle.replace("_", " "),
|
|
'Author': globalData.Author,
|
|
'Copyright': globalData.Copyright,
|
|
'Scale' : args.scale,
|
|
'Target' : "TARGET",
|
|
'mapComment' : globalData.mapComment,
|
|
'club' : globalData.club,
|
|
'thanksto' : globalData.thanksto,
|
|
'datat' : globalData.datat,
|
|
'wpage' : globalData.wpage,
|
|
'cs' : coordsyst if coordsyst != "" else globalData.cs,
|
|
'totData' : totdata,
|
|
'maps' :maps,
|
|
'plan': plan,
|
|
'XVIscale': globalData.XVIScale,
|
|
'extended': extended,
|
|
'configPath' : "",
|
|
'other_scraps_plan' : totMapsPlan,
|
|
'readMeList' : totReadMe,
|
|
'errorList' : totReadMeErrorDat,
|
|
'fixPointList' : totReadMeFixPoint,
|
|
'other_scraps_extended' : totMapsExtended,
|
|
'file_info' : f"# File generated by pyCreateTh.py version: {globalData.Version} date: {datetime.now().strftime("%Y.%m.%d-%H:%M:%S")}",
|
|
}
|
|
|
|
DEST_PATH = os.path.dirname(ENTRY_FILE) + '/' + SurveyTitle
|
|
|
|
# from pprint import pprint
|
|
|
|
# pprint(config_vars, width=120, depth=2)
|
|
|
|
update_template_files(DEST_PATH + '/template.thconfig', config_vars, DEST_PATH + '/' + SurveyTitle + '.thconfig')
|
|
update_template_files(DEST_PATH + '/template-tot.th', config_vars, DEST_PATH + '/' + SurveyTitle + '-tot.th')
|
|
update_template_files(DEST_PATH + '/template-maps.th', config_vars, DEST_PATH + '/' + SurveyTitle + '-maps.th')
|
|
|
|
|
|
#################################################################################################
|
|
# Final therion compilation #
|
|
#################################################################################################
|
|
|
|
if globalData.finalTherionExe == True:
|
|
FILE = DEST_PATH + '/' + SurveyTitle + '.thconfig'
|
|
t = compile_file(FILE, therion_path=globalData.therionPath)
|
|
threads.append(t)
|
|
|
|
for thread in threads: # Attendre que tous les threads se terminent
|
|
thread.join()
|
|
|
|
logfile = (DEST_PATH + '/therion.log').replace("\\", "/")
|
|
|
|
with open(logfile, 'r') as f:
|
|
content = f.read()
|
|
# print(content)
|
|
|
|
stat = get_stats_from_log(content)
|
|
|
|
if stat["length"] != 0.0 and stat["depth"] != 0.0 :
|
|
totReadMe += f"\tFinal compilation successful length: {stat["length"]} m, depth: {stat["depth"]} m\n"
|
|
log.info(f"Final compilation successful length: {Colors.ENDC}{stat["length"]}{Colors.INFO} m, depth: {Colors.ENDC}{stat["depth"]}{Colors.INFO} m")
|
|
else :
|
|
totReadMe += f"\tFinal compilation error, check log file\n"
|
|
log.error(f"Final compilation error, check log file")
|
|
|
|
config_vars['readMeList'] = totReadMe
|
|
|
|
update_template_files(DEST_PATH + '/template-readme.md', config_vars, DEST_PATH +'/' + SurveyTitle + '-readme.md')
|
|
|
|
return _stationList, SurveyTitle, totReadMeError, threads
|
|
|
|
|
|
#################################################################################################
|
|
# Convertit un fichier .dat en fichiers .th #
|
|
#################################################################################################
|
|
def dat_to_th_files (ENTRY_FILE, fixPoints = [], crs_wkt = "", CONFIG_PATH = "", totReadMeError = "", bar=None) :
|
|
"""
|
|
Convertit un fichier .dat en fichiers .th
|
|
|
|
Args:
|
|
ENTRY_FILE (str): Le chemin vers le fichier .dat d'entrée.
|
|
fixPoints (list, optional): Liste des points de fixation. Defaults to [].
|
|
crs_wkt (str, optional): Le système de référence spatiale en WKT. Defaults to "".
|
|
CONFIG_PATH (str, optional): Le chemin vers le fichier de configuration. Defaults to "".
|
|
|
|
Returns:
|
|
tuple: Un tuple contenant un DataFrame des stations et le nom du survey.
|
|
|
|
"""
|
|
|
|
|
|
# Détecter la fin de section (FF CR LF qui correspond à \x0c\r\n)
|
|
section_separator = '\x0c'
|
|
shortCurentFile = os.path.basename(ENTRY_FILE)
|
|
|
|
|
|
#################################################################################################
|
|
# 1 : Lecture du fichier dat #
|
|
#################################################################################################
|
|
|
|
content, totReadMe, enc = load_text_file_utf8(ENTRY_FILE, shortCurentFile)
|
|
|
|
#################################################################################################
|
|
# Séparer les sections #
|
|
#################################################################################################
|
|
sections = content.split(section_separator)
|
|
|
|
# Listes pour stocker les données
|
|
data = []
|
|
unique_id = 1
|
|
totdata = f"\t## Input list:\n"
|
|
totMapsPlan = ""
|
|
totMapsExtended = ""
|
|
totReadMeErrorDat = ""
|
|
totReadMeFixPoint = f"cs {crs_wkt}\n"
|
|
threads = []
|
|
|
|
# Tableau global pour stocker toutes les stations
|
|
stationList = pd.DataFrame(columns=['StationName', 'Survey_Name_01', 'Survey_Name_02'])
|
|
stationList_Fixed = pd.DataFrame(columns=['StationName', 'Survey_Name'])
|
|
|
|
section0 = True;
|
|
|
|
#################################################################################################
|
|
# 2 : Boucle pour lire les surveys au format dat #
|
|
#################################################################################################
|
|
for section in sections:
|
|
|
|
listStationSection = pd.DataFrame(columns=['StationName', 'Survey_Name'])
|
|
listStationSection_Fixed = pd.DataFrame(columns=['StationName', 'Survey_Name'])
|
|
|
|
if not section.strip():
|
|
continue # ignorer les sections vides
|
|
|
|
# Dictionnaire pour stocker les infos de la section courante
|
|
section_data = {
|
|
'ID': unique_id,
|
|
'SURVEY_TITLE': None,
|
|
'SURVEY_NAME': None,
|
|
'SURVEY_DATE': None,
|
|
'COMMENT' : None,
|
|
'SURVEY_TEAM': None,
|
|
'DECLINATION': None,
|
|
'FORMAT': None,
|
|
'CORRECTIONS' : None,
|
|
"CORRECTIONS2": None,
|
|
"DISCOVERY": None,
|
|
"PREFIX": None,
|
|
'DATA' : [],
|
|
'STATION': [],
|
|
'STATION_FIXED': [],
|
|
'SOURCE' : []
|
|
}
|
|
|
|
regex_patterns = {
|
|
"DECLINATION": r"DECLINATION:\s*([\d\.\-]+)",
|
|
"FORMAT": r"FORMAT:\s*([A-Za-z]+)",
|
|
"CORRECTIONS": r"CORRECTIONS:\s*([\d\.\-]+\s+[\d\.\-]+\s+[\d\.\-]+)",
|
|
"CORRECTIONS2": r"CORRECTIONS2:\s*([\d\.\-]+\s+[\d\.\-]+)",
|
|
"DISCOVERY": r"DISCOVERY:\s*(\d+\s+\d+\s+\d+)",
|
|
"PREFIX": r"PREFIX:\s*(\S+)"
|
|
}
|
|
|
|
# Parcourir les lignes de la section
|
|
lines = section.split('\n')
|
|
|
|
section_data['SOURCE'] = section
|
|
|
|
NextLineSurveyTeam = False
|
|
|
|
if lines:
|
|
if section0 :
|
|
section_data['SURVEY_TITLE'] = lines[0].strip()
|
|
lines = lines[1:] # Supprimer la première ligne
|
|
section0 = False
|
|
else :
|
|
lines = lines[1:]
|
|
section_data['SURVEY_TITLE'] = lines[0].strip()
|
|
lines = lines[1:] # Supprimer la première ligne
|
|
|
|
jumpLine = False
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if jumpLine == True :
|
|
jumpLine = False
|
|
line = line.strip()
|
|
|
|
elif line.startswith('SURVEY NAME:'):
|
|
section_data['SURVEY_NAME'] = sanitize_filename(line.split(':', 1)[1].strip())
|
|
|
|
elif line.startswith('SURVEY DATE:'):
|
|
# current_field = 'DATE'
|
|
# Séparer la date et le commentaire
|
|
date_parts = line.split(':', 1)[1].strip().split('COMMENT:', 1)
|
|
date = date_parts[0].strip()
|
|
mois, jour, annee = date.split()
|
|
date_convertie = f"{int(annee):04d} {int(mois):02d} {int(jour):02d}"
|
|
section_data['SURVEY_DATE'] = date_convertie
|
|
if section_data['SURVEY_DATE'] == None or section_data['SURVEY_DATE'] == '' :
|
|
section_data['SURVEY_DATE'] = "2000 01 01"
|
|
log.warning(f"Survey {Colors.ENDC}{section_data['SURVEY_NAME']}{Colors.WARNING} with no date, add default date 2000 01 01 ")
|
|
if len(date_parts) > 1:
|
|
section_data['COMMENT'] = date_parts[1].strip()
|
|
|
|
elif line.startswith('SURVEY TEAM:'):
|
|
NextLineSurveyTeam = True
|
|
line.strip()
|
|
|
|
elif NextLineSurveyTeam == True :
|
|
NextLineSurveyTeam = False
|
|
val = line.strip()
|
|
if val.count(' ') >= 2:
|
|
val = val.replace(' ', '/', 1)
|
|
section_data['SURVEY_TEAM'] = val
|
|
|
|
elif line.startswith('DECLINATION:'):
|
|
for champ, pattern in regex_patterns.items():
|
|
match = re.search(pattern, line)
|
|
if match:
|
|
section_data[champ] = match.group(1).strip()
|
|
jumpLine = True # Sauter une ligne après la ligne DECLINATION
|
|
|
|
else :
|
|
if line.strip() != '' :
|
|
section_data['DATA'].append(line.strip())
|
|
else :
|
|
line.strip()
|
|
|
|
# Ajouter les données de la section à la liste
|
|
if len(section_data['DATA']) > 0 :
|
|
listStationSection_Fixed, listStationSection, dfDATA = station_list_dat(section_data, listStationSection, listStationSection_Fixed, fixPoints, section_data['SURVEY_NAME'])
|
|
section_data['STATION'] = listStationSection
|
|
section_data['STATION_FIXED'] = listStationSection_Fixed
|
|
data.append(section_data)
|
|
unique_id += 1
|
|
|
|
|
|
#################################################################################################
|
|
# Détecter les surveys avec plusieurs points de départ #
|
|
#################################################################################################
|
|
|
|
# points = points_uniques(section_data, crs_wkt)
|
|
|
|
# if len(points) > 1 :
|
|
# log.warning(f"Points {Colors.ENDC}{points}{Colors.WARNING} uniques dans la section {Colors.ENDC}{section_data['SURVEY_NAME']}")
|
|
# # globalData.error_count += 1
|
|
|
|
# else :
|
|
# log.debug(f"Points {Colors.ENDC}{points}{Colors.DEBUG} uniques dans la section {section_data['SURVEY_NAME']}")
|
|
|
|
|
|
#################################################################################################
|
|
# Grouper les sections ayant même date team et un point commun #
|
|
#################################################################################################
|
|
val1 = len(data)
|
|
|
|
# duplicates = find_duplicates_by_date_and_team(data)
|
|
duplicates = find_duplicates_by_date(data)
|
|
|
|
data = merge_duplicate_surveys(data, duplicates)
|
|
|
|
val2 = val1 - len(data)
|
|
|
|
if val2 != 0 :
|
|
log.info(f"Read dat file: {Colors.ENDC}{shortCurentFile}{Colors.INFO} with {Colors.ENDC}{len(data)}{Colors.GREEN}{Colors.INFO} survey(s) and merged {Colors.ENDC}{val2}")
|
|
bar(val2)
|
|
else :
|
|
log.info(f"Read dat file: {Colors.ENDC}{shortCurentFile}{Colors.INFO} with {Colors.ENDC}{len(data)}{Colors.INFO} survey(s)")
|
|
|
|
|
|
#################################################################################################
|
|
# Créer le dossier pour les fichiers convertis #
|
|
#################################################################################################
|
|
|
|
if data[0]['SURVEY_TITLE'] !="" :
|
|
SurveyTitle = sanitize_filename(data[0]['SURVEY_TITLE'])
|
|
folderDest = os.path.dirname(ENTRY_FILE) + "\\" + SurveyTitle
|
|
if os.path.isdir(folderDest):
|
|
SurveyTitle = sanitize_filename(os.path.basename(ENTRY_FILE[:-4]))
|
|
else :
|
|
SurveyTitle = sanitize_filename(os.path.basename(ENTRY_FILE[:-4]))
|
|
|
|
folderDest = os.path.dirname(ENTRY_FILE) + "\\" + SurveyTitle
|
|
|
|
copy_template_if_not_exists(globalData.templatePath,folderDest)
|
|
|
|
if args.file[-3:].lower() != "dat" :
|
|
_destination = folderDest + "\\config.thc"
|
|
# print(f"destination_path : {_destination}")
|
|
os.remove(_destination)
|
|
|
|
# Trie des données par date
|
|
data = sorted(data, key=lambda x: x['SURVEY_DATE'] or "")
|
|
|
|
#################################################################################################
|
|
# 3 : Boucle pour créer les surveys au format th #
|
|
#################################################################################################
|
|
|
|
surveyCount = 1
|
|
|
|
# totReadMe += f"* Source file: {os.path.basename(ENTRY_FILE)}\n"
|
|
|
|
proj = args.proj.lower()
|
|
values = {
|
|
"none": ("# ", "# ", "# "),
|
|
"plan": ("", "", "# "),
|
|
"extended": ("", "# ", ""),
|
|
}
|
|
|
|
maps, plan, extended = values.get(proj, ("", "", ""))
|
|
|
|
for _line in data :
|
|
|
|
# currentSurveyName = f"{globalData.typeSurveyName}{surveyCount:02d}"
|
|
# currentSurveyName = f"{globalData.typeSurveyName}{surveyCount:02d}_{sanitize_filename(_line['SURVEY_NAME'])}"
|
|
currentSurveyName = f"{globalData.SurveyPrefixName}{surveyCount:02d}_{sanitize_filename(_line['SURVEY_DATE'])}"
|
|
|
|
output_file = f"{folderDest}\\Data\\{currentSurveyName}.th"
|
|
|
|
#################################################################################################
|
|
# gestion des CORRECTIONS #
|
|
#################################################################################################
|
|
|
|
_CorrectionValues = [float(val) for val in _line['CORRECTIONS'].strip().split()]
|
|
|
|
if all(val == 0.0 for val in _CorrectionValues) :
|
|
_corrections = ""
|
|
|
|
else :
|
|
_corrections = f"\t\t# Corrections: {_CorrectionValues[0]} {_CorrectionValues[1]} {_CorrectionValues[2]}, not yet implemented\n"
|
|
log.error(f"Corrections: {Colors.ENDC}{_CorrectionValues[0]} {_CorrectionValues[1]} {_CorrectionValues[2]}{Colors.ERROR}, not yet implemented in {Colors.ENDC}{currentSurveyName}")
|
|
totReadMeError += f"\tCorrections: {_CorrectionValues[0]} {_CorrectionValues[1]} {_CorrectionValues[2]}, not yet implemented in {currentSurveyName}\n"
|
|
globalData.error_count += 1
|
|
|
|
if _line['CORRECTIONS2'] != None :
|
|
_CorrectionValues3 = [float(val) for val in _line['CORRECTIONS2'].strip().split()]
|
|
if all(val == 0.0 for val in _CorrectionValues) :
|
|
_CorrectionValues3 = ""
|
|
|
|
else :
|
|
log.error(f"Corrections2: {Colors.ENDC}{_CorrectionValues[0]} {_CorrectionValues[1]} {_CorrectionValues[2]}{Colors.ERROR}, not yet implemented in {Colors.ENDC}{currentSurveyName}")
|
|
totReadMeError += f"\tCorrections2: {_CorrectionValues[0]} {_CorrectionValues[1]} {_CorrectionValues[2]}, not yet implemented in {currentSurveyName}\n"
|
|
globalData.error_count += 1
|
|
|
|
if _line['DISCOVERY'] != None :
|
|
date = _line['DISCOVERY'].strip()
|
|
mois, jour, annee = date.split()
|
|
discovery = f"{int(annee):04d} {int(mois):02d} {int(jour):02d}"
|
|
|
|
else :
|
|
discovery = f"{_line['SURVEY_DATE']} # '????'"
|
|
|
|
if _line['PREFIX'] != None :
|
|
log.error(f"PREFIX: {Colors.ENDC}{_line['PREFIX']}, not yet implemented in {Colors.ENDC}{currentSurveyName}")
|
|
totReadMeError += f"\tPREFIX: {_line['PREFIX']}, not yet implemented in {currentSurveyName}\n"
|
|
globalData.error_count += 1
|
|
|
|
SurveyNameCount = {
|
|
'surveyCount' :f"{currentSurveyName}",
|
|
'SURVEY_NAME': _line['SURVEY_NAME']
|
|
}
|
|
|
|
|
|
#################################################################################################
|
|
# gestion des DATA #
|
|
#################################################################################################
|
|
|
|
stationList_Fixed, stationList, dfDATA = station_list_dat(_line, stationList, stationList_Fixed, fixPoints, currentSurveyName)
|
|
|
|
headerData = dfDATA.iloc[0].tolist()
|
|
|
|
#################################################################################################
|
|
# Recherche des points fixes (entrées)
|
|
#################################################################################################
|
|
|
|
fixPoint =""
|
|
|
|
# Extraire les noms des stations depuis dfDATA
|
|
stations_from = set(dfDATA.iloc[:, 0]) # Colonne 'FROM'
|
|
stations_to = set(dfDATA.iloc[:, 1]) # Colonne 'TO'
|
|
all_stations = stations_from.union(stations_to)
|
|
|
|
# Filtrer fixPoints pour garder seulement ceux présents dans dfDATA
|
|
list_common_points = [point for point in fixPoints if point[0] in all_stations]
|
|
|
|
# Afficher le résultat
|
|
# print(list_common_points)
|
|
|
|
if len(list_common_points) >= 1 :
|
|
fixPoint += f"\t\tcs {crs_wkt}\n"
|
|
for point in list_common_points :
|
|
totReadMeFixPoint += f"\tFix point: {point[0]} [{point[2]:.2f} m, {point[3]:.2f} m, {point[4]:.2f} m], in {currentSurveyName}\n"
|
|
if point[1] == 'm' :
|
|
fixPoint += f"\t\tfix {point[0]} {point[2]:.2f} {point[3]:.2f} {point[4]:.3f}\n"
|
|
elif point[1] == 'f' :
|
|
fixPoint += f"\t\tfix {point[0]} {point[2]*0.3048:.2f} {point[3]*0.3048:.2f} {point[4]*0.3048:.2f} # Conversion feet - meter\n"
|
|
fixPoint += f'\t\tstation {point[0]} "{point[0]}" entrance\n'
|
|
|
|
|
|
|
|
#################################################################################################
|
|
# Gestion des formats
|
|
#################################################################################################
|
|
|
|
dataFormat, length, compass, clino, totReadMeErrorDat = dat_survey_format_extract(_line, headerData, currentSurveyName, shortCurentFile, totReadMeErrorDat)
|
|
|
|
if "grads" in compass:
|
|
_compass = "grads"
|
|
else:
|
|
_compass = "degree"
|
|
|
|
#################################################################################################
|
|
# Gestion des formats
|
|
#################################################################################################
|
|
|
|
with open(str(output_file), "w+", encoding="utf-8") as f:
|
|
f.write(globalData.thFileDat.format(
|
|
VERSION = globalData.Version,
|
|
DATE=datetime.now().strftime("%Y.%m.%d-%H:%M:%S"),
|
|
# SURVEY_NAME = sanitize_filename(_line['SURVEY_NAME']),
|
|
SURVEY_NAME = f"{currentSurveyName}",
|
|
SURVEY_TITLE = _line['SURVEY_NAME'].replace("_", " "),
|
|
SURVEY_DATE = _line['SURVEY_DATE'],
|
|
SURVEY_TEAM = _line['SURVEY_TEAM'],
|
|
FORMAT = _line['FORMAT'],
|
|
COMPASS = compass,
|
|
LENGTH = length,
|
|
CLINO = clino,
|
|
DATA_FORMAT = dataFormat,
|
|
CORRECTIONS =_corrections,
|
|
DECLINATION = f"\t\tdeclination {_line['DECLINATION']} {_compass}\n" if (crs_wkt == "" and _line['DECLINATION'] != 0.0) else "",
|
|
DATA = formated_station_list(dfDATA, dataFormat, length, shortCurentFile),
|
|
COMMENT = sanitize_filename(_line['SURVEY_NAME'] + " " + _line['COMMENT']).replace('"', "'").replace('_', " "),
|
|
FIX_POINTS = fixPoint,
|
|
EXPLO_DATE = discovery,
|
|
EXPLO_TEAM = f"{_line['SURVEY_TEAM']} # '????'",
|
|
SOURCE = '\n'.join('# ' + line for line in _line['SOURCE'].splitlines()),
|
|
)
|
|
)
|
|
|
|
totdata +=f"\tinput Data/{currentSurveyName}/{currentSurveyName}-tot.th\n"
|
|
|
|
log.info(f"Therion file : {Colors.ENDC}{safe_relpath(output_file)}{Colors.GREEN} created from {Colors.ENDC}{os.path.basename(ENTRY_FILE)}")
|
|
|
|
#################################################################################################
|
|
# Création des dossiers
|
|
#################################################################################################
|
|
|
|
_Config_PATH = CONFIG_PATH + "../../"
|
|
|
|
StatCreateFolder, stat, totReadMeErrorDat, thread2 = create_th_folders(
|
|
ENTRY_FILE = output_file,
|
|
PROJECTION = args.proj,
|
|
SCALE = args.scale,
|
|
UPDATE = args.update,
|
|
CONFIG_PATH = _Config_PATH,
|
|
totReadMeError = totReadMeErrorDat,
|
|
args_file = args.file,
|
|
proj = args.proj.lower()
|
|
)
|
|
threads += thread2
|
|
|
|
log.info(f"File: {Colors.ENDC}{currentSurveyName}{Colors.INFO}, compilation successful, length: {Colors.ENDC}{stat["length"]}m{Colors.INFO}, depth: {Colors.ENDC}{stat["depth"]}m")
|
|
totReadMe += f"\t{currentSurveyName} compilation successful length: {stat["length"]} m, depth: {stat["depth"]} m\n"
|
|
|
|
_destination = output_file[:-3] + "\\Sources"
|
|
destination_path = os.path.join(_destination, os.path.basename(output_file))
|
|
shutil.move(output_file, destination_path)
|
|
|
|
if args.file[-3:].lower() != "dat" :
|
|
_destination = output_file[:-3] + "\\config.thc"
|
|
destination_path = os.path.join(_destination, os.path.basename(output_file))
|
|
# print(f"destination_path : {_destination}")
|
|
os.remove(_destination)
|
|
|
|
if not StatCreateFolder :
|
|
totMapsPlan += f"\t{plan}MP-{currentSurveyName}-Plan-tot@{currentSurveyName}\n\t{plan}break\n"
|
|
totMapsExtended += f"\t{extended}MC-{currentSurveyName}-Extended-tot@{currentSurveyName}\n\t{extended}break\n"
|
|
surveyCount += 1
|
|
|
|
if globalData.error_count > 0:
|
|
bar.text(f"{Colors.INFO}file: {Colors.ENDC}{os.path.basename(ENTRY_FILE)[:-4]}{Colors.INFO}, survey: {Colors.ENDC}{currentSurveyName}{Colors.ERROR}, error: {Colors.ENDC}{globalData.error_count}")
|
|
else :
|
|
bar.text(f"{Colors.INFO}file: {Colors.ENDC}{os.path.basename(ENTRY_FILE)[:-4]}{Colors.INFO}, survey: {Colors.ENDC}{currentSurveyName}")
|
|
bar()
|
|
|
|
#################################################################################################
|
|
# 4 : Finalisation (remplissage des -tot.th et maps.th #
|
|
#################################################################################################
|
|
|
|
#################################################################################################
|
|
# Gestion des equates
|
|
#################################################################################################
|
|
|
|
totdata +=f"\n"
|
|
|
|
_stationList = stationList.copy()
|
|
|
|
# On numérote les doublons de Survey_Name pour chaque StationName
|
|
_stationList['Survey_Number'] = _stationList.groupby('StationName').cumcount() + 1
|
|
|
|
# print(_stationList)
|
|
|
|
# On pivote le tableau pour que chaque Survey_Name devienne une colonne
|
|
tableau_pivot = _stationList.pivot(index='StationName', columns='Survey_Number', values='Survey_Name_01')
|
|
|
|
tableau_pivot.columns = [f'Survey_Name_{i}' for i in tableau_pivot.columns]
|
|
|
|
# print(f"tableau_pivot: {Colors.ENDC}{tableau_pivot}{Colors.INFO} in {Colors.ENDC}{ENTRY_FILE}")
|
|
|
|
totdata +=f"\n\t## equates list:\n"
|
|
|
|
if 'Survey_Name_2' in tableau_pivot.columns:
|
|
# On réinitialise l'index pour avoir StationName comme colonne normale
|
|
tableau_pivot = tableau_pivot.reset_index()
|
|
tableau_equate = tableau_pivot[tableau_pivot['Survey_Name_2'].notna()]
|
|
|
|
log.info(f"Total '{Colors.ENDC}equates{Colors.INFO}' founds : {Colors.ENDC}{len(tableau_equate)}{Colors.INFO} in {Colors.ENDC}{shortCurentFile}")
|
|
# print(tableau_equate)
|
|
# print(f"fixePoints : {Colors.ENDC}{fixed_names}{Colors.INFO} in {Colors.ENDC}{ENTRY_FILE}")
|
|
|
|
# Pour chaque ligne du tableau
|
|
for _, row in tableau_equate.iterrows():
|
|
station = row['StationName']
|
|
|
|
# On récupère tous les Survey_Name non vides (NaN exclus)
|
|
surveys = [row[col] for col in tableau_equate.columns if col.startswith('Survey_Name') and pd.notna(row[col])]
|
|
|
|
# Pour chaque paire unique (i < j), on écrit la ligne 'equate'
|
|
for i in range(len(surveys)):
|
|
for j in range(i + 1, len(surveys)):
|
|
totdata +=f"\tequate {station}@{surveys[i]}.{surveys[i]} {station}@{surveys[j]}.{surveys[j]}\n"
|
|
else:
|
|
log.info(f"No '{Colors.ENDC}equates{Colors.INFO}' found in {Colors.ENDC}{ENTRY_FILE}")
|
|
|
|
totdata +=f"\n\t## Maps list:\n\t{maps}input {SurveyTitle}-maps.th\n"
|
|
|
|
if totReadMeErrorDat == "" : totReadMeErrorDat += "\tNo errors in the file, that's excellent !\n"
|
|
|
|
stationList_Fixed = stationList_Fixed.drop_duplicates()
|
|
|
|
for index, row in stationList_Fixed.iterrows():
|
|
# log.info(f"Fixed station: {Colors.ENDC}{row['Station_Fix']}{Colors.INFO}, survey: {Colors.ENDC}{row['Survey_Name']}")
|
|
totReadMeFixPoint += f"\tFixed station: {row['Station_Fix']}, Survey: {row['Survey_Name']}\n"
|
|
|
|
config_vars = {
|
|
'fileName': SurveyTitle,
|
|
'caveName': SurveyTitle.replace("_", " "),
|
|
'Author': globalData.Author,
|
|
'Copyright': globalData.Copyright,
|
|
'Scale' : args.scale,
|
|
'Target' : "TARGET",
|
|
'mapComment' : globalData.mapComment,
|
|
'club' : globalData.club,
|
|
'thanksto' : globalData.thanksto,
|
|
'datat' : globalData.datat,
|
|
'wpage' : globalData.wpage,
|
|
'cs' : crs_wkt if crs_wkt != "" else globalData.cs,
|
|
'totData' : totdata,
|
|
'maps' : maps,
|
|
'plan': plan,
|
|
'XVIscale':globalData.XVIScale,
|
|
'extended': extended,
|
|
'configPath' : CONFIG_PATH,
|
|
'other_scraps_plan' : totMapsPlan,
|
|
'readMeList' : totReadMe,
|
|
'errorList' : totReadMeErrorDat,
|
|
'fixPointList' : totReadMeFixPoint,
|
|
'other_scraps_extended' : totMapsExtended,
|
|
'file_info' : f"# File generated by pyCreateTh.py version: {globalData.Version} date: {datetime.now().strftime("%Y.%m.%d-%H:%M:%S")}",
|
|
}
|
|
|
|
DEST_PATH = os.path.dirname(ENTRY_FILE) + '/' + SurveyTitle
|
|
|
|
update_template_files(DEST_PATH + '/template.thconfig', config_vars, DEST_PATH + '/' + SurveyTitle + '.thconfig')
|
|
update_template_files(DEST_PATH + '/template-tot.th', config_vars, DEST_PATH + '/' + SurveyTitle + '-tot.th')
|
|
update_template_files(DEST_PATH + '/template-maps.th', config_vars, DEST_PATH + '/' + SurveyTitle + '-maps.th')
|
|
|
|
#################################################################################################
|
|
# Final therion compilation #
|
|
#################################################################################################
|
|
|
|
if globalData.finalTherionExe == True :
|
|
FILE = DEST_PATH + '/' + SurveyTitle + '.thconfig'
|
|
t = compile_file(FILE, therion_path=globalData.therionPath)
|
|
threads.append(t)
|
|
|
|
for thread in threads: # Attendre que tous les threads se terminent
|
|
thread.join()
|
|
|
|
logfile = (DEST_PATH + '/therion.log').replace("\\", "/")
|
|
|
|
with open(logfile, 'r') as f:
|
|
content = f.read()
|
|
# print(content)
|
|
|
|
stat = get_stats_from_log(content)
|
|
|
|
if stat["length"] != 0.0 and stat["depth"] != 0.0 :
|
|
totReadMe += f"\tFinal compilation successful length: {stat["length"]} m, depth: {stat["depth"]} m\n"
|
|
log.info(f"Final compilation successful length: {Colors.ENDC}{stat["length"]}{Colors.INFO} m, depth: {Colors.ENDC}{stat["depth"]}{Colors.INFO} m")
|
|
else :
|
|
totReadMe += f"\tFinal compilation error, check log file\n"
|
|
log.error(f"Final compilation error, check log file")
|
|
|
|
config_vars['readMeList'] = totReadMe
|
|
|
|
update_template_files(DEST_PATH + '/template-readme.md', config_vars, DEST_PATH +'/' + SurveyTitle + '-readme.md')
|
|
|
|
stationList["Survey_Name_02"] = SurveyTitle
|
|
|
|
totReadMeError += totReadMeErrorDat
|
|
|
|
return stationList, SurveyTitle, totReadMeError, threads
|
|
|
|
|
|
#################################################################################################
|
|
def wait_until_file_is_released(filepath, timeout=30):
|
|
"""Wait until a file is released (i.e., not locked by another process).
|
|
|
|
Args:
|
|
filepath (str): The path to the file to check.
|
|
timeout (int, optional): The maximum time to wait in seconds. Defaults to 30.
|
|
|
|
Returns:
|
|
bool: True if the file is released, False if the timeout is reached.
|
|
|
|
"""
|
|
|
|
start = time.time()
|
|
while True:
|
|
try:
|
|
with open(filepath, "rb"):
|
|
return True
|
|
|
|
except PermissionError:
|
|
if time.time() - start > timeout:
|
|
log.Error(f"Timeout: The file remains locked after {Colors.ENDC}{timeout}{Colors.ERROR} secondes: {Colors.ENDC}{filepath}")
|
|
time.sleep(0.1) # attend 100 ms
|
|
|
|
|
|
#################################################################################################
|
|
# main function #
|
|
#################################################################################################
|
|
if __name__ == u'__main__':
|
|
|
|
start_time = datetime.now()
|
|
threads = []
|
|
fileTitle = ""
|
|
_fileTitle = ""
|
|
|
|
#################################################################################################
|
|
# Parse arguments #
|
|
#################################################################################################
|
|
parser = argparse.ArgumentParser(
|
|
description=f"{Colors.BLUE}Create a skeleton folder and th, th2 files with scraps from *.tro, *.mak, *.dat, *.th Therion files, version: {Colors.ENDC}{globalData.Version}\n",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
parser.print_help = colored_help.__get__(parser)
|
|
parser.add_argument("--file", help="the file (*.th, *.mak, *.dat, *.tro) to perform e.g. './Therion_file.th'", default="")
|
|
parser.add_argument("--proj", choices=['All', 'Plan', 'Extended', 'None'], help="the th2 files scrap projection to produce, default: All", default="All")
|
|
parser.add_argument("--scale", help="scale for the pdf layout exports, default value: 1000 (i.e. xvi files scale is 100)", default="1000")
|
|
parser.add_argument("--update", help="th2 files update mode (only for th input files, no folders created)", action="store_true", default=False)
|
|
|
|
parser.epilog = (
|
|
f"{Colors.GREEN}Please, complete {Colors.BLUE}config.ini{Colors.GREEN} in {Colors.BLUE}FILE{Colors.GREEN} folder or in script folder for personal configuration{Colors.ENDC}\n"
|
|
f"{Colors.GREEN}If no argument: {Colors.BLUE} files selection by a windows\n{Colors.ENDC}\n"
|
|
f"{Colors.BLUE}Examples:{Colors.ENDC}\n"
|
|
f"\t> python pyCreateTh.py ./Tests/Entree.th --scale 1000\n"
|
|
f"\t> python pyCreateTh.py Entree.th\n"
|
|
f"\t> python pyCreateTh.py\n\n")
|
|
args = parser.parse_args()
|
|
|
|
if args.file == "":
|
|
args.file = select_file_tk_window()
|
|
# print(f"Selected file : {args.file}")
|
|
|
|
output_log = splitext(abspath(args.file))[0] + ".log"
|
|
log = setup_logger(output_log, debug_log)
|
|
|
|
# log.debug("Ceci est un message de debug")
|
|
# log.info("Tout va bien")
|
|
# log.warning("Attention, possible souci")
|
|
# log.error("Une erreur est survenue")
|
|
# log.critical("Erreur critique !")
|
|
|
|
if os.name == 'posix': os.system('clear') # Linux, MacOS
|
|
elif os.name == 'nt': os.system('cls')# Windows
|
|
else: print("\n" * 100)
|
|
|
|
#################################################################################################
|
|
# Reading config.ini #
|
|
#################################################################################################
|
|
config_file = load_config(args)
|
|
|
|
#################################################################################################
|
|
# titre #
|
|
#################################################################################################
|
|
titre_largeur = 150
|
|
bordure = "#" * titre_largeur + Colors.ENDC
|
|
ansi_escape = re.compile(r'\x1b\[[0-9;]*m')
|
|
|
|
def pad_line(texte, center=False):
|
|
# Supprimer les séquences ANSI pour le calcul de longueur visuelle
|
|
visible_len = len(ansi_escape.sub('', texte))
|
|
espace_total = titre_largeur - visible_len - 2 # 2 pour les * à gauche et droite
|
|
|
|
if center:
|
|
left = espace_total // 2
|
|
right = espace_total - left
|
|
return f"#{' ' * left}{texte}{' ' * right}{Colors.ENDC}{Colors.INFO}#"
|
|
else:
|
|
return f"# {texte}{' ' * max(0, espace_total - 1)}{Colors.INFO}#"
|
|
|
|
_titre = [
|
|
bordure,
|
|
pad_line(f"{Colors.BOLD}{Colors.YELLOW}Files conversion th, dat, mak, tro and trox to therion files and folders", center=True),
|
|
pad_line(f"Script pyCreateTh by : {Colors.BLUE}alexandre.pont@yahoo.fr"),
|
|
pad_line(f"Version : {Colors.ENDC}{globalData.Version}"),
|
|
pad_line(f"Input file : {Colors.ENDC}{safe_relpath(args.file)}"),
|
|
pad_line(f"Output folder : {Colors.ENDC}{safe_relpath(splitext(abspath(args.file))[0])}"),
|
|
pad_line(f"Log file : {Colors.ENDC}{os.path.basename(output_log)}"),
|
|
pad_line(f"Config file: {Colors.ENDC}{safe_relpath(config_file)}"),
|
|
pad_line(""),
|
|
bordure
|
|
]
|
|
|
|
for line in _titre:
|
|
log.info(line)
|
|
|
|
|
|
if args.file == "":
|
|
log.critical(f"No valid file selected, try again")
|
|
exit(0)
|
|
|
|
|
|
#################################################################################################
|
|
# Fichier TH #
|
|
#################################################################################################
|
|
if args.file[-2:].lower() == "th" :
|
|
flagErrorCompile, stat, totReadMeError, thread2 = create_th_folders(
|
|
ENTRY_FILE = abspath(args.file),
|
|
TARGET = None,
|
|
PROJECTION= args.proj,
|
|
SCALE = args.scale,
|
|
UPDATE = args.update,
|
|
CONFIG_PATH = "",
|
|
args_file = args.file,
|
|
proj = args.proj.lower())
|
|
threads += thread2
|
|
fileTitle = sanitize_filename(os.path.basename(args.file))[:-3]
|
|
|
|
|
|
#################################################################################################
|
|
# Fichier MAK #
|
|
#################################################################################################
|
|
elif args.file[-3:].lower() == "mak" :
|
|
|
|
SurveyTitleMak = sanitize_filename(os.path.basename(abspath(args.file))[:-4])
|
|
DEST_PATH = os.path.dirname(args.file) + '/' + SurveyTitleMak
|
|
|
|
if os.path.isdir(DEST_PATH):
|
|
log.critical(f"The folder {Colors.ENDC}{SurveyTitleMak}{Colors.ERROR}{Colors.BOLD}, all ready exist : update mode is not possible for mak files")
|
|
exit(0)
|
|
|
|
fileTitle, thread2 = mak_to_th_file(abspath(args.file))
|
|
threads += thread2
|
|
|
|
#################################################################################################
|
|
# Fichier DAT #
|
|
#################################################################################################
|
|
elif args.file[-3:].lower() == "dat" :
|
|
_ConfigPath = "./"
|
|
|
|
QtySections = 0
|
|
|
|
ABS_file = abspath(args.file)
|
|
|
|
content, val, enc = load_text_file_utf8(ABS_file, os.path.basename(ABS_file))
|
|
section = content.split('\x0c')
|
|
QtySections += len(section)
|
|
|
|
lines = section[0].split('\n')
|
|
|
|
if lines[0] !="" :
|
|
SurveyTitleDat = sanitize_filename(lines[0])
|
|
folderDest = os.path.dirname(args.file) + "\\" + SurveyTitleDat
|
|
else :
|
|
SurveyTitleDat = sanitize_filename(os.path.basename(args.file)[:-4])
|
|
folderDest = os.path.dirname(args.file) + "\\" + SurveyTitleDat
|
|
|
|
if os.path.isdir(folderDest):
|
|
log.critical(f"The folder {Colors.ENDC}{SurveyTitleDat}{Colors.ERROR}{Colors.BOLD}, all ready exist : update mode is not possible for mak files")
|
|
exit(0)
|
|
|
|
with alive_bar( QtySections, title=f"{Colors.GREEN}Dat to Th conversion progress: {Colors.BLUE}", length = 20, enrich_print=False,
|
|
stats=True, # Désactive les stats par défaut pour plus de lisibilité
|
|
elapsed=True, # Optionnel : masque le temps écoulé
|
|
monitor=True, # Optionnel : masque les métriques (ex: "eta")
|
|
bar="smooth" # Style de la barre (autres options: "smooth", "classic", "blocks")
|
|
) as bar:
|
|
with redirect_stdout(sys.__stdout__):
|
|
for i in range(1):
|
|
if globalData.error_count > 0:
|
|
bar.text(f"{Colors.INFO}file: {Colors.ENDC}{os.path.basename(ABS_file)[:-4]}{Colors.ERROR}, error: {Colors.ENDC}{globalData.error_count}")
|
|
else :
|
|
bar.text(f"{Colors.INFO}file: {Colors.ENDC}{os.path.basename(ABS_file)[:-4]}")
|
|
stationList, fileTitle, totReadMeError, thread2 = dat_to_th_files (ABS_file , fixPoints = [], crs_wkt = "", CONFIG_PATH = _ConfigPath, totReadMeError = "", bar = bar)
|
|
threads += thread2
|
|
bar()
|
|
|
|
#################################################################################################
|
|
# Fichier TRO #
|
|
#################################################################################################
|
|
elif args.file[-3:].lower() == "tro" :
|
|
|
|
SrcFile = abspath(args.file)
|
|
DestFile = SrcFile[:-4] + ".th"
|
|
|
|
source_content, val, encodage = load_text_file_utf8(SrcFile, os.path.basename(SrcFile))
|
|
|
|
entrance, fileTitle, coordinates, coordsyst, fle_th_fnme = convert_tro( fle_tro_fnme = SrcFile, fle_tro_encoding= encodage,
|
|
fle_th_fnme = DestFile, cavename = None, icomments = True, icoupe = False, istructure = False,
|
|
thlang = None, Errorfiles = False )
|
|
|
|
if coordsyst == None :
|
|
log.critical(f"The VisualTopo file {Colors.ENDC}{SrcFile}{Colors.ERROR}{Colors.BOLD}, have no coordinate system define. Correct it and try again")
|
|
exit(0)
|
|
|
|
content, val, encodage = load_text_file_utf8(fle_th_fnme, os.path.basename(fle_th_fnme))
|
|
|
|
if globalData.parse_tro_files_by_explo :
|
|
|
|
_centerlines = parse_therion_centerline(content)
|
|
centerlines = regroupe_date(_centerlines)
|
|
|
|
|
|
with alive_bar( len(centerlines) + 1 , title=f"{Colors.GREEN}Tro to Th conversion progress: {Colors.BLUE}", length = 20, enrich_print=False,
|
|
stats=True, # Désactive les stats par défaut pour plus de lisibilité
|
|
elapsed=True, # Optionnel : masque le temps écoulé
|
|
monitor=True, # Optionnel : masque les métriques (ex: "eta")
|
|
bar="smooth" # Style de la barre (autres options: "smooth", "classic", "blocks")
|
|
) as bar:
|
|
|
|
with redirect_stdout(sys.__stdout__):
|
|
for i in range(1):
|
|
if globalData.error_count > 0:
|
|
bar.text(f"{Colors.INFO}file: {Colors.ENDC}{os.path.basename(SrcFile)}{Colors.ERROR}, error: {Colors.ENDC}{globalData.error_count}")
|
|
|
|
else :
|
|
bar.text(f"{Colors.INFO}file: {Colors.ENDC}{os.path.basename(SrcFile)}")
|
|
|
|
stationList, fileTitle, totReadMeError, thread2 = tro_to_th_files (ENTRY_FILE = SrcFile ,
|
|
centerlines = centerlines,
|
|
entrance = entrance,
|
|
fileTitle = fileTitle,
|
|
coordinates = coordinates,
|
|
coordsyst = coordsyst,
|
|
fle_th_fnme = fle_th_fnme,
|
|
CONFIG_PATH = "",
|
|
totReadMeError = "",
|
|
bar = bar)
|
|
threads += thread2
|
|
bar()
|
|
|
|
else :
|
|
if encodage != "utf-8":
|
|
with open(str(fle_th_fnme), "w+", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
with open(fle_th_fnme, 'a', encoding='utf-8') as file: # Données originales en commentaire dans le fichier th
|
|
file.write(f"\n\n#############################################################################################")
|
|
file.write(f"\n# Originals data file : {args.file}")
|
|
if globalData.error_count == 0 :
|
|
file.write(f"\n# Conversion with pyCreateTh version {globalData.Version}, the {datetime.now().strftime("%Y.%m.%d %H:%M:%S")}, without error")
|
|
else :
|
|
file.write(f"\n# Conversion with pyCreateTh version {globalData.Version}, the {datetime.now().strftime("%Y.%m.%d %H:%M:%S")}, with {globalData.error_count} error(s)")
|
|
|
|
file.write(f"\n#############################################################################################\n\n")
|
|
for line in source_content.splitlines():
|
|
file.write(f"# {line}\n")
|
|
|
|
flagErrorCompile, stat, totReadMeError, thread2 = create_th_folders(
|
|
ENTRY_FILE = fle_th_fnme,
|
|
TARGET = None,
|
|
PROJECTION= args.proj,
|
|
SCALE = args.scale,
|
|
UPDATE = args.update,
|
|
CONFIG_PATH = "",
|
|
args_file = args.file,
|
|
proj = args.proj.lower() )
|
|
|
|
threads += thread2
|
|
fileTitle = sanitize_filename(os.path.basename(fle_th_fnme)[:-3])
|
|
|
|
if os.path.isfile(fle_th_fnme):
|
|
os.remove(fle_th_fnme)
|
|
|
|
#################################################################################################
|
|
# Fichier TROX #
|
|
#################################################################################################
|
|
elif args.file[-4:].lower() == "trox" :
|
|
SrcFile = abspath(args.file)
|
|
analyse_xml_balises(SrcFile)
|
|
fileTitle = sanitize_filename(os.path.basename(SrcFile)[:-4])
|
|
|
|
#################################################################################################
|
|
# Autres types #
|
|
#################################################################################################
|
|
else :
|
|
log.error(f"file {Colors.ENDC}{safe_relpath(args.file)}{Colors.ERROR} not yet supported")
|
|
globalData.error_count += 1
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
destination_path = os.path.dirname(output_log) + "\\" + fileTitle
|
|
file_name = os.path.basename(output_log)
|
|
destination_file = os.path.join(destination_path, file_name)
|
|
|
|
wait_until_file_is_released(output_log)
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
if globalData.error_count == 0 :
|
|
log.info(f"All files processed successfully in {Colors.ENDC}{duration:.2f}{Colors.INFO} secondes, without error")
|
|
else :
|
|
log.error(f"There were {Colors.ENDC}{globalData.error_count}{Colors.ERROR} errors during {Colors.ENDC}{duration:.2f}{Colors.ERROR} secondes, check the log file: {Colors.ENDC}{os.path.basename(output_log)}")
|
|
|
|
wait_until_file_is_released(output_log)
|
|
|
|
release_log_file(log)
|
|
|
|
# Supprimer le fichier cible si il existe déjà
|
|
if os.path.isfile(destination_file):
|
|
os.remove(destination_file)
|
|
|
|
if not args.update :
|
|
shutil.move(output_log, destination_path)
|
|
|
|
if os.path.exists(fileTitle):
|
|
os.remove(fileTitle)
|
|
|