This commit is contained in:
Alex38Lyon
2025-01-31 14:15:58 +01:00
parent dcae8e9aa0
commit c2e1457cd5
119 changed files with 315017 additions and 0 deletions
+307
View File
@@ -0,0 +1,307 @@
from dataclasses import dataclass,field
from enum import Enum
import pandas as pd
import numpy as np
from os.path import abspath, exists
from helpers.geo import *
#from geo import *
from subprocess import check_output, CalledProcessError
class Expedition(str, Enum):
"""A class to represent the different expeditions"""
UP2006 = "UP2006"
UP2008 = "UP2008"
UP2010 = "UP2010"
UP2014 = "UP2014"
UP2017 = "UP2017"
UP2019 = "UP2019"
UP2023 = "UP2023"
ENG08 = "ENG08"
ITA08 = "ITA08"
unknown = "unknown"
def assignExpedition(name: str) -> Expedition:
"""Assign the correct expedition given a date"""
target = Expedition.unknown
for expedition in Expedition:
if expedition.name.__contains__(name):
target = expedition
return target
@dataclass
class Cave:
"""A class that contains the information about a specific cavity"""
cadnum : str
exped : Expedition
comment : str
altitude : str
carto : str
explo_status : int
_index : int
coordinates : coordinatePairUTM = coordinatePairUTM(x=-999.,y=-999.)
name : str = "undefined"
length: float = 0
depth : float = 0
complete_name: str = "undefined"
explorers : str = "undefined"
_search_string : str = field(init=False)
_folder_path : str = field(init=False)
_sector_folder_path : str = field(init=False)
def __post_init__(self) -> None:
self._search_string=f"{self.cadnum} {self.name}"
# set the local folder path for the caves
def add_coordinates(self, coords : coordinatePairUTM) -> None:
"""A method for adding coordinates to the Cave entry"""
self.coordinates = coordinatePairUTM(coords.x,coords.y)
self.coordinates.add_lat_long_from_xy()
self.coordinates.add_sector()
self._folder_path = f"../therion/data/{self.cadnum[:-3]}/{self.name}"
self._sector_folder_path = f"../therion/data/{self.cadnum[:-3]}/{self.cadnum[:-3]}.th"
def makeTheriontemplate(self) -> str:
""" Generate an empty therion file using the cave data"""
TEMPLATE = f"""survey {self.name} -title '{self.complete_name}' \\
-attr cadnum {self.cadnum} \\
-attr exped {self.exped}\n
\tcentreline
\t\tcs epsg:32718
\t\t#fix ENT {self.coordinates.x} {self.coordinates.y} {self.altitude}
\t#explo-date {self.exped}
\t#team "{self.explorers}"
\tunits length meters
\t units compass clino degrees
\tdata normal from to tape compass clino
\t#<RENSEIGNER LES DONNEES ICI>
\tendcentreline
endsurvey
"""
return TEMPLATE
def make_folder(self) -> None:
"""A method which creates an empty folder for the cave of interest."""
filepath = abspath(self._folder_path).strip('\n')
print(filepath)
try:
check_output(f'mkdir {filepath}', shell=True)
cavename = self.name.strip("\n").strip(' ')
TH_FILE = f'{filepath}/{cavename}.th'
print("Name of the filepath",TH_FILE)
MD_FILE = f"{filepath}/NOTES.md"
if not exists(TH_FILE):
with open(TH_FILE, 'w+') as th_file:
th_file.write(self.makeTheriontemplate())
with open(MD_FILE, 'w+') as md_file:
md_file.write(self.comment)
except CalledProcessError:
TH_FILE = f"{filepath}/{self.name}.th"
MD_FILE = f"{filepath}/NOTES.md"
if not exists(TH_FILE):
with open(TH_FILE, 'w+') as th_file:
th_file.write(self.makeTheriontemplate())
with open(MD_FILE, 'w+') as md_file:
md_file.write(self.comment)
pass
def make_entry_in_sector_file(self) -> None:
"""adds an entry line to the sector .th file"""
with open(self._sector_folder_path, "r+") as f:
lines = f.readlines()
startindex = [x for x,line in enumerate(lines) if ("centreline" in line) or ("centerline" in line)]
formatted = f"""
#input {self.name}/{self.name}.th
"""
lines.insert(startindex[0]-1,formatted)
f.seek(0)
endindex = [x for x,line in enumerate(lines) if ("endcentreline" in line) or ("endcenterline" in line)]
name_as = f'"{self.complete_name}"'
formatted = f"""
fix ENT_{self.cadnum} {self.coordinates.x} {self.coordinates.y} {self.altitude}
station ENT_{self.cadnum} {name_as}
#equate ENT_{self.cadnum} 0@{self.name}
"""
lines.insert(endindex[0],formatted)
f.seek(0)
f.writelines(lines)
class CaveExistsError(Exception):
pass
class CadasterNotLoadedError(Exception):
pass
class CaveNotFoundError(Exception):
pass
class MoreCavesFoundError(Exception):
pass
@dataclass
class CaveCadaster:
"""A class that expects a list of caves and contains methods for reporting info about these caves"""
caves : list[Cave] = field(default_factory=list)
def add_entry(self, cave: Cave) -> None:
"""Enter an instance of a Cave to the database"""
self.caves.append(cave)
def check_existing(self, cave: Cave) -> None:
"""Check from a cave's coodinates that it does not already exist in the cadaster"""
for existing_cave in self.caves:
if cave.coordinates.x != float('nan'):
dist = np.sqrt((cave.coordinates.x - existing_cave.coordinates.x)**2 + (cave.coordinates.y - existing_cave.coordinates.y)**2)
if dist < 1:
raise CaveExistsError("the cave exists already")
def find_cave(self,search_string: str) -> list[Cave]:
"""Return a Cave instance given a cadastral number"""
targets = []
for cave in self.caves:
if cave._search_string.lower().__contains__(search_string.lower()):
targets.append(cave)
if len(targets)>=1:
return targets
else:
raise CaveNotFoundError("there is no cave with this cadastral number")
def delete_cave(self, search_string: str) -> None:
cave = self.find_cave(search_string)
proceed = input("Are you sure you want to delete this cave entry? Type <y/n> to proceed.")
if proceed == 'y':
self.caves.remove(cave)
print(f"Deleting the cave '{cave.name}' from the database")
else:
print(f"keeping the cave '{cave.name}' in the database")
def generate_dataframe(self) -> pd.DataFrame:
"""A method which generates a pandas.DataFrame out of the list of caves objects"""
lines = []
for cave in self.caves:
line = [cave.cadnum,
cave.coordinates.sector_name,
cave.complete_name,
f'{cave.name}',
cave.comment,
cave.coordinates.x,
cave.coordinates.y,
cave.altitude,
cave.length,
cave.depth,
cave.explorers,
cave.exped,
f"{cave.coordinates._orig_lat:.7f}",
f"{cave.coordinates._orig_long:.7f}",
cave.carto,
cave.explo_status
]
lines.append(line)
cols = ['cadnum',
'secteur',
'complete_name',
'name',
'comment',
'X_UTM18S',
'Y_UTM18S',
'altitude',
'length',
'depth',
'explorers',
'exped',
'latitude',
'longitude',
'carto',
'explo_status'
]
return pd.DataFrame(lines,columns=cols)
def write_to_file(self, output_path: str)-> None:
"""Writing the pandas.DataFrame to a file formatted exactly as expected for rereading into cave cadaster"""
df = self.generate_dataframe()
#df.sort_values(by='cadnum', inplace =True)
df.to_csv(output_path)
def generate_entry_from_file(df: pd.DataFrame, row: int) -> Cave:
"""A function to generate an entry from a specific line of a formatted dataframe"""
line = df.loc[row]
coords = coordinatePairUTM(x=line.X_UTM18S,y=line.Y_UTM18S)
coords.add_lat_long(lat=line.latitude,long=line.longitude)
coords.add_sector()
cave = Cave(
cadnum=line.cadnum,
exped= assignExpedition(str(line.exped)),
comment=line.comment,
altitude= line.altitude,
coordinates=coords,
name= line['name'],
complete_name= line.complete_name,
explorers= line.explorers,
length= line.length,
depth= line.depth,
carto=line.carto,
explo_status=line.explo_status,
_index = row
) # type: ignore
return cave
def initialise_database(filepath : str) -> CaveCadaster:
"""Reads a csv file containing the cave data into a CaveCadaster object"""
df = pd.read_csv(filepath)
cadaster = CaveCadaster()
for row in range(len(df)):
cadaster.add_entry(generate_entry_from_file(df,row))
return cadaster
# play with a subclass for the different sectors of cave exploration.
@dataclass
class CadasterSector(CaveCadaster):
"""A cave cadaster subclass"""
parent : CaveCadaster = CaveCadaster()
name : str = 'undefined'
root_cadnum : int = 999
caves: list[Cave] = field(init = False, default_factory=list)
next_cad_num : int = field(init=False)
def __post_init__(self) -> None:
self.caves = [cave for cave in self.parent.caves if str(cave.cadnum)[:3].__contains__(str(self.root_cadnum))]
self.next_cad_num = self.root_cadnum*1000+len(self.caves)+1
def add_entry(self, cave: Cave) -> None:
self.next_cad_num +=1
return super().add_entry(cave)
## test
+130
View File
@@ -0,0 +1,130 @@
from dataclasses import dataclass, field
from typing import Tuple
from shapely.geometry import shape, Point
import fiona
import pyproj as proj
from os.path import abspath
@dataclass
class coordinatePairUTM:
"""A class that expects two floats"""
x : float
y : float
cadnum_root : str = field(init=False)
sector_name : str = field(init=False)
_orig_lat : float = field(init=False)
_orig_long : float = field(init=False)
def add_lat_long(self,lat,long) -> None:
"""Attributes exploration zone to the cave"""
self._orig_lat,self._orig_long = lat,long
def add_lat_long_from_xy(self) -> None:
self._orig_lat,self._orig_long = transformer(crs_in='epsg:32718',crs_out='epsg:4326').transform(self.x,self.y)
def add_sector(self) -> None:
pt = Point(self._orig_long,self._orig_lat)
fp = abspath("../therion/data/gis/secteurs.shp")
multipolygons = read_multipolygons(fp)
intersects = [(pt.within(poly),properties) for poly,properties in multipolygons]
self.cadnum_root = "undefined"
self.sector_name = "undefined"
for intersect,property in intersects:
if intersect:
self.cadnum_root = property["Cadastre_I"]
self.sector_name = property["Nom"]
@dataclass
class coordinatePairLatLong:
"""A class containing Latitude and Longitude values"""
lat : str
long : str
hemisphere : tuple = field(init=False, default_factory=tuple)
lat_asfloat : float = field(init=False)
long_asfloat : float = field(init=False)
def __post_init__(self) -> None:
"""convert however the latitude and longitude are given to decimal format."""
self.parse_hemisphere()
if (self.lat.__contains__('°')) and (self.lat.__contains__("'")) and (self.lat.__contains__("''")):
self.parse_degree_minutes_seconds()
elif(self.lat.__contains__('°')) and (self.lat.__contains__("'")):
self.parse_degree_decimal_minutes()
else:
self.parse_decimal_degrees()
def parse_hemisphere(self) -> None:
"""Parses the lat/long coordinates given and determines in which hemisphere to go"""
if self.lat.__contains__('N'):
NH = 1
else:
NH = -1
if self.long.__contains__('E'):
EH = 1
else:
EH = -1
self.hemisphere = (NH,EH)
def parse_decimal_degrees(self) -> None:
"""Parses lat/long coordinates to a decimal float"""
self.lat_asfloat = self.hemisphere[0] * float(self.lat.strip('N').strip('S').split('°')[0])
self.long_asfloat = self.hemisphere[1] * float(self.long.strip('E').strip('W').split('°')[0])
def parse_degree_decimal_minutes(self) -> None:
"""Parses lat/long coordinates to a decimal float"""
lat_split = self.lat.strip('N').strip('S').split('°')
long_split = self.long.strip('E').strip('W').split('°')
lat_degree = float(lat_split[0])
long_degree = float(long_split[0])
lat_mins = float(lat_split[1].split("'")[0])
long_mins = float(long_split[1].split("'")[0])
self.lat_asfloat = self.hemisphere[0] * (lat_degree + lat_mins/60)
self.long_asfloat = self.hemisphere[1] * (long_degree + long_mins/60)
def parse_degree_minutes_seconds(self) -> None:
"""Parses lat/long coordinates to a decimal float"""
lat_split = self.lat.strip('N').strip('S').split('°')
long_split = self.long.strip('E').strip('W').split('°')
lat_degree = float(lat_split[0])
long_degree = float(long_split[0])
lat_mins = float(lat_split[1].split("'")[0])
long_mins = float(long_split[1].split("'")[0])
lat_secs = float(lat_split[1].split("'")[1])
long_secs = float(long_split[1].split("'")[1])
self.lat_asfloat = self.hemisphere[0] * (lat_degree + lat_mins/60 + lat_secs/3600)
self.long_asfloat = self.hemisphere[1] * (long_degree + long_mins/60 + long_secs/3600)
def read_multipolygons(filepath: str) -> list:
"""Reads a shapefile of exploration zones and makes a list of polygons"""
dataset = fiona.open(filepath)
multipolygons = [(shape(poly["geometry"]), poly["properties"]) for poly in dataset] # type: ignore
return multipolygons
def transformer(crs_out: str ,crs_in: str) -> proj.Transformer:
"""A function returning a transformer instance based on crs codes"""
return proj.Transformer.from_crs(crs_in, crs_out)
TRANSFORMER_LATLONG = transformer(crs_in="epsg:4326", crs_out="epsg:32718")
def convert_coords(coord : coordinatePairLatLong) -> coordinatePairUTM:
"""Convert from lat-long to UTM18S"""
X,Y = TRANSFORMER_LATLONG.transform(coord.lat_asfloat,coord.long_asfloat)
return coordinatePairUTM(x=X,y=Y)
import profile
import pstats
profile = profile.Profile()
#profile.runcall(convert_coords)
ps = pstats.Stats(profile)
ps.print_stats()
+60
View File
@@ -0,0 +1,60 @@
import pandas as pd
import time
# 2010-01-04T23:37:42Z
timenow = time.localtime()
timestamp = f"{timenow.tm_year}-{timenow.tm_mon}-{timenow.tm_mday}T{timenow.tm_hour}:{timenow.tm_min}:{timenow.tm_sec}Z"
TEMPLATE = """<?xml version="1.0" encoding="UTF-8"?>
<gpx
version="1.0"
creator="Centre Terre"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.topografix.com/GPX/1/0"
xsi:schemaLocation="http://www.topografix.com/GPX/1/0 http://www.topografix.com/GPX/1/0/gpx.xsd">
<time>'{time}'</time>
<bounds minlat="-52" minlon="-72" maxlat="49" maxlon="-70"/>
{data}
</gpx>"""
WPT_TEMPLATE = """
<wpt lat="{latitude}" lon="{longitude}">
<ele>{elevation}</ele>
<name>{name}</name>
<cmt>{comment}</cmt>
<desc>{description}</desc>
<sym>{symbol}</sym>
</wpt>"""
def pyToGPX(fp):
# "../../therion/data/SYNTHESE_POINTAGES.csv"
data = pd.read_csv(fp)
waypoints = ""
for index,line in data.iterrows():
if ("camp" in line.complete_name) or ("Camp" in line.complete_name):
symbol= "Lodging"
else:
symbol = "Waypoint"
formatted = WPT_TEMPLATE.format(
latitude= line.latitude,
longitude= line.longitude,
elevation= line.altitude,
comment= line.cadnum,
name= line.complete_name,
description= line.comment,
symbol= symbol
)
if "inf" not in formatted:
waypoints+=formatted
with open(fp.strip("csv") + "gpx", "w+") as f:
f.write(TEMPLATE.format(data=waypoints,time = timestamp))
+3
View File
@@ -0,0 +1,3 @@
import pandas as pd
data = pd.read_csv("../therion/data/SYNTHESE_POINTAGES.csv")
+19
View File
@@ -0,0 +1,19 @@
LANG = {
"main_menu" : {
"title" : {
"fr" : "Menu Principal",
"en" : "Main Menu",
"es" : "Menu principal"
},
"savebutton" : {
"fr" : "Sauvegarder",
"en" : "Save",
"es" : "Salvar"
},
"selectfile" : {
"fr" : "Selectionner un fichier",
"en" : "Select a file",
"es" : "Selectionar una fila"
}
}
}
@@ -0,0 +1,16 @@
matplotlib
pandas
Shapely
Fiona
pyproj
scipy
netCDF4
xarray
joblib
geopandas
motionless
salem
configparser
@@ -0,0 +1,9 @@
conda create --name ultima2 python==3.9
y
conda activate ultima2
conda install matplotlib==3.5.3 pandas==1.5.2 Shapely==1.8.4
Fiona==1.8.13.post1 pyproj==2.6.1.post1 scipy==1.9.3 netCDF4==1.5.7
xarray==2022.11.0 joblib==1.1.1 geopandas==0.9.0
y
pip install motionless==1.3.3
pip install salem==0.3.8
+63
View File
@@ -0,0 +1,63 @@
from dataclasses import dataclass, field
import matplotlib.pyplot as plt
from salem import GoogleVisibleMap, Map, transform_geopandas
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point, MultiPoint
from helpers.geo import coordinatePairUTM
from helpers.cadaster import CaveCadaster,Cave, Expedition
@dataclass
class SatelliteMapPlot:
# Configure image aspect
size_x : int
size_y : int
dpi : int
scale : float = 0.013988764
points : list[coordinatePairUTM] = field(init = False)
point_names : list[str] = field(init=False)
new_x : float = field(init = False)
new_y : float = field(init=False)
def add_points(self, cadaster : CaveCadaster) -> None:
self.points = [cave.coordinates for cave in cadaster.caves]
self.point_names = [cave.name for cave in cadaster.caves]
def add_point_to_plot(self, x: float, y: float) -> None:
self.new_x = x
self.new_y = y
def plot_map(self):
# Get the Google Static image
g = GoogleVisibleMap(y=[self.new_y-0.64*self.scale, self.new_y+0.64*self.scale], x=[self.new_x-1.5*self.scale, self.new_x+1.5*self.scale],
scale=2, # scale is for more details
maptype='satellite',
size_x=self.size_x, size_y=self.size_y
)
# the google static image is a standard rgb image
ggl_img = g.get_vardata()
sm = Map(g.grid, nx=self.size_x, factor=1)
sm.set_rgb(ggl_img) # add the background rgb image
# prepare the figure
fig, ax = plt.subplots(figsize=(self.size_x/self.dpi,self.size_y/self.dpi), dpi=self.dpi)
# plot 1
x, y = sm.grid.transform([self.new_x],[self.new_y])
xi, yi = sm.grid.transform([p._orig_long for p in self.points],[p._orig_lat for p in self.points])
ax.scatter(x, y, zorder= 100, s=5,color="blue", marker = "d") # type:ignore
ax.scatter(xi, yi, zorder= 100, s=3,color="red", marker = "d") # type:ignore
for name,x,y in zip(self.point_names,xi,yi):
ax.text(x+.0001,y+.0001,name,fontsize=5,color = "red")
sm.plot(ax=ax)
fig.patch.set_facecolor('black') # type:ignore
return fig,ax
@@ -0,0 +1,34 @@
import tkinter as tk
class ScrollbarFrame(tk.Frame):
"""
Extends class tk.Frame to support a scrollable Frame
This class is independent from the widgets to be scrolled and
can be used to replace a standard tk.Frame
"""
def __init__(self, parent, **kwargs):
tk.Frame.__init__(self, parent, **kwargs)
# The Scrollbar, layout to the right
vsb = tk.Scrollbar(self, orient="vertical")
vsb.pack(side="right", fill="y")
# The Canvas which supports the Scrollbar Interface, layout to the left
self.canvas = tk.Canvas(self, borderwidth=0, background="#ffffff")
self.canvas.pack(side="left", fill="both", expand=True)
# Bind the Scrollbar to the self.canvas Scrollbar Interface
self.canvas.configure(yscrollcommand=vsb.set)
vsb.configure(command=self.canvas.yview)
# The Frame to be scrolled, layout into the canvas
# All widgets to be scrolled have to use this Frame as parent
self.scrolled_frame = tk.Frame(self.canvas, background=self.canvas.cget('bg'))
self.canvas.create_window((4, 4), window=self.scrolled_frame, anchor="nw")
# Configures the scrollregion of the Canvas dynamically
self.scrolled_frame.bind("<Configure>", self.on_configure)
def on_configure(self, event):
"""Set the scroll region to encompass the scrolled frame"""
self.canvas.configure(scrollregion=self.canvas.bbox("all"))
+243
View File
@@ -0,0 +1,243 @@
import re
from os.path import dirname, abspath, join, splitext, basename
import argparse
file_path_reg = r"(?:\n|^)\s*###filepath:(.*)"
input_reg = r"(?:\n|^)\s*(?:input|source)\s+\"?([^\s\"]+)?"
survey_reg = r"(?:\n|^)\s*survey\s+(\S+)"
end_survey_reg = r"(?:\n|^)\s*endsurvey"
scrap_reg = r"(?:\n|^)\s*scrap\s+(\S+)"
end_scrap_reg = r"(?:\n|^)\s*endscrap"
projection_reg = r"(?:\n|^).*-projection\s+(\S+)"
drawnre = re.compile(r".*line wall")
drawnexemptre = re.compile(r".*NODRAW")
drawnexemptplanre = re.compile(r".*NODRAW PLAN")
drawnexemptextendedre = re.compile(r".*NODRAW EE")
class NoSurveysFoundException(Exception):
pass
class MultipleSurveyFoundException(Exception):
pass
class Scrap:
id = None
projection = None
data = None
parent = None
def __init__(self, id, parent, projection):
self.id = id
self.projection = projection
self.parent = parent
def is_drawn(self):
if not self.data:
return False
for line in self.data:
match = drawnre.match(line)
if match:
return True
return False
class Survey:
parent = None
file_path = None
id = None
children = []
data = None
scraps = []
plan_drawn_override = False
extended_drawn_override = False
def __init__(self, id, parent, file_path):
self.id = id
self.parent = parent
self.file_path = file_path
@property
def therion_id(self):
if len(self.id) == 1:
return self.id[0]
return "{}@{}".format(self.name, self.namespace)
@property
def name(self):
return self.id[-1]
@property
def namespace(self):
return ".".join(list(reversed(self.id[0:-1])))
def data(self, data):
self._data = data
self.scraps = Survey.parse(self)
def parse(self):
scraps = []
scrap = None
data = []
for index, line in enumerate(self.data):
match = re.match(scrap_reg, line)
if match:
id = self.id + [match.group(1)]
projection = "plan"
match = re.match(projection_reg, line)
if match:
projection = match.group(1)
scrap = Scrap(id[:], self, projection)
scraps = scraps + [scrap]
data = [line]
continue
match = re.match(end_scrap_reg, line)
if match:
id = self.id
data = data + [line]
scrap.data = data[:]
data = []
continue
# Exempt drawing
match = drawnexemptplanre.match(line)
if match:
self.plan_drawn_override = True
match = drawnexemptextendedre.match(line)
if match:
self.extended_drawn_override = True
match = drawnexemptre.match(line)
if match:
self.plan_drawn_override = True
self.extended_drawn_override = True
data = data + [line]
self.scraps = scraps
class SurveyLoader:
_data = None
survey = None
surveys = {}
@property
def surveys_list(self):
return list(self.surveys.values())
@property
def base_surveys(self):
return [s for s in self.surveys_list if len(s.children) == 0]
@staticmethod
def load(file_path):
with open(file_path, "r", encoding="utf-8") as f:
data = f.read()
lines = []
for line in data.splitlines():
if not line.strip():
continue
# if line.lstrip().startswith("#"):
# continue
match = re.match(input_reg, line)
if match:
new_file_path = abspath(join(dirname(file_path), match.group(1)))
lines = lines + ["###filepath:{}".format(new_file_path)]
lines = lines + ["\t{}".format(l) for l in SurveyLoader.load(new_file_path)]
lines = lines + ["###filepath:{}".format(file_path)]
else:
lines.append(line.strip())
return lines
@staticmethod
def parse(lines, orig_file_path=None):
surveys = {}
id = []
file_path = orig_file_path
parent = None
survey = None
data = []
for index, line in enumerate(lines):
match = re.match(file_path_reg, line)
if match:
file_path = match.group(1)
continue
match = re.match(survey_reg, line)
if match:
id = id + [match.group(1)]
parent = survey
survey = Survey(id[:], parent, file_path)
surveys[".".join(id[:])] = survey
if parent:
parent.data = data[:]
parent.children = parent.children + [survey]
data = [line]
continue
match = re.match(end_survey_reg, line)
if match:
popped = id.pop()
data = data + [line]
survey.data = data[:]
if len(survey.children) == 0:
survey.parse()
if not survey.parent:
return survey, surveys
parent.data = parent.data + data
data = parent.data[:]
parent = survey.parent
survey = parent
continue
data = data + [line]
return parent, surveys
def __init__(self, file_path):
# print(f"\033[32mDebug SurveyLoader.load : \033[0m {file_path}")
self._data = SurveyLoader.load(file_path)
survey, surveys = SurveyLoader.parse(self._data, file_path)
self.survey = survey
self.surveys = surveys
def get_survey_by_id(self, therion_id):
id = []
if "@" in therion_id:
parts = therion_id.split("@")
id = list(reversed(parts[1].split("."))) + [parts[0]]
else:
id = list(reversed(therion_id.split(".")))
key = ".".join(id)
if key in self.surveys:
return self.surveys[key]
else:
potential_key = [k for k in self.surveys.keys() if k.endswith(".{}".format(key))]
if len(potential_key) == 1:
return self.surveys[potential_key[0]]
potential_keys = [k for k in self.surveys.keys() if key in k]
if len(potential_keys) == 1:
return self.surveys[potential_keys[0]]
elif len(potential_keys) > 1:
raise MultipleSurveyFoundException("Multiple surveys were found with that key:\n\t{}".format("\n\t".join(potential_keys)))
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse a survey")
parser.add_argument(
"survey_file",
help='The survey file (*.th) to work from. e.g. "data/system_migovec.th"',
)
parser.add_argument(
"survey_selector",
help='The selector for the survey to produce a scrap for. e.g. "roundpond@vrtnarija.vrtnarija_vilinska.system_migovec"',
)
args = parser.parse_args()
entrypoint = abspath(args.survey_file)
loader = SurveyLoader(entrypoint)
print(loader.get_survey_by_id(args.survey_selector))
+5
View File
@@ -0,0 +1,5 @@
import numpy as np
a = np.array([1,2,3,4,5])
print(a)
+114
View File
@@ -0,0 +1,114 @@
import tempfile
import shutil
import os
from os.path import join
import subprocess
import re
#################################################################################################
# Codes de couleur ANSI
class Colors:
BLACK = '\033[90m'
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
ERROR = '\033[91m'
WARNING = '\033[95m'
HEADER = '\033[96m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
#################################################################################################
def compile_template(template, template_args, **kwargs):
try:
log = ""
tmpdir = tempfile.mkdtemp()
config = template.format(**template_args, tmpdir=tmpdir.replace("\\", "/"))
print(f"{Colors.YELLOW}{config}{Colors.ENDC}\n")
config_file = join(tmpdir, "config.thconfig")
log_file = join(tmpdir, "log.log")
therion_path = kwargs["therion_path"] if "therion_path" in kwargs else "therion"
with open(config_file, mode="w+", encoding="utf-8") as tmp:
with open(log_file, mode="w+") as tmp2:
tmp.write(config)
tmp.flush()
subprocess.check_output('''"{}" "{}" -l "{}"'''.format(therion_path, config_file, log_file), shell=True, )
tmp2.flush()
log = tmp2.read()
if kwargs["cleanup"]:
shutil.rmtree(tmpdir)
print("\n" )
return log, tmpdir
except Exception as e:
print(f"{Colors.ERROR}Error: Therion template compilation error: {Colors.ENDC}{e}")
#################################################################################################
def compile_file(filename, **kwargs):
try:
tmpdir = os.path.dirname(filename)
log_file = join(tmpdir, "log.log").replace("\\", "/")
therion_path = kwargs["therion_path"] if "therion_path" in kwargs else "therion"
# print(f"{Colors.BLUE}therion_path: {Colors.ENDC}{therion_path}")
# print(f"{Colors.BLUE}filename: {Colors.ENDC}{filename}")
# print(f"{Colors.BLUE}log_file: {Colors.ENDC}{log_file}")
# subprocess.check_output('''"{}" "{}" -l "{}"'''.format(therion_path, filename, log_file), shell=True, )
result = subprocess.run(
[therion_path, filename, "-l", log_file],
stdout=subprocess.PIPE, # Capture de la sortie standard
stderr=subprocess.PIPE, # Capture des erreurs
shell=True
)
stdout_with_tabs = "\n".join("\t" + line for line in result.stdout.decode().splitlines())
# Si la commande échoue, result.returncode sera non nul
if result.returncode != 0:
# Affichage des erreurs et de la sortie standard
print(f"{Colors.ERROR}Error during Therion compilation:{Colors.ENDC}")
print(f"{Colors.WARNING}stdout: {Colors.YELLOW}{stdout_with_tabs}{Colors.ENDC}")
print(f"{Colors.ERROR}stderr: \n{result.stderr.decode()}{Colors.ENDC}")
else:
# Si la commande réussit, affichez la sortie standard
print(f"{Colors.GREEN}Therion compilation succeeded.\n{Colors.YELLOW}{stdout_with_tabs}{Colors.ENDC}")
except Exception as e:
print(f"{Colors.ERROR}Error: Therion file {Colors.ENDC}{filename}{Colors.ERROR} compilation error: {Colors.ENDC}{e}")
#################################################################################################
def compile_file_th(filepath, **kwargs):
template = """source {filepath}
layout test
scale 1 500
endlayout
"""
template_args = {"filepath": filepath}
logs, _ = compile_template(template, template_args, cleanup=True, **kwargs)
return logs
lengthre = re.compile(r".*Total length of survey legs =\s*(\S+)m")
depthre = re.compile(r".*Vertical range =\s*(\S+)m")
#################################################################################################
def get_stats_from_log(log):
lenmatch = lengthre.findall(log)
depmatch = depthre.findall(log)
if len(lenmatch) == 1 and len(depmatch) == 1:
return {"length": lenmatch[0], "depth": depmatch[0]}
return {"length": 0, "depth": 0}
@@ -0,0 +1,460 @@
from dataclasses import dataclass, field
import re
@dataclass
class Date:
"""A class which represents a string formatted date"""
year : int = 2000
month : int = 1
day : int = 1
date_string : str = field(init = False)
def __post_init__(self) -> None:
self.date_string = f"{self.year}.{self.month:02d}.{self.day:02d}"
@dataclass
class StationWithComment:
name : str
comment : str
command : str = field(init=False)
def __post_init__(self) -> None:
self.command = f'station {self.name} "{self.comment}"'
@dataclass
class LineLRUD:
from_station : str
left : float
right : float
up : float
down : float
@dataclass
class DataLine:
from_station : str
to_station : str
tape : float
compass : float
@dataclass
class NormalDataLine(DataLine):
clino : float
@dataclass
class DivingDataLine(DataLine):
to_depth : float
from_depth : float
@dataclass
class Centreline:
explo_date : Date = Date()
explorers : list[str] = field(init= False, default_factory= list)
type : str = "normal"
data_header : list[str] = field(init= False, default_factory= list)
units_length : dict[str, str] = field(default_factory=lambda: {"units length" : "meters"})
units_compass_clino : dict[str, str] = field(default_factory=lambda: {"units compass clino": "degrees"})
lrud_reader : list[str] = field(default_factory=lambda: ["data", "dimensions", "station", "left", "right", "up", "down"])
data : list[DataLine] = field(init=False, default_factory= list)
lrud_data : list[LineLRUD] = field(init=False, default_factory= list)
commented_stations : list[StationWithComment] = field(init=False, default_factory=list)
_string_repr : str = field(init=False, default_factory= str)
def __post_init__(self) -> None:
if self.type == 'normal':
self.data_header = ["data", "normal", "from", "to", "tape", "compass", "clino"]
elif self.type == "normal_backclino":
self.data_header = ["data", "normal", "from", "to", "tape", "compass", "backclino"]
elif self.type == "normal_backcompass":
self.data_header = ["data", "normal", "from", "to", "tape", "backcompass", "clino"]
elif self.type == "normal_backcompass_backclino":
self.data_header = ["data", "normal", "from", "to", "tape", "backcompass", "backclino"]
elif self.type == "diving":
self.data_header = ["data", "diving", "from", "fromdepth","to", "todepth", "tape", "compass"]
elif self.type == "diving_backcompass":
self.data_header = ["data", "diving", "from", "fromdepth","to", "todepth", "tape", "backcompass"]
def add_explorers(self, explorers : list[str]) -> None:
self.explorers += explorers
def add_dataline(self, line: DataLine) -> None:
self.data+= [line]
def add_station_line(self, station : StationWithComment) -> None:
self.commented_stations.append(station)
def add_LRUDdataline(self, line: LineLRUD) -> None:
self.lrud_data+= [line]
def add_Date(self, date: str) -> None:
DATE = date.split(".")
self.date = Date(year=int(DATE[0]),month=int(DATE[0]),day=int(DATE[2]))
self.explo_date = Date(year=int(DATE[0]),month=int(DATE[1]),day=int(DATE[2]))
def update_type(self, type: str):
self.type = type
self.__post_init__()
def add_string_repr(self) -> None:
explorers = ""
for explorer in self.explorers:
explorers += f"explo-team {explorer}\n\t"
formatted_data : str = ""
formatted_lrud : str = ""
formatted_comments : str = ""
for line,lrud_line in zip(self.data,self.lrud_data):
if "clino" in self.data_header:
formatted_data += f"""
{line.from_station}\t{line.to_station}\t{line.tape}\t{line.compass}\t{line.clino}\t""" # type: ignore
formatted_lrud += f"""
{lrud_line.from_station}\t{lrud_line.left}\t{lrud_line.right}\t{lrud_line.up}\t{lrud_line.down}\t"""
elif "todepth" in self.data_header:
formatted_data += f"""
{line.from_station}\t{line.from_depth}\t{line.to_station}\t{line.to_depth}\t{line.tape}\t{line.compass}\t""" # type: ignore
formatted_lrud += f"""
{lrud_line.from_station}\t{lrud_line.left}\t{lrud_line.right}\t{lrud_line.up}\t{lrud_line.down}\t"""
for comment in self.commented_stations:
formatted_comments += f"""
{comment.command}"""
self._string_repr = f"""
centreline
explo-date {self.explo_date.date_string}
date {self.explo_date.date_string}
{explorers}
{join(self.data_header)}
{formatted_data}
{join(self.lrud_reader)}
{formatted_lrud}
{formatted_comments}
endcentreline"""
@dataclass
class Survey:
name : str
entrance : str = field(init= False, default_factory= str)
centrelines : list[Centreline] = field(init= False, default_factory= list)
_string_repr : str = field(init=False, default_factory= str)
def add_centrelines(self, centrelines: list[Centreline]) -> None:
self.centrelines += centrelines
def add_entrance(self, entrance : str) -> None:
self.entrance = entrance
def add_string_repr(self) -> None:
centrelines = ""
for centreline in self.centrelines:
centrelines += f"{centreline._string_repr}\n"
self._string_repr = f"""
## a survey compiled from Visual Topo Data using the visual_therion.py script
survey "{self.name}" -entrance {self.entrance}
{centrelines}
endsurvey
"""
@dataclass
class StrategyParser:
input_str : str
compass : str = "normal"
clino : str = "normal"
strategy_name: str = "normal"
def __post_init__(self) -> None:
if "Dir,Dir,Inv" in self.input_str:
self.clino = "back"
self.strategy_name = "normal_backclino"
elif "Inv,Inv,Dir" in self.input_str or "Inv,Dir,Dir" in self.input_str:
self.compass = "back"
if "Prof" in self.input_str:
self.strategy_name = "diving_backcompass"
else:
self.strategy_name = "normal_backcompass"
elif "Inv,Inv,Inv" in self.input_str:
self.compass = "back"
self.clino = "back"
self.strategy_name = "normal_backcompass_backclino"
elif ("Dir Dir Dir" in self.input_str) and ("Prof") in self.input_str:
self.strategy_name = "diving"
def join(l : list[str])-> str:
newstr = ""
for elem in l:
newstr += f"{elem} "
return newstr
def find_entrance_stn(data: list[str], format : str) -> str:
if format == "tro":
"""Search the visual topo file for the entrance station"""
for c,l in enumerate(data):
if 'Entree' in l:
entrance_stations = re.findall(r"(?<=Entree\s).+",l)
else:
for c,l in enumerate(data):
if '<Entree>' in l:
entrance_stations = re.findall(r"(?<=<Entree>)[0-9a-z]+",l)
return entrance_stations[0] # type: ignore
def return_centreline_params(data: list[str], fmt: str):
if fmt == "tro":
return return_centreline_params_tro(data)
else:
return return_centreline_params_trox(data)
def return_centreline_params_trox(data):
start,end = [],[]
survey_dates = []
surveyor_groups = []
for c,l in enumerate(data):
if ('Param' in l) and ('/Param' not in l):
if 'Comment' in data[c+1]:
if len(start) >= 1:
end.append(c-1)
start.append(c+2)
else:
if len(start) >= 1:
end.append(c-1)
start.append(c+1)
reg_explodate = re.findall(r'(?<=Date\=")\d\d\/\d\d\/\d\d\d\d', l)
reg_explodate = [elem for elem in reg_explodate[0].split("/")]
explodate = "{yyyy}.{mm}.{dd}".format(yyyy =reg_explodate[2], mm =reg_explodate[1], dd = reg_explodate[0])
if len(explodate) == 0:
survey_dates.append('')
else:
survey_dates.append(re.sub(r"/",".",explodate))
tp = re.findall(r"(?<=Topo réalisée par )[\w+\s]*",l)
if len(tp) == 0:
surveyor_groups.append('')
else:
surveyor_groups.append(tp[0].split(' '))
elif 'Configuration' in l:
end.append(c-1)
return surveyor_groups,survey_dates,start,end
def return_centreline_params_tro(data):
# find the parameters of the file.
start,end = [],[]
survey_dates = []
surveyor_groups = []
for c,l in enumerate(data):
if 'Param' in l:
if len(start) >= 1:
end.append(c-1)
start.append(c+1)
explodate = re.findall(r"\d\d.\d\d.\d\d", l)
if len(explodate) == 0:
survey_dates.append('')
else:
survey_dates.append(re.sub(r"-",".",explodate[0]))
tp = re.findall(r"(?<=Topo réalisée par )[\w+\s]*",l)
if len(tp) == 0:
surveyor_groups.append('')
else:
surveyor_groups.append(tp[0].split(' '))
elif 'Configuration' in l:
end.append(c-1)
return surveyor_groups,survey_dates,start,end
def parseFloat(x: str) -> float:
try:
X = float(x)
return X
except ValueError:
X = 0.
return X
def parse_CommentedStations(lines : list[str]) -> list[StationWithComment]:
parsed_lines = [[elem for elem in line.split(";") if elem != ""] for line in lines[1:]]
stations_list = []
for line in parsed_lines:
if len(line) > 1:
stn_name = [elem for elem in line[0].split(" ") if elem != ""][0]
comment = line[1]
station = StationWithComment(stn_name,comment)
stations_list.append(station)
return stations_list
def parse_LRUDS(lines: list[str], format : str) -> list[LineLRUD]:
if format == "tro":
LRUDlines = [[elem for elem in line.split(" ") if elem != ""] for line in lines[:]]
elif format == "trox":
print("parsing a trox file")
LRUDlines = []
LRUDlines.append([elem.split("=")[-1].strip('"') for elem in lines[0].split(" ")[1:] if elem != ""])
LRUDlines = LRUDlines + [["*"]+[elem.split("=")[-1].strip('"') for elem in line.split(" ")[1:] if elem != ""] for line in lines[:]]
else:
LRUDlines = [[]]
lrud_lines = []
for c,line in enumerate(LRUDlines):
if "*" in line[0] and len(line) >=9:
LRUDline =LineLRUD(LRUDlines[c][1],parseFloat(line[5]),parseFloat(line[6]),parseFloat(line[7]),parseFloat(line[8]))
elif len(line) >=9:
LRUDline =LineLRUD(line[1],parseFloat(line[5]),parseFloat(line[6]),parseFloat(line[7]),parseFloat(line[8]))
else:
print("skipping empty line {}".format(c))
if len(line) >=9:
if line[0] != line[1]:
#check there is no asterisk
lrud_lines.append(LRUDline) # type:ignore
return lrud_lines
def parse_normal_data(lines: list[str], format: str ) -> list[NormalDataLine]:
if format == "tro":
datalines = [[elem for elem in line.split(" ") if elem != ""] for line in lines[1:]]
elif format == "trox":
print("parsing a trox file")
datalines: list = []
datalines.append([elem.split("=")[-1].strip('"') for elem in lines[0].split(" ")[1:] if elem != ""])
for line in lines[1:]:
if "Dep=" in line:
datalines.append([elem.split("=")[-1].strip('"') for elem in line.split(" ")[1:] if elem != ""])
else:
datalines.append(["*"]+[elem.split("=")[-1].strip('"') for elem in line.split(" ")[1:] if elem != ""])
else:
datalines = [[]]
dataLines = []
for c,line in enumerate(datalines):
if "*" in line[0] and len(line) >=9:
dataLine = NormalDataLine(datalines[c-1][1],line[1],float(line[2]),float(line[3]),float(line[4]))
print(line)
elif len(line) >=9:
dataLine = NormalDataLine(line[0],line[1],float(line[2]),float(line[3]),float(line[4]))
else:
print("skipping empty line {}".format(c))
if dataLine.tape != 0: # type:ignore
dataLines.append(dataLine) # type:ignore
return dataLines
def parse_diving_data(lines: list[str], format: str) -> list[DivingDataLine]:
if format == "tro":
datalines = [[elem for elem in line.split(" ") if elem != ""] for line in lines[1:]]
elif format == "trox":
print("parsing a trox file")
datalines = []
datalines.append([elem.split("=")[-1].strip('"') for elem in lines[0].split(" ")[1:] if elem != ""])
datalines = datalines + [["*"]+[elem.split("=")[-1].strip('"') for elem in line.split(" ")[1:] if elem != ""] for line in lines[1:]]
else:
print("oops empty")
datalines = [[]]
dataLines = []
for c,line in enumerate(datalines[:]): # keep and index and ignore the first one.
if len(line) >=9:
if "*" in line and len(datalines[c-1]) > 9:
dataLine = DivingDataLine(from_depth= float(datalines[c][4]),
from_station= datalines[c-1][1],
to_depth= float(line[4]),
to_station=line[1],
tape= float(line[2]),
compass =float(line[3]))
else:
dataLine = DivingDataLine(from_depth= float(datalines[c][4]),
from_station= line[0],
to_depth= float(line[4]),
to_station=line[1],
tape= float(line[2]),
compass =float(line[3]))
if dataLine.tape != 0:
dataLines.append(dataLine)
return dataLines
def make_centrelines_list(data : list[str], format: str ) -> list[Centreline]:
surveyor_groups,survey_dates,starts,ends = return_centreline_params(data, fmt= format) # type:ignore
centrelines : list[Centreline] = []
for start,end,date in zip(starts,ends,survey_dates):
newCentreline = Centreline()
newCentreline.add_Date(date)
if format == "tro":
header = data[start-1]
else:
if "Comment" in data[start-1]:
header = ""
for elem in data[start-2].split(" ")[1:]:
header += elem.split("=")[-1].strip('"')+" "
else:
header = ""
for elem in data[start-1].split(" ")[1:]:
header += elem.split("=")[-1].strip('"')+" "
print("data header: ", newCentreline.data_header)
strategy = StrategyParser(header)
newCentreline.update_type(strategy.strategy_name)
station_lines = parse_CommentedStations(data[start:end])
if "normal" in strategy.strategy_name:
lrudLines = parse_LRUDS(data[start:end], format = format)
dataLines = parse_normal_data(data[start:end], format= format)
for dataLine,lrudLine in zip(dataLines,lrudLines):
newCentreline.add_dataline(dataLine)
newCentreline.add_LRUDdataline(lrudLine)
elif "diving" in strategy.strategy_name:
lrudLines = parse_LRUDS(data[start:end], format = format)
dataLines = parse_diving_data(data[start:end], format = format)
for dataLine,lrudLine in zip(dataLines,lrudLines):
newCentreline.add_dataline(dataLine)
newCentreline.add_LRUDdataline(lrudLine)
for line in station_lines:
newCentreline.add_station_line(line)
newCentreline.add_string_repr()
centrelines.append(newCentreline)
return centrelines