Crawl millions of pages every day with Python

reacher0212

Reacher

Posted on April 5, 2024

Crawl millions of pages every day with Python

Crawling might not be unfamiliar to everyone, it's easy to start gathering data. However, when facing challenges like crawling millions of pages per day, scarce resources, wanting to write fast scripts, encountering recaptcha while crawling smoothly, etc., not everyone knows how to deal with them. I also dare to write an article to share some optimized methods distilled from my experience in applying Python for more efficient crawling. I also hope the article becomes a place for discussion for everyone to learn more.

Try to find and use APIs/requests instead of selenium and puppeteer

Try to find the endpoints (HTTP requests) that websites use instead of controlling the browser with selenium (browser) for data collection. Although building requests (to avoid being blocked) may take longer, the effectiveness and optimization are much more valuable. You can save up to 100 times the cost for server/worker data collection if you use requests instead of collecting with selenium (browser). To facilitate building requests/APIs with Python, I recommend capturing requests with Fiddler or Charles, then copying the curl of those requests into Postman to get the Python code. The purpose is to capture all the headers of the request (to avoid being blocked).
Some of my experiences to avoid being blocked: Change user-agent headers, change proxies, change user agent to Google Bot (some sites do not set rate limits for Google Bot), capture requests from mobile apps (many sites only rate limit web apps but not websites),... In reality, I've encountered many cases where rate limits are set higher for mobile apps, and sometimes not blocked, so you can try to bypass SSL pinning to capture traffic on mobile apps.

Use asynchronous libraries in Python

I find that most people who use Python rarely use asynchronous libraries. Although using these libraries (aiohttp, httpx, aiosonic,...) is fast and saves a lot of CPU compared to the requests library.

» python tests/performance.py
doing tests...
{
 "aiosonic": "1000 requests in 182.03 ms",
 "aiosonic cyclic": "1000 requests in 370.55 ms",
 "aiohttp": "1000 requests in 367.66 ms",
 "requests": "1000 requests in 4613.77 ms",
 "httpx": "1000 requests in 812.41 ms"
}
aiosonic is 101.97% faster than aiohttp
aiosonic is 2434.55% faster than requests
aiosonic is 103.56% faster than aiosonic cyclic
aiosonic is 346.29% faster than httpx
Enter fullscreen mode Exit fullscreen mode

(Performance of aiosonic and other HTTP request libraries in Python)

Two ways to implement crawlers that I often use:
Way 1: Apply to a fixed number of tasks

import asyncio

import aiohttp


async def run_limit_worker(tasks, limit: int = 100):
    semaphore = asyncio.Semaphore(limit)

    async def sem_task(task):
        async with semaphore:
            return await task

    await asyncio.gather(*(sem_task(task) for task in tasks))


async def crawl_product(product):
    async with aiohttp.ClientSession() as client:
        async with client.get(f"https://example.com/api/product/{product['id']}") as resp:
            data = await resp.json()
            # todo: process your output here


async def run_all_workers():
    tasks = []

    products = [{"id": 1}, {"id": 2}, {"id": 3}]  # ect
    for product in products:
        tasks.append(crawl_product(product))

    await run_limit_worker(tasks, limit=100)


if __name__ == '__main__':
    asyncio.run(run_all_workers())

Enter fullscreen mode Exit fullscreen mode

Way 2: Fixed number of workers, get tasks from the system.

import asyncio

import aiohttp


class CrawlWorker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
        self.client = aiohttp.ClientSession()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.client:
            await self.client.close()

    async def get_input(self):
        async with self.client.get(f"https://yourserver.com/get_input?worker_id={self.worker_id}") as resp:
            return resp.json()

    async def crawl(self, product):
        async with self.client.get(f"https://example.com/api/product/{product['id']}") as resp:
            data = await resp.json()
            return data

    async def process_output(self, output):
        async with self.client.post(f"https://yourserver.com/process_output?worker_id={self.worker_id}", json={
            "product": output
        }) as resp:
            return resp.json()

    async def run(self):
        while True:
            try:
                product = await self.get_input()
                output = await self.crawl(product)
                await self.process_output(output)
            except Exception as e:
                print(f"Worker {self.worker_id} error: {e}")


