Retour au Blog
Surveillance Prix17 janvier 202515 min de lecture

Comment Surveiller les Prix des Concurrents Shopify: Guide Complet avec /products.json

Construisez un systeme complet de surveillance des prix Shopify

Surveiller les prix des concurrents est crucial pour toute entreprise e-commerce. Que vous fassiez du dropshipping, geriez une marque DTC ou un marketplace, savoir quand les concurrents changent leurs prix peut faire la difference entre gagner et perdre des ventes. Dans ce guide complet, nous vous montrons exactement comment construire votre propre systeme de surveillance des prix Shopify.

Vous voulez eviter la configuration technique? L\'outil de Surveillance des Prix de DBShopi gere tout cela automatiquement pour plus de 500M de produits.

1. Qu'est-ce que la Surveillance Prix Shopify?

La surveillance des prix Shopify est le processus automatise de suivi des prix des produits sur les boutiques Shopify. Cela implique:

  • Recuperer les donnees produits des boutiques Shopify concurrentes
  • Stocker les donnees de prix historiques pour l'analyse de tendances
  • Detecter les changements de prix automatiquement
  • Alerter quand des changements significatifs surviennent

2. Comprendre l'Endpoint /products.json

Chaque boutique Shopify expose un endpoint API public a /products.json qui retourne les donnees produits en format JSON. C'est la methode integree de Shopify pour permettre les flux produits et integrations.

Structure URL basique products.jsonbash
# Basic endpoint
https://store-name.myshopify.com/products.json

# Custom domain
https://www.aloyoga.com/products.json

# With pagination (250 products max per page)
https://www.aloyoga.com/products.json?limit=250&page=1

# Filter by collection
https://www.aloyoga.com/collections/womens-leggings/products.json

L'endpoint retourne un objet JSON contenant un tableau de produits, chacun avec des variantes incluant les informations de prix:

Structure de reponsejson
{
  "products": [
    {
      "id": 1234567890,
      "title": "High-Waist Airlift Legging",
      "handle": "high-waist-airlift-legging",
      "vendor": "Alo Yoga",
      "product_type": "Leggings",
      "created_at": "2024-01-15T10:30:00-05:00",
      "updated_at": "2025-01-17T08:15:00-05:00",
      "tags": ["womens", "leggings", "bestseller"],
      "variants": [
        {
          "id": 98765432,
          "title": "Black / XS",
          "price": "128.00",
          "compare_at_price": "148.00",
          "sku": "ALO-HW-BLK-XS",
          "inventory_quantity": 45,
          "available": true
        }
      ],
      "images": [
        {
          "src": "https://cdn.shopify.com/..."
        }
      ]
    }
  ]
}

3. Exemple Reel: Recuperer les Donnees d'Alo Yoga

Construisons un exemple pratique avec Alo Yoga (aloyoga.com), une marque populaire de vetements de sport sur Shopify. Voici comment recuperer leurs produits:

fetch_shopify_products.pypython
import requests
import json
from typing import List, Dict, Any

def fetch_shopify_products(store_url: str, limit: int = 250) -> List[Dict[str, Any]]:
    """
    Fetch all products from a Shopify store using /products.json

    Args:
        store_url: Base URL of the store (e.g., 'https://www.aloyoga.com')
        limit: Products per page (max 250)

    Returns:
        List of all products
    """
    all_products = []
    page = 1

    while True:
        url = f"{store_url}/products.json?limit={limit}&page={page}"

        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
            'Accept': 'application/json',
        }

        try:
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()

            data = response.json()
            products = data.get('products', [])

            if not products:
                break  # No more products

            all_products.extend(products)
            print(f"Page {page}: fetched {len(products)} products")

            if len(products) < limit:
                break  # Last page

            page += 1

        except requests.exceptions.RequestException as e:
            print(f"Error fetching page {page}: {e}")
            break

    return all_products


