Source code for orangecontrib.text.twitter

import logging
from functools import partial
from typing import List, Optional, Callable

import numpy as np
import tweepy
from Orange.data import (
    ContinuousVariable,
    DiscreteVariable,
    Domain,
    StringVariable,
    TimeVariable,
)
from Orange.util import dummy_callback, wrap_callback
from tweepy import TooManyRequests

from orangecontrib.text import Corpus
from orangecontrib.text.language_codes import code2lang


log = logging.getLogger(__name__)

# fmt: off
SUPPORTED_LANGUAGES = [
    "am", "ar", "bg", "bn", "bo", "ca", "ckb", "cs", "cy", "da", "de", "dv",
    "el", "en", "es", "et", "eu", "fa", "fi", "fr", "gu", "he", "hi", "hi-Latn",
    "ht", "hu", "hy", "id", "is", "it", "ja", "ka", "km", "kn", "ko", "lo",
    "lt", "lv", "ml", "mr", "my", "ne", "nl", "no", "or", "pa", "pl", "ps",
    "pt", "ro", "ru", "sd", "si", "sl", "sr", "sv", "ta", "te", "th", "tl",
    "tr", "ug", "uk", "ur", "vi", "zh"
]
# fmt: on


class NoAuthorError(ValueError):
    pass


def coordinates(tweet, _, __, dim):
    coord = tweet.geo.get("coordinates", None) if tweet.geo else None
    return coord["coordinates"][dim] if coord else None


def country_code(tweet, _, places):
    place_id = tweet.geo.get("place_id", None) if tweet.geo else None
    return places[place_id].country_code if place_id else ""


tv = TimeVariable("Date")
METAS = [
    (StringVariable("Content"), lambda doc, _, __: doc.text),
    (
        DiscreteVariable("Author"),
        lambda doc, users, _: "@" + users[doc.author_id].username,
    ),
    (tv, lambda doc, _, __: tv.parse(doc.created_at.isoformat())),
    (DiscreteVariable("Language"), lambda doc, _, __: doc.lang),
    (DiscreteVariable("Location"), country_code),
    (
        ContinuousVariable("Number of Likes", number_of_decimals=0),
        lambda doc, _, __: doc.public_metrics["like_count"],
    ),
    (
        ContinuousVariable("Number of Retweets", number_of_decimals=0),
        lambda doc, _, __: doc.public_metrics["retweet_count"],
    ),
    (
        DiscreteVariable("In Reply To"),
        lambda doc, users, _: "@" + users[doc.in_reply_to_user_id].username
        if doc.in_reply_to_user_id and doc.in_reply_to_user_id in users
        else "",
    ),
    (DiscreteVariable("Author Name"), lambda doc, users, __: users[doc.author_id].name),
    (
        StringVariable("Author Description"),
        lambda doc, users, _: users[doc.author_id].description,
    ),
    (
        ContinuousVariable("Author Tweets Count", number_of_decimals=0),
        lambda doc, users, _: users[doc.author_id].public_metrics["tweet_count"],
    ),
    (
        ContinuousVariable("Author Following Count", number_of_decimals=0),
        lambda doc, users, _: users[doc.author_id].public_metrics["following_count"],
    ),
    (
        ContinuousVariable("Author Followers Count", number_of_decimals=0),
        lambda doc, users, _: users[doc.author_id].public_metrics["followers_count"],
    ),
    (
        ContinuousVariable("Author Listed Count", number_of_decimals=0),
        lambda doc, users, _: users[doc.author_id].public_metrics["listed_count"],
    ),
    (
        DiscreteVariable("Author Verified"),
        lambda doc, users, _: str(users[doc.author_id].verified),
    ),
    (ContinuousVariable("Longitude"), partial(coordinates, dim=0)),
    (ContinuousVariable("Latitude"), partial(coordinates, dim=1)),
]
# maximum number of tweets that can be downloaded in one set of requests
# max 450requests/15min, request can contain max 100 tweets
MAX_TWEETS = 450 * 100


request_settings = {
    "tweet_fields": [
        "lang",
        "public_metrics",
        "in_reply_to_user_id",
        "author_id",
        "geo",
        "created_at",
    ],
    "user_fields": ["description", "public_metrics", "verified"],
    "place_fields": ["country_code"],
    "expansions": ["author_id", "in_reply_to_user_id", "geo.place_id"],
    "max_results": 100,
}


