` Code Snippets - twitfetcher - a simple commandline tool for searching for and fetching tweets

A commandline tool for searching for and fetching tweets

This snippet, twitfetcher, is a simple command line tool, written in python3, for retrieving tweets via the twitter search API, v1.1. The tweets can be stored into CSV or JSON formatted files.

Twitter only makes a sample of those tweets sent over the previous week searchable, but it is still a very useful free source of data for data science experiments.

BTW, for downloading tweets using the twitter streaming APIs see the twitstreamer snippet.

To use twitfetcher, start by downloading the twitfetcher code.

To use twitfetcher, you will need Python 3.2 or later installed. You'll also need to log into your twitter account to obtain consumer and access public keys and secret keys and edit the twitfetcher.py script to include them. These keys will authorize twitfetcher.py to fetch data from python. Be careful not to disclose your secret keys by giving your modifed copy of twitfetcher.py to others. Obtain the keys and add them to the twitfetcher script as follows:

  1. Open the twitfetcher.py script in your favourite text editor
  2. Log into your twitter account and go to your apps page. Here you will need to create a new application. You can enter a name, description and your website address (and read and agree to the terms and conditions). Once you have created the application, you can copy the generated consumer key and consumer secret to set the consumer_key and consumer_secret variables at the start of twitfetcher.py
  3. Now create an access token as explained in this page and copy the values for access token and access token secret to set the access_key and access_secret variables at the start of twitfetcher.py

Invoke it as follows:

python3 twitfetcher.py [options] <search-string>  [<output-path>] 
		

The twitfetcher program will output some status messages as it runs, for example:

python3 twitfetcher.py "irish AND whisky" irish_and_whisky.csv
2012-11-13 21:38:35,477 - twitfetch - INFO - no previous search results found, starting new search
2012-11-13 21:38:35,478 - twitfetch - INFO - search range approximately 7.20 days
2012-11-13 21:38:40,398 - twitfetch - INFO - fetched 100 tweets, estimated progress : 56%
2012-11-13 21:38:42,956 - twitfetch - INFO - fetched 200 tweets, estimated progress : 97%
2012-11-13 21:38:44,864 - twitfetch - INFO - fetched 224 tweets, estimated progress : 100%
2012-11-13 21:38:46,575 - twitfetch - INFO - terminating search
2012-11-13 21:38:46,576 - twitfetch - INFO - saved 224 new tweets
2012-11-13 21:38:46,577 - twitfetch - INFO - written output to irish_and_whisky.csv with total 224 tweets
        

The main code in the program is a class called twitfetch. This class handles querying the twitter API, analyzing the responses, and merging the retrieved results with any existing saved data. twitfetcher will avoid retrieve tweets already saved to the output file in an earlier invocation. Because twitter only allows searching over the last 7 days, run this script repeatedly to accumulate data over a longer period (but be sure to run it at least every 7 days).

One interesting challenge was to be able to use OAuth authorization (which is required by v1.1 of the twitter API). The earlier version (v1.0) did not require any authentication but is being phased out. Luckily there is a way to bypass the full "3-legged" OAuth authorization protocol, as is explained in the twitter API docs. Encoding the consumer and access keys into an OAuth header is still quite a fiddly process, see the generate_oauth_authorization method of twitfetch for more details.

The format used to store tweets is pluggable and json and csv formatters are provided. The jsonformatter class stores each tweet as a JSON object stored in a separate line in the output file. This format is convenient if you plan to read the tweets using a custom application.

The csvformatter class is more interesting, I used the handy built in csv module in python. Use csv format if you want to consume the output in a pre-packaged application. I used open office to test importing saved tweets from csv into a spreadsheet.

twitfetcher.py
#!/usr/bin/python3

# Copyright (C) 2012 Niall McCarroll
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, 
# distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the
# following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

# twitfetcher.py
# a very simple tool for searching for and fetching tweets
# run python3 twitfetcher.py -h for help

from urllib.request import urlopen, HTTPError, Request
from urllib.parse import quote
import urllib
import time
import json
import datetime
import time
import os
from os.path import exists
import sys
import logging
import random
import hashlib
import hmac
from hashlib import sha1
import base64
import argparse
import csv

# you must fill in the following values before you can run this script!
consumer_key = ""
consumer_secret = ""

access_token = ""
access_secret = ""


# formatter class for storing tweets as CSV rows
class csvformatter(object):

    csv.register_dialect('quotedcsv', delimiter=',', quoting=csv.QUOTE_ALL)

    def __init__(self,filename,columns):
        self.filename = filename
        self.columns = columns
        self.writer = None
        self.reader = None
        self.file = None

    def openForWrite(self):
        self.file = open(self.filename,"w",encoding="utf-8")
        self.writer = csv.writer(self.file,dialect="quotedcsv")
        self.writer.writerow([col[0] for col in self.columns])

    def openForAppend(self):
        self.file = open(self.filename,"a",encoding="utf-8")
        self.writer = csv.writer(self.file,dialect="quotedcsv")

    def openForRead(self):
        self.file = open(self.filename,"r",encoding="utf-8")
        self.reader = csv.reader(self.file,dialect="quotedcsv")
        next(self.reader) # skip headers

    def write(self,obj):
        row = []
        for (col,decoder) in self.columns:
            if col in obj:
                row.append(obj[col])
            else:
                row.append("")
        self.writer.writerow(row)
        
    def read(self):
        try:
            obj = {}
            readrow = next(self.reader)
            index = 0
            for (col,decoder) in self.columns:
                obj[col] = readrow[index]
                index += 1
            return obj
        except Exception as ex:
            logging.getLogger("twitfetch").error(str(ex))
            return None

    def close(self):
        if self.file:
            self.file.close()
            self.file = None
  
# formatter class for storing tweets as JSON objects
class jsonformatter(object):

    def __init__(self,filename,columns):
        self.filename = filename
        self.file = None

    def openForWrite(self):
        self.file = open(self.filename,"w",encoding="utf-8")
        
    def openForAppend(self):
        self.file = open(self.filename,"a",encoding="utf-8")
        
    def openForRead(self):
        self.file = open(self.filename,"r",encoding="utf-8")
       
    def write(self,obj):
        s = json.dumps(obj)
        self.file.write(s+"\n")
        
    def read(self):
        try:
            line = self.file.readline().strip()
            return json.loads(line)
        except:
            return None
    
    def close(self):
        if self.file:
            self.file.close()
            self.file = None

# utility class for fetching tweets given a search term
class twitfetch(object):

    # twitter search API returns tweets from slightly longer than the last 7 days
    MAX_SEARCH_RANGE_DAYS = 7.2

    def __init__(self,search_term,filename,options):
        self.options = options
        self.search_term = search_term
        self.filename = filename
        self.created_at_lower = datetime.datetime.utcnow() - datetime.timedelta(int(twitfetch.MAX_SEARCH_RANGE_DAYS),int(86400*twitfetch.MAX_SEARCH_RANGE_DAYS)%86400,0)
        self.created_at_upper = None
        self.search_range = twitfetch.MAX_SEARCH_RANGE_DAYS

        def date_decoder(s):
            return time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(s,'%a %b %d %H:%M:%S +0000 %Y'))

        def geo_decoder(g,index):
            try:
                return str(g["geo"]["coordinates"][index])
            except:
                return ""

        # define which columns to create from each tweet object 
        # as a list of column name, extractor-function pairs
        # an extractor function extracts the value of the column from
        # the tweet object
        # if extractor-function is set to None, the column name 
        # is used as the lookup key in the tweet object
        self.columns = [("id",None), 
                ("created_at",lambda x: date_decoder(x["created_at"])), 
                ("geo_lat",lambda x: geo_decoder(x,0)), 
                ("geo_lon",lambda x: geo_decoder(x,1)), 
                ("from_user_name",lambda x:x["user"]["name"]), 
                ("from_user_screen_name",lambda x:x["user"]["screen_name"]), 
                ("iso_language_code",lambda x: x["metadata"]["iso_language_code"]), 
                ("text",None)]

    # create and return a formatter object
    def createFormatter(self,filename,columns):
        if self.options.format == "json":
            return jsonformatter(filename,columns)
        else:
            return csvformatter(filename,columns)

    # generate a nonce used in the OAuth process
    # see http://blog.andydenmark.com/2009/03/how-to-build-oauth-consumer.html
    def generate_nonce(self):
        random_number = ''.join(str(random.randint(0, 9)) for i in range(40))
        m = hashlib.md5((str(time.time()) + str(random_number)).encode())
        return m.hexdigest()

    # generate an OAuth Authorization header to add to each request
    # see https://dev.twitter.com/docs/auth/authorizing-request
    def generate_authorization_header(self,method,url,query_parameters):
        nonce = self.generate_nonce()
        s = ""
        params = {}
        for key in query_parameters.keys():
            params[key] = query_parameters[key]
        
        params["oauth_nonce"] = nonce
        params["oauth_consumer_key"] = consumer_key
        params["oauth_token"] = access_token
        params["oauth_signature_method"] = "HMAC-SHA1"
        params["oauth_version"] = "1.0"
        params["oauth_timestamp"] = str(int(time.time()))

        sortkeys = [k for k in params.keys()]
        sortkeys.sort()
        for k in sortkeys:
            if s != "":
                s += "&"
            s += quote(k,'')
            s += '='
            s += quote(params[k],'')

        base_string = quote(method,'')+"&"+quote(url,'')+"&"+quote(s,'')
     
        signing_key = consumer_secret+"&"+access_secret

        tok = base64.standard_b64encode(hmac.new(signing_key.encode(),base_string.encode(),sha1).digest()).decode('ascii')
       
        params["oauth_signature"] = tok

        auth_header = "OAuth "
        auth_keys = [k for k in params.keys()]
        auth_keys.sort()
        first = True
        for k in auth_keys:            
            if k.startswith("oauth"):
                if not first:
                    auth_header += ", "
                auth_header += k
                auth_header += '="'
                auth_header += quote(params[k])
                auth_header += '"'
                first = False
        return auth_header

    # report progress with estimate of % completion
    def reportProgress(self,count):          
        if self.options.maxtweets:
            alt_pct_complete = 100.0 * count / self.options.maxtweets
            if alt_pct_complete > est_pct_complete:
                est_pct_complete = alt_pct_complete
        else:
            # estimate completion based on the timestamp of the most recently collected tweet
            delta = self.created_at_upper - self.created_at_lower
            delta_days = delta.days + delta.seconds / 86400
            est_pct_complete = 100-int(100*delta_days/self.search_range)
        if est_pct_complete > 100.0:
            est_pct_complete = 100.0
        logging.getLogger("twitfetch").info("fetched %d tweets, estimated progress : %02d%%"%(count,est_pct_complete))        

    # fetch a url using the specified authorization header
    def fetchurl(self,url,auth_header):
        req = Request(url,None,{'User-agent':'Mozilla/5.0','Authorization':auth_header})
        f = urlopen(req)
        return f.read()

    # call the twitter search API to fetch tweets matching search term
    # if minid is specified, search only for tweets newer than minid
    # if maxid is specified, search only for tweets older than maxid
    # return the JSON object obtained from the twitter API response
    def fetch(self,term,minid,maxid):
        time.sleep(1)
        url = "https://api.twitter.com/1.1/search/tweets.json"
        query = {}
    
        query["q"] = term
        query["count"] ="100"
        query["result_type"]="recent"
        if maxid:
            query["max_id"]=str(maxid)
        if minid:
            query["since_id"]=str(minid)
        if self.options.geocode:
            query["geocode"]=str(self.options.geocode)
        if self.options.lang:
            query["lang"]=str(self.options.lang)

        url_query = url
        sep = "?"
        for key in query.keys():
            url_query+=sep
            sep="&"
            url_query+=key
            url_query+="="
            url_query+=quote(query[key])

        logging.getLogger("twitfetch").debug("fetching:"+url_query)
        results = ''
        backoff = 16
        while backoff < 3600:
            p1 = None
            try:
                auth_header = self.generate_authorization_header("GET",url,query)
                p1 = self.fetchurl(url_query,auth_header).decode("utf-8")
                return json.loads(p1)
            except HTTPError as error:
                contents = error.read()
                logging.getLogger("twitfetch").error(str(error.code)+","+str(contents))
            except Exception as error:
                logging.getLogger("twitfetch").error(str(error))
                
            logging.getLogger("twitfetch").info("re-trying after "+str(backoff)+" seconds")            
            time.sleep(backoff)
            backoff = int(backoff*1.2)

        logging.getLogger("twitfetch").error("giving up on fetching url:"+url)

        return results
    
    # find all tweets newer than min_id (if specified)
    # mode is the mode    
    def find(self,min_id,mode,filename):
        count = 0
        formatter = self.createFormatter(filename,self.columns)
        formatter.openForWrite()
        try:    
            max_id = None
            while True:
                obj = self.fetch(self.search_term,min_id,max_id)
                terminate = True
               
                for r in obj["statuses"]:
                    # results should be ordered by id, descending
                    if max_id == None or r["id"] < max_id:
                        terminate = False
                        count += 1
                        max_id = r["id"]-1
                        obj = {}
                        for (col,decoder) in self.columns:
                            try:
                                if decoder:
                                    obj[col] = decoder(r)
                                elif col in r:
                                    obj[col] = str(r[col])
                            except:
                                obj[col] = None    
                        formatter.write(obj)

                        # record the creation date of the oldest tweet retrieved so far
                        self.created_at_upper = datetime.datetime.strptime(obj["created_at"],"%Y-%m-%d %H:%M:%S")
                    
                    if self.options.maxtweets and count >= self.options.maxtweets:
                        terminate = True
                        break

                if terminate:
                    logging.getLogger("twitfetch").info("terminating search")
                    formatter.close()
                    return count
                else:
                    self.reportProgress(count)
        except KeyboardInterrupt:
            formatter.close()
            logging.getLogger("twitfetch").info("interrupting search")
            return count
            
    # search for tweets using the search term 
    def search(self):

        filename_bak = self.filename+"_bak"   # backup file for previous collected data
        filename_new = self.filename+"_new"   # file to write newly collected tweets

        min_id = None  # set the cut-off point, when set, fetch only tweets newer than this

        # if older tweets have already been collected, get the id of the most recent
        if exists(self.filename):
            formatter = self.createFormatter(self.filename,self.columns)
            formatter.openForRead()
            obj = formatter.read() # get the row corresponding to the most recent tweet collected
            if obj:
                # get the id of the most recent tweet already collected
                min_id = obj["id"]
                created_at = obj["created_at"]
                # get the creation date of the most recent tweet already collected
                self.created_at_lower = datetime.datetime.strptime(created_at,"%Y-%m-%d %H:%M:%S")
                # work out the search range (in days) over which new tweets will be collected
                delta = datetime.datetime.utcnow() - self.created_at_lower
                self.search_range = delta.days + delta.seconds / 86400
                
                logging.getLogger("twitfetch").info("found previous search results, continue from id > "+ str(min_id) + " (newer than: "+ created_at + ")" ) 
                if self.search_range > twitfetch.MAX_SEARCH_RANGE_DAYS:
                    logging.getLogger("twitfetch").warn("there may be a gap between previously collected tweets and newly collected tweets")
            formatter.close()
            
        if not min_id:
            logging.getLogger("twitfetch").info("no previous search results found, starting new search") 
                      
        logging.getLogger("twitfetch").info("search range approximately %2.2f days"%(self.search_range))

        # find tweets and store them in the new file
        count = self.find(min_id,"w",filename_new)
        logging.getLogger("twitfetch").info("saved "+str(count)+" new tweets")

        # if older tweets have already been collected, append them to the new file
        if exists(self.filename):   
            wformatter = self.createFormatter(filename_new,self.columns)
            wformatter.openForAppend()
            rformatter = self.createFormatter(self.filename,self.columns)
            rformatter.openForRead()
            while True:
                obj = rformatter.read()
                if obj == None:
                    break
                wformatter.write(obj)
                count += 1
            wformatter.close()
            rformatter.close()
            os.rename(self.filename,filename_bak)

        # rename the new file to the result file
        os.rename(filename_new,self.filename)   
        logging.getLogger("twitfetch").info("written output to "+self.filename+" with total "+str(count)+" tweets")


if __name__ == '__main__':

    parser = argparse.ArgumentParser(description="search for and download tweets", usage="python3 twitfetcher.py [options] search_query [filename]")
    parser.add_argument('query_string', metavar='query_string', type=str, help='the query string, see https://dev.twitter.com/docs/using-search')
    parser.add_argument('filename', metavar='filename', type=str, nargs='?', help='the name of the output file (if omitted, uses the query_string + .csv or .json as the file name)')
    parser.add_argument("-v","--verbose",dest="verbose",action="store_true",help="display verbose messages")
    parser.add_argument("-l","--lang",dest="lang",type=str,help="supply language filter as ISO 639-1 code",default="")
    parser.add_argument("-g","--geocode",dest="geocode",type=str,help="supply geocode filter in form lat,lon,dist (example for 10 mile radius of Moscow: -g=55.7517,37.6178,10mi)",default="")
    parser.add_argument("-f","--format",dest="format",type=str,help="supply format to save tweets as json or csv",choices=["csv","json"], default="csv")
    parser.add_argument("-m","--max",dest="maxtweets",type=int,help="limit the number of tweets retrieved to the specified number")
    
    options = parser.parse_args()
    
    if consumer_key == "" or consumer_secret == "" or access_token == "" or access_secret == "":
        print("Error - please define non-empty strings the variables consumer_key,consumer_secret,access_token and access_secret at the start of this program")
        sys.exit(-1)

    search_term = options.query_string
    if options.filename:
        filename = options.filename
    else:
        filename = search_term + "." + options.format
    if options.verbose:
        logging.getLogger("twitfetch").setLevel(level=logging.DEBUG)
    else:
        logging.getLogger("twitfetch").setLevel(level=logging.INFO)
    
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
    logging.getLogger("twitfetch").addHandler(handler)

    tw = twitfetch(search_term,filename,options)
    tw.search()

 

Leave a comment

Anti-Spam Check
Comment