Skip to content

vocabulary

Vocabulary normalisation pass for riverbank (v0.15.3 + v0.15.5).

Post-extraction pass that converts the ad-hoc predicate/object vocabulary produced by open-vocabulary extraction into a tighter, semantically consistent schema. The pass is fully domain-agnostic — it operates on the triple buffer and has no knowledge of the subject domain.

Four normalisations are applied in order:

  1. Categorical literal promotion — repeated string-valued objects that represent a bounded category ("Director", "Approved", "Mammal") are promoted to vocab:* IRI resources.

  2. Predicate vocabulary collapse — clusters of predicates with a shared semantic root (ex:is_director / ex:is_ceo / ex:is_chair) are collapsed to a single canonical predicate using either an edit-distance (deterministic) or LLM-guided backend.

  3. Fact-stuffed predicate decomposition — predicates whose local name embeds a qualifier (year, date, ordinal) are decomposed into a base predicate triple plus a separate qualifier triple.

  4. Entity URI canonicalisation — after entity resolution writes owl:sameAs links, non-canonical subject URIs are rewritten to the single canonical URI chosen by the resolution pass.

Pipeline position::

extract → entity_resolution → [vocabulary_normalisation] → write

The pass reads from the in-memory triple buffer — no database round-trip is needed.

Profile YAML::

vocabulary_normalisation:
  enabled: true
  categorical_threshold: 2
  collapse_predicates: true
  predicate_collapse_backend: "deterministic"   # deterministic | llm
  decompose_stuffed_predicates: true
  rewrite_canonical_uris: false
  vocabulary_namespace: "http://riverbank.example/vocab/"

Stats emitted::

vocab_literals_promoted      int  Literals replaced by vocab:* IRIs
vocab_predicates_collapsed   int  Predicate rewrites from cluster collapse
vocab_facts_decomposed       int  Predicates whose qualifiers were stripped
vocab_uris_rewritten         int  Subject/object URI rewrites

CategoricalDetector

Detect string literals that represent a bounded category.

A literal is categorical when the same (predicate, object_value) pair appears in ≥ threshold triples. All such literals are promoted to IRI resources in the vocabulary namespace.

Example::

ex:Alice  ex:is  "Director"   ┐
ex:Bob    ex:is  "Director"   ┘ threshold=2 → vocab:Director
Source code in src/riverbank/vocabulary/__init__.py
class CategoricalDetector:
    """Detect string literals that represent a bounded category.

    A literal is **categorical** when the same ``(predicate, object_value)``
    pair appears in ≥ *threshold* triples.  All such literals are promoted to
    IRI resources in the vocabulary namespace.

    Example::

        ex:Alice  ex:is  "Director"   ┐
        ex:Bob    ex:is  "Director"   ┘ threshold=2 → vocab:Director
    """

    def __init__(
        self,
        threshold: int = 2,
        vocab_namespace: str = "http://riverbank.example/vocab/",
    ) -> None:
        self.threshold = threshold
        self.vocab_namespace = vocab_namespace

    def detect(self, triples: list) -> dict[tuple[str, str], str]:
        """Identify categorical literals.

        :returns: Mapping ``{(predicate, literal_value): new_iri}``.
        """
        counts: Counter = Counter()
        for t in triples:
            if not _is_iri(t.object_value):
                counts[(t.predicate, t.object_value)] += 1
        return {
            (pred, val): self.vocab_namespace + _to_camel_case(val)
            for (pred, val), cnt in counts.items()
            if cnt >= self.threshold
        }

    def promote(
        self, triples: list, categorical_map: dict[tuple[str, str], str]
    ) -> tuple[list, int]:
        """Rewrite object literals using *categorical_map*.

        :returns: ``(new_triples, n_promoted)``
        """
        result = []
        n = 0
        for t in triples:
            key = (t.predicate, t.object_value)
            if key in categorical_map:
                result.append(t.model_copy(update={"object_value": categorical_map[key]}))
                n += 1
            else:
                result.append(t)
        return result, n

detect(triples)

Identify categorical literals.

:returns: Mapping {(predicate, literal_value): new_iri}.

Source code in src/riverbank/vocabulary/__init__.py
def detect(self, triples: list) -> dict[tuple[str, str], str]:
    """Identify categorical literals.

    :returns: Mapping ``{(predicate, literal_value): new_iri}``.
    """
    counts: Counter = Counter()
    for t in triples:
        if not _is_iri(t.object_value):
            counts[(t.predicate, t.object_value)] += 1
    return {
        (pred, val): self.vocab_namespace + _to_camel_case(val)
        for (pred, val), cnt in counts.items()
        if cnt >= self.threshold
    }

promote(triples, categorical_map)

Rewrite object literals using categorical_map.

:returns: (new_triples, n_promoted)

Source code in src/riverbank/vocabulary/__init__.py
def promote(
    self, triples: list, categorical_map: dict[tuple[str, str], str]
) -> tuple[list, int]:
    """Rewrite object literals using *categorical_map*.

    :returns: ``(new_triples, n_promoted)``
    """
    result = []
    n = 0
    for t in triples:
        key = (t.predicate, t.object_value)
        if key in categorical_map:
            result.append(t.model_copy(update={"object_value": categorical_map[key]}))
            n += 1
        else:
            result.append(t)
    return result, n

EmbeddingPredicateCanonicali

Post-extraction pass: embed predicate labels, DBSCAN-cluster, rewrite to canonical.

Uses nomic-embed-text (via Ollama) or falls back to all-MiniLM-L6-v2 (sentence-transformers). Groups predicates by semantic similarity using DBSCAN, then optionally asks the LLM to select the best canonical name for each cluster.

Example::

was_born_in (0.97 similarity) → has_birth_place
born_in     (0.95)            → has_birth_place
birthplace  (0.91)            → has_birth_place

New stats: predicates_canonicalized, predicate_clusters_merged.

Profile YAML::

vocabulary_normalisation:
  enabled: true
  embedding_canonicalization: true
  embedding_canonicalization_threshold: 0.88
  embedding_canonicalization_model: "nomic-embed-text"
  embedding_canonicalization_llm_rename: true

Falls back gracefully when embedding libraries are unavailable or the LLM call fails — returns the original triples unchanged.

