Authentication
Many websites require authentication to access content. qCrawl supports various authentication methods including session-based (cookies), API tokens, and custom authentication flows.
Session-based authentication
Use cookies to maintain login sessions:
from qcrawl.core.spider import Spider
from qcrawl.core.request import Request
class AuthSpider(Spider):
name = "auth_spider"
start_urls = ["https://example.com/login"]
custom_settings = {
"COOKIES_ENABLED": True, # Enable cookie middleware
}
async def parse(self, response):
rv = self.response_view(response)
# Extract CSRF token
csrf_token = rv.doc.cssselect("input[name=csrf_token]")[0].get("value")
# Submit login form
yield Request(
url="https://example.com/login",
method="POST",
body={
"username": "user@example.com",
"password": "password123",
"csrf_token": csrf_token
},
meta={"next_action": "start_crawl"}
)
async def parse_logged_in(self, response):
# Check if login succeeded
if response.request.meta.get("next_action") == "start_crawl":
# Start crawling protected pages
yield Request(url="https://example.com/dashboard")
Verify login success:
async def parse(self, response):
rv = self.response_view(response)
# Check if login form is still present
login_form = rv.doc.cssselect("form#login")
if login_form:
self.logger.error("Login failed - still seeing login form")
return
# Check for logged-in indicator
user_menu = rv.doc.cssselect(".user-menu")
if not user_menu:
self.logger.error("Login may have failed - no user menu found")
return
# Login succeeded, proceed with crawling
for link in rv.doc.cssselect("a.protected-content"):
yield rv.follow(link.get("href"))
API token authentication
Use bearer tokens or API keys in headers:
class ApiSpider(Spider):
name = "api_spider"
start_urls = []
custom_settings = {
"DEFAULT_REQUEST_HEADERS": {
"Authorization": "Bearer YOUR_API_TOKEN",
"Accept": "application/json"
}
}
async def start_requests(self):
yield Request(url="https://api.example.com/data")
async def parse(self, response):
data = response.json()
for item in data.get("items", []):
yield {
"id": item["id"],
"name": item["name"]
}
# Paginate API
next_page = data.get("next_page_url")
if next_page:
yield Request(url=next_page)
Token from environment variables:
import os
class ApiSpider(Spider):
name = "api_spider"
start_urls = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Load token from environment
api_token = os.getenv("API_TOKEN")
if not api_token:
raise ValueError("API_TOKEN environment variable not set")
self.custom_settings = {
"DEFAULT_REQUEST_HEADERS": {
"Authorization": f"Bearer {api_token}",
"Accept": "application/json"
}
}
Basic HTTP authentication
Use username and password in URL or headers:
from qcrawl.core.request import Request
class BasicAuthSpider(Spider):
name = "basic_auth"
start_urls = []
async def start_requests(self):
# Method 1: Include in URL
yield Request(url="https://username:password@example.com/protected")
# Method 2: Use Authorization header
import base64
credentials = base64.b64encode(b"username:password").decode("utf-8")
yield Request(
url="https://example.com/protected",
headers={"Authorization": f"Basic {credentials}"}
)
OAuth 2.0 authentication
Handle OAuth token refresh:
import time
from qcrawl.core.request import Request
class OAuthSpider(Spider):
name = "oauth_spider"
start_urls = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.access_token = None
self.token_expires_at = 0
async def get_access_token(self):
"""Fetch or refresh OAuth access token."""
# Request new token from OAuth endpoint
token_url = "https://api.example.com/oauth/token"
response = await self.make_token_request(token_url)
data = response.json()
self.access_token = data["access_token"]
self.token_expires_at = time.time() + data["expires_in"]
return self.access_token
async def start_requests(self):
# Get initial token
token = await self.get_access_token()
yield Request(
url="https://api.example.com/data",
headers={"Authorization": f"Bearer {token}"}
)
async def parse(self, response):
# Check if token expired
if time.time() >= self.token_expires_at:
token = await self.get_access_token()
# Retry request with new token
yield Request(
url=response.url,
headers={"Authorization": f"Bearer {token}"}
)
return
# Process response
data = response.json()
for item in data.get("items", []):
yield item
Custom authentication flows
Handle multi-step authentication:
class CustomAuthSpider(Spider):
name = "custom_auth"
start_urls = ["https://example.com/step1"]
custom_settings = {
"COOKIES_ENABLED": True,
}
async def parse(self, response):
"""Step 1: Get initial token."""
rv = self.response_view(response)
initial_token = rv.doc.cssselect("input[name=token]")[0].get("value")
# Step 2: Submit token
yield Request(
url="https://example.com/step2",
method="POST",
body={"token": initial_token},
meta={"step": 2}
)
async def parse_step2(self, response):
"""Step 2: Submit credentials."""
step = response.request.meta.get("step")
if step == 2:
rv = self.response_view(response)
csrf = rv.doc.cssselect("input[name=csrf]")[0].get("value")
yield Request(
url="https://example.com/login",
method="POST",
body={
"username": "user",
"password": "pass",
"csrf": csrf
},
meta={"step": 3}
)
elif step == 3:
# Authentication complete, start crawling
yield Request(url="https://example.com/protected")
Handling authentication errors
Detect and handle auth failures:
async def parse(self, response):
# Check for authentication errors
if response.status_code == 401:
self.logger.error("Unauthorized - authentication failed")
return
if response.status_code == 403:
self.logger.error("Forbidden - insufficient permissions")
return
# Check for redirect to login page
if "login" in response.url.lower():
self.logger.warning("Redirected to login - session may have expired")
return
# Process authenticated response
rv = self.response_view(response)
for item in rv.doc.cssselect(".item"):
yield self.extract_item(item)
Best practices
- Handle authentication properly: Use cookie middleware for sessions, headers for API tokens
- Validate authentication state: Check login success before proceeding with protected pages
- Clean up sensitive data: Don't log passwords or tokens
- Use environment variables: Store credentials outside code
- Handle token expiration: Implement refresh logic for OAuth/JWT tokens
- Verify auth success: Check for login indicators, not just HTTP 200
- Respect auth rate limits: API tokens often have usage quotas
- Test authentication flow: Verify login works before full crawl
- Handle session expiration: Detect and re-authenticate if session expires
See also: Error Recovery, Rate Limiting