Item Pipeline
Once an item is scraped by a spider, it is passed to the Item Pipeline, where it moves through a series of components executed in sequence.
Each pipeline component is a Python class that defines a method to process incoming items. The component can modify the item, validate it, store it, or decide to drop it entirely—controlling whether it continues through the pipeline.
Common uses of item pipelines include:
- Validating items to ensure required fields are present
- Detecting and removing duplicate items
- Cleaning or normalizing HTML data
- Route items to external sinks (DB, message queue) independent from file exporters.
Pipeline API
Item pipelines are async handlers that run when item_scraped is emitted by the engine (after spider parsing).
To create a pipeline component, define a class with an async def process_item(self, item: Item, spider) -> Item: method.
The method receives the scraped item and the spider that produced it.
Example pipeline that validates required fields and marks items to be dropped via metadata:
import logging
from qcrawl.pipelines.base import ItemPipeline, DropItem
from qcrawl.core.item import Item
logger = logging.getLogger(__name__)
class ValidationPipeline(ItemPipeline):
"""Validate required fields and raise DropItem for invalid items."""
def __init__(self, required=None):
super().__init__()
self.required = set(required or ["title"])
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", None)
if not isinstance(data, dict):
logger.error("ValidationPipeline: item.data is not a dict; dropping item")
raise DropItem("invalid item.data type")
missing = [k for k in self.required if not data.get(k)]
if missing:
logger.debug("ValidationPipeline: missing keys %s; dropping item", missing)
raise DropItem(f"Missing required fields: {missing}")
# normalization example
if "price" in data:
try:
data["price"] = float(data["price"])
except Exception:
data.pop("price", None)
return item
Data cleaning pipelines
Pipelines are ideal for cleaning and normalizing extracted data:
Clean text data
import re
from qcrawl.pipelines.base import ItemPipeline
from qcrawl.core.item import Item
class TextCleaningPipeline(ItemPipeline):
"""Clean and normalize text fields."""
def clean_text(self, text):
"""Remove extra whitespace and normalize text."""
if not text:
return None
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Strip leading/trailing whitespace
text = text.strip()
return text if text else None
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", {})
# Clean text fields
for field in ["title", "description", "author"]:
if field in data and isinstance(data[field], str):
data[field] = self.clean_text(data[field])
return item
Parse and normalize prices
import re
from qcrawl.pipelines.base import ItemPipeline
from qcrawl.core.item import Item
class PriceNormalizationPipeline(ItemPipeline):
"""Parse and normalize price fields."""
def parse_price(self, price_text):
"""Extract numeric price from text."""
if not price_text:
return None
# Remove currency symbols and commas
cleaned = re.sub(r'[$€£¥,]', '', str(price_text))
# Extract first number
match = re.search(r'\d+\.?\d*', cleaned)
if match:
try:
return float(match.group())
except ValueError:
return None
return None
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", {})
# Parse price field
if "price" in data:
data["price"] = self.parse_price(data["price"])
return item
Parse dates
from datetime import datetime
from qcrawl.pipelines.base import ItemPipeline
from qcrawl.core.item import Item
class DateParsingPipeline(ItemPipeline):
"""Parse various date formats to ISO format."""
def __init__(self):
super().__init__()
self.date_formats = [
"%Y-%m-%d",
"%d/%m/%Y",
"%B %d, %Y",
"%b %d, %Y",
"%Y-%m-%dT%H:%M:%S",
]
def parse_date(self, date_text):
"""Parse various date formats."""
if not date_text:
return None
# Try each format
for fmt in self.date_formats:
try:
return datetime.strptime(str(date_text).strip(), fmt).isoformat()
except ValueError:
continue
return None
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", {})
# Parse date fields
for field in ["published", "updated", "created_at"]:
if field in data:
data[field] = self.parse_date(data[field])
return item
Data transformation pipelines
Transform and enrich item data:
Type conversion
from qcrawl.pipelines.base import ItemPipeline
from qcrawl.core.item import Item
class TypeConversionPipeline(ItemPipeline):
"""Convert field types."""
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", {})
# Convert numeric fields
for field in ["quantity", "stock", "views"]:
if field in data:
try:
data[field] = int(data[field])
except (ValueError, TypeError):
data[field] = None
# Convert boolean fields
for field in ["in_stock", "featured"]:
if field in data:
data[field] = str(data[field]).lower() in {"true", "1", "yes"}
return item
Data enrichment
from qcrawl.pipelines.base import ItemPipeline
from qcrawl.core.item import Item
from datetime import datetime, timezone
class EnrichmentPipeline(ItemPipeline):
"""Add computed or metadata fields."""
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", {})
metadata = getattr(item, "metadata", {})
# Add scrape timestamp
metadata["scraped_at"] = datetime.now(timezone.utc).isoformat()
# Add spider name
metadata["spider_name"] = spider.name
# Compute derived fields
if "price" in data and "quantity" in data:
data["total_value"] = data["price"] * data["quantity"]
return item
Advanced validation pipelines
Complex validation beyond simple field checks:
Schema validation
from qcrawl.pipelines.base import ItemPipeline, DropItem
from qcrawl.core.item import Item
class SchemaValidationPipeline(ItemPipeline):
"""Validate item structure and types."""
def __init__(self):
super().__init__()
self.schema = {
"title": str,
"price": (int, float),
"url": str,
}
def validate_item(self, data):
"""Validate data against schema."""
for field, expected_type in self.schema.items():
if field not in data:
return False, f"Missing required field: {field}"
if data[field] is not None and not isinstance(data[field], expected_type):
return False, f"Invalid type for {field}: expected {expected_type}"
return True, None
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", {})
valid, error = self.validate_item(data)
if not valid:
raise DropItem(error)
return item
Business logic validation
from qcrawl.pipelines.base import ItemPipeline, DropItem
from qcrawl.core.item import Item
class BusinessValidationPipeline(ItemPipeline):
"""Validate business rules."""
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", {})
# Price must be positive
if "price" in data and data["price"] is not None:
if data["price"] <= 0:
raise DropItem("Invalid price: must be positive")
# Quantity must be non-negative
if "quantity" in data and data["quantity"] is not None:
if data["quantity"] < 0:
raise DropItem("Invalid quantity: cannot be negative")
# URL must be valid
if "url" in data:
if not data["url"].startswith(("http://", "https://")):
raise DropItem("Invalid URL format")
return item
Deduplication pipeline
Remove duplicate items:
from qcrawl.pipelines.base import ItemPipeline, DropItem
from qcrawl.core.item import Item
class DeduplicationPipeline(ItemPipeline):
"""Drop duplicate items based on unique key."""
def __init__(self):
super().__init__()
self.seen_ids = set()
async def process_item(self, item: Item, spider) -> Item:
data = getattr(item, "data", {})
# Use URL as unique identifier
item_id = data.get("url")
if not item_id:
return item
if item_id in self.seen_ids:
raise DropItem(f"Duplicate item: {item_id}")
self.seen_ids.add(item_id)
return item
Registering pipelines
Configure pipelines in spider settings:
class MySpider(Spider):
name = "my_spider"
start_urls = ["https://example.com"]
custom_settings = {
"ITEM_PIPELINES": {
"myproject.pipelines.TextCleaningPipeline": 100,
"myproject.pipelines.PriceNormalizationPipeline": 200,
"myproject.pipelines.DateParsingPipeline": 300,
"myproject.pipelines.ValidationPipeline": 400,
"myproject.pipelines.DeduplicationPipeline": 500,
}
}
Pipeline order: - Lower numbers run first (100, 200, 300...) - Clean → Transform → Validate → Deduplicate
Best practices
- Separation of concerns: Extract in spiders, clean/transform in pipelines
- Keep spiders simple: Only basic
.strip()in spiders, complex cleaning in pipelines - Pipeline order matters: Clean before validate, validate before dedupe
- Raise DropItem: Use
DropItemexception to prevent export - Keep data serializable: Exporters operate on
item.dataonly - Be defensive: Validate item structure and types to avoid runtime errors
- Log appropriately: Use logger for debugging pipeline issues
- Test pipelines: Unit test pipeline logic independently from spiders
- Reuse pipelines: Share common pipelines across multiple spiders
See also: Items, Data Extraction, Exporters