Source code in src/riverbank/vocabulary/__init__.py
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
class EmbeddingPredicateCanonicali:
    """Post-extraction pass: embed predicate labels, DBSCAN-cluster, rewrite to canonical.

    Uses ``nomic-embed-text`` (via Ollama) or falls back to ``all-MiniLM-L6-v2``
    (sentence-transformers).  Groups predicates by semantic similarity using
    DBSCAN, then optionally asks the LLM to select the best canonical name for
    each cluster.

    Example::

        was_born_in (0.97 similarity) → has_birth_place
        born_in     (0.95)            → has_birth_place
        birthplace  (0.91)            → has_birth_place

    New stats: ``predicates_canonicalized``, ``predicate_clusters_merged``.

    Profile YAML::

        vocabulary_normalisation:
          enabled: true
          embedding_canonicalization: true
          embedding_canonicalization_threshold: 0.88
          embedding_canonicalization_model: "nomic-embed-text"
          embedding_canonicalization_llm_rename: true

    Falls back gracefully when embedding libraries are unavailable or the LLM
    call fails — returns the original triples unchanged.
    """

    # Well-known ontology namespace prefixes that must never be remapped
    _PROTECTED_NAMESPACES = frozenset(
        {
            "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
            "http://www.w3.org/2000/01/rdf-schema#",
            "http://www.w3.org/2002/07/owl#",
            "http://www.w3.org/2004/02/skos/core#",
            "http://www.w3.org/ns/shacl#",
        }
    )

    def __init__(
        self,
        threshold: float = 0.88,
        model_name: str = "nomic-embed-text",
        llm_rename: bool = True,
        settings: Any = None,
    ) -> None:
        self._threshold = threshold
        self._model_name = model_name
        self._llm_rename = llm_rename
        self._settings = settings
        self._embedder: Any = None  # lazy-loaded

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def canonicalize(
        self,
        triples: list,
        llm_client: Any = None,
    ) -> tuple[list, int, int]:
        """Cluster predicates by embedding similarity and rewrite to canonical forms.

        Args:
            triples: List of :class:`~riverbank.prov.ExtractedTriple` objects.
            llm_client: Optional LLM client (openai.OpenAI compatible) for
                canonical name selection.  When ``None`` and
                ``llm_rename=True``, the pass will attempt to build one from
                ``self._settings``.

        Returns:
            ``(new_triples, predicates_canonicalized, predicate_clusters_merged)``
            where *predicates_canonicalized* is the total number of predicate
            rewrites and *predicate_clusters_merged* is the number of synonym
            clusters that were merged.
        """
        import logging as _log  # noqa: PLC0415
        _logger = _log.getLogger(__name__)

        if not triples:
            return triples, 0, 0

        # Collect predicate IRIs and their frequencies
        from collections import Counter as _Counter  # noqa: PLC0415
        freq: _Counter = _Counter(t.predicate for t in triples)
        predicates = [
            p for p in freq
            if not any(p.startswith(ns) for ns in self._PROTECTED_NAMESPACES)
        ]

        if len(predicates) < 2:
            return triples, 0, 0

        # Embed predicate labels
        embedder = self._get_embedder()
        if embedder is None:
            _logger.debug(
                "EmbeddingPredicateCanonicali: embedding unavailable — skipping"
            )
            return triples, 0, 0

        labels = [_label_from_iri(p) for p in predicates]
        try:
            raw_embeddings = embedder(labels)
        except Exception as exc:  # noqa: BLE001
            _logger.warning(
                "EmbeddingPredicateCanonicali: embedding failed — %s", exc
            )
            return triples, 0, 0

        # DBSCAN clustering
        clusters = self._dbscan_cluster(predicates, raw_embeddings, self._threshold)
        multi_clusters = [c for c in clusters if len(c) >= 2]
        if not multi_clusters:
            return triples, 0, 0

        # Build canonicalization map {non_canonical → canonical}
        canon_map: dict[str, str] = {}
        for cluster in multi_clusters:
            canonical = self._pick_canonical(cluster, freq, llm_client)
            # Preserve namespace of the most-frequent predicate in the cluster
            canon_ns = _predicate_namespace(canonical)
            for pred in cluster:
                if pred != canonical:
                    # If canonical has no namespace but member does, keep member's ns
                    if not canon_ns:
                        member_ns = _predicate_namespace(pred)
                        member_local = _local_name(canonical)
                        canon_iri = member_ns + member_local if member_ns else canonical
                    else:
                        canon_iri = canonical
                    canon_map[pred] = canon_iri

        if not canon_map:
            return triples, 0, 0

        # Rewrite triples
        result = []
        n_rewritten = 0
        for t in triples:
            if t.predicate in canon_map:
                result.append(t.model_copy(update={"predicate": canon_map[t.predicate]}))
                n_rewritten += 1
            else:
                result.append(t)

        n_clusters_merged = len(multi_clusters)
        _logger.info(
            "EmbeddingPredicateCanonicali: %d predicates rewritten across "
            "%d cluster(s) (threshold=%.2f)",
            n_rewritten,
            n_clusters_merged,
            self._threshold,
        )
        return result, n_rewritten, n_clusters_merged

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    def _get_embedder(self) -> "Any | None":
        """Return a callable ``(labels: list[str]) → list[list[float]]``.

        Tries Ollama first (nomic-embed-text), then sentence-transformers.
        Returns ``None`` when neither is available.
        """
        if self._embedder is not None:
            return self._embedder

        # Try Ollama embed API first (nomic-embed-text is Ollama-specific)
        if self._model_name == "nomic-embed-text" or ":" in self._model_name:
            ollama_fn = self._make_ollama_embedder()
            if ollama_fn is not None:
                self._embedder = ollama_fn
                return self._embedder

        # Fall back to sentence-transformers
        st_fn = self._make_st_embedder()
        if st_fn is not None:
            self._embedder = st_fn
            return self._embedder

        return None

    def _make_ollama_embedder(self) -> "Any | None":
        """Return an Ollama embeddings callable or None on failure."""
        try:
            from openai import OpenAI  # noqa: PLC0415

            settings = self._settings
            if settings is None:
                try:
                    from riverbank.config import get_settings as _gs  # noqa: PLC0415
                    settings = _gs()
                except Exception:  # noqa: BLE001
                    return None

            llm = getattr(settings, "llm", None)
            api_base: str = getattr(llm, "api_base", "http://localhost:11434/v1")
            # Normalize to embeddings endpoint — Ollama uses /api/embed but
            # the OpenAI-compat client uses /v1/embeddings via the base URL.
            client = OpenAI(api_key="ollama", base_url=api_base)
            model_name = self._model_name

            def _embed(labels: list[str]) -> list[list[float]]:
                resp = client.embeddings.create(input=labels, model=model_name)
                return [item.embedding for item in resp.data]

            # Quick connectivity test
            _embed(["test"])

            import logging as _log  # noqa: PLC0415
            _log.getLogger(__name__).debug(
                "EmbeddingPredicateCanonicali: using Ollama %s", model_name
            )
            return _embed
        except Exception:  # noqa: BLE001
            return None

    def _make_st_embedder(self) -> "Any | None":
        """Return a sentence-transformers embeddings callable or None."""
        try:
            from sentence_transformers import SentenceTransformer  # noqa: PLC0415

            # nomic-embed-text is not a ST model; map to the default ST model
            st_model = (
                self._model_name
                if "/" in self._model_name or self._model_name.startswith("all-")
                else "all-MiniLM-L6-v2"
            )
            model = SentenceTransformer(st_model)

            def _embed(labels: list[str]) -> list[list[float]]:
                vecs = model.encode(labels, show_progress_bar=False)
                return [list(v) for v in vecs]

            import logging as _log  # noqa: PLC0415
            _log.getLogger(__name__).debug(
                "EmbeddingPredicateCanonicali: using sentence-transformers %s", st_model
            )
            return _embed
        except Exception:  # noqa: BLE001
            return None

    def _dbscan_cluster(
        self,
        predicates: list[str],
        embeddings: list[list[float]],
        eps_similarity: float,
    ) -> list[list[str]]:
        """DBSCAN-style clustering using cosine similarity.

        *eps_similarity* is the minimum similarity (not distance) for two
        points to be considered neighbours.  Converted to ``eps = 1 - eps_similarity``
        in cosine-distance space.

        Returns a list of clusters (each cluster is a non-empty list of
        predicate IRI strings).  Singleton clusters are included so that
        noise points can be tracked.
        """
        n = len(predicates)
        eps = 1.0 - eps_similarity

        # Build cosine-distance matrix
        dist: list[list[float]] = [[0.0] * n for _ in range(n)]
        for i in range(n):
            for j in range(i + 1, n):
                try:
                    sim = _cosine_similarity(embeddings[i], embeddings[j])
                except Exception:  # noqa: BLE001
                    sim = 0.0
                d = max(0.0, 1.0 - sim)
                dist[i][j] = d
                dist[j][i] = d

        # DBSCAN with min_samples=1 so every point belongs to some cluster
        # (min_samples=1 means no noise — every point is a core point or part
        # of a cluster whose core point reached it within eps).
        min_samples = 1
        visited = [False] * n
        cluster_ids = [-1] * n
        cid = 0

        def _neighbours(idx: int) -> list[int]:
            return [j for j in range(n) if j != idx and dist[idx][j] <= eps]

        for i in range(n):
            if visited[i]:
                continue
            visited[i] = True
            nbrs = _neighbours(i)
            if len(nbrs) < min_samples - 1:
                # Singleton (noise in standard DBSCAN; here assigned to own cluster)
                cluster_ids[i] = cid
                cid += 1
                continue
            cluster_ids[i] = cid
            seed = list(nbrs)
            while seed:
                j = seed.pop(0)
                if not visited[j]:
                    visited[j] = True
                    j_nbrs = _neighbours(j)
                    if len(j_nbrs) >= min_samples - 1:
                        seed.extend(j_nbrs)
                if cluster_ids[j] == -1:
                    cluster_ids[j] = cid
            cid += 1

        # Group predicates by cluster id
        groups: dict[int, list[str]] = {}
        for i, p in enumerate(predicates):
            groups.setdefault(cluster_ids[i], []).append(p)

        return list(groups.values())

    def _pick_canonical(
        self,
        cluster: list[str],
        freq: "Counter",
        llm_client: Any = None,
    ) -> str:
        """Select the canonical predicate for a cluster.

        Strategy:
        1. If ``llm_rename=True``, ask the LLM to choose (or propose) the best
           canonical name.  The LLM result is validated against the cluster
           members; if it proposes a new name, it is accepted when it passes
           a basic sanity check.
        2. Fall back to the most-frequent predicate as canonical.
        """
        import logging as _log  # noqa: PLC0415
        _logger = _log.getLogger(__name__)

        # Frequency-based fallback (used when LLM is disabled or fails)
        freq_canonical = max(cluster, key=lambda p: (freq.get(p, 0), -len(p)))

        if not self._llm_rename:
            return freq_canonical

        # Try LLM
        client = llm_client
        if client is None and self._settings is not None:
            try:
                from openai import OpenAI as _OAI  # noqa: PLC0415
                llm = getattr(self._settings, "llm", None)
                api_base: str = getattr(llm, "api_base", "http://localhost:11434/v1")
                api_key: str = getattr(llm, "api_key", "ollama")
                client = _OAI(api_key=api_key, base_url=api_base)
            except Exception:  # noqa: BLE001
                return freq_canonical

        if client is None:
            return freq_canonical

        freq_lines = "\n".join(
            f"  {p}: {freq.get(p, 0)} triple(s)" for p in cluster
        )
        prompt = _CANONICAL_NAME_PROMPT.format(
            predicates="\n".join(f"  {p}" for p in cluster),
            frequencies=freq_lines,
        )

        try:
            llm_settings = getattr(self._settings, "llm", None) if self._settings else None
            model_name: str = getattr(llm_settings, "model", "llama3.2") if llm_settings else "llama3.2"
            resp = client.chat.completions.create(
                model=model_name,
                messages=[{"role": "user", "content": prompt}],
                temperature=0.0,
                max_tokens=128,
            )
            raw = (resp.choices[0].message.content or "").strip()
            # Strip markdown fences
            raw = re.sub(r"^```[a-z]*\n?", "", raw, flags=re.MULTILINE)
            raw = re.sub(r"```$", "", raw.strip())

            import json as _json  # noqa: PLC0415
            data = _json.loads(raw)
            chosen_local = data.get("canonical", "").strip()

            if not chosen_local:
                return freq_canonical

            # Sanitize: only allow safe identifier characters
            if not re.match(r"^[a-z_][a-z0-9_]*$", chosen_local):
                # Accept if it's a valid local name (may have uppercase)
                if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", chosen_local):
                    _logger.debug(
                        "EmbeddingPredicateCanonicali: LLM proposed unsafe name %r — using frequency fallback",
                        chosen_local,
                    )
                    return freq_canonical

            # Map to the namespace of the most-frequent predicate
            ns = _predicate_namespace(freq_canonical)
            return (ns + chosen_local) if ns else chosen_local

        except Exception as exc:  # noqa: BLE001
            _logger.debug(
                "EmbeddingPredicateCanonicali: LLM canonical name selection failed — %s",
                exc,
            )
            return freq_canonical

