Source code for orangecontrib.text.twitter
from collections import OrderedDict, Iterable
import tweepy
from Orange import data
from orangecontrib.text import Corpus
from orangecontrib.text.language_codes import code2lang
__all__ = ['Credentials', 'TwitterAPI']
def coordinates_geoJSON(json):
if json:
return json.get('coordinates', [None, None])
return [None, None]
[docs]class Credentials:
""" Twitter API credentials. """
def __init__(self, consumer_key, consumer_secret):
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
self._valid = None
@property
def valid(self):
if self._valid is None:
self.check()
return self._valid
def check(self):
try:
self.auth.get_authorization_url()
self._valid = True
except tweepy.TweepError:
self._valid = False
return self._valid
def __getstate__(self):
odict = self.__dict__.copy()
odict['_valid'] = None
odict.pop('auth')
return odict
def __setstate__(self, odict):
self.__dict__.update(odict)
self.auth = tweepy.OAuthHandler(self.consumer_key,
self.consumer_secret)
def __eq__(self, other):
return isinstance(other, Credentials) \
and self.consumer_key == other.consumer_key \
and self.consumer_secret == other.consumer_secret
[docs]class TwitterAPI:
""" Fetch tweets from the Tweeter API.
Notes:
Results across multiple searches are aggregated. To remove tweets form
previous searches and only return results from the last search either
call `reset` method before searching or provide `collecting=False`
argument to search method.
"""
attributes = []
class_vars = [
(data.DiscreteVariable('Author'), lambda doc: '@' + doc.author.screen_name),
]
tv = data.TimeVariable('Date')
metas = [
(data.StringVariable('Content'), lambda doc: doc.text),
(tv, lambda doc: TwitterAPI.tv.parse(doc.created_at.isoformat())),
(data.DiscreteVariable('Language'), lambda doc: doc.lang),
(data.DiscreteVariable('Location'), lambda doc: getattr(doc.place, 'country_code', None)),
(data.ContinuousVariable('Number of Likes'), lambda doc: doc.favorite_count),
(data.ContinuousVariable('Number of Retweets'), lambda doc: doc.retweet_count),
(data.DiscreteVariable('In Reply To'),
lambda doc: '@' + doc.in_reply_to_screen_name if doc.in_reply_to_screen_name else ''),
(data.DiscreteVariable('Author Name'), lambda doc: doc.author.name),
(data.StringVariable('Author Description'), lambda doc: doc.author.description),
(data.ContinuousVariable('Author Statuses Count'), lambda doc: doc.author.statuses_count),
(data.ContinuousVariable('Author Favourites Count'), lambda doc: doc.author.favourites_count),
(data.ContinuousVariable('Author Friends Count'), lambda doc: doc.author.friends_count),
(data.ContinuousVariable('Author Followers Count'), lambda doc: doc.author.followers_count),
(data.ContinuousVariable('Author Listed Count'), lambda doc: doc.author.listed_count),
(data.DiscreteVariable('Author Verified'), lambda doc: str(doc.author.verified)),
(data.ContinuousVariable('Longitude'),
lambda doc: coordinates_geoJSON(doc.coordinates)[0]),
(data.ContinuousVariable('Latitude'),
lambda doc: coordinates_geoJSON(doc.coordinates)[1]),
]
text_features = [metas[0][0]] # Content
string_attributes = [m for m, _ in metas
if isinstance(m, data.StringVariable)]
def __init__(self, credentials,
on_progress=None, should_break=None,
on_error=None, on_rate_limit=None):
self.key = credentials
self.api = tweepy.API(credentials.auth)
self.container = OrderedDict()
self.search_history = []
# Callbacks:
self.on_error = on_error
self.on_rate_limit = on_rate_limit
self.on_progress = on_progress or (lambda *args: args)
self.should_break = should_break or (lambda *args: False)
@property
def tweets(self):
return self.container.values()
[docs] def search_content(self, content, *, max_tweets=0,
lang=None, allow_retweets=True,
collecting=False):
""" Search by content.
Args:
content (list of str): A list of key words to search for.
max_tweets (int): If greater than zero limits the number of
downloaded tweets.
lang (str): A language's code (either ISO 639-1 or ISO 639-3
formats).
allow_retweets(bool): Whether to download retweets.
collecting (bool): Whether to collect results across multiple
search calls.
Returns:
Corpus
"""
if not collecting:
self.reset()
if max_tweets == 0:
max_tweets = float('Inf')
def build_query():
nonlocal content
if not content:
q = 'from: '
else:
if not isinstance(content, list):
content = [content]
q = ' OR '.join(['"{}"'.format(q) for q in content])
if not allow_retweets:
q += ' -filter:retweets'
return q
query = build_query()
cursor = tweepy.Cursor(self.api.search, q=query, lang=lang)
corpus, count = self.fetch(cursor, max_tweets)
self.append_history('Content', content, lang if lang else 'Any',
str(allow_retweets), count)
return corpus
[docs] def search_authors(self, authors, *, max_tweets=0, collecting=False):
""" Search by authors.
Args:
authors (list of str): A list of authors to search for.
max_tweets (int): If greater than zero limits the number of
downloaded tweets.
collecting (bool): Whether to collect results across multiple
search calls.
Returns:
Corpus
"""
if not collecting:
self.reset()
if max_tweets == 0: # set to max allowed for progress
max_tweets = 3200
if not isinstance(authors, list):
authors = [authors]
cursors = [tweepy.Cursor(self.api.user_timeline, screen_name=a)
for a in authors]
corpus, count = self.fetch(cursors, max_tweets)
self.append_history('Author', authors, None, None, count)
return corpus
def fetch(self, cursors, max_tweets):
if not isinstance(cursors, list):
cursors = [cursors]
count = 0
try:
for i, cursor in enumerate(cursors):
for j, tweet in enumerate(cursor.items(max_tweets), start=1):
if self.should_break():
break
if tweet.id not in self.container:
count += 1
self.container[tweet.id] = tweet
if j % 20 == 0:
self.on_progress(len(self.container),
(i*max_tweets + j)/
(len(cursors)*max_tweets))
if self.should_break():
break
except tweepy.TweepError as e:
if e.response.status_code == 429 and self.on_rate_limit:
self.on_rate_limit()
elif self.on_error:
self.on_error(str(e))
return None, 0
return self.create_corpus(), count
def create_corpus(self):
return Corpus.from_documents(self.tweets, 'Twitter', self.attributes,
self.class_vars, self.metas,
title_indices=[-1])
[docs] def reset(self):
""" Removes all downloaded tweets. """
self.search_history = []
self.container = OrderedDict()
def append_history(self, mode, query, lang, allow_retweets, n_tweets):
query = ', '.join(query) if isinstance(query, Iterable) else query
if lang in code2lang.keys():
lang = code2lang[lang]
self.search_history.append((
('Query', query),
('Search by', mode),
('Language', lang),
('Allow retweets', allow_retweets),
('Tweets count', n_tweets),
))
def report(self):
return self.search_history