Back to Blog
Price MonitoringJanuary 17, 202515 min read

How to Monitor Shopify Competitor Prices: Complete Guide with /products.json

Build a complete Shopify price monitoring system from scratch

Monitoring competitor prices is crucial for any e-commerce business. Whether you're dropshipping, running a DTC brand, or managing a marketplace, knowing when competitors change their prices can mean the difference between winning and losing sales. In this comprehensive guide, we'll show you exactly how to build your own Shopify price monitoring system from scratch.

Want to skip the technical setup? DBShopi\'s Price Monitoring tool handles all of this automatically for 500M+ products.

1. What is Shopify Price Monitoring?

Shopify price monitoring is the automated process of tracking product prices across Shopify stores. This involves:

  • Fetching product data from competitor Shopify stores
  • Storing historical price data for trend analysis
  • Detecting price changes automatically
  • Alerting when significant changes occur

2. Understanding the /products.json Endpoint

Every Shopify store exposes a public API endpoint at /products.json that returns product data in JSON format. This is Shopify's built-in way to allow product feeds and integrations.

Basic products.json URL structurebash
# Basic endpoint
https://store-name.myshopify.com/products.json

# Custom domain
https://www.aloyoga.com/products.json

# With pagination (250 products max per page)
https://www.aloyoga.com/products.json?limit=250&page=1

# Filter by collection
https://www.aloyoga.com/collections/womens-leggings/products.json

The endpoint returns a JSON object containing an array of products, each with variants that include pricing information:

Response structurejson
{
  "products": [
    {
      "id": 1234567890,
      "title": "High-Waist Airlift Legging",
      "handle": "high-waist-airlift-legging",
      "vendor": "Alo Yoga",
      "product_type": "Leggings",
      "created_at": "2024-01-15T10:30:00-05:00",
      "updated_at": "2025-01-17T08:15:00-05:00",
      "tags": ["womens", "leggings", "bestseller"],
      "variants": [
        {
          "id": 98765432,
          "title": "Black / XS",
          "price": "128.00",
          "compare_at_price": "148.00",
          "sku": "ALO-HW-BLK-XS",
          "inventory_quantity": 45,
          "available": true
        }
      ],
      "images": [
        {
          "src": "https://cdn.shopify.com/..."
        }
      ]
    }
  ]
}

3. Real Example: Fetching Data from Alo Yoga

Let's build a practical example using Alo Yoga (aloyoga.com), a popular athletic wear brand on Shopify. Here's how to fetch their products:

fetch_shopify_products.pypython
import requests
import json
from typing import List, Dict, Any

def fetch_shopify_products(store_url: str, limit: int = 250) -> List[Dict[str, Any]]:
    """
    Fetch all products from a Shopify store using /products.json

    Args:
        store_url: Base URL of the store (e.g., 'https://www.aloyoga.com')
        limit: Products per page (max 250)

    Returns:
        List of all products
    """
    all_products = []
    page = 1

    while True:
        url = f"{store_url}/products.json?limit={limit}&page={page}"

        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
            'Accept': 'application/json',
        }

        try:
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()

            data = response.json()
            products = data.get('products', [])

            if not products:
                break  # No more products

            all_products.extend(products)
            print(f"Page {page}: fetched {len(products)} products")

            if len(products) < limit:
                break  # Last page

            page += 1

        except requests.exceptions.RequestException as e:
            print(f"Error fetching page {page}: {e}")
            break

    return all_products


# Example usage
if __name__ == "__main__":
    store_url = "https://www.aloyoga.com"
    products = fetch_shopify_products(store_url)

    print(f"\nTotal products fetched: {len(products)}")

    # Display first product details
    if products:
        first = products[0]
        print(f"\nFirst product: {first['title']}")
        print(f"Vendor: {first['vendor']}")

        if first.get('variants'):
            variant = first['variants'][0]
            print(f"Price: ${variant['price']}")
            if variant.get('compare_at_price'):
                print(f"Compare at: ${variant['compare_at_price']}")
