"""
Operations on GeoDataFrame.
"""
import numpy as np
from geopandas import GeoDataFrame
from shapely.geometry import Polygon, MultiPoint
import shapely
import os
[docs]def clip(gdf: GeoDataFrame, shape: Polygon) -> GeoDataFrame:
"""
Clip a GeoDataFrame to the specified area.
Same as (but somewhat faster than) :func:`geopandas.clip` or
:func:`geopandas.overlay` with ``how='intersection'``.
"""
ix = gdf.sindex.query(shape, predicate="intersects")
ix.sort()
gdf = gdf.iloc[ix]
gdf = gdf.set_geometry(gdf.geometry.intersection(shape))
return gdf[~gdf.is_empty]
[docs]def repair(gdf: GeoDataFrame) -> GeoDataFrame:
"""
Attempt to fix broken geometries in the given GeoDataFrame, and returns
a new GeoDataFrame.
This uses the ``unary_union()`` as well as the ``buffer(0)`` trick. Your
mileage may vary.
"""
return gdf.set_geometry([
shapely.ops.unary_union([shape])
for shape in gdf.geometry.buffer(0.0)
])
[docs]def buffer(
gdf: GeoDataFrame,
distance: float = 0.0,
*args, **kwargs
) -> GeoDataFrame:
"""
Applies ``shape.buffer(...)`` to each shape in a GeoDataFrame and returns
a new GeoDataFrame with replaced geometry.
This can be used as a trick to repair some types of broken geometries in
shapefiles.
"""
return gdf.set_geometry(
gdf.geometry.buffer(distance))
[docs]def read_shape(spec: str) -> Polygon:
"""
Parse shape from WKT, or bounds, or .wkt or .wkb file.
:param spec: a string specifying either 'minx,miny,maxx,maxy', or a WKT
text, or a filename pointing to a .wkt or .wkb file.
"""
if os.path.exists(spec):
ext = os.path.splitext(spec)[1].lower()
if ext == '.wkt':
return read_wkt(spec)
elif ext == '.wkb':
return read_wkb(spec)
try:
return read_wkt(spec)
except shapely.errors.ReadingError:
pass
try:
return read_wkb(spec)
except shapely.errors.ReadingError:
pass
raise ValueError("Unknown shape file format: {!r}".format(spec))
if isinstance(spec, str):
try:
minx, miny, maxx, maxy = map(float, spec.split(','))
return shapely.geometry.box(minx, miny, maxx, maxy)
except ValueError:
pass
try:
return shapely.wkt.loads(spec)
except shapely.errors.ReadingError:
pass
raise ValueError("Unknown shape definition: {!r}".format(spec))
[docs]def read_wkt(filename: str) -> Polygon:
"""Read .wkt file."""
with open(filename, 'rt') as f:
return shapely.wkt.load(f)
[docs]def read_wkb(filename: str) -> Polygon:
"""Read .wkb file."""
with open(filename, 'rb') as f:
return shapely.wkb.load(f)
[docs]def write_wkt(filename: str, shape: Polygon):
"""Read .wkt file."""
with open(filename, 'wt') as f:
shapely.wkt.dump(shape, f)
[docs]def write_wkb(filename: str, shape: Polygon):
"""Read .wkb file."""
with open(filename, 'wb') as f:
shapely.wkb.dump(shape, f)
[docs]def is_point_in_polygon(polygon: Polygon, points: np.ndarray) -> np.ndarray:
"""
Check which points are contained in a general polygon.
This may be more efficient than
``polygon.intersection(MultiPoint(points))`` if the polygon's Delaunay
triangulation consists of only few partitions.
:param polygon: a triangle
:param points: an ``(N, 2)`` coordinate array.
:return: a boolean ``(N,)`` array
"""
# In general, we could also use any other convex partitioning algorithm
# combined with is_point_in_convex_polygon, but I couldn't find any such
# implementations in shapely.
triangles = shapely.ops.triangulate(polygon)
return np.any([
is_point_in_triangle(triangle, points)
for triangle in triangles
], axis=0)
[docs]def is_point_in_convex_polygon(
polygon: Polygon,
points: np.ndarray
) -> np.ndarray:
"""
Check which points are contained in a convex polygon.
This may be more efficient than
``polygon.intersection(MultiPoint(points))`` if the polygon has a low
number of edges.
:param polygon: a triangle
:param points: an ``(N, 2)`` coordinate array.
:return: a boolean ``(N,)`` array
"""
mask = is_point_in_bounds(polygon.bounds, points)
points = points[mask]
centroid = np.array(polygon.centroid)
xmin, ymin, xmax, ymax = polygon.bounds
scale = np.array([xmax - xmin, ymax - ymin])
points = (np.array(points)[None, :, :] - centroid) / scale
coords = (np.array(polygon.boundary)[:, None, :] - centroid) / scale
cross = np.cross(
coords[1:] - coords[:-1],
points - coords[:-1])
result = (
np.all(cross <= 0, axis=0) |
np.all(cross >= 0, axis=0))
mask[mask] = result
return mask
[docs]def is_point_in_triangle(polygon: Polygon, points: np.ndarray) -> np.ndarray:
"""
Check which points are contained in a triangle.
:param polygon: a triangle
:param points: an ``(N, 2)`` coordinate array.
:return: a boolean ``(N,)`` array
"""
corners = np.array(polygon.boundary)[:-1]
if corners.shape[0] != 3:
raise ValueError(
"Expected triangle, got polygon with {} boundary points"
.format(corners.shape[0]))
A, B, C = corners
AP = np.array(points) - A
AB = B - A
AC = C - A
v_x = np.cross(AP, AC)
v_y = np.cross(AB, AP)
v_z = np.cross(AB, AC)
return (
((v_x >= 0) & (v_y >= 0) & (v_x + v_y <= v_z)) |
((v_x <= 0) & (v_y <= 0) & (v_x + v_y >= v_z))
)
[docs]def is_point_in_bounds(bounds: tuple, points: np.ndarray) -> np.ndarray:
"""
Check which points are contained in the given bounds.
:param bounds: a tuple ``(minx, miny, maxx, maxy)``
:param points: an ``(N, 2)`` coordinate array.
:return: a boolean ``(N,)`` array
"""
return np.all(
(points >= bounds[:2]) &
(points <= bounds[2:]),
axis=-1)
[docs]def find_polygon_at_point(gdf: GeoDataFrame, points: np.ndarray) -> np.ndarray:
"""
For each point in ``points`` look up the index of a containing polygon in
a GeoDataFrame, or ``-1`` if none is found.
:param gdf: a GeoDataFrame whose geometry is queried against
:param points: an ``(N, 2)`` coordinate array.
:return: an integer ``(N,)`` array
"""
# This seems to be slightly slower than using shapely.strtree.STRtree,
# but it is a bit more natural and simpler.
index = gdf.sindex
shapes = gdf.geometry.iloc
return np.array([
next((
i for i in index.query(p)
if shapes[i].intersects(p)), -1)
for p in MultiPoint(points[:, :2])
])