canonicalize(triples, llm_client=None)

Cluster predicates by embedding similarity and rewrite to canonical forms.

Parameters:

Name Type Description Default
triples list

List of :class:~riverbank.prov.ExtractedTriple objects.

required
llm_client Any

Optional LLM client (openai.OpenAI compatible) for canonical name selection. When None and llm_rename=True, the pass will attempt to build one from self._settings.

None

Returns:

Type Description
list

(new_triples, predicates_canonicalized, predicate_clusters_merged)

int

where predicates_canonicalized is the total number of predicate

int

rewrites and predicate_clusters_merged is the number of synonym

tuple[list, int, int]

clusters that were merged.

Source code in src/riverbank/vocabulary/__init__.py
def canonicalize(
    self,
    triples: list,
    llm_client: Any = None,
) -> tuple[list, int, int]:
    """Cluster predicates by embedding similarity and rewrite to canonical forms.

    Args:
        triples: List of :class:`~riverbank.prov.ExtractedTriple` objects.
        llm_client: Optional LLM client (openai.OpenAI compatible) for
            canonical name selection.  When ``None`` and
            ``llm_rename=True``, the pass will attempt to build one from
            ``self._settings``.

    Returns:
        ``(new_triples, predicates_canonicalized, predicate_clusters_merged)``
        where *predicates_canonicalized* is the total number of predicate
        rewrites and *predicate_clusters_merged* is the number of synonym
        clusters that were merged.
    """
    import logging as _log  # noqa: PLC0415
    _logger = _log.getLogger(__name__)

    if not triples:
        return triples, 0, 0

    # Collect predicate IRIs and their frequencies
    from collections import Counter as _Counter  # noqa: PLC0415
    freq: _Counter = _Counter(t.predicate for t in triples)
    predicates = [
        p for p in freq
        if not any(p.startswith(ns) for ns in self._PROTECTED_NAMESPACES)
    ]

    if len(predicates) < 2:
        return triples, 0, 0

    # Embed predicate labels
    embedder = self._get_embedder()
    if embedder is None:
        _logger.debug(
            "EmbeddingPredicateCanonicali: embedding unavailable — skipping"
        )
        return triples, 0, 0

    labels = [_label_from_iri(p) for p in predicates]
    try:
        raw_embeddings = embedder(labels)
    except Exception as exc:  # noqa: BLE001
        _logger.warning(
            "EmbeddingPredicateCanonicali: embedding failed — %s", exc
        )
        return triples, 0, 0

    # DBSCAN clustering
    clusters = self._dbscan_cluster(predicates, raw_embeddings, self._threshold)
    multi_clusters = [c for c in clusters if len(c) >= 2]
    if not multi_clusters:
        return triples, 0, 0

    # Build canonicalization map {non_canonical → canonical}
    canon_map: dict[str, str] = {}
    for cluster in multi_clusters:
        canonical = self._pick_canonical(cluster, freq, llm_client)
        # Preserve namespace of the most-frequent predicate in the cluster
        canon_ns = _predicate_namespace(canonical)
        for pred in cluster:
            if pred != canonical:
                # If canonical has no namespace but member does, keep member's ns
                if not canon_ns:
                    member_ns = _predicate_namespace(pred)
                    member_local = _local_name(canonical)
                    canon_iri = member_ns + member_local if member_ns else canonical
                else:
                    canon_iri = canonical
                canon_map[pred] = canon_iri

    if not canon_map:
        return triples, 0, 0

    # Rewrite triples
    result = []
    n_rewritten = 0
    for t in triples:
        if t.predicate in canon_map:
            result.append(t.model_copy(update={"predicate": canon_map[t.predicate]}))
            n_rewritten += 1
        else:
            result.append(t)

    n_clusters_merged = len(multi_clusters)
    _logger.info(
        "EmbeddingPredicateCanonicali: %d predicates rewritten across "
        "%d cluster(s) (threshold=%.2f)",
        n_rewritten,
        n_clusters_merged,
        self._threshold,
    )
    return result, n_rewritten, n_clusters_merged