When running this against aloyoga.com, you'll typically get 1000+ products across multiple pages. The store has leggings, sports bras, hoodies, accessories, and more.

4. Handling Pagination Efficiently

Shopify limits /products.json to 250 products per page. For stores with thousands of products, you need to handle pagination properly. Here's an improved version with better error handling and rate limiting:

shopify_scraper_advanced.pypython
import requests
import time
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ProductPrice:
    """Structured price data for a product variant"""
    product_id: int
    product_title: str
    variant_id: int
    variant_title: str
    sku: str
    price: float
    compare_at_price: Optional[float]
    available: bool
    scraped_at: datetime


class ShopifyPriceMonitor:
    """Monitor prices from Shopify stores"""

    def __init__(self, rate_limit_delay: float = 1.0):
        self.rate_limit_delay = rate_limit_delay
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
            'Accept': 'application/json',
        })

    def fetch_all_products(self, store_url: str) -> List[Dict[str, Any]]:
        """Fetch all products with pagination handling"""
        all_products = []
        page = 1
        max_retries = 3

        while True:
            url = f"{store_url}/products.json?limit=250&page={page}"

            for attempt in range(max_retries):
                try:
                    response = self.session.get(url, timeout=30)

                    if response.status_code == 429:
                        # Rate limited - wait and retry
                        wait_time = int(response.headers.get('Retry-After', 60))
                        print(f"Rate limited. Waiting {wait_time}s...")
                        time.sleep(wait_time)
                        continue

                    response.raise_for_status()
                    data = response.json()
                    products = data.get('products', [])

                    if not products:
                        return all_products

                    all_products.extend(products)
                    print(f"Page {page}: {len(products)} products (total: {len(all_products)})")

                    if len(products) < 250:
                        return all_products

                    page += 1
                    time.sleep(self.rate_limit_delay)
                    break

                except requests.exceptions.RequestException as e:
                    print(f"Attempt {attempt + 1} failed: {e}")
                    if attempt == max_retries - 1:
                        return all_products
                    time.sleep(2 ** attempt)

        return all_products

    def extract_prices(self, products: List[Dict]) -> List[ProductPrice]:
        """Extract structured price data from products"""
        prices = []
        scraped_at = datetime.now()

        for product in products:
            for variant in product.get('variants', []):
                price = ProductPrice(
                    product_id=product['id'],
                    product_title=product['title'],
                    variant_id=variant['id'],
                    variant_title=variant.get('title', 'Default'),
                    sku=variant.get('sku', ''),
                    price=float(variant['price']),
                    compare_at_price=(
                        float(variant['compare_at_price'])
                        if variant.get('compare_at_price')
                        else None
                    ),
                    available=variant.get('available', True),
                    scraped_at=scraped_at
                )
                prices.append(price)

        return prices


# Example: Monitor Alo Yoga prices
if __name__ == "__main__":
    monitor = ShopifyPriceMonitor(rate_limit_delay=0.5)

    products = monitor.fetch_all_products("https://www.aloyoga.com")
    prices = monitor.extract_prices(products)

    print(f"\nExtracted {len(prices)} price records")

    # Show sample price data
    for price in prices[:5]:
        discount = ""
        if price.compare_at_price:
            savings = price.compare_at_price - price.price
            discount = f" (Save ${savings:.2f})"
        print(f"{price.product_title} [{price.variant_title}]: ${price.price}{discount}")

5. Setting Up Proxy Rotation

When monitoring multiple stores or fetching data frequently, you'll need proxy rotation to avoid IP bans. Here's how to integrate proxies:

proxy_rotation.pypython
import requests
import random
from itertools import cycle
from typing import List, Optional

