Files
Alex38Lyon db516b2cc0 pyThtoQgis
2026-01-12 19:58:16 +01:00

1018 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
######!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2020 Xavier Robert <xavier.robert@ird.fr>
# SPDX-License-Identifier: GPL-3.0-or-later
"""
#############################################################
# #
# Script to automatize data extraction of Therion databases #
# #
# By Xavier Robert #
# Grenoble, October 2022 #
# #
#############################################################
Written by Xavier Robert, October 2022
Modifié Alex 2025 01 31
Inputs files (16): (.dbf, .prj, .shp, .shx)
- points2d
- lines2d
- areas2d
- outlines
En cas d'erreur corriger manuellement (QGis) les topologies des fichiers :
- areas2d.shp
- lines2d.shp
xavier.robert@ird.fr
"""
# Do divisions with Reals, not with integers
# Must be at the beginning of the file
from __future__ import division
# Import Python modules
#import numpy as np
import sys, os, copy, shutil
import fiona
import shapely
import geopandas as gpd
import pandas as pd
from fiona import Env
from shapely.geometry import Polygon, LineString, MultiPolygon
from shapely.geometry import shape, mapping, GeometryCollection
from shapely.ops import transform, unary_union, polygonize
from shapely.errors import TopologicalError
from shapely.validation import make_valid, explain_validity
from collections import Counter
#from functools import wraps
from alive_progress import alive_bar # https://github.com/rsalmei/alive-progress
###### TO DO #####
# -
##### End TO DO #####
#################################################################################################
#################################################################################################
#def validate(func):
# """
# Function to validate areas topology.
# From https://shapely.readthedocs.io/en/latest/manual.html
# Args:
# func (_type_): _description_
# Raises:
# TopologicalError: Error of topology
# - area does not close
# - inner ring
# - boundaries intersects
# Returns:
# _type_: _description_
# """
# @wraps(func)
# def wrapper(*args, **kwargs):
# ob = func(*args, **kwargs)
# if not ob.is_valid:
# raise TopologicalError(
# "Given arguments do not determine a valid geometric object")
# return ob
# return wrapper
def validate(inputfile, rec):
rec2 = rec
#print(rec['geometry']['coordinates'][0]) # il y a visiblement un soucis avec le nombre de []
if not Polygon(rec['geometry']['coordinates'][0]).is_valid:
print('Problem in %s geometry' %(inputfile))
print('%s is not a valid geometric object' %(rec['properties']['_ID']))
raise TopologicalError('\033[91mERROR:\033[00m Correction does not work...\n%s is not a valid geometric object\n\t The error is: %s' %(str(rec['properties']['_ID']), shapely.validation.explain_validity(rec)))
#print('We try to correct it')
#rec2b = shapely.validation.make_valid(Polygon(rec['geometry']['coordinates'][0]))
# Check à améliorer, il faut que ce soit un Polygon, et non un MultiPolygon...
#if not rec2b.is_valid:
# raise TopologicalError('ERROR: Correction failed...\n%s is not a valid geometric object\n\t The error is: %s' %(str(rec['properties']['_ID']), shapely.validation.explain_validity(rec)))
#else:
# rec2['geometry']['coordinates'][0] = list(rec2b.exterior.coords)
# Find where there is the error if possible
#Diagnostics
#validation.explain_validity(ob):
#Returns a string explaining the validity or invalidity of the object.
#The messages may or may not have a representation of a problem point that can be parsed out.
#coords = [(0, 0), (0, 2), (1, 1), (2, 2), (2, 0), (1, 1), (0, 0)]
#p = Polygon(coords)
#from shapely.validation import explain_validity
#shapely.validation.explain_validity(p)
#'Ring Self-intersection[1 1]'
#shapely.validation.make_valid(ob)
#Returns a valid representation of the geometry, if it is invalid. If it is valid, the input geometry will be returned.
#In many cases, in order to create a valid geometry, the input geometry must be split into multiple parts or multiple geometries. If the geometry must be split into multiple parts of the same geometry type, then a multi-part geometry (e.g. a MultiPolygon) will be returned. if the geometry must be split into multiple parts of different types, then a GeometryCollection will be returned.
#For example, this operation on a geometry with a bow-tie structure:
#from shapely.validation import make_valid
#coords = [(0, 0), (0, 2), (1, 1), (2, 2), (2, 0), (1, 1), (0, 0)]
#p = Polygon(coords)
#make_valid(p)
#<MULTIPOLYGON (((1 1, 0 0, 0 2, 1 1)), ((2 0, 1 1, 2 2, 2 0)))>
#Yields a MultiPolygon with two parts, and sometimes area + line:
return rec2
#################################################################################################
def cutareas(pathshp, outlines, outputspath):
"""
Function to cut shapefiles areas with the outline to only keep the lines inside the outline
Args:
pathshp (str) : path where are stored output shp from Therion
outlines (geopandas obj): the outline shapefile
outputspath (str) : path where to copy the gpkg files
"""
print('\033[1;32mWorking with areas...\033[0m')
# 2- Validate the outline and Areas shapefile
#for rec in outlines:
# rec2 = validate('outline2d.shp', rec)
# # update correction --> To do ?
# #if rec2 != rec:
#for rec in areas:
# rec2 = validate('areas2d.shp', rec)
# # update correction
# #if rec2 != rec:
# Read the Line Shapefile
# areas = gpd.read_file(pathshp + 'areas2d.shp', driver = 'ESRI shapefile')
areas = gpd.read_file(pathshp + 'areas2d.gpkg')
# Corriger les erreurs de topologie dans les lignes avant traitement
# areas = fix_topology(areas)
# Extract the intersections between outlines and lines
# be careful, for this operation, geopandas needs to work with rtree and not pygeos
# --> uninstall pygeos and install rtree
try:
areasIN = areas.overlay(outlines, how = 'intersection')
except:
print('ERROR: 1) uninstall pygeos and install rtree\n\t2) check your polygons validity')
import rtree
print ('\tYou may check the validity of your polygons with the verify function in QGIS')
areasIN = areas.overlay(outlines, how = 'intersection')
# Removes inner lines that have different id and scrap_id
areasIN = areasIN[areasIN['_SCRAP_ID'] == areasIN ['_ID']]
# Save output
#areasIN.to_file("areas2dMasekd.gpkg", driver = "GPKG", encoding = 'utf8')
areasIN.to_file(outputspath + "areas2dMasekd.gpkg", driver = "GPKG")
return
#################################################################################################
def repair_geometry(geom):
"""Répare une géométrie en appliquant un buffer de zéro si elle est invalide."""
if geom is None:
return None # Si la géométrie est déjà None, on ne fait rien.
try:
# Vérifier si la géométrie est valide
if not geom.is_valid:
# Appliquer un buffer de zéro pour corriger la géométrie
return geom.buffer(0)
return geom
except TopologicalError:
# Gérer les erreurs topologiques si une géométrie est impossible à réparer
print(f"Erreur topologique pour la géométrie: {geom}")
return None # Renvoie None pour les géométries non réparables
#################################################################################################
def fix_topology(geodf):
"""Fonction pour corriger les erreurs de topologie dans un GeoDataFrame"""
# Compteur pour les géométries corrigées
corrected_count = 0
def count_and_repair(geom):
"""Compter et réparer les géométries invalides"""
nonlocal corrected_count
if geom and not geom.is_valid:
corrected_count += 1
return repair_geometry(geom)
# Appliquer la réparation sur toutes les géométries du GeoDataFrame
geodf['geometry'] = geodf['geometry'].apply(count_and_repair)
# Filtrer les géométries invalides restantes
geodf = geodf[geodf['geometry'].notnull()] # Exclure les géométries None
geodf = geodf[geodf.is_valid] # Garder seulement les géométries valides
# Afficher le nombre d'erreurs corrigées
if corrected_count > 0 : print(f"Nombre d'erreurs topologiques corrigées: {corrected_count}")
else : print(f"Aucune erreur de topologiques corrigée")
return geodf
#################################################################################################
def cutLines(pathshp, outlines, outputspath):
"""
Function to cut shapefiles lines with the outline to only keep the lines inside the outline
Args:
pathshp (str) : path where are stored output shp from Therion
outlines (geopandas obj): the outline shapefile
outputspath (str) : path where to copy the gpkg files
"""
print('\033[1;32mWorking with lines...\033[0m')
# Read the Line Shapefile
# lines = gpd.read_file(pathshp + 'lines2d.shp', driver = 'ESRI shapefile') # [Note Alex]
lines = gpd.read_file(pathshp + 'lines2d.gpkg') # [Note Alex]
# lines = fix_topology(lines)
# Vérifier si outlines est un GeoDataFrame
if not isinstance(outlines, gpd.GeoDataFrame):
print("outlines n'est pas un GeoDataFrame. Tentative de conversion...")
outlines = gpd.read_file(outlines) # Lire un fichier shapefile si outlines est une chaine de caractères
# Corriger les erreurs de topologie dans outlines avant traitement
# outlines = fix_topology(outlines)
# lines = gpd.read_file(pathshp + 'lines2d.shp') # [Note Alex]
# Extract lines that are not masked by the outline
linesOUT = pd.concat((lines[lines['_TYPE'] == 'centerline'],
lines[lines['_TYPE'] == 'water_flow'],
lines[lines['_TYPE'] == 'label'],
lines[lines['_CLIP'] == 'off']),
ignore_index=True)
# Extract lines will be masked by the outline
linesIN = lines[lines['_CLIP'] != 'off']
linesIN = linesIN[linesIN['_TYPE'] != 'centerline']
linesIN = linesIN[linesIN['_TYPE'] != 'water_flow']
linesIN = linesIN[linesIN['_TYPE'] != 'label']
# Extract the intersections between outlines and lines
# be careful, for this operation, geopandas needs to work with rtree and not pygeos
# --> uninstall pygeos and install rtree
try:
# outlines = outlines.buffer(0) # [Note Alex] Réparer les géométries invalides
linesIN = linesIN.overlay(outlines, how = 'intersection', keep_geom_type=True)
except:
print('\033[91mERROR: 1\033[00m) uninstall pygeos and install rtree\n\t2) check your polygons validity')
import rtree
print ('\tYou may check the validity of your polygons with the verify function in QGIS')
linesIN = linesIN.overlay(outlines, how = 'intersection', keep_geom_type=True)
print('TEST')
# Removes inner lines that have different id and scrap_id
linesIN = linesIN[linesIN['_SCRAP_ID'] == linesIN ['_ID']]
# Merge the IN and OUT database
linesTOT = pd.concat((linesOUT, linesIN),
ignore_index=True)
# Save output
#linesTOT.to_file("lines2dMasekd.gpkg", driver="GPKG", encoding = 'utf8')
linesTOT.to_file(outputspath + "lines2dMasekd.gpkg", driver="GPKG")
return
#################################################################################################
def AddAltPoint(pathshp, outputspath):
"""
Function to add the altitude of the stations and entrances in the attribut table
Args:
pathshp (str) : path where are stored output shp from Therion
outputspath (str): path where to copy the gpkg files
"""
print('\033[1;32mWorking with points...\033[1;32m')
# Definition des altitudes des entrées supérieures des réseaux à plusieurs entrées
EntreeSupp = {'JB' : 2333, # Entrée C37
'CP' : 2136, # Entrée CP16
'LP9' : 2299, # Entrée LP9
'CP6' : 2182, # Entrée CP53
'CP62' : 1960, # Entrée CP62
'A21' : 1797, # Entrée A21
'Mirolda': 2330 # Entrée Jockers
}
# Définition des noms de réseau
RNames = {'JB' : 'Gouffre Jean Bernard',
'CP' : 'Réseau de la Combe aux Puaires',
'LP9' : 'LP9 - CP39',
'CP6' : 'CP6 - CP53',
'CP62' : 'CP62 - CP63',
'A21' : 'A21 -A24',
'Mirolda': 'Réseau Lucien-Bouclier - Mirolda'
}
# Définition des noms de systèmes
SNames = {'SynclinalJB' : 'Système du Jean-Bernard',
'SystemeCP' : 'Système de la Combe aux Puaires',
'SystemeAV' : 'Système des Avoudrues',
'SystemeA21' : 'Système du A21',
'SystemMirolda' : 'Système du Criou - Mirolda',
'SystemeBossetan': 'Système de Bossetan',
'sources' : 'Résurgences',
'tuet' : 'Système du Tuet',
'eauxfroides' : 'Système des Eaux Froides'
}
# Open the text file with the coordinates of the caves
# This text file (Caves.txt) should be build with Therion compilation
# and stored in the output's shapefiles folder
# export cave-list -location on -o Outputs/SHP/Caves.txt
f = open(pathshp + 'Caves.txt', 'r').readlines()
# Make a new shapefile instance
with fiona.open(pathshp + 'points2d.shp', 'r') as inputshp:
# Créer le nouveau schéma des shapefiles
newschema = inputshp.schema
newschema['properties']['_CAVE'] = 'str'
newschema['properties']['_SYSTEM'] = 'str'
newschema['properties']['_ALT'] = 'str:4'
newschema['properties']['_DEPTH'] = 'float'
newschema['properties']['_EASTING'] = 'float'
newschema['properties']['_NORTHING'] = 'float'
# Open the output shapefile
#with fiona.open(inputfile[:-4] + 'Alt.shp', 'w', crs=inputshp.crs, driver='ESRI Shapefile', schema=newschema) as ouput:
#with fiona.open('points2dAlt.gpkg', 'w', crs=inputshp.crs, driver='GPKG', schema=newschema, encoding = 'utf8') as ouput:
with fiona.open(outputspath + 'points2dAlt.gpkg', 'w', crs=inputshp.crs, driver='GPKG', schema=newschema) as ouput:
with alive_bar(len(inputshp), title = "\x1b[32;1m- Processing stations...\x1b[0m", length = 20) as bar:
# do a loop on the stations
for rec in inputshp:
# Copy the schema from the input data
g = rec
g['properties']['_CAVE'] = ''
g['properties']['_SYSTEM'] = ''
g['properties']['_DEPTH'] = ''
# Add Alt, Easting, Northing
g['properties']['_ALT'] = str(round(float(rec['geometry']['coordinates'][2])))
g['properties']['_EASTING'] = float(rec['geometry']['coordinates'][0])
g['properties']['_NORTHING'] = float(rec['geometry']['coordinates'][1])
if rec['properties']['_TYPE'] == 'station' and rec['properties']['_STSURVEY'] != None:
# Find system
system = rec['properties']['_STSURVEY'].split('.')[-2]
g['properties']['_SYSTEM'] = SNames[system]
# Find Cave
xxx = rec['properties']['_STSURVEY'].split('.')
while len(xxx) < 4:
xxx.append('junk')
if 'trous' in xxx[0] or SNames[system] == 'Résurgences' or 'sources' in xxx[0]:
g['properties']['_CAVE'] = rec['properties']['_STNAME']
g['properties']['_DEPTH'] = 0
elif 'eauxfroides' in xxx[-3]:
g['properties']['_CAVE'] = 'Résurgence des Eaux Froides'
g['properties']['_DEPTH'] = 0
elif 'tuet' in xxx[-4]:
g['properties']['_CAVE'] = 'Tuet'
g['properties']['_DEPTH'] = 0
elif 'ReseauCP' in xxx[-4]:
g['properties']['_CAVE'] = RNames['CP']
g['properties']['_DEPTH'] = EntreeSupp['CP'] - float(rec['geometry']['coordinates'][2])
elif 'LP9' in xxx[-4]:
g['properties']['_CAVE'] = RNames['LP9']
g['properties']['_DEPTH'] = EntreeSupp['LP9'] - float(rec['geometry']['coordinates'][2])
elif 'CP6' in xxx[-4]:
g['properties']['_CAVE'] = RNames['CP6']
g['properties']['_DEPTH'] = EntreeSupp['CP6'] - float(rec['geometry']['coordinates'][2])
elif 'CP62' in xxx[-4]:
g['properties']['_CAVE'] = RNames['CP62']
g['properties']['_DEPTH'] = EntreeSupp['CP62'] - float(rec['geometry']['coordinates'][2])
elif xxx[-3] == 'Jean-Bernard':
#g['pfileroperties']['_CAVE'] = rec['properties']['_STSURVEY'].split('.')[-3]
g['properties']['_CAVE'] = RNames['JB']
g['properties']['_DEPTH'] = EntreeSupp['JB'] - float(rec['geometry']['coordinates'][2])
elif 'A21' in xxx[-4]:
g['properties']['_CAVE'] = RNames['A21']
g['properties']['_DEPTH'] = EntreeSupp['A21'] - float(rec['geometry']['coordinates'][2])
elif 'Mirolda' in xxx[-3]:
g['properties']['_CAVE'] = RNames['Mirolda']
g['properties']['_DEPTH'] = EntreeSupp['Mirolda'] - float(rec['geometry']['coordinates'][2])
else:
g['properties']['_CAVE'] = xxx[-4]
if g['properties']['_CAVE'] == 'A22':
g['properties']['_CAVE'] = 'A(V)22'
#g['properties']['_DEPTH'] = 0
# Trouver l'altitude de l'entrée !!!!
for line in f:
if g['properties']['_CAVE'] in line and line.split('\t')[6] != '\n':
altmax = float(line.split('\t')[6])
g['properties']['_DEPTH'] = altmax - float(rec['geometry']['coordinates'][2])
# Write record
ouput.write (g)
# Update progress bar
bar()
return
#################################################################################################
def shp2gpkg(pathshp, outputspath):
"""
function to convert shp files into gpkg files
Args:
pathshp (str) : path where are stored output shp from Therion
outputspath (str): path where to copy the gpkg files
"""
# files to be converted
# files = ['outline2d', 'shots3d', 'walls3d']
files = ['outline2d']
print('shp2gpkg : ', files)
with alive_bar(len(files), title = "\x1b[32;1m- Processing shp2gpkg...\x1b[0m", length = 20) as bar:
for fname in files :
if fname == 'walls3d':
print('shp2gpkg does not support walls3d files...\n\t I am only copying the shp file into the right folder')
for ftype in ['.shp', '.dbf', '.prj', '.shx']:
shutil.copy2(pathshp + fname + ftype, outputspath + fname + ftype)
#pass
#input = gpd.read_file(fname + '.shp', layer = 'walls3d', driver = 'ESRI shapefile')
#input.to_file(fname + ".gpkg", driver="GPKG", encoding = 'utf8')
#with fiona.open(fname + '.shp', 'r') as inputshp:
# with fiona.open(fname + '.gpkg', 'w', crs=inputshp.crs, driver='GPKG', schema=inputshp.schema, encoding = 'utf8') as ouput:
# for rec in inputshp:
# # Write record
# ouput.write (g)
else:
# input = gpd.read_file(pathshp + fname + '.shp', driver = 'ESRI shapefile')
input = gpd.read_file(pathshp + fname + '.shp', encoding = 'utf8')
#input.to_file(fname + ".gpkg", driver="GPKG", encoding = 'utf8')
# input.to_file(outputspath + fname + ".gpkg", driver="GPKG")
input.to_file(outputspath + fname + ".gpkg", encoding = 'utf8')
#input.to_file(fname + ".gpkg", driver="GPKG")
#update bar
bar()
return
#################################################################################################
def file_shp2gpkg(pathshp, infile, outputspath, outfile ):
"""
Function to convert shp files into gpkg files using Fiona.
Args:
pathshp (str): Path where the input shp files are stored.
file (str): Name of the file to be converted (without extension).
outputspath (str): Path where the output gpkg files will be saved.
"""
try:
# Configuration de l'environnement Fiona pour accepter les géométries non fermées
with Env(OGR_GEOMETRY_ACCEPT_UNCLOSED_RING="YES"):
input_shp = os.path.join(pathshp, infile + '.shp')
output_gpkg = os.path.join(outputspath, outfile + '.gpkg')
# Vérification que le fichier source existe
if not os.path.exists(input_shp):
raise FileNotFoundError(f"\t\033[91mError: the file \033[0m{input_shp}\033[91m did not exist.")
# Lecture du fichier Shapefile
with fiona.open(input_shp, 'r') as source:
# Affichage du nombre d'objets et de leur type
num_features = len(source)
geometry_type = source.schema['geometry']
# Vérification que le driver GPKG est disponible
if 'GPKG' not in fiona.supported_drivers:
raise RuntimeError("\t\033[91mError: The GPKG dirver is not supported by Fiona.\033[0m")
# Création du fichier GeoPackage
with fiona.open(
output_gpkg,
'w',
driver='GPKG',
schema=source.schema,
crs=source.crs
) as destination:
for feature in source:
destination.write(feature)
print(f'\033[1;32mConversion OK, \033[32m file : \033[0m{pathshp}{infile}.shp\033[32m to : \033[0m{outputspath}{outfile}.gpkg, type {geometry_type} : {num_features}')
except FileNotFoundError as e:
print(f"\t\033[91mError : \033[0m{e}", file=sys.stderr)
except RuntimeError as e:
print(f"\t\033[91mError : \033[0m{e}", file=sys.stderr)
except fiona.errors.FionaError as e:
print(f"\t\033[91mError read/write file : \033[0m{e}", file=sys.stderr)
except Exception as e:
print(f"\t\033[91mError unknow : \033[0m{e}", file=sys.stderr)
#################################################################################################
def count_topology_errors(file_path):
"""
Analyzes a shapefile for topology errors and counts them by type,
while listing the record numbers associated with each error.
Also returns the total number of errors.
Args:
file_path (str): Path to the shapefile to analyze.
Returns:
tuple:
- dict: A dictionary with the error types as keys and a list of record indices as values.
- int: The total number of errors found.
"""
error_details = {}
record_types = {'Point': 0, 'LineString': 0, 'Polygon': 0, 'MultiPoint': 0, 'MultiLineString': 0, 'MultiPolygon': 0}
total_records = 0
total_errors = 0
try:
# Open the shapefile in read mode
with fiona.open(file_path, "r") as src:
for i, record in enumerate(src):
total_records += 1
geometry = shape(record['geometry'])
# Count the type of geometry
geom_type = geometry.geom_type
if geom_type in record_types:
record_types[geom_type] += 1
else:
record_types[geom_type] = 1 # Add new geometry type if not found
try:
# Validate the geometry and explain any issues
validity_explanation = explain_validity(geometry)
if validity_explanation != "Valid Geometry":
# Extract the type of error
error_type = validity_explanation.split(" ")[0] # First word of the explanation
total_errors += 1 # Increment total errors
# Add the record index to the error details
if error_type in error_details:
error_details[error_type].append(i)
else:
error_details[error_type] = [i]
except Exception as e:
print(f"\033[91mError processing record \033[0m{i}: {e}")
# Display the total number of records and their types
if total_errors == 0 : print(f"\033[1;32mFile: \033[0m{file_path}\033[32m, records : \033[0m{total_records}\033[32m, no errors found: \033[0m")
else : print(f"\n\033[1;32mFile: \033[0m{file_path}\033[32m, records : \033[0m{total_records},\033[91m errors found: \033[0m{total_errors}")
for geom_type, count in record_types.items():
if count != 0 : print(f"\tType {geom_type} : {count} records")
# Print the results
if error_details:
print("\t\033[91mTopology errors found:\033[0m")
for error_type, indices in error_details.items():
print(f"\t\033[91mRecords : \033[0m{', '.join(map(str, indices))}\033[91m Erreur type : \033[0m{error_type}\033[91m : \033[0m{len(indices)}\033[91m occurrences\033[0m")
# else:
# print("\t\033[1;32mNo topology errors found.\t\033[0m")
return error_details, total_errors
except Exception as e:
print(f"\033[91mError analyzing shapefile: \033[0m{file_path}\033[91m Code : \033[0m{e}")
return {}, 0
#################################################################################################
def fix_geometries_fiona(input_shp, output_shp):
"""
Fixes geometries in a shapefile using Fiona and Shapely.
Parameters:
- input_shp (str): Path to the input shapefile.
- output_shp (str): Path to the output corrected shapefile.
Returns:
- None (saves the corrected file).
"""
try:
with fiona.open(input_shp, "r") as src:
schema = src.schema.copy()
crs = src.crs
records = list(src)
fixed_records = []
invalid_count = 0
fixed_count = 0
removed_count = 0
error_details = []
for record in records:
geom = shape(record["geometry"])
if geom is None or geom.is_empty:
removed_count += 1
continue # Remove empty geometries
try:
if not geom.is_valid:
invalid_count += 1
repaired_geom = unary_union([geom])
# Convert GeometryCollection into a valid geometry if possible
if isinstance(repaired_geom, GeometryCollection):
repaired_geom = list(polygonize(repaired_geom))
repaired_geom = repaired_geom[0] if repaired_geom else None
if repaired_geom and repaired_geom.is_valid:
geom = repaired_geom
fixed_count += 1
else:
removed_count += 1
continue # If still invalid, remove it
new_record = record.copy()
new_record["geometry"] = mapping(geom)
fixed_records.append(new_record)
except Exception as e:
removed_count += 1
error_details.append(str(e))
continue # Skip problematic geometries
if error_details:
print("\t⚠️ Error details:")
for error in set(error_details):
print(f"\t\t- {error}")
with fiona.open(output_shp, "w", driver="ESRI Shapefile", schema=schema, crs=crs) as dst:
dst.writerecords(fixed_records)
print(f'\033[1;32mCorrection, \033[32m file : \033[0m{input_shp}.shp\033[32m to : \033[0m{output_shp}')
print(f"\t🔍 Invalid geometries before correction: {invalid_count}")
print(f"\t🔧 Fixed geometries: {fixed_count}")
print(f"\t⚠️ \033[91m Removed geometries, check it ! : ⚠️ \033[0m {removed_count}")
print(f"\t📊 Total objects after correction: {len(fixed_records)}")
except Exception as e:
print(f"\033[91mError processing the shapefile: \033[0m{e}")
def fix_geometries_pure_python(input_shp, output_shp):
"""
Repairs geometries in a shapefile using Fiona and Shapely.
Parameters:
- input_shp (str): Path to the input shapefile.
- output_shp (str): Path to the output shapefile after correction.
Returns:
- None (saves the corrected file).
"""
try:
# Configuration de l'environnement Fiona pour accepter les géométries non fermées
with Env(OGR_GEOMETRY_ACCEPT_UNCLOSED_RING="YES"):
print(f"✅ Loading file: {input_shp}")
# Ouvrir le fichier Shapefile en lecture
with fiona.open(input_shp, 'r') as source:
# Vérifier le nombre d'objets et le type de géométrie
num_features = len(source)
geometry_type = source.schema['geometry']
print(f"️ Number of objects in the file: {num_features}")
print(f"️ Geometry type: {geometry_type}")
# Initialisation des compteurs d'erreurs
invalid_before = 0
invalid_after = 0
fixed_geometries = 0
removed_geometries = 0
# Préparer le schéma de sortie
output_schema = source.schema
# Ouvrir le fichier Shapefile en écriture
with fiona.open(
output_shp,
'w',
driver=source.driver,
schema=output_schema,
crs=source.crs
) as output:
# Parcourir chaque objet dans le fichier source
for feature in source:
geom = shape(feature['geometry'])
# Vérifier si la géométrie est invalide avant correction
if not geom.is_valid:
invalid_before += 1
# Fonction pour corriger la géométrie
def fix_geometry(geom):
if geom is None or geom.is_empty:
return None # Supprimer les géométries vides
try:
if not geom.is_valid:
geom = unary_union([geom]) # Corriger les chevauchements
geom = next(polygonize(geom)) # Reformuler en polygones si nécessaire
return geom
except Exception:
return None # Si impossible à réparer, retourner None
# Appliquer la correction
fixed_geom = fix_geometry(geom)
# Vérifier si la géométrie est invalide après correction
if fixed_geom is None or fixed_geom.is_empty:
removed_geometries += 1
continue # Ignorer les géométries irrécupérables
if not fixed_geom.is_valid:
invalid_after += 1
else:
fixed_geometries += 1
# Mettre à jour la géométrie dans l'objet
feature['geometry'] = mapping(fixed_geom)
output.write(feature)
# Afficher les résultats de la correction
print(f"🔍 Invalid geometries before correction: {invalid_before}")
print(f"🔍 Fixed geometries: {fixed_geometries}")
print(f"🔍 Invalid geometries after correction: {invalid_after}")
print(f"🔍 Removed geometries (irreparable): {removed_geometries}")
print(f"📊 Total objects after correction: {num_features - removed_geometries}")
print(f"✅ Corrected file saved to: {output_shp}")
except FileNotFoundError:
print(f"❌ Error: Input file not found: {input_shp}", file=sys.stderr)
except fiona.errors.FionaError as e:
print(f"❌ Error reading/writing the file: {e}", file=sys.stderr)
except Exception as e:
print(f"❌ Unexpected error during processing: {e}", file=sys.stderr)
def OLD2fix_geometries_pure_python(input_shp, output_shp):
"""
Repairs geometries in a shapefile using Fiona and Shapely.
Parameters:
- input_shp (str): Path to the input shapefile.
- output_shp (str): Path to the output shapefile after correction.
Returns:
- None (saves the corrected file).
"""
try:
# Configuration de l'environnement Fiona pour accepter les géométries non fermées
with Env(OGR_GEOMETRY_ACCEPT_UNCLOSED_RING="YES"):
print(f"✅ Loading file: {input_shp}")
# Ouvrir le fichier Shapefile en lecture
with fiona.open(input_shp, 'r') as source:
# Vérifier le nombre d'objets et le type de géométrie
num_features = len(source)
geometry_type = source.schema['geometry']
print(f"️ Number of objects in the file: {num_features}")
print(f"️ Geometry type: {geometry_type}")
# Initialisation des compteurs d'erreurs
invalid_before = 0
invalid_after = 0
fixed_geometries = 0
removed_geometries = 0
# Préparer le schéma de sortie
output_schema = source.schema
# Ouvrir le fichier Shapefile en écriture
with fiona.open(
output_shp,
'w',
driver=source.driver,
schema=output_schema,
crs=source.crs
) as output:
# Parcourir chaque objet dans le fichier source
for feature in source:
geom = shape(feature['geometry'])
# Vérifier si la géométrie est invalide avant correction
if not geom.is_valid:
invalid_before += 1
# Fonction pour corriger la géométrie
def fix_geometry(geom):
if geom is None or geom.is_empty:
return None # Supprimer les géométries vides
try:
if not geom.is_valid:
geom = unary_union([geom]) # Corriger les chevauchements
geom = next(polygonize(geom)) # Reformuler en polygones si nécessaire
return geom
except Exception:
return None # Si impossible à réparer, retourner None
# Appliquer la correction
fixed_geom = fix_geometry(geom)
# Vérifier si la géométrie est invalide après correction
if fixed_geom is None or fixed_geom.is_empty:
removed_geometries += 1
continue # Ignorer les géométries irrécupérables
if not fixed_geom.is_valid:
invalid_after += 1
else:
fixed_geometries += 1
# Mettre à jour la géométrie dans l'objet
feature['geometry'] = mapping(fixed_geom)
output.write(feature)
# Afficher les résultats de la correction
print(f"🔍 Invalid geometries before correction: {invalid_before}")
print(f"🔍 Fixed geometries: {fixed_geometries}")
print(f"🔍 Invalid geometries after correction: {invalid_after}")
print(f"🔍 Removed geometries (irreparable): {removed_geometries}")
print(f"📊 Total objects after correction: {num_features - removed_geometries}")
print(f"✅ Corrected file saved to: {output_shp}")
except FileNotFoundError:
print(f"❌ Error: Input file not found: {input_shp}", file=sys.stderr)
except fiona.errors.FionaError as e:
print(f"❌ Error reading/writing the file: {e}", file=sys.stderr)
except Exception as e:
print(f"❌ Unexpected error during processing: {e}", file=sys.stderr)
def fix_geometries_pure_python(input_shp, output_shp):
"""
Corrige les erreurs de géométrie d'un fichier Shapefile et enregistre uniquement
les objets du même type que le fichier source.
Affiche une synthèse des modifications et un bilan des géométries par type avant et après le traitement.
:param input_shp: Chemin du fichier Shapefile d'entrée
:param output_shp: Chemin du fichier Shapefile de sortie
"""
with fiona.open(input_shp, 'r') as src:
meta = src.meta # Métadonnées du fichier
original_geom_type = meta['schema']['geometry'].upper() # Type de géométrie attendu (en majuscules pour éviter les écarts de format)
original_geom_type_simple = original_geom_type.replace('3D ', '') # Suppression du préfixe '3D '
fixed_features = []
geom_counts_before = Counter()
geom_counts_after = Counter()
modifications = 0
for feature in src:
fixed_feature = dict(feature) # Copie de l'entité
geom = shape(feature['geometry'])
geom_counts_before[geom.geom_type] += 1
# Correction de la géométrie
valid_geom = make_valid(geom)
geom_type_fixed = valid_geom.geom_type.upper()
geom_counts_after[geom_type_fixed] += 1
# Vérification si la géométrie corrigée est du même type que l'original
if geom_type_fixed == original_geom_type or geom_type_fixed == original_geom_type_simple:
fixed_feature['geometry'] = mapping(valid_geom)
fixed_features.append(fixed_feature)
else:
modifications += 1
# Écriture du fichier de sortie avec uniquement les géométries du type d'origine
if fixed_features:
with fiona.open(output_shp, 'w', **meta) as dst:
dst.writerecords(fixed_features)
print(f"Correction terminée. Fichier sauvegardé sous: {output_shp}")
else:
print("Aucune entité valide trouvée correspondant au type original. Aucun fichier généré.")
# Affichage de la synthèse
print("\t--- Synthèse des corrections ---")
print(f"\tNombre total de modifications ignorées : {modifications}")
print("\tAvant correction :", dict(geom_counts_before))
print("\tAprès correction (uniquement type original) :", dict(geom_counts_after))
#################################################################################################
def ThCutAreas(pathshp, outputspath):
print(' ')
print('\033[1;32m****************************************************************')
print('Program to cut areas and lines that are intersecting the outline')
print(' Original written by X. Robert, ISTerre')
print(' October 2022')
print(' Updated by A. Pont')
print(' Febuary 2025')
print('****************************************************************')
print('\033[0m ')
# Check if areas, lines, points2d and outline shapefiles exists...
areaOK = True
for fname in ['outline2d', 'lines2d', 'areas2d', 'points2d']:
if not os.path.isfile(pathshp + fname + '.shp'):
if fname == 'areas2d':
areaOK = False
else:
print(f"\033[91mERROR:\033[00m File {(str(pathshp + fname + '.shp'))}\033[91m does not exist'\033[00m")
return False
err = count_topology_errors(pathshp + fname + '.shp')
if err[1] != 0 :
print (f'\033[91mERROR: in file \033[0m{(str(pathshp + fname + '.shp'))} \033[91m, try too fix it... \033[0m')
fix_geometries_pure_python(pathshp + fname + '.shp', pathshp + fname + '_fixed.shp')
err2 = count_topology_errors(pathshp + fname + '_fixed.shp')
if err2[1] == 0 :
# for ftype in ['.shp', '.dbf', '.prj', '.shx']:
# os.rename(pathshp + fname + ftype , pathshp + fname + '_Old' + ftype)
# os.rename(pathshp + fname + '_fixed' + ftype, pathshp + fname + ftype)
# count_topology_errors(pathshp + fname + '.shp')
file_shp2gpkg(pathshp, fname + "_fixed", outputspath, fname)
else :
print (f'\033[91mERROR: in file \033[0m{(str(pathshp + fname + '.shp'))} \033[91m please fix it manualy with QGis... \033[0m')
return False
else :
file_shp2gpkg(pathshp, fname, outputspath, fname)
# print (f'\033[32mOK :\033[00m No erreur... {err[1]}, code {err[0]}')
# Check if Outputs path exists
if not os.path.exists(outputspath):
print ('\033[91mWARNING:\033[00m ' + outputspath + ' does not exist, I am creating it...')
os.mkdir(outputspath)
#1- Read the outline shapefile
# outlines = gpd.read_file(pathshp + 'outline2d.shp', driver = 'ESRI shapefile')
outlines = gpd.read_file(outputspath + 'outline2d.gpkg')
print('\033[1;32mCheck\033[0m')
## Change SHP to gpkg
# shp2gpkg(pathshp, outputspath)
## Work with points
#AddAltPoint(pathshp, outputspath)
## Work with lines
cutLines(outputspath, outlines, outputspath)
## Work with Areas
if areaOK:
print ('\033[1;32mCuting areas...\033[0m')
cutareas(outputspath, outlines, outputspath)
else:
print ("No areas to process...")
#5- End ?
print('')
print('\033[1;32mUpdate point, areas and lines done.\033[0m')
print('')
######################################################################################################
if __name__ == u'__main__':
###################################################
# initiate variables
#inputfile = 'stations3d.shp'
pathshp = './Inputs/'
outputspath = './Outputs/'
###################################################
# Run the transformation
ThCutAreas(pathshp, outputspath)
# End...