Table of Contents
Monitoring competitor prices is crucial for any e-commerce business. Whether you're dropshipping, running a DTC brand, or managing a marketplace, knowing when competitors change their prices can mean the difference between winning and losing sales. In this comprehensive guide, we'll show you exactly how to build your own Shopify price monitoring system from scratch.
1. What is Shopify Price Monitoring?
Shopify price monitoring is the automated process of tracking product prices across Shopify stores. This involves:
- Fetching product data from competitor Shopify stores
- Storing historical price data for trend analysis
- Detecting price changes automatically
- Alerting when significant changes occur
2. Understanding the /products.json Endpoint
Every Shopify store exposes a public API endpoint at /products.json that returns product data in JSON format. This is Shopify's built-in way to allow product feeds and integrations.
# Basic endpoint
https://store-name.myshopify.com/products.json
# Custom domain
https://www.aloyoga.com/products.json
# With pagination (250 products max per page)
https://www.aloyoga.com/products.json?limit=250&page=1
# Filter by collection
https://www.aloyoga.com/collections/womens-leggings/products.jsonThe endpoint returns a JSON object containing an array of products, each with variants that include pricing information:
{
"products": [
{
"id": 1234567890,
"title": "High-Waist Airlift Legging",
"handle": "high-waist-airlift-legging",
"vendor": "Alo Yoga",
"product_type": "Leggings",
"created_at": "2024-01-15T10:30:00-05:00",
"updated_at": "2025-01-17T08:15:00-05:00",
"tags": ["womens", "leggings", "bestseller"],
"variants": [
{
"id": 98765432,
"title": "Black / XS",
"price": "128.00",
"compare_at_price": "148.00",
"sku": "ALO-HW-BLK-XS",
"inventory_quantity": 45,
"available": true
}
],
"images": [
{
"src": "https://cdn.shopify.com/..."
}
]
}
]
}3. Real Example: Fetching Data from Alo Yoga
Let's build a practical example using Alo Yoga (aloyoga.com), a popular athletic wear brand on Shopify. Here's how to fetch their products:
import requests
import json
from typing import List, Dict, Any
def fetch_shopify_products(store_url: str, limit: int = 250) -> List[Dict[str, Any]]:
"""
Fetch all products from a Shopify store using /products.json
Args:
store_url: Base URL of the store (e.g., 'https://www.aloyoga.com')
limit: Products per page (max 250)
Returns:
List of all products
"""
all_products = []
page = 1
while True:
url = f"{store_url}/products.json?limit={limit}&page={page}"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
'Accept': 'application/json',
}
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
products = data.get('products', [])
if not products:
break # No more products
all_products.extend(products)
print(f"Page {page}: fetched {len(products)} products")
if len(products) < limit:
break # Last page
page += 1
except requests.exceptions.RequestException as e:
print(f"Error fetching page {page}: {e}")
break
return all_products
# Example usage
if __name__ == "__main__":
store_url = "https://www.aloyoga.com"
products = fetch_shopify_products(store_url)
print(f"\nTotal products fetched: {len(products)}")
# Display first product details
if products:
first = products[0]
print(f"\nFirst product: {first['title']}")
print(f"Vendor: {first['vendor']}")
if first.get('variants'):
variant = first['variants'][0]
print(f"Price: ${variant['price']}")
if variant.get('compare_at_price'):
print(f"Compare at: ${variant['compare_at_price']}")4. Handling Pagination Efficiently
Shopify limits /products.json to 250 products per page. For stores with thousands of products, you need to handle pagination properly. Here's an improved version with better error handling and rate limiting:
import requests
import time
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ProductPrice:
"""Structured price data for a product variant"""
product_id: int
product_title: str
variant_id: int
variant_title: str
sku: str
price: float
compare_at_price: Optional[float]
available: bool
scraped_at: datetime
class ShopifyPriceMonitor:
"""Monitor prices from Shopify stores"""
def __init__(self, rate_limit_delay: float = 1.0):
self.rate_limit_delay = rate_limit_delay
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
'Accept': 'application/json',
})
def fetch_all_products(self, store_url: str) -> List[Dict[str, Any]]:
"""Fetch all products with pagination handling"""
all_products = []
page = 1
max_retries = 3
while True:
url = f"{store_url}/products.json?limit=250&page={page}"
for attempt in range(max_retries):
try:
response = self.session.get(url, timeout=30)
if response.status_code == 429:
# Rate limited - wait and retry
wait_time = int(response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
data = response.json()
products = data.get('products', [])
if not products:
return all_products
all_products.extend(products)
print(f"Page {page}: {len(products)} products (total: {len(all_products)})")
if len(products) < 250:
return all_products
page += 1
time.sleep(self.rate_limit_delay)
break
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return all_products
time.sleep(2 ** attempt)
return all_products
def extract_prices(self, products: List[Dict]) -> List[ProductPrice]:
"""Extract structured price data from products"""
prices = []
scraped_at = datetime.now()
for product in products:
for variant in product.get('variants', []):
price = ProductPrice(
product_id=product['id'],
product_title=product['title'],
variant_id=variant['id'],
variant_title=variant.get('title', 'Default'),
sku=variant.get('sku', ''),
price=float(variant['price']),
compare_at_price=(
float(variant['compare_at_price'])
if variant.get('compare_at_price')
else None
),
available=variant.get('available', True),
scraped_at=scraped_at
)
prices.append(price)
return prices
# Example: Monitor Alo Yoga prices
if __name__ == "__main__":
monitor = ShopifyPriceMonitor(rate_limit_delay=0.5)
products = monitor.fetch_all_products("https://www.aloyoga.com")
prices = monitor.extract_prices(products)
print(f"\nExtracted {len(prices)} price records")
# Show sample price data
for price in prices[:5]:
discount = ""
if price.compare_at_price:
savings = price.compare_at_price - price.price
discount = f" (Save ${savings:.2f})"
print(f"{price.product_title} [{price.variant_title}]: ${price.price}{discount}")5. Setting Up Proxy Rotation
When monitoring multiple stores or fetching data frequently, you'll need proxy rotation to avoid IP bans. Here's how to integrate proxies:
import requests
import random
from itertools import cycle
from typing import List, Optional
class ProxyRotator:
"""Rotate through a list of proxies for web scraping"""
def __init__(self, proxies: List[str]):
"""
Initialize with a list of proxy URLs.
Format: 'http://user:pass@host:port' or 'http://host:port'
Popular proxy providers:
- Bright Data (formerly Luminati)
- Oxylabs
- Smartproxy
- ScraperAPI
"""
self.proxies = proxies
self.proxy_cycle = cycle(proxies)
self.current_proxy = None
def get_next(self) -> dict:
"""Get next proxy in rotation"""
proxy = next(self.proxy_cycle)
self.current_proxy = proxy
return {
'http': proxy,
'https': proxy
}
def get_random(self) -> dict:
"""Get a random proxy"""
proxy = random.choice(self.proxies)
self.current_proxy = proxy
return {
'http': proxy,
'https': proxy
}
class ShopifyScraperWithProxy:
"""Shopify scraper with proxy rotation support"""
def __init__(self, proxies: Optional[List[str]] = None):
self.proxy_rotator = ProxyRotator(proxies) if proxies else None
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.9',
})
def fetch_products(self, store_url: str, page: int = 1) -> dict:
"""Fetch products with optional proxy rotation"""
url = f"{store_url}/products.json?limit=250&page={page}"
proxies = None
if self.proxy_rotator:
proxies = self.proxy_rotator.get_next()
try:
response = self.session.get(
url,
proxies=proxies,
timeout=30,
verify=True
)
response.raise_for_status()
return response.json()
except requests.exceptions.ProxyError as e:
print(f"Proxy error with {self.proxy_rotator.current_proxy}: {e}")
# Try next proxy
if self.proxy_rotator:
return self.fetch_products(store_url, page)
raise
# Example usage with residential proxies
if __name__ == "__main__":
# Example proxy list (replace with real proxies)
proxy_list = [
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
"http://user:pass@proxy3.example.com:8080",
]
scraper = ShopifyScraperWithProxy(proxies=proxy_list)
# Or without proxies for testing
# scraper = ShopifyScraperWithProxy()
data = scraper.fetch_products("https://www.aloyoga.com")
print(f"Fetched {len(data.get('products', []))} products")6. Saving Data to CSV
For simple analysis and quick exports, CSV is perfect. Here's how to save your price data:
import csv
from datetime import datetime
from typing import List, Dict, Any
import os
def save_products_to_csv(
products: List[Dict[str, Any]],
store_name: str,
output_dir: str = "./data"
) -> str:
"""
Save product data to CSV with timestamp
Returns:
Path to the saved file
"""
# Create output directory if needed
os.makedirs(output_dir, exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{store_name}_prices_{timestamp}.csv"
filepath = os.path.join(output_dir, filename)
# Flatten products with variants
rows = []
for product in products:
for variant in product.get('variants', []):
rows.append({
'scraped_at': datetime.now().isoformat(),
'store': store_name,
'product_id': product['id'],
'product_title': product['title'],
'product_type': product.get('product_type', ''),
'vendor': product.get('vendor', ''),
'handle': product.get('handle', ''),
'variant_id': variant['id'],
'variant_title': variant.get('title', 'Default'),
'sku': variant.get('sku', ''),
'price': variant.get('price', ''),
'compare_at_price': variant.get('compare_at_price', ''),
'available': variant.get('available', True),
'inventory_quantity': variant.get('inventory_quantity', ''),
'tags': ', '.join(product.get('tags', [])),
})
# Write to CSV
if rows:
fieldnames = rows[0].keys()
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"Saved {len(rows)} records to {filepath}")
return filepath
def compare_price_changes(old_file: str, new_file: str) -> List[Dict]:
"""
Compare two CSV files to detect price changes
"""
def load_csv(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
return {row['variant_id']: row for row in csv.DictReader(f)}
old_data = load_csv(old_file)
new_data = load_csv(new_file)
changes = []
for variant_id, new_row in new_data.items():
if variant_id in old_data:
old_price = float(old_data[variant_id]['price'])
new_price = float(new_row['price'])
if old_price != new_price:
changes.append({
'product': new_row['product_title'],
'variant': new_row['variant_title'],
'old_price': old_price,
'new_price': new_price,
'change': new_price - old_price,
'change_pct': ((new_price - old_price) / old_price) * 100
})
return changes
# Example usage
if __name__ == "__main__":
from shopify_scraper_advanced import ShopifyPriceMonitor
monitor = ShopifyPriceMonitor()
products = monitor.fetch_all_products("https://www.aloyoga.com")
filepath = save_products_to_csv(products, "aloyoga")
print(f"Data saved to: {filepath}")
# Example output:
# Saved 3456 records to ./data/aloyoga_prices_20250117_143022.csv7. Saving Data to PostgreSQL
For production systems with historical tracking and analytics, PostgreSQL is the way to go. Here's a complete setup:
-- Create database schema for Shopify price monitoring
-- Stores table
CREATE TABLE stores (
id SERIAL PRIMARY KEY,
domain VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255),
created_at TIMESTAMP DEFAULT NOW(),
last_scraped_at TIMESTAMP
);
-- Products table
CREATE TABLE products (
id BIGINT PRIMARY KEY, -- Shopify product ID
store_id INTEGER REFERENCES stores(id),
title VARCHAR(500) NOT NULL,
handle VARCHAR(255),
vendor VARCHAR(255),
product_type VARCHAR(255),
tags TEXT[],
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Variants table
CREATE TABLE variants (
id BIGINT PRIMARY KEY, -- Shopify variant ID
product_id BIGINT REFERENCES products(id),
title VARCHAR(255),
sku VARCHAR(255),
created_at TIMESTAMP DEFAULT NOW()
);
-- Price history table (the main tracking table)
CREATE TABLE price_history (
id SERIAL PRIMARY KEY,
variant_id BIGINT REFERENCES variants(id),
price DECIMAL(10, 2) NOT NULL,
compare_at_price DECIMAL(10, 2),
available BOOLEAN DEFAULT TRUE,
inventory_quantity INTEGER,
scraped_at TIMESTAMP DEFAULT NOW()
);
-- Create indexes for fast queries
CREATE INDEX idx_price_history_variant_id ON price_history(variant_id);
CREATE INDEX idx_price_history_scraped_at ON price_history(scraped_at);
CREATE INDEX idx_products_store_id ON products(store_id);
-- View for latest prices
CREATE VIEW latest_prices AS
SELECT DISTINCT ON (ph.variant_id)
s.domain AS store,
p.title AS product,
v.title AS variant,
v.sku,
ph.price,
ph.compare_at_price,
ph.available,
ph.scraped_at
FROM price_history ph
JOIN variants v ON v.id = ph.variant_id
JOIN products p ON p.id = v.product_id
JOIN stores s ON s.id = p.store_id
ORDER BY ph.variant_id, ph.scraped_at DESC;
-- View for price changes
CREATE VIEW price_changes AS
WITH ranked_prices AS (
SELECT
variant_id,
price,
scraped_at,
LAG(price) OVER (PARTITION BY variant_id ORDER BY scraped_at) AS prev_price
FROM price_history
)
SELECT
v.id AS variant_id,
p.title AS product,
v.title AS variant,
rp.prev_price AS old_price,
rp.price AS new_price,
(rp.price - rp.prev_price) AS price_change,
ROUND(((rp.price - rp.prev_price) / rp.prev_price * 100)::numeric, 2) AS change_pct,
rp.scraped_at
FROM ranked_prices rp
JOIN variants v ON v.id = rp.variant_id
JOIN products p ON p.id = v.product_id
WHERE rp.prev_price IS NOT NULL
AND rp.price != rp.prev_price
ORDER BY rp.scraped_at DESC;import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
from typing import List, Dict, Any
from contextlib import contextmanager
class PriceDatabase:
"""PostgreSQL database handler for price monitoring"""
def __init__(self, connection_string: str):
"""
Initialize with PostgreSQL connection string.
Example: 'postgresql://user:password@localhost:5432/price_monitor'
"""
self.connection_string = connection_string
@contextmanager
def get_connection(self):
"""Context manager for database connections"""
conn = psycopg2.connect(self.connection_string)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def ensure_store(self, domain: str, name: str = None) -> int:
"""Get or create store record"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO stores (domain, name, last_scraped_at)
VALUES (%s, %s, NOW())
ON CONFLICT (domain) DO UPDATE
SET last_scraped_at = NOW()
RETURNING id
""", (domain, name or domain))
return cur.fetchone()[0]
def save_products(self, store_id: int, products: List[Dict[str, Any]]):
"""Save products and variants to database"""
with self.get_connection() as conn:
with conn.cursor() as cur:
for product in products:
# Upsert product
cur.execute("""
INSERT INTO products (id, store_id, title, handle, vendor, product_type, tags)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
handle = EXCLUDED.handle,
vendor = EXCLUDED.vendor,
product_type = EXCLUDED.product_type,
tags = EXCLUDED.tags,
updated_at = NOW()
""", (
product['id'],
store_id,
product['title'],
product.get('handle'),
product.get('vendor'),
product.get('product_type'),
product.get('tags', [])
))
# Upsert variants and record prices
for variant in product.get('variants', []):
cur.execute("""
INSERT INTO variants (id, product_id, title, sku)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
sku = EXCLUDED.sku
""", (
variant['id'],
product['id'],
variant.get('title'),
variant.get('sku')
))
# Insert price record
cur.execute("""
INSERT INTO price_history
(variant_id, price, compare_at_price, available, inventory_quantity)
VALUES (%s, %s, %s, %s, %s)
""", (
variant['id'],
float(variant.get('price', 0)),
float(variant['compare_at_price']) if variant.get('compare_at_price') else None,
variant.get('available', True),
variant.get('inventory_quantity')
))
def get_price_changes(self, hours: int = 24) -> List[Dict]:
"""Get recent price changes"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM price_changes
WHERE scraped_at > NOW() - INTERVAL '%s hours'
ORDER BY ABS(change_pct) DESC
""", (hours,))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
# Example usage
if __name__ == "__main__":
from shopify_scraper_advanced import ShopifyPriceMonitor
# Initialize database
db = PriceDatabase("postgresql://user:password@localhost:5432/price_monitor")
# Scrape store
monitor = ShopifyPriceMonitor()
products = monitor.fetch_all_products("https://www.aloyoga.com")
# Save to database
store_id = db.ensure_store("aloyoga.com", "Alo Yoga")
db.save_products(store_id, products)
print(f"Saved {len(products)} products to database")
# Check for price changes
changes = db.get_price_changes(hours=24)
print(f"\nPrice changes in last 24 hours: {len(changes)}")
for change in changes[:10]:
print(f"{change['product']}: ${change['old_price']} -> ${change['new_price']} ({change['change_pct']}%)")8. Deploying with Coolify
Coolify is an excellent self-hosted alternative to Heroku. Here's how to deploy your price monitor as a scheduled job:
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV PYTHONUNBUFFERED=1
# Default command (can be overridden)
CMD ["python", "main.py"]requests==2.31.0
psycopg2-binary==2.9.9
python-dotenv==1.0.0
schedule==1.2.1#!/usr/bin/env python3
"""
Shopify Price Monitor - Coolify Deployment
Run as a scheduled task to monitor competitor prices
"""
import os
import schedule
import time
from datetime import datetime
from dotenv import load_dotenv
from shopify_scraper_advanced import ShopifyPriceMonitor
from save_to_postgresql import PriceDatabase
load_dotenv()
# Configuration from environment variables
DATABASE_URL = os.getenv('DATABASE_URL')
STORES_TO_MONITOR = os.getenv('STORES_TO_MONITOR', '').split(',')
SCRAPE_INTERVAL_HOURS = int(os.getenv('SCRAPE_INTERVAL_HOURS', '6'))
def run_price_check():
"""Main price monitoring function"""
print(f"\n{'='*50}")
print(f"Starting price check at {datetime.now().isoformat()}")
print(f"{'='*50}")
db = PriceDatabase(DATABASE_URL)
monitor = ShopifyPriceMonitor(rate_limit_delay=1.0)
for store_url in STORES_TO_MONITOR:
store_url = store_url.strip()
if not store_url:
continue
try:
print(f"\nScraping: {store_url}")
# Extract domain for store name
domain = store_url.replace('https://', '').replace('http://', '').rstrip('/')
# Fetch products
products = monitor.fetch_all_products(store_url)
print(f" Fetched {len(products)} products")
# Save to database
store_id = db.ensure_store(domain)
db.save_products(store_id, products)
print(f" Saved to database")
except Exception as e:
print(f" Error scraping {store_url}: {e}")
# Report price changes
changes = db.get_price_changes(hours=SCRAPE_INTERVAL_HOURS)
if changes:
print(f"\nPrice changes detected: {len(changes)}")
for change in changes[:20]:
direction = "โ" if change['price_change'] > 0 else "โ"
print(f" {direction} {change['product']}: ${change['old_price']} -> ${change['new_price']}")
print(f"\nPrice check completed at {datetime.now().isoformat()}")
def main():
"""Entry point for Coolify deployment"""
print("Shopify Price Monitor Starting...")
print(f"Monitoring {len(STORES_TO_MONITOR)} stores")
print(f"Check interval: {SCRAPE_INTERVAL_HOURS} hours")
# Run immediately on startup
run_price_check()
# Schedule periodic runs
schedule.every(SCRAPE_INTERVAL_HOURS).hours.do(run_price_check)
# Keep running
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == "__main__":
main()# Database connection
DATABASE_URL=postgresql://user:password@db:5432/price_monitor
# Stores to monitor (comma-separated)
STORES_TO_MONITOR=https://www.aloyoga.com,https://www.gymshark.com,https://www.fabletics.com
# How often to check prices (in hours)
SCRAPE_INTERVAL_HOURS=6
# Optional: Proxy configuration
PROXY_LIST=http://user:pass@proxy1:8080,http://user:pass@proxy2:80809. Automating with N8N
N8N is a powerful workflow automation tool. Here's how to set up a price monitoring workflow:
N8N Workflow Steps:
- Schedule Trigger: Run every 6 hours
- HTTP Request: Fetch /products.json from target stores
- Function: Extract and transform price data
- PostgreSQL: Save to price_history table
- PostgreSQL: Query for price changes
- IF: Check if changes exist
- Slack/Email: Send alert notification
// N8N Function node to extract price data from products.json response
const products = $input.item.json.products;
const storeDomain = $input.item.json.storeDomain;
const scrapedAt = new Date().toISOString();
const priceRecords = [];
for (const product of products) {
for (const variant of product.variants || []) {
priceRecords.push({
store_domain: storeDomain,
product_id: product.id,
product_title: product.title,
product_type: product.product_type || '',
variant_id: variant.id,
variant_title: variant.title || 'Default',
sku: variant.sku || '',
price: parseFloat(variant.price),
compare_at_price: variant.compare_at_price ? parseFloat(variant.compare_at_price) : null,
available: variant.available !== false,
scraped_at: scrapedAt
});
}
}
return priceRecords.map(record => ({ json: record }));// N8N Function node to format Slack alert
const changes = $input.all().map(item => item.json);
if (changes.length === 0) {
return [{ json: { skip: true } }];
}
let message = `๐ *Price Changes Detected*\n\n`;
for (const change of changes.slice(0, 10)) {
const direction = change.price_change > 0 ? '๐' : '๐';
const pct = Math.abs(change.change_pct).toFixed(1);
message += `${direction} *${change.product}*\n`;
message += ` ${change.variant}: $${change.old_price} โ $${change.new_price} (${change.price_change > 0 ? '+' : ''}${pct}%)\n\n`;
}
if (changes.length > 10) {
message += `_...and ${changes.length - 10} more changes_`;
}
return [{
json: {
text: message,
channel: '#price-alerts'
}
}];10. Conclusion
You now have all the tools needed to build a complete Shopify price monitoring system:
- Fetch product data using /products.json with pagination
- Rotate proxies for reliable large-scale scraping
- Store data in CSV for simple analysis or PostgreSQL for production
- Deploy with Coolify for self-hosted automation
- Create workflows with N8N for visual automation
DBShopi Team
E-commerce Intelligence
Ready to Start Monitoring Prices?
Skip the technical setup and get instant access to price monitoring for millions of Shopify stores.
Try DBShopi Price Monitoring