FactDecomposer

Detect predicates that encode a qualifier in their local name and decompose them into a base predicate triple plus a separate qualifier triple on the same subject.

Supported qualifier patterns:

  • Year: _in_YYYY or _YYYYex:year "YYYY"
  • Date: _on_<date>ex:date "<date>"
  • Ordinal: _first, _second, _Nth, _3rdex:ordinal "…"

Example::

ex:subject  ex:acquired_company_in_2022  ex:Acme
→ ex:subject  ex:acquired_company  ex:Acme
→ ex:subject  ex:year              "2022"
Source code in src/riverbank/vocabulary/__init__.py
class FactDecomposer:
    """Detect predicates that encode a qualifier in their local name and
    decompose them into a base predicate triple plus a separate qualifier
    triple on the same subject.

    Supported qualifier patterns:

    * Year: ``_in_YYYY`` or ``_YYYY`` → ``ex:year "YYYY"``
    * Date: ``_on_<date>`` → ``ex:date "<date>"``
    * Ordinal: ``_first``, ``_second``, ``_Nth``, ``_3rd`` → ``ex:ordinal "…"``

    Example::

        ex:subject  ex:acquired_company_in_2022  ex:Acme
        → ex:subject  ex:acquired_company  ex:Acme
        → ex:subject  ex:year              "2022"
    """

    def decompose(self, triples: list) -> tuple[list, int]:
        """Expand each fact-stuffed triple into two triples.

        Handles both end-of-string qualifiers (``founded_in_2019``) and
        mid-word qualifiers (``won_first_FA_Cup`` → ``won_FA_Cup``).

        :returns: ``(expanded_triple_list, n_decomposed)``
        """
        result: list = []
        n = 0
        for t in triples:
            local = _local_name(t.predicate)
            ns = _predicate_namespace(t.predicate)
            decomposed = False
            for pattern, qual_pred in _QUALIFIER_PATTERNS:
                m = pattern.search(local)
                if m:
                    before = local[: m.start()]
                    after = local[m.end() :]
                    # Rejoin the fragments around the qualifier, adding a
                    # connecting underscore only when both halves are non-empty.
                    if before and after:
                        base_local = before + "_" + after
                    elif before:
                        base_local = before
                    elif after:
                        base_local = after.lstrip("_")
                    else:
                        base_local = "predicate"
                    qualifier_value = m.group(1).replace("_", " ")
                    base_predicate = (ns + base_local) if ns else base_local
                    result.append(t.model_copy(update={"predicate": base_predicate}))
                    result.append(
                        t.model_copy(update={"predicate": qual_pred, "object_value": qualifier_value})
                    )
                    n += 1
                    decomposed = True
                    break
            if not decomposed:
                result.append(t)
        return result, n

decompose(triples)

Expand each fact-stuffed triple into two triples.

Handles both end-of-string qualifiers (founded_in_2019) and mid-word qualifiers (won_first_FA_Cupwon_FA_Cup).

:returns: (expanded_triple_list, n_decomposed)

Source code in src/riverbank/vocabulary/__init__.py
def decompose(self, triples: list) -> tuple[list, int]:
    """Expand each fact-stuffed triple into two triples.

    Handles both end-of-string qualifiers (``founded_in_2019``) and
    mid-word qualifiers (``won_first_FA_Cup`` → ``won_FA_Cup``).

    :returns: ``(expanded_triple_list, n_decomposed)``
    """
    result: list = []
    n = 0
    for t in triples:
        local = _local_name(t.predicate)
        ns = _predicate_namespace(t.predicate)
        decomposed = False
        for pattern, qual_pred in _QUALIFIER_PATTERNS:
            m = pattern.search(local)
            if m:
                before = local[: m.start()]
                after = local[m.end() :]
                # Rejoin the fragments around the qualifier, adding a
                # connecting underscore only when both halves are non-empty.
                if before and after:
                    base_local = before + "_" + after
                elif before:
                    base_local = before
                elif after:
                    base_local = after.lstrip("_")
                else:
                    base_local = "predicate"
                qualifier_value = m.group(1).replace("_", " ")
                base_predicate = (ns + base_local) if ns else base_local
                result.append(t.model_copy(update={"predicate": base_predicate}))
                result.append(
                    t.model_copy(update={"predicate": qual_pred, "object_value": qualifier_value})
                )
                n += 1
                decomposed = True
                break
        if not decomposed:
            result.append(t)
    return result, n

NormalisationConfig dataclass

Configuration for the vocabulary normalisation pass.

Source code in src/riverbank/vocabulary/__init__.py
@dataclass
class NormalisationConfig:
    """Configuration for the vocabulary normalisation pass."""

    enabled: bool = True
    categorical_threshold: int = 2
    collapse_predicates: bool = True
    predicate_collapse_backend: str = "deterministic"  # deterministic | llm
    decompose_stuffed_predicates: bool = True
    rewrite_canonical_uris: bool = False
    vocabulary_namespace: str = "http://riverbank.example/vocab/"
    # v0.15.5: embedding-based predicate canonicalization
    embedding_canonicalization: bool = False
    embedding_canonicalization_threshold: float = 0.88
    embedding_canonicalization_model: str = "nomic-embed-text"
    embedding_canonicalization_llm_rename: bool = True

NormalisationResult dataclass

Result returned by :meth:VocabularyNormalisationPass.run.

