Clustering
C++20 header-only: DBSCAN, HDBSCAN, k-means.
Loading...
Searching...
No Matches
prim_mst_backend.h
Go to the documentation of this file.
1#pragma once
2
3#include <algorithm>
4#include <array>
5#include <atomic>
6#include <cstddef>
7#include <cstdint>
8#include <future>
9#include <limits>
10#include <thread>
11#include <type_traits>
12#include <utility>
13#include <vector>
14
18#include "clustering/math/detail/avx2_helpers.h"
19#include "clustering/math/detail/sq_distances_block.h"
21#include "clustering/ndarray.h"
22
23namespace clustering::hdbscan {
24
34inline constexpr std::size_t kPrimMaxN = std::size_t{16384};
35
39inline constexpr std::size_t kPrimMrdMatrixByteBudget = kPrimMaxN * kPrimMaxN * sizeof(float);
40
51inline constexpr std::size_t kPrimDenseCoreMinN = 1024;
52inline constexpr std::size_t kPrimDenseCoreMinD = 17;
53inline constexpr std::size_t kPrimDenseCoreMaxMinSamples = 64;
54inline constexpr std::size_t kPrimPersistentRelaxMinWorkers = 8;
55inline constexpr std::size_t kPrimPersistentRelaxMinOpsPerWorker = std::size_t{1} << 14;
56
84template <class T> class PrimMstBackend {
85 static_assert(std::is_same_v<T, float>,
86 "PrimMstBackend<T> supports only float; a double specialization is out of scope.");
87
88public:
89 PrimMstBackend() = default;
90
104 void run(const NDArray<T, 2> &X, std::size_t minSamples, math::Pool pool, MstOutput<T> &out) {
105 const std::size_t n = X.dim(0);
106 const std::size_t d = X.dim(1);
107 CLUSTERING_ALWAYS_ASSERT(minSamples >= 1);
108 CLUSTERING_ALWAYS_ASSERT(minSamples < n);
109
110 // Refuse @c n that would push the `O(n^2 * d)` inner work past the dispatcher's intended
111 // Prim window. Phrased as `n <= kNsqBudget` / n rather than @c n*n <= kNsqBudget to avoid
112 // the intermediate overflowing @c std::size_t at large @c n. Fires before any allocation so
113 // out-of-budget callers surface deterministically.
114 constexpr std::size_t kNsqBudget = kPrimMrdMatrixByteBudget / sizeof(T);
115 CLUSTERING_ALWAYS_ASSERT(n <= kNsqBudget / n);
116
117 out.edges.clear();
118 out.edges.reserve(n - 1);
119 out.coreDistances = NDArray<T, 1>(std::array<std::size_t, 1>{n});
120 T *coreDistData = out.coreDistances.data();
121 const T *xData = X.data();
122 const bool useDenseCore = shouldUseDenseCore(n, d, minSamples);
123 // Row @c i starts at @c xData + i*d, so every row is 32-byte aligned iff the base pointer
124 // is aligned and the row stride @c d*sizeof(T) is a multiple of 32. Either condition can
125 // fail independently: NumPy buffers only guarantee element alignment, and @c d is caller-
126 // driven. When both hold we can use the strict-aligned dot kernel; otherwise the generic
127 // kernel's per-operand alignment check is required to stay correct.
128 const bool rowsAligned32 =
129 X.template isAligned<32>() && (d % (std::size_t{32} / sizeof(T)) == 0);
130
131 std::vector<T> rowNorms;
132 if (useDenseCore) {
133 rowNorms.resize(n);
134 for (std::size_t i = 0; i < n; ++i) {
135 const T *row = xData + (i * d);
136 rowNorms[i] = rowsAligned32 ? math::detail::dotRowAligned32Ptr(row, row, d)
137 : math::detail::dotRowPtr(row, row, d);
138 }
139 computeDenseCoreDistances(X, rowNorms, minSamples, rowsAligned32, pool, coreDistData);
140 } else {
141 // Shapes that fail @c shouldUseDenseCore take the KDTree kNN path: the dense symmetric
142 // scan does not amortise at small @c n, low @c d, or large @c minSamples (where the per-
143 // update top-@c k rescan dominates).
144 const KDTree<T> tree(X);
145 const auto kSigned = static_cast<std::int32_t>(minSamples);
146 auto [knnIdx, knnSqDist] = tree.knnQuery(kSigned, pool);
147 (void)knnIdx;
148 for (std::size_t i = 0; i < n; ++i) {
149 coreDistData[i] = knnSqDist(i, minSamples - 1);
150 }
151 }
152
153 // Phase 2: streaming Prim. Maintain `edgeWeight[v]` = best-known incident MRD weight to
154 // the growing tree, `parent[v]` = the in-tree vertex realising that weight, and a visited
155 // bitmap. Each iteration picks the smallest-weight unvisited @c target via a linear scan,
156 // emits the edge `(parent[target], target, edgeWeight[target])`, then relaxes every other
157 // unvisited @c v by recomputing `sqDist(target, v)` and lifting to MRD.
158 std::vector<std::uint8_t> visited(n, std::uint8_t{0});
159 std::vector<std::int32_t> parent(n, std::int32_t{0});
160 std::vector<T> edgeWeight(n, std::numeric_limits<T>::max());
161
162 auto sqDistance = [&](const T *rowT, std::size_t tIdx, std::size_t v) noexcept {
163 if (useDenseCore) {
164 const T *rowV = xData + (v * d);
165 const T dot = rowsAligned32 ? math::detail::dotRowAligned32Ptr(rowT, rowV, d)
166 : math::detail::dotRowPtr(rowT, rowV, d);
167 return math::detail::sqEuclideanFromDot(rowNorms[tIdx], rowNorms[v], dot);
168 }
169 return math::detail::sqEuclideanRowPtr(rowT, xData + (v * d), d);
170 };
171
172 auto relaxRange = [&](std::size_t lo, std::size_t hi, std::int32_t target, std::size_t tIdx,
173 T coreT, const T *rowT) noexcept {
174 for (std::size_t v = lo; v < hi; ++v) {
175 if (visited[v] != 0U) {
176 continue;
177 }
178 const T sq = sqDistance(rowT, tIdx, v);
179 T w = sq;
180 if (coreT > w) {
181 w = coreT;
182 }
183 const T coreV = coreDistData[v];
184 if (coreV > w) {
185 w = coreV;
186 }
187 if (w < edgeWeight[v]) {
188 parent[v] = target;
189 edgeWeight[v] = w;
190 }
191 }
192 };
193
194 auto relaxRangeAndFindNext = [&](std::size_t lo, std::size_t hi, std::int32_t target,
195 std::size_t tIdx, T coreT,
196 const T *rowT) noexcept -> std::pair<std::int32_t, T> {
197 std::int32_t bestV = -1;
198 T bestW = std::numeric_limits<T>::max();
199 for (std::size_t v = lo; v < hi; ++v) {
200 if (visited[v] != 0U) {
201 continue;
202 }
203 const T sq = sqDistance(rowT, tIdx, v);
204 T w = sq;
205 if (coreT > w) {
206 w = coreT;
207 }
208 const T coreV = coreDistData[v];
209 if (coreV > w) {
210 w = coreV;
211 }
212 if (w < edgeWeight[v]) {
213 parent[v] = target;
214 edgeWeight[v] = w;
215 }
216 if (edgeWeight[v] < bestW) {
217 bestW = edgeWeight[v];
218 bestV = static_cast<std::int32_t>(v);
219 }
220 }
221 return {bestV, bestW};
222 };
223
224 auto findNext = [&]() noexcept -> std::pair<std::int32_t, T> {
225 std::int32_t bestV = -1;
226 T bestW = std::numeric_limits<T>::max();
227 for (std::size_t v = 0; v < n; ++v) {
228 if (visited[v] != 0U) {
229 continue;
230 }
231 if (edgeWeight[v] < bestW) {
232 bestW = edgeWeight[v];
233 bestV = static_cast<std::int32_t>(v);
234 }
235 }
236 return {bestV, bestW};
237 };
238
239 auto persistentRelaxFrom = [&]() -> bool {
240 if (!shouldUsePersistentParallelRelax(n, d, useDenseCore, pool)) {
241 return false;
242 }
243
244 // Pack the per-worker barrier flag into the same 64 B cache line as its local-best
245 // reduction slot. Each line is written only by its owning participant (worker or main),
246 // and read by main only when @c done matches the current phase counter, so the release
247 // store on @c done synchronises the non-atomic @c vertex and @c weight writes that
248 // preceded it. Per-line ownership eliminates the cross-core RMW contention a shared
249 // @c completed counter would incur on each phase close.
250 struct alignas(64) LocalBest {
251 std::atomic<std::uint32_t> done{0};
252 std::int32_t vertex = -1;
253 T weight = std::numeric_limits<T>::max();
254 };
255
256 const std::size_t workerTasks = pool.workerCount() - 1;
257 const std::size_t participantCount = workerTasks + 1;
258 std::vector<LocalBest> localBest(participantCount);
259
260 auto blockBegin = [&](std::size_t id) noexcept { return (n * id) / participantCount; };
261 auto blockEnd = [&](std::size_t id) noexcept { return (n * (id + 1)) / participantCount; };
262 auto relaxBlock = [&](std::size_t id,
263 std::int32_t target) noexcept -> std::pair<std::int32_t, T> {
264 const auto tIdx = static_cast<std::size_t>(target);
265 const T coreT = coreDistData[tIdx];
266 const T *const rowT = xData + (tIdx * d);
267 std::int32_t bestV = -1;
268 T bestW = std::numeric_limits<T>::max();
269 for (std::size_t v = blockBegin(id); v < blockEnd(id); ++v) {
270 if (visited[v] != 0U) {
271 continue;
272 }
273 const T sq = sqDistance(rowT, tIdx, v);
274 T w = sq;
275 if (coreT > w) {
276 w = coreT;
277 }
278 const T coreV = coreDistData[v];
279 if (coreV > w) {
280 w = coreV;
281 }
282 if (w < edgeWeight[v]) {
283 parent[v] = target;
284 edgeWeight[v] = w;
285 }
286 if (edgeWeight[v] < bestW) {
287 bestW = edgeWeight[v];
288 bestV = static_cast<std::int32_t>(v);
289 }
290 }
291 return {bestV, bestW};
292 };
293
294 std::atomic<std::uint32_t> phase{0};
295 std::atomic<std::uint32_t> ready{0};
296 std::atomic<bool> stop{false};
297 std::int32_t currentTarget = 0;
298
299 auto workerLoop = [&](std::size_t id) {
300 std::uint32_t seen = phase.load(std::memory_order_acquire);
301 ready.fetch_add(1, std::memory_order_release);
302 for (;;) {
303 std::uint32_t next = phase.load(std::memory_order_acquire);
304 while (next == seen) {
305 spinPause();
306 next = phase.load(std::memory_order_acquire);
307 }
308 seen = next;
309 if (stop.load(std::memory_order_acquire)) {
310 return;
311 }
312 auto [bv, bw] = relaxBlock(id, currentTarget);
313 localBest[id].vertex = bv;
314 localBest[id].weight = bw;
315 localBest[id].done.store(seen, std::memory_order_release);
316 }
317 };
318
319 std::vector<std::future<void>> futures;
320 futures.reserve(workerTasks);
321 for (std::size_t id = 1; id < participantCount; ++id) {
322 futures.emplace_back(pool.pool->submit_task([&, id] { workerLoop(id); }));
323 }
324 while (ready.load(std::memory_order_acquire) != workerTasks) {
325 spinPause();
326 }
327
328 auto reduceBest = [&]() noexcept -> std::pair<std::int32_t, T> {
329 std::int32_t bestV = -1;
330 T bestW = std::numeric_limits<T>::max();
331 for (const LocalBest &candidate : localBest) {
332 if (candidate.vertex >= 0 && candidate.weight < bestW) {
333 bestW = candidate.weight;
334 bestV = candidate.vertex;
335 }
336 }
337 return {bestV, bestW};
338 };
339
340 auto relaxRound = [&](std::int32_t target) noexcept -> std::pair<std::int32_t, T> {
341 currentTarget = target;
342 const std::uint32_t newPhase =
343 phase.fetch_add(1, std::memory_order_acq_rel) + std::uint32_t{1};
344 auto [bv, bw] = relaxBlock(0, target);
345 localBest[0].vertex = bv;
346 localBest[0].weight = bw;
347 // Per-worker flag wait: one cache line per worker, written exactly once per phase by
348 // its owner and read exactly once per phase by main. Replaces a shared atomic counter
349 // whose cross-core RMW serialised the barrier close.
350 for (std::size_t id = 1; id < participantCount; ++id) {
351 while (localBest[id].done.load(std::memory_order_acquire) != newPhase) {
352 spinPause();
353 }
354 }
355 return reduceBest();
356 };
357
358 visited[0] = 1U;
359 edgeWeight[0] = T{0};
360 auto [nextV, nextW] = relaxRound(static_cast<std::int32_t>(0));
361
362 while (out.edges.size() + 1 < n) {
363 CLUSTERING_ALWAYS_ASSERT(nextV >= 0);
364
365 const auto bIdx = static_cast<std::size_t>(nextV);
366 visited[bIdx] = 1U;
367 out.edges.push_back(MstEdge<T>{parent[bIdx], nextV, nextW});
368
369 if (out.edges.size() + 1 == n) {
370 break;
371 }
372 auto next = relaxRound(nextV);
373 nextV = next.first;
374 nextW = next.second;
375 }
376
377 stop.store(true, std::memory_order_release);
378 phase.fetch_add(1, std::memory_order_release);
379 for (auto &future : futures) {
380 future.get();
381 }
382 return true;
383 };
384
385 if (persistentRelaxFrom()) {
386 return;
387 }
388
389 auto relaxFrom = [&](std::int32_t target) noexcept -> std::pair<std::int32_t, T> {
390 const auto tIdx = static_cast<std::size_t>(target);
391 const T coreT = coreDistData[tIdx];
392 const T *rowT = xData + (tIdx * d);
393 // Per-iter parallel dispatch: the gate uses the per-worker op budget `(n*d / nWorkers)`
394 // so very small @c n stays serial and avoids submit_blocks overhead.
395 if (pool.pool != nullptr && pool.shouldParallelizeWork(n * d)) {
396 pool.pool
397 ->submit_blocks(std::size_t{0}, n,
398 [&](std::size_t lo, std::size_t hi) {
399 relaxRange(lo, hi, target, tIdx, coreT, rowT);
400 })
401 .wait();
402 return findNext();
403 }
404 return relaxRangeAndFindNext(0, n, target, tIdx, coreT, rowT);
405 };
406
407 // Seed: vertex 0 is in the tree with weight 0. The first relax populates @c edgeWeight for
408 // every other vertex so the first argmin scan has finite values.
409 visited[0] = 1U;
410 edgeWeight[0] = T{0};
411 auto [nextV, nextW] = relaxFrom(static_cast<std::int32_t>(0));
412
413 while (out.edges.size() + 1 < n) {
414 // The graph is complete (every pair has a finite MRD), so on a connected workload the
415 // argmin always finds a finite entry. Asserting here flags any contract violation that
416 // would otherwise leave the spanning tree short of @c n - 1 edges.
417 CLUSTERING_ALWAYS_ASSERT(nextV >= 0);
418
419 const auto bIdx = static_cast<std::size_t>(nextV);
420 visited[bIdx] = 1U;
421 out.edges.push_back(MstEdge<T>{parent[bIdx], nextV, nextW});
422
423 if (out.edges.size() + 1 == n) {
424 break;
425 }
426 auto next = relaxFrom(nextV);
427 nextV = next.first;
428 nextW = next.second;
429 }
430 }
431
432private:
433 [[nodiscard]] static constexpr bool shouldUseDenseCore(std::size_t n, std::size_t d,
434 std::size_t minSamples) noexcept {
435 return n >= kPrimDenseCoreMinN && d >= kPrimDenseCoreMinD &&
436 minSamples <= kPrimDenseCoreMaxMinSamples;
437 }
438
439 [[nodiscard]] static bool shouldUsePersistentParallelRelax(std::size_t n, std::size_t d,
440 bool useDenseCore,
441 math::Pool pool) noexcept {
442 return useDenseCore && pool.pool != nullptr &&
443 pool.workerCount() >= kPrimPersistentRelaxMinWorkers &&
444 pool.shouldParallelizeWork(n * d, kPrimPersistentRelaxMinOpsPerWorker);
445 }
446
447 static void spinPause() noexcept {
448#ifdef CLUSTERING_USE_AVX2
449 _mm_pause();
450#else
451 std::this_thread::yield();
452#endif
453 }
454
461 static void updateTopK(T *topK, std::vector<std::size_t> &worstSlot, std::size_t minSamples,
462 std::size_t row, T sq) noexcept {
463 T *const rowTopK = topK + (row * minSamples);
464 std::size_t worst = worstSlot[row];
465 if (!(sq < rowTopK[worst])) {
466 return;
467 }
468 rowTopK[worst] = sq;
469 worst = 0;
470 T worstValue = rowTopK[0];
471 for (std::size_t s = 1; s < minSamples; ++s) {
472 if (rowTopK[s] > worstValue) {
473 worstValue = rowTopK[s];
474 worst = s;
475 }
476 }
477 worstSlot[row] = worst;
478 }
479
486 static void computeDenseCoreDistances(const NDArray<T, 2> &X, const std::vector<T> &rowNorms,
487 std::size_t minSamples, bool rowsAligned32, math::Pool pool,
488 T *coreDistData) {
489 const std::size_t n = X.dim(0);
490 const std::size_t d = X.dim(1);
491 const T *const xData = X.data();
492 std::vector<T> topK(n * minSamples, std::numeric_limits<T>::max());
493 std::vector<std::size_t> worstSlot(n, 0);
494
495 // With enough workers, row-independent scans win despite computing each pair twice: every row
496 // owns its top-k state, so the pool path has no cross-row writes and can reuse the batched
497 // four-neighbour distance kernel that amortises AVX2 horizontal sums.
498 if (pool.pool != nullptr && pool.workerCount() >= 4 && pool.shouldParallelizeWork(n * n * d)) {
499 pool.pool
500 ->submit_blocks(std::size_t{0}, n,
501 [&](std::size_t lo, std::size_t hi) {
502 computeDenseCoreDistancesRows(X, minSamples, lo, hi, topK.data(),
503 worstSlot);
504 })
505 .wait();
506 for (std::size_t i = 0; i < n; ++i) {
507 coreDistData[i] = topK[(i * minSamples) + worstSlot[i]];
508 }
509 return;
510 }
511
512 for (std::size_t i = 0; i < n; ++i) {
513 const T *const rowI = xData + (i * d);
514 const T normI = rowNorms[i];
515 for (std::size_t j = i + 1; j < n; ++j) {
516 const T *const rowJ = xData + (j * d);
517 const T dot = rowsAligned32 ? math::detail::dotRowAligned32Ptr(rowI, rowJ, d)
518 : math::detail::dotRowPtr(rowI, rowJ, d);
519 const T sq = math::detail::sqEuclideanFromDot(normI, rowNorms[j], dot);
520 updateTopK(topK.data(), worstSlot, minSamples, i, sq);
521 updateTopK(topK.data(), worstSlot, minSamples, j, sq);
522 }
523 }
524
525 for (std::size_t i = 0; i < n; ++i) {
526 coreDistData[i] = topK[(i * minSamples) + worstSlot[i]];
527 }
528 }
529
530 static void computeDenseCoreDistancesRows(const NDArray<T, 2> &X, std::size_t minSamples,
531 std::size_t lo, std::size_t hi, T *topK,
532 std::vector<std::size_t> &worstSlot) noexcept {
533 constexpr std::size_t kBlockRows = 64;
534 const std::size_t n = X.dim(0);
535 const std::size_t d = X.dim(1);
536 const T *const xData = X.data();
537 std::array<T, kBlockRows> distances{};
538
539 for (std::size_t i = lo; i < hi; ++i) {
540 const T *const rowI = xData + (i * d);
541 for (std::size_t base = 0; base < n; base += kBlockRows) {
542 const std::size_t count = std::min(kBlockRows, n - base);
543 math::detail::sqDistancesAosBlock(rowI, xData + (base * d), count, d, distances.data());
544 for (std::size_t offset = 0; offset < count; ++offset) {
545 const std::size_t j = base + offset;
546 if (j != i) {
547 updateTopK(topK, worstSlot, minSamples, i, distances[offset]);
548 }
549 }
550 }
551 }
552 }
553};
554
555} // namespace clustering::hdbscan
#define CLUSTERING_ALWAYS_ASSERT(cond)
Release-active assertion: evaluates cond in every build configuration.
Implements a KDTree data structure.
Definition kdtree.h:92
std::pair< NDArray< std::int32_t, 2 >, NDArray< T, 2 > > knnQuery(std::int32_t k, math::Pool pool) const
Returns the k nearest neighbours of every indexed point, self-excluded.
Definition kdtree.h:241
Represents a multidimensional array (NDArray) of a fixed number of dimensions N and element type T.
Definition ndarray.h:136
size_t dim(std::size_t index) const noexcept
Returns the size of a specific dimension of the NDArray.
Definition ndarray.h:461
const T * data() const noexcept
Provides read-only access to the internal data array.
Definition ndarray.h:503
void run(const NDArray< T, 2 > &X, std::size_t minSamples, math::Pool pool, MstOutput< T > &out)
Build the MRD-weighted minimum spanning tree of X.
constexpr std::size_t kPrimDenseCoreMinD
constexpr std::size_t kPrimMaxN
Compute budget that gates the streaming Prim backend, expressed as the maximum point count it will ac...
constexpr std::size_t kPrimDenseCoreMinN
Thresholds gating the dense symmetric core-distance pass.
constexpr std::size_t kPrimPersistentRelaxMinWorkers
constexpr std::size_t kPrimMrdMatrixByteBudget
Equivalent byte-budget phrasing of kPrimMaxN, kept so callers that gate on n*n*sizeof(T) <= kPrimMrdM...
constexpr std::size_t kPrimPersistentRelaxMinOpsPerWorker
constexpr std::size_t kPrimDenseCoreMaxMinSamples
One edge of the minimum spanning tree of mutual-reachability distances.
Definition mst_output.h:22
Frozen output contract of every MST backend.
Definition mst_output.h:41
NDArray< T, 1 > coreDistances
Per-point core distance (length N; self-excluded kNN distance at minSamples).
Definition mst_output.h:45
std::vector< MstEdge< T > > edges
The N - 1 MST edges, in insertion order.
Definition mst_output.h:43
Thin injection wrapper around a BS::light_thread_pool.
Definition thread.h:63
std::size_t workerCount() const noexcept
Number of worker threads available, or 1 in serial mode.
Definition thread.h:72
BS::light_thread_pool * pool
Underlying pool, or nullptr to force serial execution.
Definition thread.h:65
bool shouldParallelizeWork(std::size_t totalOps, std::size_t minOpsPerWorker=std::size_t{1}<< 15) const noexcept
Decide whether totalOps warrants parallel dispatch, based on work volume.
Definition thread.h:118