class ProxyRotator:
    """Rotate through a list of proxies for web scraping"""

    def __init__(self, proxies: List[str]):
        """
        Initialize with a list of proxy URLs.

        Format: 'http://user:pass@host:port' or 'http://host:port'

        Popular proxy providers:
        - Bright Data (formerly Luminati)
        - Oxylabs
        - Smartproxy
        - ScraperAPI
        """
        self.proxies = proxies
        self.proxy_cycle = cycle(proxies)
        self.current_proxy = None

    def get_next(self) -> dict:
        """Get next proxy in rotation"""
        proxy = next(self.proxy_cycle)
        self.current_proxy = proxy
        return {
            'http': proxy,
            'https': proxy
        }

    def get_random(self) -> dict:
        """Get a random proxy"""
        proxy = random.choice(self.proxies)
        self.current_proxy = proxy
        return {
            'http': proxy,
            'https': proxy
        }


class ShopifyScraperWithProxy:
    """Shopify scraper with proxy rotation support"""

    def __init__(self, proxies: Optional[List[str]] = None):
        self.proxy_rotator = ProxyRotator(proxies) if proxies else None
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'application/json',
            'Accept-Language': 'en-US,en;q=0.9',
        })

    def fetch_products(self, store_url: str, page: int = 1) -> dict:
        """Fetch products with optional proxy rotation"""
        url = f"{store_url}/products.json?limit=250&page={page}"

        proxies = None
        if self.proxy_rotator:
            proxies = self.proxy_rotator.get_next()

        try:
            response = self.session.get(
                url,
                proxies=proxies,
                timeout=30,
                verify=True
            )
            response.raise_for_status()
            return response.json()

        except requests.exceptions.ProxyError as e:
            print(f"Proxy error with {self.proxy_rotator.current_proxy}: {e}")
            # Try next proxy
            if self.proxy_rotator:
                return self.fetch_products(store_url, page)
            raise


# Example usage with residential proxies
if __name__ == "__main__":
    # Example proxy list (replace with real proxies)
    proxy_list = [
        "http://user:pass@proxy1.example.com:8080",
        "http://user:pass@proxy2.example.com:8080",
        "http://user:pass@proxy3.example.com:8080",
    ]

    scraper = ShopifyScraperWithProxy(proxies=proxy_list)

    # Or without proxies for testing
    # scraper = ShopifyScraperWithProxy()

    data = scraper.fetch_products("https://www.aloyoga.com")
    print(f"Fetched {len(data.get('products', []))} products")
Always respect robots.txt and rate limits. Aggressive scraping can get your IP banned and may violate terms of service. For production use, consider using a service like DBShopi that handles this responsibly.

6. Saving Data to CSV

For simple analysis and quick exports, CSV is perfect. Here's how to save your price data:

save_to_csv.pypython
import csv
from datetime import datetime
from typing import List, Dict, Any
import os