[docs]class TwitterAPI: """Fetch tweets from the Tweeter API. Notes: Results across multiple searches are aggregated. To remove tweets form previous searches and only return results from the last search either call `reset` method before searching or provide `collecting=False` argument to search method. """ text_features = [METAS[0][0]] # Content string_attributes = [m for m, _ in METAS if isinstance(m, StringVariable)]
[docs] def __init__(self, bearer_token): self.api = tweepy.Client(bearer_token) self.tweets = {} self.search_history = []
[docs] def search_content( self, content: List[str], *, max_tweets: Optional[int] = MAX_TWEETS, lang: Optional[str] = None, allow_retweets: bool = True, collecting: bool = False, callback: Callable = dummy_callback, ) -> Optional[Corpus]: """ Search recent tweets by content (keywords). Parameters ---------- content A list of key-words to search for. max_tweets Limits the number of downloaded tweets. If none use APIs maximum. lang A language's code (either ISO 639-1 or ISO 639-3 formats). allow_retweets Whether to download retweets. collecting Whether to collect results across multiple search calls. callback Function to report the progress Returns ------- Corpus with tweets """ if not collecting: self.reset() max_tweets = max_tweets or MAX_TWEETS def build_query(): assert len(content) > 0, "At leas one keyword required" q = " OR ".join(['"{}"'.format(q) for q in content]) if not allow_retweets: q += " -is:retweet" if lang: q += f" lang:{lang}" return q paginator = tweepy.Paginator( self.api.search_recent_tweets, build_query(), **request_settings ) count = self._fetch(paginator, max_tweets, callback=callback) self.append_history("Content", content, lang or "Any", allow_retweets, count) return self._create_corpus()
[docs] def search_authors( self, authors: List[str], *, max_tweets: Optional[int] = MAX_TWEETS, collecting: bool = False, callback: Callable = dummy_callback, ) -> Optional[Corpus]: """ Search recent tweets by authors. Parameters ---------- authors A list of authors to search for. max_tweets Limits the number of downloaded tweets. If none use APIs maximum. collecting Whether to collect results across multiple search calls. callback Function to report the progress Returns ------- Corpus with tweets """ if not collecting: self.reset() count_sum = 0 n = len(authors) for i, author in enumerate(authors): author_ = self.api.get_user(username=author) if author_.data is None: raise NoAuthorError(author) paginator = tweepy.Paginator( self.api.get_users_tweets, author_.data.id, **request_settings ) count_sum += self._fetch( paginator, max_tweets, callback=wrap_callback(callback, i / n, (i + 1) / n), ) self.append_history("Author", authors, None, None, count_sum) return self._create_corpus()
def _fetch( self, paginator: tweepy.Paginator, max_tweets: int, callback: Callable ) -> int: count = 0 try: done = False for i, response in enumerate(paginator): users = {u.id: u for u in response.includes.get("users", [])} places = {p.id: p for p in response.includes.get("places", [])} for j, tweet in enumerate(response.data or [], start=1): if tweet.id not in self.tweets: count += 1 self.tweets[tweet.id] = [f(tweet, users, places) for _, f in METAS] callback(count / max_tweets) if count >= max_tweets: done = True break if done: break except TooManyRequests: log.debug("TooManyRequests raised") return count def _create_corpus(self) -> Optional[Corpus]: if len(self.tweets) == 0: return None def to_val(attr, val): if isinstance(attr, DiscreteVariable): attr.val_from_str_add(val) return attr.to_val(val) m = [attr for attr, _ in METAS] domain = Domain(attributes=[], class_vars=[], metas=m) metas = np.array( [ [to_val(attr, t) for (attr, _), t in zip(METAS, ts)] for ts in self.tweets.values() ], dtype=object, ) x = np.empty((len(metas), 0)) return Corpus.from_numpy(domain, x, metas=metas, text_features=self.text_features) def append_history( self, mode: str, query: List[str], lang: Optional[str], allow_retweets: Optional[bool], n_tweets: int, ): lang = code2lang.get(lang, lang) self.search_history.append( ( ("Query", query), ("Search by", mode), ("Language", lang), ("Allow retweets", str(allow_retweets)), ("Tweets count", n_tweets), ) )
[docs] def reset(self): """Removes all downloaded tweets.""" self.tweets = {} self.search_history = []