# Example usage
if __name__ == "__main__":
    store_url = "https://www.aloyoga.com"
    products = fetch_shopify_products(store_url)

    print(f"\nTotal products fetched: {len(products)}")

    # Display first product details
    if products:
        first = products[0]
        print(f"\nFirst product: {first['title']}")
        print(f"Vendor: {first['vendor']}")

        if first.get('variants'):
            variant = first['variants'][0]
            print(f"Price: ${variant['price']}")
            if variant.get('compare_at_price'):
                print(f"Compare at: ${variant['compare_at_price']}")
En executant ceci sur aloyoga.com, vous obtiendrez generalement plus de 1000 produits sur plusieurs pages. La boutique propose des leggings, brassieres de sport, sweats, accessoires, et plus.

4. Gerer la Pagination Efficacement

Shopify limite /products.json a 250 produits par page. Pour les boutiques avec des milliers de produits, vous devez gerer la pagination correctement. Voici une version amelioree avec meilleure gestion des erreurs et limitation de taux:

shopify_scraper_advanced.pypython
import requests
import time
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ProductPrice:
    """Structured price data for a product variant"""
    product_id: int
    product_title: str
    variant_id: int
    variant_title: str
    sku: str
    price: float
    compare_at_price: Optional[float]
    available: bool
    scraped_at: datetime


class ShopifyPriceMonitor:
    """Monitor prices from Shopify stores"""

    def __init__(self, rate_limit_delay: float = 1.0):
        self.rate_limit_delay = rate_limit_delay
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
            'Accept': 'application/json',
        })

    def fetch_all_products(self, store_url: str) -> List[Dict[str, Any]]:
        """Fetch all products with pagination handling"""
        all_products = []
        page = 1
        max_retries = 3

        while True:
            url = f"{store_url}/products.json?limit=250&page={page}"

            for attempt in range(max_retries):
                try:
                    response = self.session.get(url, timeout=30)

                    if response.status_code == 429:
                        # Rate limited - wait and retry
                        wait_time = int(response.headers.get('Retry-After', 60))
                        print(f"Rate limited. Waiting {wait_time}s...")
                        time.sleep(wait_time)
                        continue

                    response.raise_for_status()
                    data = response.json()
                    products = data.get('products', [])

                    if not products:
                        return all_products

                    all_products.extend(products)
                    print(f"Page {page}: {len(products)} products (total: {len(all_products)})")

                    if len(products) < 250:
                        return all_products

                    page += 1
                    time.sleep(self.rate_limit_delay)
                    break

                except requests.exceptions.RequestException as e:
                    print(f"Attempt {attempt + 1} failed: {e}")
                    if attempt == max_retries - 1:
                        return all_products
                    time.sleep(2 ** attempt)

        return all_products

    def extract_prices(self, products: List[Dict]) -> List[ProductPrice]:
        """Extract structured price data from products"""
        prices = []
        scraped_at = datetime.now()

        for product in products:
            for variant in product.get('variants', []):
                price = ProductPrice(
                    product_id=product['id'],
                    product_title=product['title'],
                    variant_id=variant['id'],
                    variant_title=variant.get('title', 'Default'),
                    sku=variant.get('sku', ''),
                    price=float(variant['price']),
                    compare_at_price=(
                        float(variant['compare_at_price'])
                        if variant.get('compare_at_price')
                        else None
                    ),
                    available=variant.get('available', True),
                    scraped_at=scraped_at
                )
                prices.append(price)

        return prices


# Example: Monitor Alo Yoga prices
if __name__ == "__main__":
    monitor = ShopifyPriceMonitor(rate_limit_delay=0.5)

    products = monitor.fetch_all_products("https://www.aloyoga.com")
    prices = monitor.extract_prices(products)

    print(f"\nExtracted {len(prices)} price records")

    # Show sample price data
    for price in prices[:5]:
        discount = ""
        if price.compare_at_price:
            savings = price.compare_at_price - price.price
            discount = f" (Save ${savings:.2f})"
        print(f"{price.product_title} [{price.variant_title}]: ${price.price}{discount}")

5. Configurer la Rotation de Proxy

Lors de la surveillance de plusieurs boutiques ou de la recuperation frequente de donnees, vous aurez besoin de rotation de proxy pour eviter les bannissements IP. Voici comment integrer les proxies:

