import os
from copy import copy
from numbers import Integral
from itertools import chain
import nltk
import numpy as np
import scipy.sparse as sp
from gensim import corpora
from Orange.data import Table, Domain, ContinuousVariable, DiscreteVariable
from orangecontrib.text.vectorization import BowVectorizer
def get_sample_corpora_dir():
path = os.path.dirname(__file__)
directory = os.path.join(path, 'datasets')
return os.path.abspath(directory)
def _check_arrays(*arrays):
for a in arrays:
if not (a is None or isinstance(a, np.ndarray) or sp.issparse(a)):
raise TypeError('Argument {} should be of type np.array, sparse or None.'.format(a))
lengths = set(a.shape[0] for a in arrays if a is not None)
if len(lengths) > 1:
raise ValueError('Leading dimension mismatch')
return lengths.pop() if len(lengths) else 0
[docs]class Corpus(Table):
"""Internal class for storing a corpus."""
def __new__(cls, *args, **kwargs):
"""Bypass Table.__new__."""
return object.__new__(cls)
[docs] def __init__(self, domain=None, X=None, Y=None, metas=None, W=None, text_features=None, ids=None):
"""
Args:
domain (Orange.data.Domain): the domain for this Corpus
X (numpy.ndarray): attributes
Y (numpy.ndarray): class variables
metas (numpy.ndarray): meta attributes; e.g. text
W (numpy.ndarray): instance weights
text_features (list): meta attributes that are used for
text mining. Infer them if None.
ids (numpy.ndarray): Indices
"""
n_doc = _check_arrays(X, Y, metas)
self.X = X if X is not None and X.size else sp.csr_matrix((n_doc, 0)) # prefer sparse (BoW compute values)
self.Y = Y if Y is not None else np.zeros((n_doc, 0))
self.metas = metas if metas is not None else np.zeros((n_doc, 0))
self.W = W if W is not None else np.zeros((n_doc, 0))
self.domain = domain
self.text_features = None # list of text features for mining
self._tokens = None
self._dictionary = None
self._ngrams_corpus = None
self.ngram_range = (1, 1)
self.attributes = {}
self.pos_tags = None
self.used_preprocessor = None # required for compute values
if domain is not None and text_features is None:
self._infer_text_features()
elif domain is not None:
self.set_text_features(text_features)
if ids is not None:
self.ids = ids
else:
Table._init_ids(self)
[docs] def set_text_features(self, feats):
"""
Select which meta-attributes to include when mining text.
Args:
feats (list or None): List of text features to include. If None infer them.
"""
if feats is not None:
for f in feats:
if f not in chain(self.domain.variables, self.domain.metas):
raise ValueError('Feature "{}" not found.'.format(f))
if len(set(feats)) != len(feats):
raise ValueError('Text features must be unique.')
self.text_features = feats
else:
self._infer_text_features()
self._tokens = None # invalidate tokens
def _infer_text_features(self):
"""
Infer which text features to use. If nothing was provided
in the file header, use the first text feature.
"""
include_feats = []
first = None
for attr in self.domain.metas:
if attr.is_string:
if first is None:
first = attr
if attr.attributes.get('include', 'False') == 'True':
include_feats.append(attr)
if len(include_feats) == 0 and first:
include_feats.append(first)
self.set_text_features(include_feats)
[docs] def extend_corpus(self, metadata, Y):
"""
Append documents to corpus.
Args:
metadata (numpy.ndarray): Meta data
Y (numpy.ndarray): Class variables
"""
self.metas = np.vstack((self.metas, metadata))
cv = self.domain.class_var
for val in set(Y):
if val not in cv.values:
cv.add_value(val)
new_Y = np.array([cv.to_val(i) for i in Y])[:, None]
self._Y = np.vstack((self._Y, new_Y))
self.X = self.W = np.zeros((len(self), 0))
Table._init_ids(self)
self._tokens = None # invalidate tokens
[docs] def extend_attributes(self, X, feature_names, feature_values=None,
compute_values=None, var_attrs=None):
"""
Append features to corpus. If `feature_values` argument is present,
features will be Discrete else Continuous.
Args:
X (numpy.ndarray or scipy.sparse.csr_matrix): Features values to append
feature_names (list): List of string containing feature names
feature_values (list): A list of possible values for Discrete features.
compute_values (list): Compute values for corresponding features.
var_attrs (dict): Additional attributes appended to variable.attributes.
"""
if self.X.size == 0:
self.X = X
elif sp.issparse(self.X) or sp.issparse(X):
self.X = sp.hstack((self.X, X)).tocsr()
else:
self.X = np.hstack((self.X, X))
if compute_values is None:
compute_values = [None] * X.shape[1]
if feature_values is None:
feature_values = [None] * X.shape[1]
new_attr = self.domain.attributes
for f, values, cv in zip(feature_names, feature_values, compute_values):
if values is not None:
var = DiscreteVariable(f, values=values, compute_value=cv)
else:
var = ContinuousVariable(f, compute_value=cv)
if isinstance(var_attrs, dict):
var.attributes.update(var_attrs)
new_attr += (var, )
new_domain = Domain(
attributes=new_attr,
class_vars=self.domain.class_vars,
metas=self.domain.metas
)
self.domain = new_domain
@property
def documents(self):
"""
Returns: a list of strings representing documents — created by joining
selected text features.
"""
return self.documents_from_features(self.text_features)
@property
def titles(self):
""" Returns a list of titles. """
attrs = [attr for attr in chain(self.domain.variables, self.domain.metas)
if attr.attributes.get('title', False)]
# Alternatively, use heuristics
if not attrs:
for var in sorted(chain(self.domain.metas, self.domain),
key=lambda var: var.name,
reverse=True): # reverse so that title < heading < filename
if var.name.lower() in ('title', 'heading', 'h1', 'filename') \
and not var.attributes.get('hidden', False): # skip BoW features
attrs = [var]
break
if attrs:
return self.documents_from_features(attrs)
else:
return ['Document {}'.format(i+1) for i in range(len(self))]
[docs] def documents_from_features(self, feats):
"""
Args:
feats (list): A list fo features to join.
Returns: a list of strings constructed by joining feats.
"""
# create a Table where feats are in metas
data = Table(Domain([], [], [i.name for i in feats],
source=self.domain), self)
# When we use only features coming from sparse X data.metas is sparse.
# Transform it to dense.
if sp.issparse(data.metas):
data.metas = data.metas.toarray()
return [' '.join(f.str_val(val) for f, val in zip(data.domain.metas, row))
for row in data.metas]
[docs] def store_tokens(self, tokens, dictionary=None):
"""
Args:
tokens (list): List of lists containing tokens.
"""
self._tokens = np.array(tokens)
self._dictionary = dictionary or corpora.Dictionary(self.tokens)
@property
def tokens(self):
"""
np.ndarray: A list of lists containing tokens. If tokens are not yet
present, run default preprocessor and save tokens.
"""
if self._tokens is None:
self._apply_base_preprocessor()
return self._tokens
[docs] def has_tokens(self):
""" Return whether corpus is preprocessed or not. """
return self._tokens is not None
def _apply_base_preprocessor(self):
from orangecontrib.text.preprocess import base_preprocessor
base_preprocessor(self)
@property
def dictionary(self):
"""
corpora.Dictionary: A token to id mapper.
"""
if self._dictionary is None:
self._apply_base_preprocessor()
return self._dictionary
def ngrams_iterator(self, join_with=' ', include_postags=False):
if self.pos_tags is None:
include_postags = False
if include_postags:
data = zip(self.tokens, self.pos_tags)
else:
data = self.tokens
if join_with is None:
processor = lambda doc, n: nltk.ngrams(doc, n)
elif include_postags:
processor = lambda doc, n: (join_with.join(token + '_' + tag for token, tag in ngram)
for ngram in nltk.ngrams(zip(*doc), n))
else:
processor = lambda doc, n: (join_with.join(ngram) for ngram in nltk.ngrams(doc, n))
return (list(chain(*(processor(doc, n)
for n in range(self.ngram_range[0], self.ngram_range[1]+1))))
for doc in data)
@property
def ngrams_corpus(self):
if self._ngrams_corpus is None:
return BowVectorizer().transform(self).ngrams_corpus
return self._ngrams_corpus
@ngrams_corpus.setter
def ngrams_corpus(self, value):
self._ngrams_corpus = value
@property
def ngrams(self):
"""generator: Ngram representations of documents."""
return self.ngrams_iterator(join_with=' ')
[docs] def copy(self):
"""Return a copy of the table."""
c = self.__class__(self.domain, self.X.copy(), self.Y.copy(), self.metas.copy(),
self.W.copy(), copy(self.text_features))
# since tokens and dictionary are considered immutable copies are not needed
c._tokens = self._tokens
c._dictionary = self._dictionary
c.ngram_range = self.ngram_range
c.pos_tags = self.pos_tags
c.name = self.name
c.used_preprocessor = self.used_preprocessor
return c
@staticmethod
[docs] def from_documents(documents, name, attributes=None, class_vars=None, metas=None,
title_indices=None):
"""
Create corpus from documents.
Args:
documents (list): List of documents.
name (str): Name of the corpus
attributes (list): List of tuples (Variable, getter) for attributes.
class_vars (list): List of tuples (Variable, getter) for class vars.
metas (list): List of tuples (Variable, getter) for metas.
title_indices (list): List of indices into domain corresponding to features which will
be used as titles.
Returns:
Corpus.
"""
attributes = attributes or []
class_vars = class_vars or []
metas = metas or []
title_indices = title_indices or []
domain = Domain(attributes=[attr for attr, _ in attributes],
class_vars=[attr for attr, _ in class_vars],
metas=[attr for attr, _ in metas])
for ind in title_indices:
domain[ind].attributes['title'] = True
for attr in domain.attributes:
if isinstance(attr, DiscreteVariable):
attr.values = []
def to_val(attr, val):
if isinstance(attr, DiscreteVariable):
attr.val_from_str_add(val)
return attr.to_val(val)
if documents:
X = np.array([[to_val(attr, func(doc)) for attr, func in attributes]
for doc in documents])
Y = np.array([[to_val(attr, func(doc)) for attr, func in class_vars]
for doc in documents])
metas = np.array([[to_val(attr, func(doc)) for attr, func in metas]
for doc in documents], dtype=object)
else: # assure shapes match the number of columns
X = np.empty((0, len(attributes)))
Y = np.empty((0, len(class_vars)))
metas = np.empty((0, len(metas)))
corpus = Corpus(X=X, Y=Y, metas=metas, domain=domain, text_features=[])
corpus.name = name
return corpus
def __getitem__(self, key):
c = super().__getitem__(key)
Corpus.retain_preprocessing(self, c, key)
return c
@classmethod
def from_table(cls, domain, source, row_indices=...):
t = super().from_table(domain, source, row_indices)
c = Corpus(t.domain, t.X, t.Y, t.metas, t.W, ids=t.ids)
Corpus.retain_preprocessing(source, c, row_indices)
return c
@classmethod
def from_file(cls, filename):
if not os.path.exists(filename): # check the default location
abs_path = os.path.join(get_sample_corpora_dir(), filename)
if not abs_path.endswith('.tab'):
abs_path += '.tab'
if not os.path.exists(abs_path):
raise FileNotFoundError('File "{}" not found.'.format(filename))
else:
filename = abs_path
table = Table.from_file(filename)
return cls(table.domain, table.X, table.Y, table.metas, table.W)
@staticmethod
[docs] def retain_preprocessing(orig, new, key=...):
""" Set preprocessing of 'new' object to match the 'orig' object. """
if isinstance(orig, Corpus):
if orig._tokens is not None: # retain preprocessing
if isinstance(key, tuple): # get row selection
key = key[0]
if isinstance(key, Integral):
new._tokens = np.array([orig._tokens[key]])
new.pos_tags = None if orig.pos_tags is None else np.array(
[orig.pos_tags[key]])
elif isinstance(key, list) or isinstance(key, np.ndarray) or isinstance(key,
slice):
new._tokens = orig._tokens[key]
new.pos_tags = None if orig.pos_tags is None else orig.pos_tags[key]
elif key is Ellipsis:
new._tokens = orig._tokens
new.pos_tags = orig.pos_tags
else:
raise TypeError('Indexing by type {} not supported.'.format(type(key)))
new._dictionary = orig._dictionary
new.text_features = orig.text_features
new.ngram_range = orig.ngram_range
new.attributes = orig.attributes
new.used_preprocessor = orig.used_preprocessor
def __len__(self):
return len(self.metas)
def __eq__(self, other):
def arrays_equal(a, b):
if sp.issparse(a) != sp.issparse(b):
return False
elif sp.issparse(a) and sp.issparse(b):
return (a != b).nnz == 0
else:
return np.array_equal(a, b)
return (self.text_features == other.text_features and
self._dictionary == other._dictionary and
np.array_equal(self._tokens, other._tokens) and
arrays_equal(self.X, other.X) and
arrays_equal(self.Y, other.Y) and
arrays_equal(self.metas, other.metas) and
np.array_equal(self.pos_tags, other.pos_tags) and
self.domain == other.domain and
self.ngram_range == other.ngram_range)