` Code Snippets - A local file cache for Amazon S3

An extensible Queue-based web-crawler

Web crawlers (sometimes called spiders or robots) are applications which can analyze collections of web resources which are linked together from web page. Resources may be images, stylesheets or web pages.

This snippet, minicrawler, illustrates how to create a simple but extensible web crawler in python. The crawler is based on python's multithread-capable queue module, and uses multiple threads to speed up the crawling process (if only one thread was used, it would likely be blocked for much of the time waiting for a web resource to be downloaded).

Web crawlers are useful for collecting and analyzing information stored in web pages, searching for missing links, or creating offline copies of web sites. However, when running web crawlers, be careful not to overload the target web site with requests.

To use minicrawler, you will need Python 2.6 or later (including Python 3.x). Invoke it as follows passing the URL of the web page to start crawling from and the location of the directory in which to store the downloaded files (an optional extra argument specifies the log level):

python|python3 minicrawler.py <url> <download-directory> [debug|info|warning|error] 
		

The minicrawler program will output some summary statistics on completion, for example:

python3 minicrawler.py http://www.mccarroll.net/index.html /tmp/download 
queue consumer completed (stats= {'Succeeded Jobs': 31, 'Failed Jobs': 4})
queue consumer completed (stats= {'Succeeded Jobs': 32, 'Failed Jobs': 6})
queue consumer completed (stats= {'Succeeded Jobs': 32, 'Failed Jobs': 5})
queue completed (stats= {'Maximum Queue Size': 59, '# Consumer Threads': 3, 'Elapsed Time (secs)': 9})
        

The program is based on generic Job and JobQueue classes. Job defines a basic work unit, and should be sub-classed to implement a run method. Jobs also have a unique key and a set of actions that are invoked after the main part of the job has executed successfully.

job.py
import logging

class Job(object):

    """define a generic job for use in the JobQueue.  A job is characterised by an optional key 
       which must be unique within a JobQueue, and an optional series of actions which must be 
       executed after the job has run successfully

       this class is designed to subclassed, the subclass should override the run method"""

    def __init__(self,key,actions):
        self.key = key
        self.actions = actions
        
    def getKey(self):
        return self.key

    def execute(self):
        success = False
        try:
            success = self.run()
        except:
            pass

        logger = logging.getLogger("job")
        if success:
            logger.info("OK: "+ self.getKey())
        else:
            logger.warning("FAILED: " + self.getKey())

        if success:
            for action in self.actions:
                try:
                    action.run(self)
                except:
                    pass
        return success
   
    def run(self):
        raise Exception("Error - Job.run should be overridden in a subclass and return True iff the job succeeded, False otherwise")

Job is then sub-classed by FetchJob, whose key value is the URL to be fetched, and whose run method attempts to retrieve the content of the URL.

fetchjob.py
try:
    # python 2.X
    import urllib2
    import urlparse
except:
    # python 3
    import urllib.request as urllib2
    import urllib.parse as urlparse

import sys

from job import Job

class FetchJob(Job):

    """sub-class Job to define a job which download content for a URL"""

    def __init__(self,link_url,parent_url,actions):
        self.parent_url = parent_url
        self.link_url = link_url
        url = urlparse.urljoin(self.parent_url,self.link_url)
        Job.__init__(self,url,actions)
        self.content = ""
        self.error = ""   
     
    def getUrl(self):
        return self.getKey()        

    def run(self):
        try:
            opener = urllib2.build_opener()
            opener.addheaders = [('User-agent', 'Mozilla/5.0')]
            infile = opener.open(self.getUrl())
            # FIXME check about getting the encoding right here
            self.content = str(infile.read())
            return True
        except Exception:
            _, ex, _ = sys.exc_info()
            self.error = str(ex)
            return False          
   
    def getContent(self):
        return self.content

    def getError(self):
        return self.error

The generic JobQueue class is based on Python's useful Queue module which implements a thread-safe queue with support for multiple producer and consumer threads. JobQueue adds some extra functionality, ensuring that a job with a given key value can only be submitted once, and implementing the consumer threads which will execute the jobs.

jobqueue.py
try:
    from Queue import Queue
except:
    from queue import Queue
import threading
import time

class JobQueue(object):

    """implement a generic job queue with multiple consumer threads"""

    def __init__(self,num_consumer_threads):
        self.queue = Queue()
        self.keys = set()
        self.lock = threading.Lock()
        self.num_consumer_threads = num_consumer_threads
        self.max_size = 0 # track the maximum number of jobs in the queue
        self.timer = 0

    def add(self,job):
        jobKey = job.getKey()
        if jobKey:        
            with self.lock:
                # first check that a job with the same key has not already been added
                if not jobKey in self.keys:
                    self.keys.add(jobKey)
                    # track the maximum queue size
                    qs = self.queue.qsize()+1
                    if qs > self.max_size:
                        self.max_size = qs
                else:
                    return
        self.queue.put(job)

    def run(self):        
     
        self.timer = time.time()

        class consumer(threading.Thread):

            """ define a consumer that reads jobs from the queue and executes them"""

            def __init__(self,queue):
                threading.Thread.__init__(self)
                self.queue = queue
                self.daemon = True
                self.successcount = 0
                self.failcount = 0
                
            def run(self):
                running = True
                while running:
                    job = self.queue.get()
                    if job:
                        if job.execute():
                            self.successcount += 1
                        else:
                            self.failcount += 1
                    else:
                        running = False
                    self.queue.task_done()
                
            def getStats(self):
                return { "Succeeded Jobs":self.successcount, "Failed Jobs":self.failcount }
        
        # create and start consumer threads

        consumers = []
        for i in range(self.num_consumer_threads):
            c = consumer(self.queue)
            c.start()
            consumers.append(c)

        # wait for all jobs to complete

        self.queue.join()

        # to trigger the consumers to quit send an end of queue marker (None) for each to pick up
        # based on example at http://effbot.org/librarybook/queue.htm        

        for consumer in consumers:
            self.queue.put(None)

        # wait for consumers to consume the jobs

        self.queue.join()

        # print stats from the run
        
        for consumer in consumers:
            consumer.join()
            print("queue consumer completed (stats= "+str(consumer.getStats())+")") 

        self.timer = time.time() - self.timer   
        
        print("queue completed (stats= "+str(self.getStats())+")")

    def getStats(self):
        return { 
            "Maximum Queue Size":self.max_size, 
            "# Consumer Threads":self.num_consumer_threads, 
            "Elapsed Time (secs)":int(self.timer) }

The MiniCrawler class sub-classes JobQueue to add a schedule method which constructs FetchJob instances given a link to a URL and (optionally) the URL of the resource which contained the link. The minicrawler module also contains code to invoke the crawling process by reading the command line arguments, and configuring and running a MiniCrawler instance to start the crawling by scheduling the first URL.

minicrawler.py
import logging
from jobqueue import JobQueue
from fetchjob import FetchJob
from htmlparse import HtmlParseAction
from save2disk import Save2DiskAction

class MiniCrawler(JobQueue):

    def __init__(self,download_dir,num_consumer_threads=3):
        JobQueue.__init__(self,num_consumer_threads)
        self.download_dir = download_dir

    def schedule(self,link_url,parent_url=""):
        actions = [Save2DiskAction(self.download_dir)]
        if link_url.endswith(".html"):
            actions.append(HtmlParseAction(self))
        self.add(FetchJob(link_url,parent_url,actions))

if __name__ == '__main__':
    import sys
    download_url = sys.argv[1]
    download_dir = sys.argv[2]
    loglevel = ''
    if len(sys.argv)>3:
        loglevel = sys.argv[3]
    logger = logging.getLogger("job")
    handler = logging.StreamHandler(sys.stdout)
    logger.addHandler(handler)
    handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
    if loglevel == 'debug':
        logger.setLevel(logging.DEBUG)
    elif loglevel == 'info':
        logger.setLevel(logging.INFO)
    elif loglevel == 'warning':
        logger.setLevel(logging.WARNING)
    else:
        logger.setLevel(logging.ERROR)
    q = MiniCrawler(download_dir)
    q.schedule(download_url)
    q.run()

MiniCrawler uses two different actions when scheduling each new FetchJob. The first action is defined by the Save2Disk class, and is invoked after every successful job execution to save the contents to the local filesystem.

save2disk.py
try:
    # python 2.X
    import urlparse
except:
    # python 3
    import urllib.parse as urlparse

import os
import os.path

class Save2DiskAction(object):

    """action to run on a FetchJob after it has successfully 
       downloaded content - save the content to disk"""

    def __init__(self,download_dir):
        self.download_dir = download_dir

    def run(self,job):
        content = job.getContent()
        url = job.getUrl()
        (_,_,fpath,_,_,_) = urlparse.urlparse(url)
        if fpath == "":
            return
        if fpath.startswith("/"):
            fpath = fpath[1:]
        fpath = os.path.join(self.download_dir,fpath)
        fdirname = os.path.split(fpath)[0]
        if fdirname:        
            try:
                os.makedirs(fdirname)
            except:
                pass
        if os.path.splitext(fpath)[1] == "":
            fpath = os.path.join(fpath,"index.html")
        f = open(fpath,"w")
        f.write(job.getContent())
        f.close()

The second action is defined by the HtmlParse class. This is invoked when an HTML page is successfully retrieved, and attempts to extract links from the page and schedule new FetchJob tasks.

htmlparse.py
try:
    # python 2.X
    import urlparse
except:
    # python 3
    import urllib.parse as urlparse

import re
from os.path import splitext

href_pattern = re.compile(r'(?:src|href)="([^"]*)"', re.I | re.S)

class HtmlParseAction(object):

    """define an action to run on a successful job - parse the downloaded content, extract any links and queue
       further downloads"""

    def __init__(self,fetchqueue):
        self.fetchqueue = fetchqueue

    def run(self,job):
        content = job.getContent()
        url = job.getUrl()
        (_,snetloc,_,_,_,_) = urlparse.urlparse(url)
        for link in self.getLinks(content):  
            (uscheme,unetloc,upath,_,_,_) = urlparse.urlparse(link)
            if unetloc == '' or unetloc == snetloc:
                linkurl = urlparse.urlunparse((uscheme,unetloc,upath,"","",""))
                self.fetchqueue.schedule(linkurl,job.getUrl())
        
    def getLinks(self,content):
        pos = 0
        urls = []        
        matched = href_pattern.search(content[pos:])       
        while matched:
            start = pos+matched.start(1)
            pos = pos+matched.end(1)
            url = matched.group(1)
            urls.append(url)
            matched = href_pattern.search(content[pos:]) 
        return urls
            

minicrawler is a basic program which is likely to need to be adapted and extended for real world tasks. Notable current limitations include:


 

Leave a comment

Anti-Spam Check
Comment