proxy_rotation.pypython
import requests
import random
from itertools import cycle
from typing import List, Optional

class ProxyRotator:
    """Rotate through a list of proxies for web scraping"""

    def __init__(self, proxies: List[str]):
        """
        Initialize with a list of proxy URLs.

        Format: 'http://user:pass@host:port' or 'http://host:port'

        Popular proxy providers:
        - Bright Data (formerly Luminati)
        - Oxylabs
        - Smartproxy
        - ScraperAPI
        """
        self.proxies = proxies
        self.proxy_cycle = cycle(proxies)
        self.current_proxy = None

    def get_next(self) -> dict:
        """Get next proxy in rotation"""
        proxy = next(self.proxy_cycle)
        self.current_proxy = proxy
        return {
            'http': proxy,
            'https': proxy
        }

    def get_random(self) -> dict:
        """Get a random proxy"""
        proxy = random.choice(self.proxies)
        self.current_proxy = proxy
        return {
            'http': proxy,
            'https': proxy
        }


class ShopifyScraperWithProxy:
    """Shopify scraper with proxy rotation support"""

    def __init__(self, proxies: Optional[List[str]] = None):
        self.proxy_rotator = ProxyRotator(proxies) if proxies else None
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'application/json',
            'Accept-Language': 'en-US,en;q=0.9',
        })

    def fetch_products(self, store_url: str, page: int = 1) -> dict:
        """Fetch products with optional proxy rotation"""
        url = f"{store_url}/products.json?limit=250&page={page}"

        proxies = None
        if self.proxy_rotator:
            proxies = self.proxy_rotator.get_next()

        try:
            response = self.session.get(
                url,
                proxies=proxies,
                timeout=30,
                verify=True
            )
            response.raise_for_status()
            return response.json()

        except requests.exceptions.ProxyError as e:
            print(f"Proxy error with {self.proxy_rotator.current_proxy}: {e}")
            # Try next proxy
            if self.proxy_rotator:
                return self.fetch_products(store_url, page)
            raise


# Example usage with residential proxies
if __name__ == "__main__":
    # Example proxy list (replace with real proxies)
    proxy_list = [
        "http://user:pass@proxy1.example.com:8080",
        "http://user:pass@proxy2.example.com:8080",
        "http://user:pass@proxy3.example.com:8080",
    ]

    scraper = ShopifyScraperWithProxy(proxies=proxy_list)

    # Or without proxies for testing
    # scraper = ShopifyScraperWithProxy()

    data = scraper.fetch_products("https://www.aloyoga.com")
    print(f"Fetched {len(data.get('products', []))} products")
Respectez toujours robots.txt et les limites de taux. Le scraping agressif peut faire bannir votre IP et peut violer les conditions d'utilisation. Pour un usage en production, envisagez d'utiliser un service comme DBShopi qui gere cela de maniere responsable.

6. Sauvegarder les Donnees en CSV

Pour des analyses simples et des exports rapides, le CSV est parfait. Voici comment sauvegarder vos donnees de prix:

save_to_csv.pypython
import csv
from datetime import datetime
from typing import List, Dict, Any
import os