Source code in src/riverbank/vocabulary/__init__.py
@dataclass
class NormalisationResult:
    """Result returned by :meth:`VocabularyNormalisationPass.run`."""

    triples: list
    vocab_literals_promoted: int = 0
    vocab_predicates_collapsed: int = 0
    vocab_facts_decomposed: int = 0
    vocab_uris_rewritten: int = 0
    # v0.15.5: embedding-based predicate canonicalization
    predicates_canonicalized: int = 0
    predicate_clusters_merged: int = 0

PredicateCollapser

Detect clusters of semantically equivalent predicates and collapse them to a single canonical form.

Two backends are supported:

  • deterministic — edit-distance similarity on local predicate names using :class:difflib.SequenceMatcher.
  • llm — single LLM prompt asking for groupings (callable injected at call time so the class remains pure Python with no LLM dependency).

The canonical predicate within each cluster is the most frequently occurring one in the triple buffer.

Source code in src/riverbank/vocabulary/__init__.py
class PredicateCollapser:
    """Detect clusters of semantically equivalent predicates and collapse them
    to a single canonical form.

    Two backends are supported:

    * **deterministic** — edit-distance similarity on local predicate names
      using :class:`difflib.SequenceMatcher`.
    * **llm** — single LLM prompt asking for groupings (callable injected at
      call time so the class remains pure Python with no LLM dependency).

    The *canonical* predicate within each cluster is the most frequently
    occurring one in the triple buffer.
    """

    def __init__(
        self,
        backend: str = "deterministic",
        similarity_threshold: float = 0.6,
    ) -> None:
        self.backend = backend
        self.similarity_threshold = similarity_threshold

    def find_clusters(
        self,
        triples: list,
        llm_client: Optional[Callable[[list[str]], list[list[str]]]] = None,
    ) -> dict[str, str]:
        """Return ``{non_canonical_predicate: canonical_predicate}``.

        When *backend* is ``"llm"``, *llm_client* must be a callable that
        accepts a list of predicate strings and returns a list of groups
        (each group is a list of semantically equivalent predicate strings).
        """
        preds = list({t.predicate for t in triples})
        if len(preds) < 2:
            return {}
        if self.backend == "llm" and llm_client is not None:
            return self._llm_clusters(preds, triples, llm_client)
        return self._deterministic_clusters(preds, triples)

    def _deterministic_clusters(
        self, preds: list[str], triples: list
    ) -> dict[str, str]:
        local_names = {p: _local_name(p) for p in preds}
        freq: Counter = Counter(t.predicate for t in triples)
        clusters: list[list[str]] = []
        assigned: set[str] = set()

        for p in sorted(preds, key=lambda x: (-freq[x], x)):  # most frequent first
            if p in assigned:
                continue
            cluster = [p]
            assigned.add(p)
            for q in sorted(preds, key=lambda x: (-freq[x], x)):
                if q in assigned:
                    continue
                ratio = SequenceMatcher(
                    None, local_names[p], local_names[q]
                ).ratio()
                if ratio >= self.similarity_threshold:
                    cluster.append(q)
                    assigned.add(q)
            clusters.append(cluster)

        collapse_map: dict[str, str] = {}
        for cluster in clusters:
            if len(cluster) < 2:
                continue
            canonical = max(cluster, key=lambda x: freq[x])
            for p in cluster:
                if p != canonical:
                    collapse_map[p] = canonical
        return collapse_map

    def _llm_clusters(
        self,
        preds: list[str],
        triples: list,
        llm_client: Callable[[list[str]], list[list[str]]],
    ) -> dict[str, str]:
        freq: Counter = Counter(t.predicate for t in triples)
        groups: list[list[str]] = llm_client(preds)
        collapse_map: dict[str, str] = {}
        for group in groups:
            if len(group) < 2:
                continue
            canonical = max(group, key=lambda x: freq.get(x, 0))
            for p in group:
                if p != canonical:
                    collapse_map[p] = canonical
        return collapse_map

    def collapse(
        self, triples: list, collapse_map: dict[str, str]
    ) -> tuple[list, int]:
        """Apply *collapse_map* to every triple's predicate.

        :returns: ``(new_triples, n_collapsed)``
        """
        result = []
        n = 0
        for t in triples:
            if t.predicate in collapse_map:
                result.append(t.model_copy(update={"predicate": collapse_map[t.predicate]}))
                n += 1
            else:
                result.append(t)
        return result, n

collapse(triples, collapse_map)

Apply collapse_map to every triple's predicate.

:returns: (new_triples, n_collapsed)

Source code in src/riverbank/vocabulary/__init__.py
def collapse(
    self, triples: list, collapse_map: dict[str, str]
) -> tuple[list, int]:
    """Apply *collapse_map* to every triple's predicate.

    :returns: ``(new_triples, n_collapsed)``
    """
    result = []
    n = 0
    for t in triples:
        if t.predicate in collapse_map:
            result.append(t.model_copy(update={"predicate": collapse_map[t.predicate]}))
            n += 1
        else:
            result.append(t)
    return result, n

find_clusters(triples, llm_client=None)

Return {non_canonical_predicate: canonical_predicate}.

When backend is "llm", llm_client must be a callable that accepts a list of predicate strings and returns a list of groups (each group is a list of semantically equivalent predicate strings).

Source code in src/riverbank/vocabulary/__init__.py
def find_clusters(
    self,
    triples: list,
    llm_client: Optional[Callable[[list[str]], list[list[str]]]] = None,
) -> dict[str, str]:
    """Return ``{non_canonical_predicate: canonical_predicate}``.

    When *backend* is ``"llm"``, *llm_client* must be a callable that
    accepts a list of predicate strings and returns a list of groups
    (each group is a list of semantically equivalent predicate strings).
    """
    preds = list({t.predicate for t in triples})
    if len(preds) < 2:
        return {}
    if self.backend == "llm" and llm_client is not None:
        return self._llm_clusters(preds, triples, llm_client)
    return self._deterministic_clusters(preds, triples)

URICanonicaliser

Rewrite non-canonical subject/object URIs using owl:sameAs links present in the triple buffer.

After entity resolution writes owl:sameAs triples, this pass:

  1. Builds equivalence classes from all owl:sameAs pairs.
  2. Chooses the canonical URI for each class — the one that appears most frequently as a subject in non-owl:sameAs triples.
  3. Rewrites every occurrence of a non-canonical URI (as subject or object) to the canonical form.

Example::

# owl:sameAs chain written by entity_resolution
ex:Marie_Curie  owl:sameAs  ex:Maria_Sklodowska_Curie
ex:M_Curie      owl:sameAs  ex:Marie_Curie

