pyCreateTh

This commit is contained in:
Alex38Lyon
2025-06-16 07:37:02 +02:00
parent 231a5a22f3
commit 6882d52675
77 changed files with 43343 additions and 35074 deletions
+67
View File
@@ -0,0 +1,67 @@
import logging
import sys
import re
# Couleurs ANSI par niveau de log
COLOR_CODES = {
logging.DEBUG: "\033[94m", # Bleu
logging.INFO: "\033[92m", # Vert
logging.WARNING: "\033[95m",
logging.ERROR: "\033[91m", # Rouge
logging.CRITICAL: "\033[1;91m", # Rouge vif
}
RESET = "\033[0m"
# Supprime les codes ANSI (pour l'écriture dans les fichiers)
def strip_ansi_codes(text):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
# Formatter pour la console avec couleurs
class ConsoleFormatter(logging.Formatter):
def format(self, record):
color = COLOR_CODES.get(record.levelno, "")
message = super().format(record)
return f"{color}{message}{RESET}"
# Formatter pour le fichier avec "!!!" sur les erreurs
class FileFormatter(logging.Formatter):
def format(self, record):
clean_msg = strip_ansi_codes(record.getMessage())
prefix = "!!! " if record.levelno >= logging.ERROR else ""
record_copy = logging.LogRecord(
name=record.name,
level=record.levelno,
pathname=record.pathname,
lineno=record.lineno,
msg=f"{prefix}{clean_msg}",
args=(),
exc_info=record.exc_info,
func=record.funcName,
sinfo=record.stack_info
)
return super().format(record_copy)
# Fonction de configuration du logger
def setup_logger(logfile="app.log", debug_log=False):
logger = logging.getLogger("Logger")
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
min_level = logging.DEBUG if debug_log else logging.INFO
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(min_level)
console_formatter = ConsoleFormatter("%(levelname)s: %(message)s")
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# File handler
file_handler = logging.FileHandler(logfile, encoding="utf-8")
file_handler.setLevel(min_level)
file_formatter = FileFormatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
+243
View File
@@ -0,0 +1,243 @@
import re
from os.path import dirname, abspath, join
import argparse
file_path_reg = r"(?:\n|^)\s*###filepath:(.*)"
input_reg = r"(?:\n|^)\s*(?:input|source)\s+\"?([^\s\"]+)?"
survey_reg = r"(?:\n|^)\s*survey\s+(\S+)"
end_survey_reg = r"(?:\n|^)\s*endsurvey"
scrap_reg = r"(?:\n|^)\s*scrap\s+(\S+)"
end_scrap_reg = r"(?:\n|^)\s*endscrap"
projection_reg = r"(?:\n|^).*-projection\s+(\S+)"
drawnre = re.compile(r".*line wall")
drawnexemptre = re.compile(r".*NODRAW")
drawnexemptplanre = re.compile(r".*NODRAW PLAN")
drawnexemptextendedre = re.compile(r".*NODRAW EE")
class NoSurveysFoundException(Exception):
pass
class MultipleSurveyFoundException(Exception):
pass
class Scrap:
id = None
projection = None
data = None
parent = None
def __init__(self, id, parent, projection):
self.id = id
self.projection = projection
self.parent = parent
def is_drawn(self):
if not self.data:
return False
for line in self.data:
match = drawnre.match(line)
if match:
return True
return False
class Survey:
parent = None
file_path = None
id = None
children = []
data = None
scraps = []
plan_drawn_override = False
extended_drawn_override = False
def __init__(self, id, parent, file_path):
self.id = id
self.parent = parent
self.file_path = file_path
@property
def therion_id(self):
if len(self.id) == 1:
return self.id[0]
return "{}@{}".format(self.name, self.namespace)
@property
def name(self):
return self.id[-1]
@property
def namespace(self):
return ".".join(list(reversed(self.id[0:-1])))
def data(self, data):
self._data = data
self.scraps = Survey.parse(self)
def parse(self):
scraps = []
scrap = None
data = []
for index, line in enumerate(self.data):
match = re.match(scrap_reg, line)
if match:
id = self.id + [match.group(1)]
projection = "plan"
match = re.match(projection_reg, line)
if match:
projection = match.group(1)
scrap = Scrap(id[:], self, projection)
scraps = scraps + [scrap]
data = [line]
continue
match = re.match(end_scrap_reg, line)
if match:
id = self.id
data = data + [line]
scrap.data = data[:]
data = []
continue
# Exempt drawing
match = drawnexemptplanre.match(line)
if match:
self.plan_drawn_override = True
match = drawnexemptextendedre.match(line)
if match:
self.extended_drawn_override = True
match = drawnexemptre.match(line)
if match:
self.plan_drawn_override = True
self.extended_drawn_override = True
data = data + [line]
self.scraps = scraps
class SurveyLoader:
_data = None
survey = None
surveys = {}
@property
def surveys_list(self):
return list(self.surveys.values())
@property
def base_surveys(self):
return [s for s in self.surveys_list if len(s.children) == 0]
@staticmethod
def load(file_path):
with open(file_path, "r", encoding="utf-8") as f:
data = f.read()
lines = []
for line in data.splitlines():
if not line.strip():
continue
# if line.lstrip().startswith("#"):
# continue
match = re.match(input_reg, line)
if match:
new_file_path = abspath(join(dirname(file_path), match.group(1)))
lines = lines + ["###filepath:{}".format(new_file_path)]
lines = lines + ["\t{}".format(l) for l in SurveyLoader.load(new_file_path)]
lines = lines + ["###filepath:{}".format(file_path)]
else:
lines.append(line.strip())
return lines
@staticmethod
def parse(lines, orig_file_path=None):
surveys = {}
id = []
file_path = orig_file_path
parent = None
survey = None
data = []
for index, line in enumerate(lines):
match = re.match(file_path_reg, line)
if match:
file_path = match.group(1)
continue
match = re.match(survey_reg, line)
if match:
id = id + [match.group(1)]
parent = survey
survey = Survey(id[:], parent, file_path)
surveys[".".join(id[:])] = survey
if parent:
parent.data = data[:]
parent.children = parent.children + [survey]
data = [line]
continue
match = re.match(end_survey_reg, line)
if match:
popped = id.pop()
data = data + [line]
survey.data = data[:]
if len(survey.children) == 0:
survey.parse()
if not survey.parent:
return survey, surveys
parent.data = parent.data + data
data = parent.data[:]
parent = survey.parent
survey = parent
continue
data = data + [line]
return parent, surveys
def __init__(self, file_path):
# print(f"\033[32mDebug SurveyLoader.load : \033[0m {file_path}")
self._data = SurveyLoader.load(file_path)
survey, surveys = SurveyLoader.parse(self._data, file_path)
self.survey = survey
self.surveys = surveys
def get_survey_by_id(self, therion_id):
id = []
if "@" in therion_id:
parts = therion_id.split("@")
id = list(reversed(parts[1].split("."))) + [parts[0]]
else:
id = list(reversed(therion_id.split(".")))
key = ".".join(id)
if key in self.surveys:
return self.surveys[key]
else:
potential_key = [k for k in self.surveys.keys() if k.endswith(".{}".format(key))]
if len(potential_key) == 1:
return self.surveys[potential_key[0]]
potential_keys = [k for k in self.surveys.keys() if key in k]
if len(potential_keys) == 1:
return self.surveys[potential_keys[0]]
elif len(potential_keys) > 1:
raise MultipleSurveyFoundException("Multiple surveys were found with that key:\n\t{}".format("\n\t".join(potential_keys)))
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse a survey")
parser.add_argument(
"survey_file",
help='The survey file (*.th) to work from. e.g. "data/system_migovec.th"',
)
parser.add_argument(
"survey_selector",
help='The selector for the survey to produce a scrap for. e.g. "roundpond@vrtnarija.vrtnarija_vilinska.system_migovec"',
)
args = parser.parse_args()
entrypoint = abspath(args.survey_file)
loader = SurveyLoader(entrypoint)
print(loader.get_survey_by_id(args.survey_selector))
+279
View File
@@ -0,0 +1,279 @@
import tempfile
import shutil
import os
from os.path import join
import subprocess
import re
import logging
import threading
log = logging.getLogger("Logger")
#################################################################################################
# Codes de couleur ANSI
class Colors:
BLACK = '\033[90m'
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
ERROR = '\033[91m'
WARNING = '\033[95m'
HEADER = '\033[96m'
DEBUG = '\033[94m' # Bleu
INFO = '\033[92m' # Vert
CRITICAL = '\033[1;91m', # Rouge vif
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
#################################################################################################
def compile_template(template, template_args, **kwargs):
global error_count
try :
logfile = ""
tmpdir = tempfile.mkdtemp()
config = template.format(**template_args, tmpdir=tmpdir.replace("\\", "/"))
log.debug(f"{config}\n")
config_file = join(tmpdir, "config.thconfig")
log_file = join(tmpdir, "log.log")
therion_path = kwargs["therion_path"] if "therion_path" in kwargs else "therion"
with open(config_file, mode="w+", encoding="utf-8") as tmp:
with open(log_file, mode="w+") as tmp2:
tmp.write(config)
tmp.flush()
subprocess.check_output('''"{}" "{}" -l "{}"'''.format(therion_path, config_file, log_file), shell=True, )
tmp2.flush()
logfile = tmp2.read()
if kwargs["cleanup"]:
shutil.rmtree(tmpdir)
log.debug("\n" )
return logfile, tmpdir
except Exception as e:
log.error(f"Therion template compilation error: {Colors.ENDC}{e}")
error_count += 1
def compile_template2(template, template_args, **kwargs):
global error_count
logfile = ""
tmpdir = None
try:
tmpdir = tempfile.mkdtemp()
config = template.format(**template_args, tmpdir=tmpdir.replace("\\", "/"))
log.debug(f"{config}\n")
config_file = join(tmpdir, "config.thconfig")
log_file = join(tmpdir, "log.log")
therion_path = kwargs.get("therion_path", "therion")
# Écriture des fichiers config + log
with open(config_file, "w", encoding="utf-8") as tmp:
tmp.write(config)
tmp.flush()
# Exécution de Therion
result = subprocess.run(
[therion_path, config_file, "-l", log_file],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True, # Décode automatiquement en UTF-8 (avec fallback ci-dessous)
timeout=kwargs.get("timeout", 60),
errors="replace" # Remplace caractères invalides (évite UnicodeDecodeError)
)
# Lecture du log (en mode tolérant)
try:
with open(log_file, "r", encoding="cp1252", errors="replace") as f:
logfile = f.read()
except Exception as log_err:
log.warning(f"Could not read Therion log: {Colors.ENDC}{log_err}")
# Analyse du code retour
if result.returncode != 0:
log.error(f"Therion compilation failed with return code {Colors.ENDC}{result.returncode} {Colors.ERROR}{result.stdout}")
error_count += 1
else:
log.info(f"Therion compilation successful")
return logfile, tmpdir
except subprocess.TimeoutExpired:
log.error(f"Therion process timed out and was terminated")
error_count += 1
return "Therion timeout", tmpdir
except Exception as e:
log.error(f"Therion template compilation error: {Colors.ENDC}{e}")
error_count += 1
return str(e), tmpdir
finally:
if kwargs.get("cleanup", True) and tmpdir:
try:
shutil.rmtree(tmpdir)
except Exception as cleanup_err:
log.warning(f"Could not delete temp directory: {Colors.ENDC}{cleanup_err}")
#################################################################################################
def compile_file(filename, **kwargs):
global error_count
try:
tmpdir = os.path.dirname(filename)
log_file = join(tmpdir, "therion.log").replace("\\", "/")
therion_path = kwargs["therion_path"] if "therion_path" in kwargs else "therion"
process = subprocess.Popen(
[therion_path, filename, "-l", log_file],
cwd=tmpdir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # fusion stdout + stderr
universal_newlines=True, # décodage automatique en texte
bufsize=1 # ligne par ligne
)
log.info(f"Start therion compilation file : {Colors.ENDC}~\\{os.path.relpath(filename)}")
# Lecture en temps réel
for line in process.stdout:
line = line.rstrip()
lower_line = line.lower()
if "error" in lower_line:
log.error(f" [Therion_Compile] {Colors.ENDC}{line}")
elif "warning" in lower_line:
log.warning(f" [Therion_Compile] {Colors.ENDC}{line}")
else:
log.debug(f" [Therion_Compile] {Colors.ENDC}{line}")
process.wait()
# Si la commande échoue, result.returncode sera non nul
if process.returncode != 0:
# Affichage des erreurs et de la sortie standard
log.error(f"Error during Therion compilation, stderr : \n{Colors.ENDC}{process.stderr.decode()}")
error_count += 1
log.info(f"Therion file : {Colors.ENDC}~\\{os.path.relpath(filename)}{Colors.GREEN} succeeded")
except Exception as e:
log.error(f"Therion file {Colors.ENDC}~\\{os.path.relpath(filename, os.path.expanduser('~'))}{Colors.ERROR} compilation error: {Colors.ENDC}{e}")
error_count += 1
def compile_file2(filename, **kwargs):
global error_count
tmpdir = os.path.dirname(filename)
log_file = join(tmpdir, "therion.log").replace("\\", "/")
therion_path = kwargs.get("therion_path", "therion")
timeout = kwargs.get("timeout", 60) # seconds
log.info(f"Start therion compilation file : {Colors.WHITE}{filename}")
try:
# Lancement du processus Therion
process = subprocess.Popen(
[therion_path, filename, "-l", log_file],
cwd=tmpdir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# Fonction de lecture en temps réel (dans un thread séparé)
def read_output(proc):
try:
for line in proc.stdout:
line = line.rstrip()
lower_line = line.lower()
if "error" in lower_line:
log.error(f"[Therion_Compile] {Colors.ENDC}{line}")
elif "warning" in lower_line:
log.warning(f" [Therion_Compile] {Colors.ENDC}{line}")
else:
log.debug(f" [Therion_Compile] {Colors.ENDC}{line}")
except Exception as e:
log.warning(f"Reading Therion output: {Colors.ENDC}{e}")
# Démarrage du thread de lecture
output_thread = threading.Thread(target=read_output, args=(process,))
output_thread.start()
# Attente avec timeout
output_thread.join(timeout)
if output_thread.is_alive():
log.error(f"Therion compilation timed out after {Colors.ENDC}{timeout}{Colors.ERROR} seconds. Killing process...")
error_count += 1
process.kill()
output_thread.join()
process.wait()
# Vérification du code de retour
if process.returncode != 0:
log.error(f"Therion returned error code {Colors.ENDC}{process.returncode}")
error_count += 1
else:
log.info(f"Therion file : {Colors.ENDC}~\\{os.path.relpath(filename)}{Colors.GREEN} compilation succeeded")
except Exception as e:
log.error(f"Therion file {Colors.ENDC}~\\{os.path.relpath(filename)}{Colors.ERROR} compilation error: {Colors.ENDC}{e}")
error_count += 1
#################################################################################################
def compile_file_th(filepath, **kwargs):
template = """source {filepath}
layout test
scale 1 500
endlayout
"""
template_args = {"filepath": filepath}
logs, _ = compile_template(template, template_args, cleanup=True, **kwargs)
return logs
#################################################################################################
# Attention fonctionne pour la version therion en français ! à voir pour les autres langues
lengthre = re.compile(r".*Longueur totale de la topographie = \s*(\S+)m")
depthre = re.compile(r".*Longueur totale verticale =\s*(\S+)m")
#################################################################################################
def get_stats_from_log(log):
lenmatch = lengthre.findall(log)
depmatch = depthre.findall(log)
if len(lenmatch) == 1 and len(depmatch) == 1:
return {"length": lenmatch[0], "depth": depmatch[0]}
return {"length": 0, "depth": 0}
#################################################################################################
syscoord = re.compile(r".*output coordinate system: \s*(\S+)")
#################################################################################################
def get_syscoord_from_log(log):
lenmatch = syscoord.findall(log)
if len(lenmatch) == 1:
return {"syscoord": lenmatch[0]}
return {"syscoord": 0}