def save_products_to_csv(
    products: List[Dict[str, Any]],
    store_name: str,
    output_dir: str = "./data"
) -> str:
    """
    Save product data to CSV with timestamp

    Returns:
        Path to the saved file
    """
    # Create output directory if needed
    os.makedirs(output_dir, exist_ok=True)

    # Generate filename with timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"{store_name}_prices_{timestamp}.csv"
    filepath = os.path.join(output_dir, filename)

    # Flatten products with variants
    rows = []
    for product in products:
        for variant in product.get('variants', []):
            rows.append({
                'scraped_at': datetime.now().isoformat(),
                'store': store_name,
                'product_id': product['id'],
                'product_title': product['title'],
                'product_type': product.get('product_type', ''),
                'vendor': product.get('vendor', ''),
                'handle': product.get('handle', ''),
                'variant_id': variant['id'],
                'variant_title': variant.get('title', 'Default'),
                'sku': variant.get('sku', ''),
                'price': variant.get('price', ''),
                'compare_at_price': variant.get('compare_at_price', ''),
                'available': variant.get('available', True),
                'inventory_quantity': variant.get('inventory_quantity', ''),
                'tags': ', '.join(product.get('tags', [])),
            })

    # Write to CSV
    if rows:
        fieldnames = rows[0].keys()
        with open(filepath, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(rows)

    print(f"Saved {len(rows)} records to {filepath}")
    return filepath


def compare_price_changes(old_file: str, new_file: str) -> List[Dict]:
    """
    Compare two CSV files to detect price changes
    """
    def load_csv(filepath):
        with open(filepath, 'r', encoding='utf-8') as f:
            return {row['variant_id']: row for row in csv.DictReader(f)}

    old_data = load_csv(old_file)
    new_data = load_csv(new_file)

    changes = []
    for variant_id, new_row in new_data.items():
        if variant_id in old_data:
            old_price = float(old_data[variant_id]['price'])
            new_price = float(new_row['price'])

            if old_price != new_price:
                changes.append({
                    'product': new_row['product_title'],
                    'variant': new_row['variant_title'],
                    'old_price': old_price,
                    'new_price': new_price,
                    'change': new_price - old_price,
                    'change_pct': ((new_price - old_price) / old_price) * 100
                })

    return changes


# Example usage
if __name__ == "__main__":
    from shopify_scraper_advanced import ShopifyPriceMonitor

    monitor = ShopifyPriceMonitor()
    products = monitor.fetch_all_products("https://www.aloyoga.com")

    filepath = save_products_to_csv(products, "aloyoga")
    print(f"Data saved to: {filepath}")

    # Example output:
    # Saved 3456 records to ./data/aloyoga_prices_20250117_143022.csv

7. Sauvegarder les Donnees dans PostgreSQL

Pour les systemes de production avec suivi historique et analytics, PostgreSQL est la solution. Voici une configuration complete:

database_schema.sqlsql
-- Create database schema for Shopify price monitoring

-- Stores table
CREATE TABLE stores (
    id SERIAL PRIMARY KEY,
    domain VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(255),
    created_at TIMESTAMP DEFAULT NOW(),
    last_scraped_at TIMESTAMP
);

-- Products table
CREATE TABLE products (
    id BIGINT PRIMARY KEY,  -- Shopify product ID
    store_id INTEGER REFERENCES stores(id),
    title VARCHAR(500) NOT NULL,
    handle VARCHAR(255),
    vendor VARCHAR(255),
    product_type VARCHAR(255),
    tags TEXT[],
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- Variants table
CREATE TABLE variants (
    id BIGINT PRIMARY KEY,  -- Shopify variant ID
    product_id BIGINT REFERENCES products(id),
    title VARCHAR(255),
    sku VARCHAR(255),
    created_at TIMESTAMP DEFAULT NOW()
);

-- Price history table (the main tracking table)
CREATE TABLE price_history (
    id SERIAL PRIMARY KEY,
    variant_id BIGINT REFERENCES variants(id),
    price DECIMAL(10, 2) NOT NULL,
    compare_at_price DECIMAL(10, 2),
    available BOOLEAN DEFAULT TRUE,
    inventory_quantity INTEGER,
    scraped_at TIMESTAMP DEFAULT NOW()
);

-- Create indexes for fast queries
CREATE INDEX idx_price_history_variant_id ON price_history(variant_id);
CREATE INDEX idx_price_history_scraped_at ON price_history(scraped_at);
CREATE INDEX idx_products_store_id ON products(store_id);

-- View for latest prices
CREATE VIEW latest_prices AS
SELECT DISTINCT ON (ph.variant_id)
    s.domain AS store,
    p.title AS product,
    v.title AS variant,
    v.sku,
    ph.price,
    ph.compare_at_price,
    ph.available,
    ph.scraped_at
FROM price_history ph
JOIN variants v ON v.id = ph.variant_id
JOIN products p ON p.id = v.product_id
JOIN stores s ON s.id = p.store_id
ORDER BY ph.variant_id, ph.scraped_at DESC;

-- View for price changes
CREATE VIEW price_changes AS
WITH ranked_prices AS (
    SELECT
        variant_id,
        price,
        scraped_at,
        LAG(price) OVER (PARTITION BY variant_id ORDER BY scraped_at) AS prev_price
    FROM price_history
)
SELECT
    v.id AS variant_id,
    p.title AS product,
    v.title AS variant,
    rp.prev_price AS old_price,
    rp.price AS new_price,
    (rp.price - rp.prev_price) AS price_change,
    ROUND(((rp.price - rp.prev_price) / rp.prev_price * 100)::numeric, 2) AS change_pct,
    rp.scraped_at
FROM ranked_prices rp
JOIN variants v ON v.id = rp.variant_id
JOIN products p ON p.id = v.product_id
WHERE rp.prev_price IS NOT NULL
AND rp.price != rp.prev_price
ORDER BY rp.scraped_at DESC;
save_to_postgresql.pypython
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
from typing import List, Dict, Any
from contextlib import contextmanager

class PriceDatabase:
    """PostgreSQL database handler for price monitoring"""

    def __init__(self, connection_string: str):
        """
        Initialize with PostgreSQL connection string.

        Example: 'postgresql://user:password@localhost:5432/price_monitor'
        """
        self.connection_string = connection_string

    @contextmanager
    def get_connection(self):
        """Context manager for database connections"""
        conn = psycopg2.connect(self.connection_string)
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()

    def ensure_store(self, domain: str, name: str = None) -> int:
        """Get or create store record"""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    INSERT INTO stores (domain, name, last_scraped_at)
                    VALUES (%s, %s, NOW())
                    ON CONFLICT (domain) DO UPDATE
                    SET last_scraped_at = NOW()
                    RETURNING id
                """, (domain, name or domain))
                return cur.fetchone()[0]

    def save_products(self, store_id: int, products: List[Dict[str, Any]]):
        """Save products and variants to database"""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                for product in products:
                    # Upsert product
                    cur.execute("""
                        INSERT INTO products (id, store_id, title, handle, vendor, product_type, tags)
                        VALUES (%s, %s, %s, %s, %s, %s, %s)
                        ON CONFLICT (id) DO UPDATE SET
                            title = EXCLUDED.title,
                            handle = EXCLUDED.handle,
                            vendor = EXCLUDED.vendor,
                            product_type = EXCLUDED.product_type,
                            tags = EXCLUDED.tags,
                            updated_at = NOW()
                    """, (
                        product['id'],
                        store_id,
                        product['title'],
                        product.get('handle'),
                        product.get('vendor'),
                        product.get('product_type'),
                        product.get('tags', [])
                    ))

                    # Upsert variants and record prices
                    for variant in product.get('variants', []):
                        cur.execute("""
                            INSERT INTO variants (id, product_id, title, sku)
                            VALUES (%s, %s, %s, %s)
                            ON CONFLICT (id) DO UPDATE SET
                                title = EXCLUDED.title,
                                sku = EXCLUDED.sku
                        """, (
                            variant['id'],
                            product['id'],
                            variant.get('title'),
                            variant.get('sku')
                        ))

                        # Insert price record
                        cur.execute("""
                            INSERT INTO price_history
                            (variant_id, price, compare_at_price, available, inventory_quantity)
                            VALUES (%s, %s, %s, %s, %s)
                        """, (
                            variant['id'],
                            float(variant.get('price', 0)),
                            float(variant['compare_at_price']) if variant.get('compare_at_price') else None,
                            variant.get('available', True),
                            variant.get('inventory_quantity')
                        ))

    def get_price_changes(self, hours: int = 24) -> List[Dict]:
        """Get recent price changes"""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT * FROM price_changes
                    WHERE scraped_at > NOW() - INTERVAL '%s hours'
                    ORDER BY ABS(change_pct) DESC
                """, (hours,))

                columns = [desc[0] for desc in cur.description]
                return [dict(zip(columns, row)) for row in cur.fetchall()]


# Example usage
if __name__ == "__main__":
    from shopify_scraper_advanced import ShopifyPriceMonitor

    # Initialize database
    db = PriceDatabase("postgresql://user:password@localhost:5432/price_monitor")

    # Scrape store
    monitor = ShopifyPriceMonitor()
    products = monitor.fetch_all_products("https://www.aloyoga.com")

    # Save to database
    store_id = db.ensure_store("aloyoga.com", "Alo Yoga")
    db.save_products(store_id, products)

    print(f"Saved {len(products)} products to database")

    # Check for price changes
    changes = db.get_price_changes(hours=24)
    print(f"\nPrice changes in last 24 hours: {len(changes)}")

    for change in changes[:10]:
        print(f"{change['product']}: ${change['old_price']} -> ${change['new_price']} ({change['change_pct']}%)")

8. Deploiement avec Coolify

Coolify est une excellente alternative auto-hebergee a Heroku. Voici comment deployer votre moniteur de prix comme tache planifiee:

Dockerfiledockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Set environment variables
ENV PYTHONUNBUFFERED=1

# Default command (can be overridden)
CMD ["python", "main.py"]
requirements.txttext
requests==2.31.0
psycopg2-binary==2.9.9
python-dotenv==1.0.0
schedule==1.2.1
main.py - Coolify scheduled jobpython
#!/usr/bin/env python3
"""
Shopify Price Monitor - Coolify Deployment
Run as a scheduled task to monitor competitor prices
"""

import os
import schedule
import time
from datetime import datetime
from dotenv import load_dotenv

from shopify_scraper_advanced import ShopifyPriceMonitor
from save_to_postgresql import PriceDatabase

load_dotenv()

# Configuration from environment variables
DATABASE_URL = os.getenv('DATABASE_URL')
STORES_TO_MONITOR = os.getenv('STORES_TO_MONITOR', '').split(',')
SCRAPE_INTERVAL_HOURS = int(os.getenv('SCRAPE_INTERVAL_HOURS', '6'))


def run_price_check():
    """Main price monitoring function"""
    print(f"\n{'='*50}")
    print(f"Starting price check at {datetime.now().isoformat()}")
    print(f"{'='*50}")

    db = PriceDatabase(DATABASE_URL)
    monitor = ShopifyPriceMonitor(rate_limit_delay=1.0)

    for store_url in STORES_TO_MONITOR:
        store_url = store_url.strip()
        if not store_url:
            continue

        try:
            print(f"\nScraping: {store_url}")

            # Extract domain for store name
            domain = store_url.replace('https://', '').replace('http://', '').rstrip('/')

            # Fetch products
            products = monitor.fetch_all_products(store_url)
            print(f"  Fetched {len(products)} products")

            # Save to database
            store_id = db.ensure_store(domain)
            db.save_products(store_id, products)
            print(f"  Saved to database")

        except Exception as e:
            print(f"  Error scraping {store_url}: {e}")

    # Report price changes
    changes = db.get_price_changes(hours=SCRAPE_INTERVAL_HOURS)
    if changes:
        print(f"\nPrice changes detected: {len(changes)}")
        for change in changes[:20]:
            direction = "↑" if change['price_change'] > 0 else "↓"
            print(f"  {direction} {change['product']}: ${change['old_price']} -> ${change['new_price']}")

    print(f"\nPrice check completed at {datetime.now().isoformat()}")


def main():
    """Entry point for Coolify deployment"""
    print("Shopify Price Monitor Starting...")
    print(f"Monitoring {len(STORES_TO_MONITOR)} stores")
    print(f"Check interval: {SCRAPE_INTERVAL_HOURS} hours")

    # Run immediately on startup
    run_price_check()

    # Schedule periodic runs
    schedule.every(SCRAPE_INTERVAL_HOURS).hours.do(run_price_check)

    # Keep running
    while True:
        schedule.run_pending()
        time.sleep(60)


if __name__ == "__main__":
    main()
.env.examplebash
# Database connection
DATABASE_URL=postgresql://user:password@db:5432/price_monitor

# Stores to monitor (comma-separated)
STORES_TO_MONITOR=https://www.aloyoga.com,https://www.gymshark.com,https://www.fabletics.com

# How often to check prices (in hours)
SCRAPE_INTERVAL_HOURS=6

# Optional: Proxy configuration
PROXY_LIST=http://user:pass@proxy1:8080,http://user:pass@proxy2:8080
Dans Coolify, creez un nouveau service, connectez votre repo GitHub et definissez les variables d'environnement. Coolify va automatiquement construire et deployer votre conteneur. Definissez-le comme type de service "Worker" car il s'execute en continu.

9. Automatiser avec N8N

N8N est un puissant outil d'automatisation de workflows. Voici comment configurer un workflow de surveillance des prix:

Etapes du Workflow N8N:

  1. Schedule Trigger: Executer toutes les 6 heures
  2. HTTP Request: Recuperer /products.json des boutiques cibles
  3. Function: Extraire et transformer les donnees de prix
  4. PostgreSQL: Sauvegarder dans la table price_history
  5. PostgreSQL: Requeter les changements de prix
  6. IF: Verifier si des changements existent
  7. Slack/Email: Envoyer une notification d'alerte
N8N Function Node - Extract Pricesjavascript
// N8N Function node to extract price data from products.json response

const products = $input.item.json.products;
const storeDomain = $input.item.json.storeDomain;
const scrapedAt = new Date().toISOString();

const priceRecords = [];

for (const product of products) {
  for (const variant of product.variants || []) {
    priceRecords.push({
      store_domain: storeDomain,
      product_id: product.id,
      product_title: product.title,
      product_type: product.product_type || '',
      variant_id: variant.id,
      variant_title: variant.title || 'Default',
      sku: variant.sku || '',
      price: parseFloat(variant.price),
      compare_at_price: variant.compare_at_price ? parseFloat(variant.compare_at_price) : null,
      available: variant.available !== false,
      scraped_at: scrapedAt
    });
  }
}

return priceRecords.map(record => ({ json: record }));
N8N Slack Alert Messagejavascript
// N8N Function node to format Slack alert

const changes = $input.all().map(item => item.json);

if (changes.length === 0) {
  return [{ json: { skip: true } }];
}

let message = `🔔 *Price Changes Detected*\n\n`;

for (const change of changes.slice(0, 10)) {
  const direction = change.price_change > 0 ? '📈' : '📉';
  const pct = Math.abs(change.change_pct).toFixed(1);

  message += `${direction} *${change.product}*\n`;
  message += `   ${change.variant}: $${change.old_price} → $${change.new_price} (${change.price_change > 0 ? '+' : ''}${pct}%)\n\n`;
}

if (changes.length > 10) {
  message += `_...and ${changes.length - 10} more changes_`;
}

return [{
  json: {
    text: message,
    channel: '#price-alerts'
  }
}];

10. Conclusion

Vous avez maintenant tous les outils necessaires pour construire un systeme complet de surveillance des prix Shopify:

  • Recuperer les donnees produits avec /products.json et pagination
  • Rotation de proxies pour scraping fiable a grande echelle
  • Stocker les donnees en CSV pour analyse simple ou PostgreSQL pour production
  • Deployer avec Coolify pour automatisation auto-hebergee
  • Creer des workflows avec N8N pour automatisation visuelle
Construire et maintenir cette infrastructure demande du temps et des ressources significatives. Pour une solution prete pour la production qui surveille plus de 500M de produits sur des millions de boutiques Shopify, decouvrez l'outil de Surveillance des Prix Shopify de DBShopi. Nous gerons toute l'infrastructure, la rotation de proxies et le traitement des donnees pour vous.
D

Equipe DBShopi

Intelligence E-commerce

Pret a Commencer la Surveillance des Prix?

Evitez la configuration technique et obtenez un acces instantane a la surveillance des prix pour des millions de boutiques Shopify.

Essayer la Surveillance Prix DBShopi