crawler-cluster

Distributed, Fault-Tolerant Web Crawling.

Multi-process, multiple workers

  1. Client process queues tasks in Redis.
  2. Worker nodes pull tasks from Redis, execute task, and store results in Redis.
  3. Client process pulls results from Redis.

Pros:

  • Worker nodes can run on any machine.
  • Add or remove worker nodes at runtime without disrupting the system.
  • Achieves fault-tolerance through process isolation and monitoring.
    • Workers are ran as systemd services, where each service is the smallest possible processing unit (either a single browser with a single page, or a single vanilla HTTP client).
      Browsers with only a single page (single tab open) are less prone to crashes and there's also no disadvantage in terms of system resource usage as running n single-page browsers simultaneously will use almost identical resources as a single browser running with n pages.
    • Each service is monitored for errors and performance degradation and is automatically restarted by systemd when necessary.

Cons:

  • Requires Redis to be installed and accessible.

Single process, multiple workers

  1. Client process launches multiple browsers.
  2. Client creates tasks, which get executed by the next-available browser.

Pros:

  • Easier setup and configuration.

Cons:

  • All browsers must run on the same machine.
  • Single point of failure. All browser are launched and managed by a single Python process. Browsers are monitored for errors and crashes and are replaced when needed, but there are can still be unhandled exceptions in dependant code (i.e. Pyppeteer, WebSockets, Chrome DevTools) that cause the Python interpreter to crash or hang.

Note: The single process, multiple workers model is only useful when scraping with web browsers. A single process running an asynchronous vanilla HTTP client will likely be enough to saturate your network's bandwidth capabilities, so there's no pont in having multiple processes on the same machine/local network.

Install

pip install pyppeteer_cluster (This must be installed on all machines in a distributed cluster)

Quick Start

Multi-process distributed workers with Redis

On all machines, set environmental variables REDIS_ADDR (address of your Redis server) and optionally REDIS_PASS (password to your Redis server, if applicable): export REDIS_ADDR="redis://123.45.67/0".
The address format is redis://{server IP}/{database number}. A default address of redis://localhost/0 will be used if REDIS_ADDR is not set.

Distributed processing is initiated through the use of systemd service templates. Start an instance of the service: systemctl --user start queue_worker_browser@1

Two services are available: [email protected] -- starts a browser worker process,
and [email protected] -- starts a vanilla HTTP client process.
Both services pull tasks from a queue (Redis List) and append task results to a queue (Redis List).

When starting a service, you need to pass an integer argument which will be used to identify the service instance.
Example: systemctl --user start queue_worker_browser@1 starts an instance with ID 1. We can use this ID to later reference the instance in other commands, such as systemctl --user stop queue_worker_browser@1 (stop the instance) and systemctl --user status queue_worker_browser@1 (check the status of the instance).
All shell brace expansions are valid, so it may be convenient to run commands such as systemctl --user start queue_worker_browser@{1..8} (start workers 1-8), or systemctl --user stop queue_worker_browser@{5,6,8} (stop workers 5,6,and 8).

Worker processes can also be started via the start_queue_worker executable, but it is recommended to let systemd manage the process so it can automatically be restarted in case of failure.

Write a function that will be used by your worker process
For this example we'll create a simple script myscript.py which has one function: fetch.
Functions that have a worker parameter will be passed a reference to a worker instance when the function is executed.

# myscript.py
async def fetch(url, worker):
    """Navigate to page at 'url' and log metadata."""
    resp = await worker.try_get(url)
    html = await worker.page.content()
    title = await worker.page.xpath('//title')
    if title:
        title = await worker.page.evaluate('(ele) => ele.innerText', title[0])
    data = {
        'url': worker.page.url,
        'status': resp.status,
        'title': title or None,
    }
    return data

Execute the function
Use an instance of QueueClient to dispatch tasks to your worker node.

from pyppeteer_cluster import QueueClient

async def main():
    client = QueueClient()
    for url in ('https://www.sec.gov','https://www.google.com','https://www.yahoo.com'):
        # both module_path and function_name are always required. 
        # module_path must be the path to the module on the machine you worker node is running on.
        # if Python can discover your module location, module_path can be the module name i.e. 'myscript'
        data = await client.execute_function(module_path='/home/user/myscript.py', function_name='fetch', url=url)
        # data will be a dict with keys 'url', 'status', 'title'

We can also queue a batch of tasks to be executed asynchronously by worker nodes.

async def main():
    client = QueueClient()
    # construct list of task requests.
    module_path='/home/user/myscript.py'
    function_name='fetch'
    tasks = [{'module': module_path, 'function': function_name, 'url': url} for url in ('https://www.sec.gov','https://www.google.com','https://www.yahoo.com')]
    # dispatch all request at once.
    data = await client.execute_functions(tasks)
    # data will be a list of dicts with keys 'url', 'status', 'title'
    # order is preserved, so an index in 'tasks' list has same corresponding index in 'data' list.
    data = await asyncio.gather(*tasks)

Worker creation functions

It may be desirable to pass non-default arguments to a Worker's constructor or call certain member functions before any queued functions are executed. When you start the queue worker service (or use the start_queue_worker executable, which the service does internally), it will check a Redis Hash for a module and function name of a worker creation function. Worker creation functions should take no arguments and return a Worker instance (i.e. a BrowserWorker or HTTPWorker).

Use the set_worker_init_func command to set the module and function name of a worker creation function.

usage: set_worker_init_func [-h] --module MODULE --function FUNCTION --worker_type {browser,http}

optional arguments:
  -h, --help            show this help message and exit
  --module MODULE, -m MODULE
                        name or path to module
  --function FUNCTION, -f FUNCTION
                        name or path to module
  --worker_type {browser,http}, -t {browser,http}
                        Type of worker that this initialization function is for. 'browser': BrowserWorker, 'http': HTTPWorker

Detection Prevention

crawler_cluster uses pyppeteer_stealth to prevent sites from detecting robotic automation.

GitHub

https://github.com/djkelleher/crawler-cluster