async def run_worker(worker_id):
    async with CrawlWorker(worker_id) as worker:
        await worker.run()


async def run_all_workers():
    tasks = []
    max_workers = 100
    for i in range(max_workers):
        tasks.append(asyncio.create_task(run_worker(i)))
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(run_all_workers())

Enter fullscreen mode Exit fullscreen mode

Automatically solve captchas like recaptcha v2, recaptcha v3, funcaptcha,...

Some sites require login to view articles, the login process needs to solve recaptcha v2, recaptcha v3, or funcaptcha. You need to create an account and log in to get tokens regularly, so you need to automate this process. The best way is also to find a way to build HTTP requests to automate this process, but you also need to solve captchas automatically to get tokens when sending requests.

After solving the captcha, services like recaptcha, funcaptcha will give you a token corresponding to the website (usually sitekey and siteurl). When logging in, registering, or performing any action other than sending account/password,... to the system, you also need to send the token to the system. Each token can only be used once. Therefore, you need to create tokens automatically to automate the registration/login process,... Below, I'll guide you on how to use Rockcaptcha to automatically generate captcha tokens for recaptcha v2, recaptcha v3, and funcaptcha.

import asyncio
from typing import Optional

import httpx

BASE_URL = "https://api.rockcaptcha.com"


async def get_fun_token(api_key, sitekey) -> Optional[dict]:
    async with httpx.AsyncClient() as client:
        create_resp = await client.get(
            f"{BASE_URL}/FunCaptchaTokenTask", params={
                "apikey": api_key,
                "sitekey": sitekey,
                "siteurl": "https://client-api.arkoselabs.com"
            }
        )
        if create_resp.json()["Code"] == 0:
            task_id = create_resp.json()["TaskId"]
            while True:
                try:
                    solve_resp = await client.get(f"{BASE_URL}/getresult", params={
                        "apikey": api_key,
                        "taskId": task_id
                    })
                    if solve_resp.status_code == 200:
                        if solve_resp.json()['Status'] == "ERROR":
                            raise RuntimeError(solve_resp.text)
                        if solve_resp.json()['Status'] == "SUCCESS":
                            return {
                                "task_id": task_id,
                                "token": solve_resp.json()["Data"]["Token"]
                            }
                except Exception:
                    raise RuntimeError("Get captcha error")
                await asyncio.sleep(0.2)
    raise RuntimeError("Get captcha error")


async def get_recaptcha_token(api_key, sitekey, siteurl) -> Optional[dict]:
    async with httpx.AsyncClient() as client:
        create_resp = await client.get(
            f"{BASE_URL}/FunCaptchaTokenTask", params={
                "apikey": api_key,
                "sitekey": sitekey,
                "siteurl": siteurl
            }
        )
        if create_resp.json()["Code"] == 0:
            task_id = create_resp.json()["TaskId"]
            while True:
                try:
                    solve_resp = await client.get(f"{BASE_URL}/getresult", params={
                        "apikey": api_key,
                        "taskId": task_id
                    })
                    if solve_resp.status_code == 200:
                        if solve_resp.json()['Status'] == "ERROR":
                            raise RuntimeError(solve_resp.text)
                        if solve_resp.json()['Status'] == "SUCCESS":
                            return solve_resp.json()["Data"]["Token"]
                except Exception:
                    raise RuntimeError("Get captcha error")
                await asyncio.sleep(0.2)
    raise RuntimeError("Get captcha error")


async def login(username, password):
    token = await get_fun_token("<YOUR_API_KEY>", "<SITEKEY>")
    async with httpx.AsyncClient() as client:
        await client.post("https://example.com/login", json={
            "username": username,
            "password": password,
            "captcha_token": token
        })


asyncio.run(login("<username>", "<password>"))
Enter fullscreen mode Exit fullscreen mode
💖 💪 🙅 🚩
reacher0212
Reacher

Posted on April 5, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related