# URICanonicaliser rewrites all triples to the canonical URI
ex:M_Curie              ex:discovered  ex:Polonium
ex:Maria_Sklodowska_Curie  ex:born_in  "Warsaw"
→  ex:Marie_Curie  ex:discovered  ex:Polonium
→  ex:Marie_Curie  ex:born_in     "Warsaw"
Source code in src/riverbank/vocabulary/__init__.py
class URICanonicaliser:
    """Rewrite non-canonical subject/object URIs using ``owl:sameAs`` links
    present in the triple buffer.

    After entity resolution writes ``owl:sameAs`` triples, this pass:

    1. Builds equivalence classes from all ``owl:sameAs`` pairs.
    2. Chooses the **canonical** URI for each class — the one that appears
       most frequently as a subject in non-``owl:sameAs`` triples.
    3. Rewrites every occurrence of a non-canonical URI (as subject or
       object) to the canonical form.

    Example::

        # owl:sameAs chain written by entity_resolution
        ex:Marie_Curie  owl:sameAs  ex:Maria_Sklodowska_Curie
        ex:M_Curie      owl:sameAs  ex:Marie_Curie

        # URICanonicaliser rewrites all triples to the canonical URI
        ex:M_Curie              ex:discovered  ex:Polonium
        ex:Maria_Sklodowska_Curie  ex:born_in  "Warsaw"
        →  ex:Marie_Curie  ex:discovered  ex:Polonium
        →  ex:Marie_Curie  ex:born_in     "Warsaw"
    """

    _SAME_AS_PREDICATES = frozenset(
        {
            "owl:sameAs",
            "http://www.w3.org/2002/07/owl#sameAs",
        }
    )

    def _is_same_as(self, predicate: str) -> bool:
        return (
            predicate in self._SAME_AS_PREDICATES
            or predicate.endswith("#sameAs")
            or predicate.endswith("/sameAs")
        )

    def canonicalise(self, triples: list) -> tuple[list, int]:
        """Rewrite non-canonical URIs.

        :returns: ``(rewritten_triples, n_rewritten)``
        """
        # ------------------------------------------------------------------
        # Step 1: build union-find from owl:sameAs edges
        # ------------------------------------------------------------------
        parent: dict[str, str] = {}

        def find(x: str) -> str:
            root = x
            while parent.get(root, root) != root:
                root = parent[root]
            # Path compression
            node = x
            while parent.get(node, node) != node:
                nxt = parent[node]
                parent[node] = root
                node = nxt
            return root

        def union(a: str, b: str) -> None:
            pa, pb = find(a), find(b)
            if pa != pb:
                parent[pb] = pa

        for t in triples:
            if self._is_same_as(t.predicate):
                union(t.subject, t.object_value)

        # ------------------------------------------------------------------
        # Step 2: count subject frequencies per equivalence class
        # ------------------------------------------------------------------
        freq: Counter = Counter()
        for t in triples:
            if not self._is_same_as(t.predicate):
                freq[t.subject] += 1

        # ------------------------------------------------------------------
        # Step 3: choose canonical URI for each class
        # ------------------------------------------------------------------
        class_members: dict[str, set[str]] = defaultdict(set)
        for t in triples:
            for uri in (t.subject, t.object_value if _is_iri(t.object_value) else None):
                if uri:
                    class_members[find(uri)].add(uri)

        canonical: dict[str, str] = {}
        for root, members in class_members.items():
            if len(members) < 2:
                continue
            canon = max(members, key=lambda x: (freq.get(x, 0), x))
            for m in members:
                if m != canon:
                    canonical[m] = canon

        if not canonical:
            return triples, 0

        # ------------------------------------------------------------------
        # Step 4: rewrite
        # ------------------------------------------------------------------
        result = []
        n = 0
        for t in triples:
            new_subj = canonical.get(t.subject, t.subject)
            new_obj = canonical.get(t.object_value, t.object_value)
            if new_subj != t.subject or new_obj != t.object_value:
                updates: dict = {}
                if new_subj != t.subject:
                    updates["subject"] = new_subj
                if new_obj != t.object_value:
                    updates["object_value"] = new_obj
                result.append(t.model_copy(update=updates))
                n += 1
            else:
                result.append(t)
        return result, n

canonicalise(triples)

Rewrite non-canonical URIs.

:returns: (rewritten_triples, n_rewritten)

Source code in src/riverbank/vocabulary/__init__.py
def canonicalise(self, triples: list) -> tuple[list, int]:
    """Rewrite non-canonical URIs.

    :returns: ``(rewritten_triples, n_rewritten)``
    """
    # ------------------------------------------------------------------
    # Step 1: build union-find from owl:sameAs edges
    # ------------------------------------------------------------------
    parent: dict[str, str] = {}

    def find(x: str) -> str:
        root = x
        while parent.get(root, root) != root:
            root = parent[root]
        # Path compression
        node = x
        while parent.get(node, node) != node:
            nxt = parent[node]
            parent[node] = root
            node = nxt
        return root

    def union(a: str, b: str) -> None:
        pa, pb = find(a), find(b)
        if pa != pb:
            parent[pb] = pa

    for t in triples:
        if self._is_same_as(t.predicate):
            union(t.subject, t.object_value)

    # ------------------------------------------------------------------
    # Step 2: count subject frequencies per equivalence class
    # ------------------------------------------------------------------
    freq: Counter = Counter()
    for t in triples:
        if not self._is_same_as(t.predicate):
            freq[t.subject] += 1

    # ------------------------------------------------------------------
    # Step 3: choose canonical URI for each class
    # ------------------------------------------------------------------
    class_members: dict[str, set[str]] = defaultdict(set)
    for t in triples:
        for uri in (t.subject, t.object_value if _is_iri(t.object_value) else None):
            if uri:
                class_members[find(uri)].add(uri)

    canonical: dict[str, str] = {}
    for root, members in class_members.items():
        if len(members) < 2:
            continue
        canon = max(members, key=lambda x: (freq.get(x, 0), x))
        for m in members:
            if m != canon:
                canonical[m] = canon

    if not canonical:
        return triples, 0

    # ------------------------------------------------------------------
    # Step 4: rewrite
    # ------------------------------------------------------------------
    result = []
    n = 0
    for t in triples:
        new_subj = canonical.get(t.subject, t.subject)
        new_obj = canonical.get(t.object_value, t.object_value)
        if new_subj != t.subject or new_obj != t.object_value:
            updates: dict = {}
            if new_subj != t.subject:
                updates["subject"] = new_subj
            if new_obj != t.object_value:
                updates["object_value"] = new_obj
            result.append(t.model_copy(update=updates))
            n += 1
        else:
            result.append(t)
    return result, n

VocabularyNormalisationPass

Orchestrate all vocabulary normalisation sub-passes (v0.15.3 + v0.15.5).

Passes applied in order:

  1. Unicode normalization — decode bare unicode escapes in literals.
  2. Categorical literal promotion — repeated literals → vocab:* IRIs.
  3. Predicate cluster collapse — edit-distance or LLM grouping.
  4. Fact-stuffed predicate decomposition — qualifier stripping.
  5. Entity URI canonicalisation — owl:sameAs rewriting (optional).
  6. Embedding-based predicate canonicalization (v0.15.5, optional) — DBSCAN clusters of semantically equivalent predicates rewritten to a canonical form using nomic-embed-text + optional LLM renaming.
  7. Deduplication — keep highest-confidence triple per (s, p, o) key.

Usage::

pass_ = VocabularyNormalisationPass.from_profile(profile)
result = pass_.run(triple_buffer)
# result.triples  — normalised triple list
# result.vocab_literals_promoted, .vocab_predicates_collapsed, …
# result.predicates_canonicalized, .predicate_clusters_merged  (v0.15.5)

The pass is idempotent: running it twice on the same buffer produces the same result as running it once (assuming no new categorical clusters emerge after the first pass).