def save_products_to_csv(
    products: List[Dict[str, Any]],
    store_name: str,
    output_dir: str = "./data"
) -> str:
    """
    Save product data to CSV with timestamp

    Returns:
        Path to the saved file
    """
    # Create output directory if needed
    os.makedirs(output_dir, exist_ok=True)

    # Generate filename with timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"{store_name}_prices_{timestamp}.csv"
    filepath = os.path.join(output_dir, filename)

    # Flatten products with variants
    rows = []
    for product in products:
        for variant in product.get('variants', []):
            rows.append({
                'scraped_at': datetime.now().isoformat(),
                'store': store_name,
                'product_id': product['id'],
                'product_title': product['title'],
                'product_type': product.get('product_type', ''),
                'vendor': product.get('vendor', ''),
                'handle': product.get('handle', ''),
                'variant_id': variant['id'],
                'variant_title': variant.get('title', 'Default'),
                'sku': variant.get('sku', ''),
                'price': variant.get('price', ''),
                'compare_at_price': variant.get('compare_at_price', ''),
                'available': variant.get('available', True),
                'inventory_quantity': variant.get('inventory_quantity', ''),
                'tags': ', '.join(product.get('tags', [])),
            })

    # Write to CSV
    if rows:
        fieldnames = rows[0].keys()
        with open(filepath, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(rows)

    print(f"Saved {len(rows)} records to {filepath}")
    return filepath


def compare_price_changes(old_file: str, new_file: str) -> List[Dict]:
    """
    Compare two CSV files to detect price changes
    """
    def load_csv(filepath):
        with open(filepath, 'r', encoding='utf-8') as f:
            return {row['variant_id']: row for row in csv.DictReader(f)}

    old_data = load_csv(old_file)
    new_data = load_csv(new_file)

    changes = []
    for variant_id, new_row in new_data.items():
        if variant_id in old_data:
            old_price = float(old_data[variant_id]['price'])
            new_price = float(new_row['price'])

            if old_price != new_price:
                changes.append({
                    'product': new_row['product_title'],
                    'variant': new_row['variant_title'],
                    'old_price': old_price,
                    'new_price': new_price,
                    'change': new_price - old_price,
                    'change_pct': ((new_price - old_price) / old_price) * 100
                })

    return changes


# Example usage
if __name__ == "__main__":
    from shopify_scraper_advanced import ShopifyPriceMonitor

    monitor = ShopifyPriceMonitor()
    products = monitor.fetch_all_products("https://www.aloyoga.com")

    filepath = save_products_to_csv(products, "aloyoga")
    print(f"Data saved to: {filepath}")

    # Example output:
    # Saved 3456 records to ./data/aloyoga_prices_20250117_143022.csv

7. Saving Data to PostgreSQL

For production systems with historical tracking and analytics, PostgreSQL is the way to go. Here's a complete setup:

database_schema.sqlsql
-- Create database schema for Shopify price monitoring

-- Stores table
CREATE TABLE stores (
    id SERIAL PRIMARY KEY,
    domain VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(255),
    created_at TIMESTAMP DEFAULT NOW(),
    last_scraped_at TIMESTAMP
);

-- Products table
CREATE TABLE products (
    id BIGINT PRIMARY KEY,  -- Shopify product ID
    store_id INTEGER REFERENCES stores(id),
    title VARCHAR(500) NOT NULL,
    handle VARCHAR(255),
    vendor VARCHAR(255),
    product_type VARCHAR(255),
    tags TEXT[],
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- Variants table
CREATE TABLE variants (
    id BIGINT PRIMARY KEY,  -- Shopify variant ID
    product_id BIGINT REFERENCES products(id),
    title VARCHAR(255),
    sku VARCHAR(255),
    created_at TIMESTAMP DEFAULT NOW()
);

-- Price history table (the main tracking table)
CREATE TABLE price_history (
    id SERIAL PRIMARY KEY,
    variant_id BIGINT REFERENCES variants(id),
    price DECIMAL(10, 2) NOT NULL,
    compare_at_price DECIMAL(10, 2),
    available BOOLEAN DEFAULT TRUE,
    inventory_quantity INTEGER,
    scraped_at TIMESTAMP DEFAULT NOW()
);

-- Create indexes for fast queries
CREATE INDEX idx_price_history_variant_id ON price_history(variant_id);
CREATE INDEX idx_price_history_scraped_at ON price_history(scraped_at);
CREATE INDEX idx_products_store_id ON products(store_id);

-- View for latest prices
CREATE VIEW latest_prices AS
SELECT DISTINCT ON (ph.variant_id)
    s.domain AS store,
    p.title AS product,
    v.title AS variant,
    v.sku,
    ph.price,
    ph.compare_at_price,
    ph.available,
    ph.scraped_at
FROM price_history ph
JOIN variants v ON v.id = ph.variant_id
JOIN products p ON p.id = v.product_id
JOIN stores s ON s.id = p.store_id
ORDER BY ph.variant_id, ph.scraped_at DESC;

-- View for price changes
CREATE VIEW price_changes AS
WITH ranked_prices AS (
    SELECT
        variant_id,
        price,
        scraped_at,
        LAG(price) OVER (PARTITION BY variant_id ORDER BY scraped_at) AS prev_price
    FROM price_history
)
SELECT
    v.id AS variant_id,
    p.title AS product,
    v.title AS variant,
    rp.prev_price AS old_price,
    rp.price AS new_price,
    (rp.price - rp.prev_price) AS price_change,
    ROUND(((rp.price - rp.prev_price) / rp.prev_price * 100)::numeric, 2) AS change_pct,
    rp.scraped_at
FROM ranked_prices rp
JOIN variants v ON v.id = rp.variant_id
JOIN products p ON p.id = v.product_id
WHERE rp.prev_price IS NOT NULL
AND rp.price != rp.prev_price
ORDER BY rp.scraped_at DESC;
save_to_postgresql.pypython
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
from typing import List, Dict, Any
from contextlib import contextmanager

class PriceDatabase:
    """PostgreSQL database handler for price monitoring"""

    def __init__(self, connection_string: str):
        """
        Initialize with PostgreSQL connection string.

        Example: 'postgresql://user:password@localhost:5432/price_monitor'
        """
        self.connection_string = connection_string

    @contextmanager
    def get_connection(self):
        """Context manager for database connections"""
        conn = psycopg2.connect(self.connection_string)
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()

    def ensure_store(self, domain: str, name: str = None) -> int:
        """Get or create store record"""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    INSERT INTO stores (domain, name, last_scraped_at)
                    VALUES (%s, %s, NOW())
                    ON CONFLICT (domain) DO UPDATE
                    SET last_scraped_at = NOW()
                    RETURNING id
                """, (domain, name or domain))
                return cur.fetchone()[0]

    def save_products(self, store_id: int, products: List[Dict[str, Any]]):
        """Save products and variants to database"""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                for product in products:
                    # Upsert product
                    cur.execute("""
                        INSERT INTO products (id, store_id, title, handle, vendor, product_type, tags)
                        VALUES (%s, %s, %s, %s, %s, %s, %s)
                        ON CONFLICT (id) DO UPDATE SET
                            title = EXCLUDED.title,
                            handle = EXCLUDED.handle,
                            vendor = EXCLUDED.vendor,
                            product_type = EXCLUDED.product_type,
                            tags = EXCLUDED.tags,
                            updated_at = NOW()
                    """, (
                        product['id'],
                        store_id,
                        product['title'],
                        product.get('handle'),
                        product.get('vendor'),
                        product.get('product_type'),
                        product.get('tags', [])
                    ))

                    # Upsert variants and record prices
                    for variant in product.get('variants', []):
                        cur.execute("""
                            INSERT INTO variants (id, product_id, title, sku)
                            VALUES (%s, %s, %s, %s)
                            ON CONFLICT (id) DO UPDATE SET
                                title = EXCLUDED.title,
                                sku = EXCLUDED.sku
                        """, (
                            variant['id'],
                            product['id'],
                            variant.get('title'),
                            variant.get('sku')
                        ))

                        # Insert price record
                        cur.execute("""
                            INSERT INTO price_history
                            (variant_id, price, compare_at_price, available, inventory_quantity)
                            VALUES (%s, %s, %s, %s, %s)
                        """, (
                            variant['id'],
                            float(variant.get('price', 0)),
                            float(variant['compare_at_price']) if variant.get('compare_at_price') else None,
                            variant.get('available', True),
                            variant.get('inventory_quantity')
                        ))

    def get_price_changes(self, hours: int = 24) -> List[Dict]:
        """Get recent price changes"""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT * FROM price_changes
                    WHERE scraped_at > NOW() - INTERVAL '%s hours'
                    ORDER BY ABS(change_pct) DESC
                """, (hours,))

                columns = [desc[0] for desc in cur.description]
                return [dict(zip(columns, row)) for row in cur.fetchall()]


# Example usage
if __name__ == "__main__":
    from shopify_scraper_advanced import ShopifyPriceMonitor

    # Initialize database
    db = PriceDatabase("postgresql://user:password@localhost:5432/price_monitor")

    # Scrape store
    monitor = ShopifyPriceMonitor()
    products = monitor.fetch_all_products("https://www.aloyoga.com")

    # Save to database
    store_id = db.ensure_store("aloyoga.com", "Alo Yoga")
    db.save_products(store_id, products)

    print(f"Saved {len(products)} products to database")

    # Check for price changes
    changes = db.get_price_changes(hours=24)
    print(f"\nPrice changes in last 24 hours: {len(changes)}")

    for change in changes[:10]:
        print(f"{change['product']}: ${change['old_price']} -> ${change['new_price']} ({change['change_pct']}%)")

8. Deploying with Coolify

Coolify is an excellent self-hosted alternative to Heroku. Here's how to deploy your price monitor as a scheduled job:

Dockerfiledockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Set environment variables
ENV PYTHONUNBUFFERED=1

# Default command (can be overridden)
CMD ["python", "main.py"]
requirements.txttext
requests==2.31.0
psycopg2-binary==2.9.9
python-dotenv==1.0.0
schedule==1.2.1
main.py - Coolify scheduled jobpython
#!/usr/bin/env python3
"""
Shopify Price Monitor - Coolify Deployment
Run as a scheduled task to monitor competitor prices
"""

import os
import schedule
import time
from datetime import datetime
from dotenv import load_dotenv

from shopify_scraper_advanced import ShopifyPriceMonitor
from save_to_postgresql import PriceDatabase

load_dotenv()

# Configuration from environment variables
DATABASE_URL = os.getenv('DATABASE_URL')
STORES_TO_MONITOR = os.getenv('STORES_TO_MONITOR', '').split(',')
SCRAPE_INTERVAL_HOURS = int(os.getenv('SCRAPE_INTERVAL_HOURS', '6'))


def run_price_check():
    """Main price monitoring function"""
    print(f"\n{'='*50}")
    print(f"Starting price check at {datetime.now().isoformat()}")
    print(f"{'='*50}")

    db = PriceDatabase(DATABASE_URL)
    monitor = ShopifyPriceMonitor(rate_limit_delay=1.0)

    for store_url in STORES_TO_MONITOR:
        store_url = store_url.strip()
        if not store_url:
            continue

        try:
            print(f"\nScraping: {store_url}")

            # Extract domain for store name
            domain = store_url.replace('https://', '').replace('http://', '').rstrip('/')

            # Fetch products
            products = monitor.fetch_all_products(store_url)
            print(f"  Fetched {len(products)} products")

            # Save to database
            store_id = db.ensure_store(domain)
            db.save_products(store_id, products)
            print(f"  Saved to database")

        except Exception as e:
            print(f"  Error scraping {store_url}: {e}")

    # Report price changes
    changes = db.get_price_changes(hours=SCRAPE_INTERVAL_HOURS)
    if changes:
        print(f"\nPrice changes detected: {len(changes)}")
        for change in changes[:20]:
            direction = "โ†‘" if change['price_change'] > 0 else "โ†“"
            print(f"  {direction} {change['product']}: ${change['old_price']} -> ${change['new_price']}")

    print(f"\nPrice check completed at {datetime.now().isoformat()}")


def main():
    """Entry point for Coolify deployment"""
    print("Shopify Price Monitor Starting...")
    print(f"Monitoring {len(STORES_TO_MONITOR)} stores")
    print(f"Check interval: {SCRAPE_INTERVAL_HOURS} hours")

    # Run immediately on startup
    run_price_check()

    # Schedule periodic runs
    schedule.every(SCRAPE_INTERVAL_HOURS).hours.do(run_price_check)

    # Keep running
    while True:
        schedule.run_pending()
        time.sleep(60)


if __name__ == "__main__":
    main()
.env.examplebash
# Database connection
DATABASE_URL=postgresql://user:password@db:5432/price_monitor

# Stores to monitor (comma-separated)
STORES_TO_MONITOR=https://www.aloyoga.com,https://www.gymshark.com,https://www.fabletics.com

# How often to check prices (in hours)
SCRAPE_INTERVAL_HOURS=6

# Optional: Proxy configuration
PROXY_LIST=http://user:pass@proxy1:8080,http://user:pass@proxy2:8080
In Coolify, create a new service, connect your GitHub repo, and set the environment variables. Coolify will automatically build and deploy your container. Set it as a "Worker" service type since it runs continuously.

9. Automating with N8N

N8N is a powerful workflow automation tool. Here's how to set up a price monitoring workflow:

N8N Workflow Steps:

  1. Schedule Trigger: Run every 6 hours
  2. HTTP Request: Fetch /products.json from target stores
  3. Function: Extract and transform price data
  4. PostgreSQL: Save to price_history table
  5. PostgreSQL: Query for price changes
  6. IF: Check if changes exist
  7. Slack/Email: Send alert notification
N8N Function Node - Extract Pricesjavascript
// N8N Function node to extract price data from products.json response

const products = $input.item.json.products;
const storeDomain = $input.item.json.storeDomain;
const scrapedAt = new Date().toISOString();

const priceRecords = [];

for (const product of products) {
  for (const variant of product.variants || []) {
    priceRecords.push({
      store_domain: storeDomain,
      product_id: product.id,
      product_title: product.title,
      product_type: product.product_type || '',
      variant_id: variant.id,
      variant_title: variant.title || 'Default',
      sku: variant.sku || '',
      price: parseFloat(variant.price),
      compare_at_price: variant.compare_at_price ? parseFloat(variant.compare_at_price) : null,
      available: variant.available !== false,
      scraped_at: scrapedAt
    });
  }
}

return priceRecords.map(record => ({ json: record }));
N8N Slack Alert Messagejavascript
// N8N Function node to format Slack alert

const changes = $input.all().map(item => item.json);

if (changes.length === 0) {
  return [{ json: { skip: true } }];
}

let message = `๐Ÿ”” *Price Changes Detected*\n\n`;

for (const change of changes.slice(0, 10)) {
  const direction = change.price_change > 0 ? '๐Ÿ“ˆ' : '๐Ÿ“‰';
  const pct = Math.abs(change.change_pct).toFixed(1);

  message += `${direction} *${change.product}*\n`;
  message += `   ${change.variant}: $${change.old_price} โ†’ $${change.new_price} (${change.price_change > 0 ? '+' : ''}${pct}%)\n\n`;
}

if (changes.length > 10) {
  message += `_...and ${changes.length - 10} more changes_`;
}

return [{
  json: {
    text: message,
    channel: '#price-alerts'
  }
}];

10. Conclusion

You now have all the tools needed to build a complete Shopify price monitoring system:

  • Fetch product data using /products.json with pagination
  • Rotate proxies for reliable large-scale scraping
  • Store data in CSV for simple analysis or PostgreSQL for production
  • Deploy with Coolify for self-hosted automation
  • Create workflows with N8N for visual automation
Building and maintaining this infrastructure takes significant time and resources. For a production-ready solution that monitors 500M+ products across millions of Shopify stores, check out DBShopi's Shopify Price Monitoring tool. We handle all the infrastructure, proxy rotation, and data processing for you.
D

DBShopi Team

E-commerce Intelligence

Ready to Start Monitoring Prices?

Skip the technical setup and get instant access to price monitoring for millions of Shopify stores.

Try DBShopi Price Monitoring