Source code for abydos.distance._token_distance

# -*- coding: utf-8 -*-

# Copyright 2018-2019 by Christopher C. Little.
# This file is part of Abydos.
#
# Abydos is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Abydos is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Abydos. If not, see <http://www.gnu.org/licenses/>.

"""abydos.distance._token_distance.

The distance._token_distance._TokenDistance module implements abstract class
_TokenDistance.
"""

from __future__ import (
    absolute_import,
    division,
    print_function,
    unicode_literals,
)

from collections import Counter, OrderedDict
from itertools import product
from math import exp, log1p

from numpy import copy as np_copy
from numpy import zeros as np_zeros

try:
    from scipy.optimize import linear_sum_assignment
except ImportError:  # pragma: no cover
    # If the system lacks the scipy library, we'll fall back to our
    # Python+Numpy implementation of the Hungarian algorithm
    linear_sum_assignment = None

from ._damerau_levenshtein import DamerauLevenshtein
from ._distance import _Distance
from ._lcprefix import LCPrefix
from ._levenshtein import Levenshtein
from ..stats import ConfusionTable
from ..tokenizer import QGrams, QSkipgrams, WhitespaceTokenizer

__all__ = ['_TokenDistance']


[docs]class _TokenDistance(_Distance): r"""Abstract Token Distance class. .. _confusion_table: +----------------+--------------+-----------------+-------+ | | |in| ``tar`` | |notin| ``tar`` | | +----------------+--------------+-----------------+-------+ | |in| ``src`` | |a| | |b| | |a+b| | +----------------+--------------+-----------------+-------+ | |notin| ``src``| |c| | |d| | |c+d| | +----------------+--------------+-----------------+-------+ | | |a+c| | |b+d| | |n| | +----------------+--------------+-----------------+-------+ .. |in| replace:: :math:`x \in` .. |notin| replace:: :math:`x \notin` .. |a| replace:: :math:`a = |X \cap Y|` .. |b| replace:: :math:`b = |X\setminus Y|` .. |c| replace:: :math:`c = |Y \setminus X|` .. |d| replace:: :math:`d = |(N\setminus X)\setminus Y|` .. |n| replace:: :math:`n = |N|` .. |a+b| replace:: :math:`p_1 = a+b = |X|` .. |a+c| replace:: :math:`p_2 = a+c = |Y|` .. |c+d| replace:: :math:`q_1 = c+d = |N\setminus X|` .. |b+d| replace:: :math:`q_2 = b+d = |N\setminus Y|` .. versionadded:: 0.3.6 """ def __init__(self, tokenizer=None, intersection_type='crisp', **kwargs): r"""Initialize _TokenDistance instance. .. _intersection_type: Parameters ---------- tokenizer : _Tokenizer A tokenizer instance from the :py:mod:`abydos.tokenizer` package intersection_type : str Specifies the intersection type, and set type as a result: - 'crisp': Ordinary intersection, wherein items are entirely members or non-members of the intersection. (Default) - ``fuzzy``: Fuzzy intersection, defined by :cite:`Wang:2014`, wherein items can be partially members of the intersection if their similarity meets or exceeds a threshold value. This also takes `metric` (by default :class:`Levenshtein()`) and `threshold` (by default 0.8) parameters. - ``soft``: Soft intersection, defined by :cite:`Russ:2014`, wherein items can be partially members of the intersection depending on their similarity. This also takes a `metric` (by default :class:`DamerauLevenshtein()`) parameter. - ``linkage``: Group linkage, defined by :cite:`On:2007`. Like the soft intersection, items can be partially members of the intersection, but the method of pairing similar members is somewhat more complex. See the cited paper for details. This also takes `metric` (by default :class:`DamerauLevenshtein()`) and `threshold` (by default 0.1) parameters. **kwargs Arbitrary keyword arguments .. _alphabet: Other Parameters ---------------- qval : int The length of each q-gram. Using this parameter and tokenizer=None will cause the instance to use the QGram tokenizer with this q value. metric : _Distance A string distance measure class for use in the ``soft`` and ``fuzzy`` variants. threshold : float A threshold value, similarities above which are counted as members of the intersection for the ``fuzzy`` variant. alphabet : Counter, collection, int, or None This represents the alphabet of possible tokens. - If a Counter is supplied, it is used directly in computing the complement of the tokens in both sets. - If a collection is supplied, it is converted to a Counter and used directly. In the case of a single string being supplied and the QGram tokenizer being used, the full alphabet is inferred (i.e. :math:`len(set(alphabet+QGrams.start\_stop))^{QGrams.qval}` is used as the cardinality of the full alphabet. - If an int is supplied, it is used as the cardinality of the full alphabet. - If None is supplied, the cardinality of the full alphabet is inferred if QGram of QSkipgrams tokenization is used (i.e. :math:`28^{QGrams.qval}` is used as the cardinality of the full alphabet or :math:`26` if QGrams.qval is 1, which assumes the strings are English language strings and only contain letters of a single case). Otherwise, the cardinality of the complement of the total will be 0. normalizer : str This represents the normalization applied to the values in the 2x2 contingency table prior to any of the cardinality (\*_card) methods returning a value. By default, no normalization is applied, but the following values are supported: - ``proportional`` : :math:`\frac{x}{n}`, where n is the total population - ``log`` : :math:`log(1+x)` - ``exp`` : :math:`e^x` - ``laplace`` : :math:`x+1` - ``inverse`` : :math:`\frac{1}{x}` - ``complement`` : :math:`n-x`, where n is the total population internal_assignment_problem : bool When using ``linkage`` as the intersection type (i.e. group linkage), this forces use of the internal implementation to solve the assignment problem, rather than scipy's linear_sum_assignment. .. versionadded:: 0.4.0 """ super(_TokenDistance, self).__init__( intersection_type=intersection_type, **kwargs ) qval = 2 if 'qval' not in self.params else self.params['qval'] self.params['tokenizer'] = ( tokenizer if tokenizer is not None else WhitespaceTokenizer() if qval == 0 else QGrams(qval=qval, start_stop='$#', skip=0, scaler=None) ) if hasattr(self.params['tokenizer'], 'qval'): if isinstance(self.params['tokenizer'].qval, int): qvals = [self.params['tokenizer'].qval] else: qvals = list(self.params['tokenizer'].qval) else: qvals = [] if 'alphabet' in self.params: if isinstance(self.params['alphabet'], str): self.params['alphabet'] = set(self.params['alphabet']) if isinstance(self.params['tokenizer'], (QGrams, QSkipgrams)): self.params['alphabet'] |= set( self.params['tokenizer'].start_stop ) self.params['alphabet'] = sum( len(self.params['alphabet']) ** qval for qval in qvals ) if hasattr(self.params['alphabet'], '__len__') and not isinstance( self.params['alphabet'], Counter ): self.params['alphabet'] = len(self.params['alphabet']) elif self.params['alphabet'] is None and isinstance( self.params['tokenizer'], (QGrams, QSkipgrams) ): self.params['alphabet'] = sum( 28 ** qval if qval > 1 else 26 for qval in qvals ) else: if isinstance(self.params['tokenizer'], (QGrams, QSkipgrams)): self.params['alphabet'] = sum( 28 ** qval if qval > 1 else 26 for qval in qvals ) else: self.params['alphabet'] = None if intersection_type == 'soft': if 'metric' not in self.params or self.params['metric'] is None: self.params['metric'] = DamerauLevenshtein() self._lcprefix = LCPrefix() self._intersection = self._soft_intersection elif intersection_type == 'fuzzy': if 'metric' not in self.params or self.params['metric'] is None: self.params['metric'] = Levenshtein() if 'threshold' not in self.params: self.params['threshold'] = 0.8 self._intersection = self._fuzzy_intersection elif intersection_type == 'linkage': if 'metric' not in self.params or self.params['metric'] is None: self.params['metric'] = DamerauLevenshtein() if 'threshold' not in self.params: self.params['threshold'] = 0.1 self._intersection = self._group_linkage_intersection else: self._intersection = self._crisp_intersection self._src_tokens = Counter() self._tar_tokens = Counter() self._population_card_value = 0 # initialize normalizer self.normalizer = self._norm_none self._norm_dict = { 'proportional': self._norm_proportional, 'log': self._norm_log, 'exp': self._norm_exp, 'laplace': self._norm_laplace, 'inverse': self._norm_inverse, 'complement': self._norm_complement, }
[docs] def _norm_none(self, x, _squares, _pop): return x
[docs] def _norm_proportional(self, x, _squares, pop): return x / max(1, pop)
[docs] def _norm_log(self, x, _squares, _pop): return log1p(x)
[docs] def _norm_exp(self, x, _squares, _pop): return exp(x)
[docs] def _norm_laplace(self, x, squares, _pop): return x + squares
[docs] def _norm_inverse(self, x, _squares, pop): return 1 / x if x else pop
[docs] def _norm_complement(self, x, _squares, pop): return pop - x
[docs] def _tokenize(self, src, tar): """Return the Q-Grams in src & tar. Parameters ---------- src : str Source string (or QGrams/Counter objects) for comparison tar : str Target string (or QGrams/Counter objects) for comparison Returns ------- tuple of Counters Q-Grams Examples -------- >>> pe = _TokenDistance() >>> pe._tokenize('AT', 'TT')._get_tokens() (Counter({'$A': 1, 'AT': 1, 'T#': 1}), Counter({'$T': 1, 'TT': 1, 'T#': 1})) .. versionadded:: 0.1.0 .. versionchanged:: 0.3.6 Encapsulated in class """ self._src_orig = src self._tar_orig = tar if isinstance(src, Counter): self._src_tokens = src else: self._src_tokens = ( self.params['tokenizer'].tokenize(src).get_counter() ) if isinstance(src, Counter): self._tar_tokens = tar else: self._tar_tokens = ( self.params['tokenizer'].tokenize(tar).get_counter() ) self._population_card_value = self._calc_population_card() # Set up the normalizer, a function of two variables: # x is the value in the contingency table square(s) # n is the number of squares that x represents if ( 'normalizer' in self.params and self.params['normalizer'] in self._norm_dict ): self.normalizer = self._norm_dict[self.params['normalizer']] return self
[docs] def _get_tokens(self): """Return the src and tar tokens as a tuple.""" return self._src_tokens, self._tar_tokens
[docs] def _src_card(self): r"""Return the cardinality of the tokens in the source set.""" return self.normalizer( sum(abs(val) for val in self._src_tokens.values()), 2, self._population_card_value, )
[docs] def _src_only(self): r"""Return the src tokens minus the tar tokens. For (multi-)sets S and T, this is :math:`S \setminus T`. """ src_only = self._src_tokens - self._intersection() if self.params['intersection_type'] != 'crisp': src_only -= self._intersection() - self._crisp_intersection() return src_only
[docs] def _src_only_card(self): """Return the cardinality of the tokens only in the source set.""" return self.normalizer( sum(abs(val) for val in self._src_only().values()), 1, self._population_card_value, )
[docs] def _tar_card(self): r"""Return the cardinality of the tokens in the target set.""" return self.normalizer( sum(abs(val) for val in self._tar_tokens.values()), 2, self._population_card_value, )
[docs] def _tar_only(self): r"""Return the tar tokens minus the src tokens. For (multi-)sets S and T, this is :math:`T \setminus S`. """ tar_only = self._tar_tokens - self._intersection() if self.params['intersection_type'] != 'crisp': tar_only -= self._intersection() - self._crisp_intersection() return tar_only
[docs] def _tar_only_card(self): """Return the cardinality of the tokens only in the target set.""" return self.normalizer( sum(abs(val) for val in self._tar_only().values()), 1, self._population_card_value, )
[docs] def _symmetric_difference(self): r"""Return the symmetric difference of tokens from src and tar. For (multi-)sets S and T, this is :math:`S \triangle T`. """ return self._src_only() + self._tar_only()
[docs] def _symmetric_difference_card(self): """Return the cardinality of the symmetric difference.""" return self.normalizer( sum(abs(val) for val in self._symmetric_difference().values()), 2, self._population_card_value, )
[docs] def _total(self): """Return the sum of the sets. For (multi-)sets S and T, this is :math:`S + T`. In the case of multisets, this counts values in the interesection twice. In the case of sets, this is identical to the union. """ return self._src_tokens + self._tar_tokens
[docs] def _total_card(self): """Return the cardinality of the complement of the total.""" return self.normalizer( sum(abs(val) for val in self._total().values()), 3, self._population_card_value, )
[docs] def _total_complement_card(self): """Return the cardinality of the complement of the total.""" if self.params['alphabet'] is None: return self.normalizer(0, 1, self._population_card_value) elif isinstance(self.params['alphabet'], Counter): return self.normalizer( max( 0, sum( abs(val) for val in ( self.params['alphabet'] - self._total() ).values() ), ), 1, self._population_card_value, ) return self.normalizer( max(0, self.params['alphabet'] - len(self._total().values())), 1, self._population_card_value, )
[docs] def _calc_population_card(self): """Return the cardinality of the population.""" save_normalizer = self.normalizer self.normalizer = self._norm_none pop = self._total_card() + self._total_complement_card() self.normalizer = save_normalizer return pop
[docs] def _population_card(self): """Return the cardinality of the population.""" return self.normalizer( self._population_card_value, 4, self._population_card_value )
[docs] def _population_unique_card(self): """Return the cardinality of the population minus the intersection.""" return self.normalizer( self._population_card_value - self._intersection_card(), 4, self._population_card_value, )
[docs] def _union(self): r"""Return the union of tokens from src and tar. For (multi-)sets S and T, this is :math:`S \cup T`. """ union = self._total() - self._intersection() if self.params['intersection_type'] != 'crisp': union -= self._intersection() - self._crisp_intersection() return union
[docs] def _union_card(self): """Return the cardinality of the union.""" return self.normalizer( sum(abs(val) for val in self._union().values()), 3, self._population_card_value, )
[docs] def _difference(self): """Return the difference of the tokens, supporting negative values.""" _src_copy = Counter(self._src_tokens) _src_copy.subtract(self._tar_tokens) return _src_copy
[docs] def _crisp_intersection(self): r"""Return the intersection of tokens from src and tar. For (multi-)sets S and T, this is :math:`S \cap T`. """ return self._src_tokens & self._tar_tokens
[docs] def _soft_intersection(self): """Return the soft intersection of the tokens in src and tar. This implements the soft intersection defined by :cite:`Russ:2014`. """ intersection = self._crisp_intersection() src_only = self._src_tokens - self._tar_tokens tar_only = self._tar_tokens - self._src_tokens def _membership(src, tar): greater_length = max(len(src), len(tar)) return ( max( greater_length - self.params['metric'].dist_abs(src, tar), self._lcprefix.dist_abs(src, tar), ) / greater_length ) # Dictionary ordering is important for reproducibility, so insertion # order needs to be controlled and retained. memberships = OrderedDict( ((src, tar), _membership(src, tar)) for src, tar in sorted(product(src_only, tar_only)) ) while memberships: src_tok, tar_tok = max(memberships, key=memberships.get) if memberships[src_tok, tar_tok] > 0.0: pairings = min(src_only[src_tok], tar_only[tar_tok]) if pairings: intersection[src_tok] += ( memberships[src_tok, tar_tok] * pairings / 2 ) intersection[tar_tok] += ( memberships[src_tok, tar_tok] * pairings / 2 ) src_only[src_tok] -= pairings tar_only[tar_tok] -= pairings del memberships[src_tok, tar_tok] return intersection
[docs] def _fuzzy_intersection(self): r"""Return the fuzzy intersection of the tokens in src and tar. This implements the fuzzy intersection defined by :cite:`Wang:2014`. For two sets X and Y, the intersection :cite:`Wang:2014` is the sum of similarities of all tokens in the two sets that are greater than or equal to some threshold value (:math:`\delta`). The lower bound of on this intersection and the value when :math:`\delta = 1.0`, is the crisp intersection. Tokens shorter than :math:`\frac{\delta}{1-\delta}`, 4 in the case of the default threshold :math:`\delta = 0.8`, must match exactly to be included in the intersection. .. versionadded:: 0.4.0 """ intersection = self._crisp_intersection() src_only = self._src_tokens - self._tar_tokens tar_only = self._tar_tokens - self._src_tokens pair = {} for src_tok in sorted(src_only): for tar_tok in sorted(tar_only): sim = self.params['metric'].sim(src_tok, tar_tok) if sim >= self.params['threshold']: pairings = min(src_only[src_tok], tar_only[tar_tok]) pair[(src_tok, tar_tok)] = (sim, pairings) for src_tok, tar_tok in sorted(pair, key=pair.get, reverse=True): pairings = pair[(src_tok, tar_tok)][1] if pairings: sim = pair[(src_tok, tar_tok)][0] intersection[src_tok] += sim / 2 * pairings intersection[tar_tok] += sim / 2 * pairings src_only[src_tok] -= pairings tar_only[tar_tok] -= pairings """ # Here is a slightly different optimization method, which is even # greedier than the above. # ordered by sim*pairings rather than just sim pair = {} for src_tok in sorted(src_only): for tar_tok in sorted(tar_only): sim = self.params['metric'].sim(src_tok, tar_tok) if sim >= self.params['threshold']: pairings = min(src_only[src_tok], tar_only[tar_tok]) pair[(src_tok, tar_tok)] = sim*pairings for src_tok, tar_tok in sorted(pair, key=pair.get, reverse=True): pairings = min(src_only[src_tok], tar_only[tar_tok]) if pairings: sim = pair[(src_tok, tar_tok)] intersection[src_tok] += sim / 2 intersection[tar_tok] += sim / 2 src_only[src_tok] -= pairings tar_only[tar_tok] -= pairings """ return intersection
[docs] def _group_linkage_intersection(self): r"""Return the group linkage intersection of the tokens in src and tar. This is based on group linkage, as defined by :cite:`On:2007`. Most of this method is concerned with solving the assignment problem, in order to find the weight of the maximum weight bipartite matching. If the system has SciPy installed, we use it's linear_sum_assignment function to get the assignments. Otherwise, we use the Hungarian algorithm of Munkres :cite:`Munkres:1957`, implemented in Python & Numpy. .. versionadded:: 0.4.0 """ intersection = self._crisp_intersection() src_only = sorted(self._src_tokens - self._tar_tokens) tar_only = sorted(self._tar_tokens - self._src_tokens) if linear_sum_assignment and not ( 'internal_assignment_problem' in self.params and self.params['internal_assignment_problem'] ): arr = np_zeros((len(tar_only), len(src_only))) for col in range(len(src_only)): for row in range(len(tar_only)): arr[row, col] = self.params['metric'].dist( src_only[col], tar_only[row] ) for row, col in zip(*linear_sum_assignment(arr)): sim = 1.0 - arr[row, col] if sim >= self.params['threshold']: intersection[src_only[col]] += (sim / 2) * ( self._src_tokens - self._tar_tokens )[src_only[col]] intersection[tar_only[row]] += (sim / 2) * ( self._tar_tokens - self._src_tokens )[tar_only[row]] else: n = max(len(tar_only), len(src_only)) arr = np_zeros((n, n), dtype=float) for col in range(len(src_only)): for row in range(len(tar_only)): arr[row, col] = self.params['metric'].dist( src_only[col], tar_only[row] ) src_only += [''] * (n - len(src_only)) tar_only += [''] * (n - len(tar_only)) orig_sim = 1 - np_copy(arr) # Step 1 for row in range(n): arr[row, :] -= arr[row, :].min() # Step 2 for col in range(n): arr[:, col] -= arr[:, col].min() while True: # Step 3 assignments = {} allocated_cols = set() allocated_rows = set() assigned_rows = set() assigned_cols = set() for row in range(n): if (arr[row, :] == 0.0).sum() == 1: col = arr[row, :].argmin() if col not in allocated_cols: assignments[row, col] = orig_sim[row, col] allocated_cols.add(col) assigned_rows.add(row) assigned_cols.add(col) for col in range(n): if (arr[:, col] == 0.0).sum() == 1: row = arr[:, col].argmin() if row not in allocated_rows: assignments[row, col] = orig_sim[row, col] allocated_rows.add(row) assigned_rows.add(row) assigned_cols.add(col) if len(assignments) == n: break marked_rows = {_ for _ in range(n) if _ not in assigned_rows} marked_cols = set() for row in sorted(set(marked_rows)): for col, mark in enumerate(arr[row, :] == 0.0): if mark: marked_cols.add(col) for row2 in range(n): if (row2, col) in assignments: marked_rows.add(row2) if n - len(marked_rows) + len(marked_cols) == n: # We have sufficient lines for col in range(n): row = arr[:, col].argmin() assignments[row, col] = orig_sim[row, col] break # Step 4 min_val = arr[tuple(marked_rows), :][ :, sorted(set(range(n)) - marked_cols) ].min() for row in range(n): for col in range(n): if row in marked_rows and col not in marked_cols: arr[row, col] -= min_val elif row not in marked_rows and col in marked_cols: arr[row, col] += min_val for row, col in assignments.keys(): sim = orig_sim[row, col] if sim >= self.params['threshold']: intersection[src_only[col]] += (sim / 2) * ( self._src_tokens - self._tar_tokens )[src_only[col]] intersection[tar_only[row]] += (sim / 2) * ( self._tar_tokens - self._src_tokens )[tar_only[row]] return intersection
[docs] def _intersection_card(self): """Return the cardinality of the intersection.""" return self.normalizer( sum(abs(val) for val in self._intersection().values()), 1, self._population_card_value, )
[docs] def _intersection(self): """Return the intersection. This function may be overridden by setting the intersection_type during initialization. """ return self._crisp_intersection() # pragma: no cover
[docs] def _get_confusion_table(self): """Return the token counts as a ConfusionTable object.""" return ConfusionTable( self._intersection_card(), self._total_complement_card(), self._src_only_card(), self._tar_only_card(), )
if __name__ == '__main__': import doctest doctest.testmod()