Source code in src/riverbank/vocabulary/__init__.py
class VocabularyNormalisationPass:
    """Orchestrate all vocabulary normalisation sub-passes (v0.15.3 + v0.15.5).

    Passes applied in order:

    0. Unicode normalization — decode bare unicode escapes in literals.
    1. Categorical literal promotion — repeated literals → vocab:* IRIs.
    2. Predicate cluster collapse — edit-distance or LLM grouping.
    3. Fact-stuffed predicate decomposition — qualifier stripping.
    4. Entity URI canonicalisation — owl:sameAs rewriting (optional).
    5. Embedding-based predicate canonicalization (v0.15.5, optional) —
       DBSCAN clusters of semantically equivalent predicates rewritten to
       a canonical form using nomic-embed-text + optional LLM renaming.
    6. Deduplication — keep highest-confidence triple per (s, p, o) key.

    Usage::

        pass_ = VocabularyNormalisationPass.from_profile(profile)
        result = pass_.run(triple_buffer)
        # result.triples  — normalised triple list
        # result.vocab_literals_promoted, .vocab_predicates_collapsed, …
        # result.predicates_canonicalized, .predicate_clusters_merged  (v0.15.5)

    The pass is idempotent: running it twice on the same buffer produces the
    same result as running it once (assuming no new categorical clusters
    emerge after the first pass).
    """

    def __init__(self, config: NormalisationConfig, settings: Any = None) -> None:
        self.config = config
        self._settings = settings
        self._categorical = CategoricalDetector(
            threshold=config.categorical_threshold,
            vocab_namespace=config.vocabulary_namespace,
        )
        self._collapser = PredicateCollapser(
            backend=config.predicate_collapse_backend,
        )
        self._decomposer = FactDecomposer()
        self._canonicaliser = URICanonicaliser()
        # v0.15.5: embedding-based predicate canonicalization
        self._embedding_canonicali: Optional[EmbeddingPredicateCanonicali] = (
            EmbeddingPredicateCanonicali(
                threshold=config.embedding_canonicalization_threshold,
                model_name=config.embedding_canonicalization_model,
                llm_rename=config.embedding_canonicalization_llm_rename,
                settings=settings,
            )
            if config.embedding_canonicalization
            else None
        )

    @classmethod
    def from_profile(cls, profile: Any, settings: Any = None) -> "VocabularyNormalisationPass":
        """Construct from a :class:`~riverbank.pipeline.CompilerProfile`."""
        cfg: dict = getattr(profile, "vocabulary_normalisation", {})
        config = NormalisationConfig(
            enabled=cfg.get("enabled", True),
            categorical_threshold=cfg.get("categorical_threshold", 2),
            collapse_predicates=cfg.get("collapse_predicates", True),
            predicate_collapse_backend=cfg.get(
                "predicate_collapse_backend", "deterministic"
            ),
            decompose_stuffed_predicates=cfg.get("decompose_stuffed_predicates", True),
            rewrite_canonical_uris=cfg.get("rewrite_canonical_uris", False),
            vocabulary_namespace=cfg.get(
                "vocabulary_namespace", "http://riverbank.example/vocab/"
            ),
            # v0.15.5
            embedding_canonicalization=cfg.get("embedding_canonicalization", False),
            embedding_canonicalization_threshold=cfg.get(
                "embedding_canonicalization_threshold", 0.88
            ),
            embedding_canonicalization_model=cfg.get(
                "embedding_canonicalization_model", "nomic-embed-text"
            ),
            embedding_canonicalization_llm_rename=cfg.get(
                "embedding_canonicalization_llm_rename", True
            ),
        )
        return cls(config, settings=settings)

    def run(
        self,
        triples: list,
        llm_client: Optional[Callable[[list[str]], list[list[str]]]] = None,
    ) -> NormalisationResult:
        """Apply all enabled sub-passes to *triples* and return a result.

        :param triples: List of
            :class:`~riverbank.prov.ExtractedTriple` objects.
        :param llm_client: Optional callable for LLM-guided predicate
            collapsing.  Must accept ``list[str]`` of predicate IRIs and
            return ``list[list[str]]`` of equivalence groups.  Only used
            when ``predicate_collapse_backend: "llm"`` is configured.
        :returns: :class:`NormalisationResult` with normalised triples and
            per-normalisation counts.
        """
        result = list(triples)
        n_promoted = n_collapsed = n_decomposed = n_rewritten = 0
        n_canonicalized = n_clusters_merged = 0

        # 0. Unicode normalization — decode bare unicode escapes in literals
        #    e.g. "1972u201373" → "1972–73"  (leaked \uXXXX without backslash)
        result = [
            t.model_copy(update={"object_value": _normalize_literal_unicode(t.object_value)})
            if not _is_iri(t.object_value) and _UNICODE_ESCAPE_RE.search(t.object_value)
            else t
            for t in result
        ]

        # 1. Categorical literal → IRI
        cat_map = self._categorical.detect(result)
        if cat_map:
            result, n_promoted = self._categorical.promote(result, cat_map)

        # 2. Predicate cluster collapse
        if self.config.collapse_predicates:
            collapse_map = self._collapser.find_clusters(result, llm_client)
            if collapse_map:
                result, n_collapsed = self._collapser.collapse(result, collapse_map)

        # 3. Fact-stuffed predicate decomposition
        if self.config.decompose_stuffed_predicates:
            result, n_decomposed = self._decomposer.decompose(result)

        # 4. Entity URI canonicalisation
        if self.config.rewrite_canonical_uris:
            result, n_rewritten = self._canonicaliser.canonicalise(result)

        # 5. v0.15.5: Embedding-based predicate canonicalization
        #    Runs after the deterministic passes so that it operates on a
        #    cleaner predicate set (collapsed + decomposed).  The LLM client
        #    is NOT shared with the predicate collapser — this pass builds its
        #    own client from settings when llm_rename is enabled.
        if self._embedding_canonicali is not None:
            result, n_canonicalized, n_clusters_merged = (
                self._embedding_canonicali.canonicalize(result)
            )

        # 6. Deduplication — after URI rewriting + canonicalization, previously
        #    distinct triples may now share the same (subject, predicate, object_value)
        #    key.  Keep the one with the highest confidence.
        seen: dict[tuple[str, str, str], Any] = {}
        for t in result:
            key = (t.subject, t.predicate, t.object_value)
            if key not in seen or t.confidence > seen[key].confidence:
                seen[key] = t
        result = list(seen.values())

        return NormalisationResult(
            triples=result,
            vocab_literals_promoted=n_promoted,
            vocab_predicates_collapsed=n_collapsed,
            vocab_facts_decomposed=n_decomposed,
            vocab_uris_rewritten=n_rewritten,
            predicates_canonicalized=n_canonicalized,
            predicate_clusters_merged=n_clusters_merged,
        )

from_profile(profile, settings=None) classmethod

Construct from a :class:~riverbank.pipeline.CompilerProfile.

