# -*- coding: utf-8 -*-
# _____ _ _ _
# | ____|__| | ___| |_ _____(_)___ ___
# | _| / _` |/ _ \ \ \ /\ / / _ \ / __/ __|
# | |__| (_| | __/ |\ V V / __/ \__ \__ \
# |_____\__,_|\___|_| \_/\_/_\___|_|___/___/
# | \/ | ___ ___| |__ / _|_ __ ___ ___
# | |\/| |/ _ \/ __| '_ \| |_| '__/ _ \/ _ \
# | | | | __/\__ \ | | | _| | | __/ __/
# |_| |_|\___||___/_| |_|_| |_| \___|\___|
#
#
# Unit of Strength of Materials and Structural Analysis
# University of Innsbruck,
#
# Research Group for Computational Mechanics of Materials
# Institute of Structural Engineering, BOKU University, Vienna
#
# 2023 - today
#
# Matthias Neuner | matthias.neuner@boku.ac.at
# Thomas Mader | thomas.mader@bokut.ac.at
#
# This file is part of EdelweissMeshfree.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# The full text of the license can be found in the file LICENSE.md at
# the top level directory of EdelweissMeshfree.
# ---------------------------------------------------------------------
import concurrent.futures
import itertools
from typing import Any, List, Set, Tuple, Union
import numpy as np
from edelweissfe.journal.journal import Journal
from edelweissfe.numerics.parallelizationutilities import (
getNumberOfThreads,
isFreeThreadingSupported,
)
from numpy.typing import NDArray
from edelweissmeshfree.meshfree.particlekerneldomain import ParticleKernelDomain
from edelweissmeshfree.particlemanagers.base.baseparticlemanager import (
BaseParticleManager,
)
[docs]
class _FastKDBinOrganizer:
"""
Optimized Bin Organizer using pure NumPy vectorization and integer indexing.
"""
def __init__(self, kernelFunctions: List[Any], dimension: int) -> None:
self._dimension = dimension
# --- 1. Vectorized Bounding Box Extraction ---
bboxes = [sf.getBoundingBox() for sf in kernelFunctions]
if not bboxes:
self._mins = np.empty((0, dimension))
self._maxs = np.empty((0, dimension))
self._bins = []
self._boundingBoxMin = np.zeros(dimension)
self._boundingBoxMax = np.zeros(dimension)
self._nBins = np.zeros(dimension, dtype=int)
self._binSize = np.ones(dimension)
self._strides = np.ones(3, dtype=int)
return
bboxes_arr = np.array(bboxes)
self._mins = bboxes_arr[:, 0, :]
self._maxs = bboxes_arr[:, 1, :]
# --- 2. Grid Setup ---
self._boundingBoxMin = np.min(self._mins, axis=0) - 1e-12
self._boundingBoxMax = np.max(self._maxs, axis=0) + 1e-12
avg_size = np.mean(self._maxs - self._mins, axis=0)
self._binSize = avg_size / 2.0
self._nBins = np.ceil((self._boundingBoxMax - self._boundingBoxMin) / self._binSize).astype(int)
self._strides = np.ones(3, dtype=int)
if dimension >= 2:
self._strides[1] = self._nBins[0]
if dimension == 3:
self._strides[2] = self._nBins[0] * self._nBins[1]
total_bins = int(np.prod(self._nBins))
self._bins = [[] for _ in range(total_bins)]
# --- 3. Vectorized Bin Index Calculation ---
min_indices = ((self._mins - self._boundingBoxMin) / self._binSize).astype(int)
max_indices = ((self._maxs - self._boundingBoxMin) / self._binSize).astype(int)
# --- 4. Fill Bins ---
_, stride_y, stride_z = self._strides[0], self._strides[1], self._strides[2]
bins = self._bins
for k_idx, (min_idx, max_idx) in enumerate(zip(min_indices, max_indices)):
if dimension == 3:
for z in range(min_idx[2], max_idx[2] + 1):
z_offset = z * stride_z
for y in range(min_idx[1], max_idx[1] + 1):
y_offset = z_offset + y * stride_y
start = y_offset + min_idx[0]
end = y_offset + max_idx[0] + 1
for bin_idx in range(start, end):
bins[bin_idx].append(k_idx)
elif dimension == 2:
for y in range(min_idx[1], max_idx[1] + 1):
y_offset = y * stride_y
start = y_offset + min_idx[0]
end = y_offset + max_idx[0] + 1
for bin_idx in range(start, end):
bins[bin_idx].append(k_idx)
else:
for bin_idx in range(min_idx[0], max_idx[0] + 1):
bins[bin_idx].append(k_idx)
def getCandidateIndices(self, query_min: NDArray[np.float64], query_max: NDArray[np.float64]) -> Set[int]:
if not self._bins:
return set()
idx_lower = ((query_min - self._boundingBoxMin) / self._binSize).astype(int)
idx_upper = ((query_max - self._boundingBoxMin) / self._binSize).astype(int)
np.maximum(idx_lower, 0, out=idx_lower)
np.minimum(idx_upper, self._nBins - 1, out=idx_upper)
bins = self._bins
_, stride_y, stride_z = self._strides[0], self._strides[1], self._strides[2]
lists_to_chain = []
if self._dimension == 3:
for z in range(idx_lower[2], idx_upper[2] + 1):
z_offset = z * stride_z
for y in range(idx_lower[1], idx_upper[1] + 1):
y_offset = z_offset + y * stride_y
start = y_offset + idx_lower[0]
end = y_offset + idx_upper[0] + 1
for bin_idx in range(start, end):
if bins[bin_idx]:
lists_to_chain.append(bins[bin_idx])
elif self._dimension == 2:
for y in range(idx_lower[1], idx_upper[1] + 1):
y_offset = y * stride_y
start = y_offset + idx_lower[0]
end = y_offset + idx_upper[0] + 1
for bin_idx in range(start, end):
if bins[bin_idx]:
lists_to_chain.append(bins[bin_idx])
elif self._dimension == 1:
for bin_idx in range(idx_lower[0], idx_upper[0] + 1):
if bins[bin_idx]:
lists_to_chain.append(bins[bin_idx])
if not lists_to_chain:
return set()
return set(itertools.chain.from_iterable(lists_to_chain))
[docs]
class KDBinOrganizedParticleManager(BaseParticleManager):
def __init__(
self,
particleKernelDomain: ParticleKernelDomain,
dimension: int,
journal: Journal,
bondParticlesToKernelFunctions: bool = False,
randomlyShiftPartliceShapeFunctions: Union[bool, float] = False,
):
self._meshfreeKernelFunctions = particleKernelDomain.meshfreeKernelFunctions
self._particles = particleKernelDomain.particles
self._dimension = dimension
self._bondParticlesToKernelFunctions = bondParticlesToKernelFunctions
self._journal = journal
if isFreeThreadingSupported():
self._numThreads = getNumberOfThreads()
else:
self._numThreads = 1
# Pre-fetch labels for integer sorting
self._kernelLabels = np.array([k.node.label for k in self._meshfreeKernelFunctions], dtype=int)
if not isinstance(randomlyShiftPartliceShapeFunctions, (bool, float)):
raise ValueError("randomlyShiftPartliceShapeFunctions must be a boolean or a float.")
self._randomlyShiftPartliceShapeFunctions = randomlyShiftPartliceShapeFunctions
if self._bondParticlesToKernelFunctions:
if len(self._particles) != len(self._meshfreeKernelFunctions):
raise ValueError("The number of particles and kernel functions must be equal.")
for particle, kernelFunction in zip(self._particles, self._meshfreeKernelFunctions):
particleCoordinates = particle.getCenterCoordinates()
kernelFunction.moveTo(particleCoordinates)
self.signalizeKernelFunctionUpdate()
[docs]
def signalizeKernelFunctionUpdate(self) -> None:
self._theBins = _FastKDBinOrganizer(list(self._meshfreeKernelFunctions), self._dimension)
[docs]
def updateConnectivity(self) -> bool:
hasChanged = False
if self._bondParticlesToKernelFunctions:
self._journal.message("Updating kernel function positions...", "ParticleManager")
for particle, kernelFunction in zip(self._particles, self._meshfreeKernelFunctions):
particleCoordinates = particle.getCenterCoordinates()
if self._randomlyShiftPartliceShapeFunctions:
if isinstance(self._randomlyShiftPartliceShapeFunctions, float):
particleVol = particle.getVolumeUndeformed()
particleSize = particleVol ** (1.0 / self._dimension)
randdisp = (
(np.random.rand(self._dimension) - 0.5)
* np.sqrt(particle.getVolumeUndeformed())
* self._randomlyShiftPartliceShapeFunctions
* particleSize
)
particleCoordinates += randdisp
kernelFunction.moveTo(particleCoordinates)
self.signalizeKernelFunctionUpdate()
# Capture variables for closure
all_kernels = self._meshfreeKernelFunctions
kernel_mins = self._theBins._mins
kernel_maxs = self._theBins._maxs
bin_organizer = self._theBins
kernel_labels = self._kernelLabels
dim = self._dimension
def processParticleChunk(particleChunk: List[Any]) -> bool:
particlesInChunkHaveChanged = False
for p in particleChunk:
evaluationCoordinates = p.getEvaluationCoordinates()
# Broad Phase Min/Max Calculation
if len(evaluationCoordinates) == 1:
p_min = evaluationCoordinates[0]
p_max = evaluationCoordinates[0]
else:
p_min = np.min(evaluationCoordinates, axis=0)
p_max = np.max(evaluationCoordinates, axis=0)
# 1. Grid Search
candidate_indices = bin_organizer.getCandidateIndices(p_min, p_max)
# 2. Vectorized AABB Filter
cand_idx_arr = np.array(list(candidate_indices), dtype=int)
c_mins = kernel_mins[cand_idx_arr, :dim]
c_maxs = kernel_maxs[cand_idx_arr, :dim]
p_max_s = p_max[:dim]
p_min_s = p_min[:dim]
overlap_mask = np.all((p_max_s >= c_mins) & (p_min_s <= c_maxs), axis=1)
surviving_indices = cand_idx_arr[overlap_mask]
# 3. Precise Check (Geometric)
valid_indices = []
# Ensure coordinates are 2D for the Cython signature
eval_coords_view = evaluationCoordinates
if eval_coords_view.ndim == 1:
# Reshape (dim,) -> (1, dim)
eval_coords_view = eval_coords_view.reshape(1, -1)
for k_idx in surviving_indices:
sf = all_kernels[k_idx]
if sf.isAnyCoordinateInSupport(eval_coords_view):
valid_indices.append(k_idx)
valid_indices.sort(key=lambda idx: kernel_labels[idx])
validKernels = [all_kernels[i] for i in valid_indices]
if not validKernels:
raise ValueError(
f"Particle at {p.getCenterCoordinates()} has no associated kernel functions after connectivity update."
)
if validKernels != p.kernelFunctions:
particlesInChunkHaveChanged = True
p.assignKernelFunctions(
validKernels
) # assign the kernel functions. This happens even if they are the same, because the overlap with the particle usually changes due to movement.
return particlesInChunkHaveChanged
if self._numThreads <= 1:
if processParticleChunk(self._particles):
hasChanged = True
else:
chunkSize = len(self._particles) // self._numThreads + 1
chunks = [self._particles[i : i + chunkSize] for i in range(0, len(self._particles), chunkSize)]
with concurrent.futures.ThreadPoolExecutor(max_workers=self._numThreads) as executor:
results = executor.map(processParticleChunk, chunks)
if any(results):
hasChanged = True
return hasChanged
def getCoveredDomain(self) -> Tuple[NDArray[np.float64], NDArray[np.float64]]:
return self._theBins._boundingBoxMin, self._theBins._boundingBoxMax
def __str__(self) -> str:
return (
f"KDBinOrganizedParticleManager with {len(self._particles)} particles "
f"and {len(self._meshfreeKernelFunctions)} shape functions "
f"in {self._dimension} dimensions. Covered domain: {self.getCoveredDomain()}."
)
def visualize(self) -> None:
if self._dimension != 2:
raise ValueError("Visualization only supported for 2D.")
import matplotlib.pyplot as plt
nBins = self._theBins._nBins
nKernelFunctions = np.zeros(nBins)
for i in range(nBins[0]):
for j in range(nBins[1]):
flat_idx = j * self._theBins._strides[1] + i
nKernelFunctions[i, j] = len(self._theBins._bins[flat_idx])
plt.figure()
plt.imshow(nKernelFunctions.T, origin="lower")
plt.title("Number of kernel functions in the bins")
for i in range(nBins[0] + 1):
plt.plot([i - 0.5, i - 0.5], [0 - 0.5, nBins[1] - 0.5], "k")
for j in range(nBins[1] + 1):
plt.plot([0 - 0.5, nBins[0] - 0.5], [j - 0.5, j - 0.5], "k")
plt.colorbar()
plt.show()