Source code in src/riverbank/vocabulary/__init__.py
@classmethod
def from_profile(cls, profile: Any, settings: Any = None) -> "VocabularyNormalisationPass":
    """Construct from a :class:`~riverbank.pipeline.CompilerProfile`."""
    cfg: dict = getattr(profile, "vocabulary_normalisation", {})
    config = NormalisationConfig(
        enabled=cfg.get("enabled", True),
        categorical_threshold=cfg.get("categorical_threshold", 2),
        collapse_predicates=cfg.get("collapse_predicates", True),
        predicate_collapse_backend=cfg.get(
            "predicate_collapse_backend", "deterministic"
        ),
        decompose_stuffed_predicates=cfg.get("decompose_stuffed_predicates", True),
        rewrite_canonical_uris=cfg.get("rewrite_canonical_uris", False),
        vocabulary_namespace=cfg.get(
            "vocabulary_namespace", "http://riverbank.example/vocab/"
        ),
        # v0.15.5
        embedding_canonicalization=cfg.get("embedding_canonicalization", False),
        embedding_canonicalization_threshold=cfg.get(
            "embedding_canonicalization_threshold", 0.88
        ),
        embedding_canonicalization_model=cfg.get(
            "embedding_canonicalization_model", "nomic-embed-text"
        ),
        embedding_canonicalization_llm_rename=cfg.get(
            "embedding_canonicalization_llm_rename", True
        ),
    )
    return cls(config, settings=settings)

run(triples, llm_client=None)

Apply all enabled sub-passes to triples and return a result.

:param triples: List of :class:~riverbank.prov.ExtractedTriple objects. :param llm_client: Optional callable for LLM-guided predicate collapsing. Must accept list[str] of predicate IRIs and return list[list[str]] of equivalence groups. Only used when predicate_collapse_backend: "llm" is configured. :returns: :class:NormalisationResult with normalised triples and per-normalisation counts.

Source code in src/riverbank/vocabulary/__init__.py
def run(
    self,
    triples: list,
    llm_client: Optional[Callable[[list[str]], list[list[str]]]] = None,
) -> NormalisationResult:
    """Apply all enabled sub-passes to *triples* and return a result.

    :param triples: List of
        :class:`~riverbank.prov.ExtractedTriple` objects.
    :param llm_client: Optional callable for LLM-guided predicate
        collapsing.  Must accept ``list[str]`` of predicate IRIs and
        return ``list[list[str]]`` of equivalence groups.  Only used
        when ``predicate_collapse_backend: "llm"`` is configured.
    :returns: :class:`NormalisationResult` with normalised triples and
        per-normalisation counts.
    """
    result = list(triples)
    n_promoted = n_collapsed = n_decomposed = n_rewritten = 0
    n_canonicalized = n_clusters_merged = 0

    # 0. Unicode normalization — decode bare unicode escapes in literals
    #    e.g. "1972u201373" → "1972–73"  (leaked \uXXXX without backslash)
    result = [
        t.model_copy(update={"object_value": _normalize_literal_unicode(t.object_value)})
        if not _is_iri(t.object_value) and _UNICODE_ESCAPE_RE.search(t.object_value)
        else t
        for t in result
    ]

    # 1. Categorical literal → IRI
    cat_map = self._categorical.detect(result)
    if cat_map:
        result, n_promoted = self._categorical.promote(result, cat_map)

    # 2. Predicate cluster collapse
    if self.config.collapse_predicates:
        collapse_map = self._collapser.find_clusters(result, llm_client)
        if collapse_map:
            result, n_collapsed = self._collapser.collapse(result, collapse_map)

    # 3. Fact-stuffed predicate decomposition
    if self.config.decompose_stuffed_predicates:
        result, n_decomposed = self._decomposer.decompose(result)

    # 4. Entity URI canonicalisation
    if self.config.rewrite_canonical_uris:
        result, n_rewritten = self._canonicaliser.canonicalise(result)

    # 5. v0.15.5: Embedding-based predicate canonicalization
    #    Runs after the deterministic passes so that it operates on a
    #    cleaner predicate set (collapsed + decomposed).  The LLM client
    #    is NOT shared with the predicate collapser — this pass builds its
    #    own client from settings when llm_rename is enabled.
    if self._embedding_canonicali is not None:
        result, n_canonicalized, n_clusters_merged = (
            self._embedding_canonicali.canonicalize(result)
        )

    # 6. Deduplication — after URI rewriting + canonicalization, previously
    #    distinct triples may now share the same (subject, predicate, object_value)
    #    key.  Keep the one with the highest confidence.
    seen: dict[tuple[str, str, str], Any] = {}
    for t in result:
        key = (t.subject, t.predicate, t.object_value)
        if key not in seen or t.confidence > seen[key].confidence:
            seen[key] = t
    result = list(seen.values())

    return NormalisationResult(
        triples=result,
        vocab_literals_promoted=n_promoted,
        vocab_predicates_collapsed=n_collapsed,
        vocab_facts_decomposed=n_decomposed,
        vocab_uris_rewritten=n_rewritten,
        predicates_canonicalized=n_canonicalized,
        predicate_clusters_merged=n_clusters_merged,
    )

build_llm_predicate_collapser(settings, profile)

Return an LLM callable for predicate grouping, or None on failure.

The callable accepts a list of predicate IRI strings and returns a list of equivalence groups (each group is a list of semantically equivalent predicate strings). Used by :class:PredicateCollapser in llm mode.

Falls back to None (caller uses deterministic mode) when the openai package is unavailable or the LLM call is not configured.

Source code in src/riverbank/vocabulary/__init__.py
def build_llm_predicate_collapser(
    settings: Any,
    profile: Any,
) -> "Optional[Callable[[list[str]], list[list[str]]]]":
    """Return an LLM callable for predicate grouping, or ``None`` on failure.

    The callable accepts a list of predicate IRI strings and returns a list
    of equivalence groups (each group is a list of semantically equivalent
    predicate strings).  Used by :class:`PredicateCollapser` in ``llm`` mode.

    Falls back to ``None`` (caller uses deterministic mode) when the openai
    package is unavailable or the LLM call is not configured.
    """
    try:
        import json  # noqa: PLC0415

        from openai import OpenAI  # noqa: PLC0415

        model_provider = getattr(profile, "model_provider", "ollama")
        model_name = getattr(profile, "model_name", "llama3.2")

        api_base = getattr(settings.llm, "api_base", "http://localhost:11434/v1")
        api_key = getattr(settings.llm, "api_key", "ollama" if model_provider == "ollama" else "")
        # api_base is already normalised by LLMSettings._normalise_api_base validator

        client = OpenAI(api_key=api_key, base_url=api_base)

        def _llm_grouper(predicates: list[str]) -> list[list[str]]:
            prompt = _PREDICATE_COLLAPSE_PROMPT.format(
                predicates="\n".join(predicates)
            )
            try:
                resp = client.chat.completions.create(
                    model=model_name,
                    messages=[{"role": "user", "content": prompt}],
                    temperature=0.0,
                    max_tokens=1024,
                )
                raw = resp.choices[0].message.content or "{}"
                # Strip markdown code fences if present
                raw = re.sub(r"^```[a-z]*\n?", "", raw.strip(), flags=re.MULTILINE)
                raw = re.sub(r"```$", "", raw.strip())
                data = json.loads(raw)
                groups: list[list[str]] = data.get("groups", [])
                # Validate: only keep groups whose members are in our predicate set
                pred_set = set(predicates)
                return [
                    [p for p in g if p in pred_set]
                    for g in groups
                    if len([p for p in g if p in pred_set]) >= 2
                ]
            except Exception as _e:  # noqa: BLE001
                import logging as _log  # noqa: PLC0415
                _log.getLogger(__name__).warning(
                    "LLM predicate collapse call failed, falling back to deterministic: %s", _e
                )
                return []

        return _llm_grouper
    except Exception:  # noqa: BLE001
        return None