Table des matieres
Surveiller les prix des concurrents est crucial pour toute entreprise e-commerce. Que vous fassiez du dropshipping, geriez une marque DTC ou un marketplace, savoir quand les concurrents changent leurs prix peut faire la difference entre gagner et perdre des ventes. Dans ce guide complet, nous vous montrons exactement comment construire votre propre systeme de surveillance des prix Shopify.
1. Qu'est-ce que la Surveillance Prix Shopify?
La surveillance des prix Shopify est le processus automatise de suivi des prix des produits sur les boutiques Shopify. Cela implique:
- Recuperer les donnees produits des boutiques Shopify concurrentes
- Stocker les donnees de prix historiques pour l'analyse de tendances
- Detecter les changements de prix automatiquement
- Alerter quand des changements significatifs surviennent
2. Comprendre l'Endpoint /products.json
Chaque boutique Shopify expose un endpoint API public a /products.json qui retourne les donnees produits en format JSON. C'est la methode integree de Shopify pour permettre les flux produits et integrations.
# Basic endpoint
https://store-name.myshopify.com/products.json
# Custom domain
https://www.aloyoga.com/products.json
# With pagination (250 products max per page)
https://www.aloyoga.com/products.json?limit=250&page=1
# Filter by collection
https://www.aloyoga.com/collections/womens-leggings/products.jsonL'endpoint retourne un objet JSON contenant un tableau de produits, chacun avec des variantes incluant les informations de prix:
{
"products": [
{
"id": 1234567890,
"title": "High-Waist Airlift Legging",
"handle": "high-waist-airlift-legging",
"vendor": "Alo Yoga",
"product_type": "Leggings",
"created_at": "2024-01-15T10:30:00-05:00",
"updated_at": "2025-01-17T08:15:00-05:00",
"tags": ["womens", "leggings", "bestseller"],
"variants": [
{
"id": 98765432,
"title": "Black / XS",
"price": "128.00",
"compare_at_price": "148.00",
"sku": "ALO-HW-BLK-XS",
"inventory_quantity": 45,
"available": true
}
],
"images": [
{
"src": "https://cdn.shopify.com/..."
}
]
}
]
}3. Exemple Reel: Recuperer les Donnees d'Alo Yoga
Construisons un exemple pratique avec Alo Yoga (aloyoga.com), une marque populaire de vetements de sport sur Shopify. Voici comment recuperer leurs produits:
import requests
import json
from typing import List, Dict, Any
def fetch_shopify_products(store_url: str, limit: int = 250) -> List[Dict[str, Any]]:
"""
Fetch all products from a Shopify store using /products.json
Args:
store_url: Base URL of the store (e.g., 'https://www.aloyoga.com')
limit: Products per page (max 250)
Returns:
List of all products
"""
all_products = []
page = 1
while True:
url = f"{store_url}/products.json?limit={limit}&page={page}"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
'Accept': 'application/json',
}
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
products = data.get('products', [])
if not products:
break # No more products
all_products.extend(products)
print(f"Page {page}: fetched {len(products)} products")
if len(products) < limit:
break # Last page
page += 1
except requests.exceptions.RequestException as e:
print(f"Error fetching page {page}: {e}")
break
return all_products
# Example usage
if __name__ == "__main__":
store_url = "https://www.aloyoga.com"
products = fetch_shopify_products(store_url)
print(f"\nTotal products fetched: {len(products)}")
# Display first product details
if products:
first = products[0]
print(f"\nFirst product: {first['title']}")
print(f"Vendor: {first['vendor']}")
if first.get('variants'):
variant = first['variants'][0]
print(f"Price: ${variant['price']}")
if variant.get('compare_at_price'):
print(f"Compare at: ${variant['compare_at_price']}")4. Gerer la Pagination Efficacement
Shopify limite /products.json a 250 produits par page. Pour les boutiques avec des milliers de produits, vous devez gerer la pagination correctement. Voici une version amelioree avec meilleure gestion des erreurs et limitation de taux:
import requests
import time
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ProductPrice:
"""Structured price data for a product variant"""
product_id: int
product_title: str
variant_id: int
variant_title: str
sku: str
price: float
compare_at_price: Optional[float]
available: bool
scraped_at: datetime
class ShopifyPriceMonitor:
"""Monitor prices from Shopify stores"""
def __init__(self, rate_limit_delay: float = 1.0):
self.rate_limit_delay = rate_limit_delay
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
'Accept': 'application/json',
})
def fetch_all_products(self, store_url: str) -> List[Dict[str, Any]]:
"""Fetch all products with pagination handling"""
all_products = []
page = 1
max_retries = 3
while True:
url = f"{store_url}/products.json?limit=250&page={page}"
for attempt in range(max_retries):
try:
response = self.session.get(url, timeout=30)
if response.status_code == 429:
# Rate limited - wait and retry
wait_time = int(response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
data = response.json()
products = data.get('products', [])
if not products:
return all_products
all_products.extend(products)
print(f"Page {page}: {len(products)} products (total: {len(all_products)})")
if len(products) < 250:
return all_products
page += 1
time.sleep(self.rate_limit_delay)
break
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return all_products
time.sleep(2 ** attempt)
return all_products
def extract_prices(self, products: List[Dict]) -> List[ProductPrice]:
"""Extract structured price data from products"""
prices = []
scraped_at = datetime.now()
for product in products:
for variant in product.get('variants', []):
price = ProductPrice(
product_id=product['id'],
product_title=product['title'],
variant_id=variant['id'],
variant_title=variant.get('title', 'Default'),
sku=variant.get('sku', ''),
price=float(variant['price']),
compare_at_price=(
float(variant['compare_at_price'])
if variant.get('compare_at_price')
else None
),
available=variant.get('available', True),
scraped_at=scraped_at
)
prices.append(price)
return prices
# Example: Monitor Alo Yoga prices
if __name__ == "__main__":
monitor = ShopifyPriceMonitor(rate_limit_delay=0.5)
products = monitor.fetch_all_products("https://www.aloyoga.com")
prices = monitor.extract_prices(products)
print(f"\nExtracted {len(prices)} price records")
# Show sample price data
for price in prices[:5]:
discount = ""
if price.compare_at_price:
savings = price.compare_at_price - price.price
discount = f" (Save ${savings:.2f})"
print(f"{price.product_title} [{price.variant_title}]: ${price.price}{discount}")5. Configurer la Rotation de Proxy
Lors de la surveillance de plusieurs boutiques ou de la recuperation frequente de donnees, vous aurez besoin de rotation de proxy pour eviter les bannissements IP. Voici comment integrer les proxies:
import requests
import random
from itertools import cycle
from typing import List, Optional
class ProxyRotator:
"""Rotate through a list of proxies for web scraping"""
def __init__(self, proxies: List[str]):
"""
Initialize with a list of proxy URLs.
Format: 'http://user:pass@host:port' or 'http://host:port'
Popular proxy providers:
- Bright Data (formerly Luminati)
- Oxylabs
- Smartproxy
- ScraperAPI
"""
self.proxies = proxies
self.proxy_cycle = cycle(proxies)
self.current_proxy = None
def get_next(self) -> dict:
"""Get next proxy in rotation"""
proxy = next(self.proxy_cycle)
self.current_proxy = proxy
return {
'http': proxy,
'https': proxy
}
def get_random(self) -> dict:
"""Get a random proxy"""
proxy = random.choice(self.proxies)
self.current_proxy = proxy
return {
'http': proxy,
'https': proxy
}
class ShopifyScraperWithProxy:
"""Shopify scraper with proxy rotation support"""
def __init__(self, proxies: Optional[List[str]] = None):
self.proxy_rotator = ProxyRotator(proxies) if proxies else None
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.9',
})
def fetch_products(self, store_url: str, page: int = 1) -> dict:
"""Fetch products with optional proxy rotation"""
url = f"{store_url}/products.json?limit=250&page={page}"
proxies = None
if self.proxy_rotator:
proxies = self.proxy_rotator.get_next()
try:
response = self.session.get(
url,
proxies=proxies,
timeout=30,
verify=True
)
response.raise_for_status()
return response.json()
except requests.exceptions.ProxyError as e:
print(f"Proxy error with {self.proxy_rotator.current_proxy}: {e}")
# Try next proxy
if self.proxy_rotator:
return self.fetch_products(store_url, page)
raise
# Example usage with residential proxies
if __name__ == "__main__":
# Example proxy list (replace with real proxies)
proxy_list = [
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
"http://user:pass@proxy3.example.com:8080",
]
scraper = ShopifyScraperWithProxy(proxies=proxy_list)
# Or without proxies for testing
# scraper = ShopifyScraperWithProxy()
data = scraper.fetch_products("https://www.aloyoga.com")
print(f"Fetched {len(data.get('products', []))} products")6. Sauvegarder les Donnees en CSV
Pour des analyses simples et des exports rapides, le CSV est parfait. Voici comment sauvegarder vos donnees de prix:
import csv
from datetime import datetime
from typing import List, Dict, Any
import os
def save_products_to_csv(
products: List[Dict[str, Any]],
store_name: str,
output_dir: str = "./data"
) -> str:
"""
Save product data to CSV with timestamp
Returns:
Path to the saved file
"""
# Create output directory if needed
os.makedirs(output_dir, exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{store_name}_prices_{timestamp}.csv"
filepath = os.path.join(output_dir, filename)
# Flatten products with variants
rows = []
for product in products:
for variant in product.get('variants', []):
rows.append({
'scraped_at': datetime.now().isoformat(),
'store': store_name,
'product_id': product['id'],
'product_title': product['title'],
'product_type': product.get('product_type', ''),
'vendor': product.get('vendor', ''),
'handle': product.get('handle', ''),
'variant_id': variant['id'],
'variant_title': variant.get('title', 'Default'),
'sku': variant.get('sku', ''),
'price': variant.get('price', ''),
'compare_at_price': variant.get('compare_at_price', ''),
'available': variant.get('available', True),
'inventory_quantity': variant.get('inventory_quantity', ''),
'tags': ', '.join(product.get('tags', [])),
})
# Write to CSV
if rows:
fieldnames = rows[0].keys()
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"Saved {len(rows)} records to {filepath}")
return filepath
def compare_price_changes(old_file: str, new_file: str) -> List[Dict]:
"""
Compare two CSV files to detect price changes
"""
def load_csv(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
return {row['variant_id']: row for row in csv.DictReader(f)}
old_data = load_csv(old_file)
new_data = load_csv(new_file)
changes = []
for variant_id, new_row in new_data.items():
if variant_id in old_data:
old_price = float(old_data[variant_id]['price'])
new_price = float(new_row['price'])
if old_price != new_price:
changes.append({
'product': new_row['product_title'],
'variant': new_row['variant_title'],
'old_price': old_price,
'new_price': new_price,
'change': new_price - old_price,
'change_pct': ((new_price - old_price) / old_price) * 100
})
return changes
# Example usage
if __name__ == "__main__":
from shopify_scraper_advanced import ShopifyPriceMonitor
monitor = ShopifyPriceMonitor()
products = monitor.fetch_all_products("https://www.aloyoga.com")
filepath = save_products_to_csv(products, "aloyoga")
print(f"Data saved to: {filepath}")
# Example output:
# Saved 3456 records to ./data/aloyoga_prices_20250117_143022.csv7. Sauvegarder les Donnees dans PostgreSQL
Pour les systemes de production avec suivi historique et analytics, PostgreSQL est la solution. Voici une configuration complete:
-- Create database schema for Shopify price monitoring
-- Stores table
CREATE TABLE stores (
id SERIAL PRIMARY KEY,
domain VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255),
created_at TIMESTAMP DEFAULT NOW(),
last_scraped_at TIMESTAMP
);
-- Products table
CREATE TABLE products (
id BIGINT PRIMARY KEY, -- Shopify product ID
store_id INTEGER REFERENCES stores(id),
title VARCHAR(500) NOT NULL,
handle VARCHAR(255),
vendor VARCHAR(255),
product_type VARCHAR(255),
tags TEXT[],
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Variants table
CREATE TABLE variants (
id BIGINT PRIMARY KEY, -- Shopify variant ID
product_id BIGINT REFERENCES products(id),
title VARCHAR(255),
sku VARCHAR(255),
created_at TIMESTAMP DEFAULT NOW()
);
-- Price history table (the main tracking table)
CREATE TABLE price_history (
id SERIAL PRIMARY KEY,
variant_id BIGINT REFERENCES variants(id),
price DECIMAL(10, 2) NOT NULL,
compare_at_price DECIMAL(10, 2),
available BOOLEAN DEFAULT TRUE,
inventory_quantity INTEGER,
scraped_at TIMESTAMP DEFAULT NOW()
);
-- Create indexes for fast queries
CREATE INDEX idx_price_history_variant_id ON price_history(variant_id);
CREATE INDEX idx_price_history_scraped_at ON price_history(scraped_at);
CREATE INDEX idx_products_store_id ON products(store_id);
-- View for latest prices
CREATE VIEW latest_prices AS
SELECT DISTINCT ON (ph.variant_id)
s.domain AS store,
p.title AS product,
v.title AS variant,
v.sku,
ph.price,
ph.compare_at_price,
ph.available,
ph.scraped_at
FROM price_history ph
JOIN variants v ON v.id = ph.variant_id
JOIN products p ON p.id = v.product_id
JOIN stores s ON s.id = p.store_id
ORDER BY ph.variant_id, ph.scraped_at DESC;
-- View for price changes
CREATE VIEW price_changes AS
WITH ranked_prices AS (
SELECT
variant_id,
price,
scraped_at,
LAG(price) OVER (PARTITION BY variant_id ORDER BY scraped_at) AS prev_price
FROM price_history
)
SELECT
v.id AS variant_id,
p.title AS product,
v.title AS variant,
rp.prev_price AS old_price,
rp.price AS new_price,
(rp.price - rp.prev_price) AS price_change,
ROUND(((rp.price - rp.prev_price) / rp.prev_price * 100)::numeric, 2) AS change_pct,
rp.scraped_at
FROM ranked_prices rp
JOIN variants v ON v.id = rp.variant_id
JOIN products p ON p.id = v.product_id
WHERE rp.prev_price IS NOT NULL
AND rp.price != rp.prev_price
ORDER BY rp.scraped_at DESC;import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
from typing import List, Dict, Any
from contextlib import contextmanager
class PriceDatabase:
"""PostgreSQL database handler for price monitoring"""
def __init__(self, connection_string: str):
"""
Initialize with PostgreSQL connection string.
Example: 'postgresql://user:password@localhost:5432/price_monitor'
"""
self.connection_string = connection_string
@contextmanager
def get_connection(self):
"""Context manager for database connections"""
conn = psycopg2.connect(self.connection_string)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def ensure_store(self, domain: str, name: str = None) -> int:
"""Get or create store record"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO stores (domain, name, last_scraped_at)
VALUES (%s, %s, NOW())
ON CONFLICT (domain) DO UPDATE
SET last_scraped_at = NOW()
RETURNING id
""", (domain, name or domain))
return cur.fetchone()[0]
def save_products(self, store_id: int, products: List[Dict[str, Any]]):
"""Save products and variants to database"""
with self.get_connection() as conn:
with conn.cursor() as cur:
for product in products:
# Upsert product
cur.execute("""
INSERT INTO products (id, store_id, title, handle, vendor, product_type, tags)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
handle = EXCLUDED.handle,
vendor = EXCLUDED.vendor,
product_type = EXCLUDED.product_type,
tags = EXCLUDED.tags,
updated_at = NOW()
""", (
product['id'],
store_id,
product['title'],
product.get('handle'),
product.get('vendor'),
product.get('product_type'),
product.get('tags', [])
))
# Upsert variants and record prices
for variant in product.get('variants', []):
cur.execute("""
INSERT INTO variants (id, product_id, title, sku)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
sku = EXCLUDED.sku
""", (
variant['id'],
product['id'],
variant.get('title'),
variant.get('sku')
))
# Insert price record
cur.execute("""
INSERT INTO price_history
(variant_id, price, compare_at_price, available, inventory_quantity)
VALUES (%s, %s, %s, %s, %s)
""", (
variant['id'],
float(variant.get('price', 0)),
float(variant['compare_at_price']) if variant.get('compare_at_price') else None,
variant.get('available', True),
variant.get('inventory_quantity')
))
def get_price_changes(self, hours: int = 24) -> List[Dict]:
"""Get recent price changes"""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT * FROM price_changes
WHERE scraped_at > NOW() - INTERVAL '%s hours'
ORDER BY ABS(change_pct) DESC
""", (hours,))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
# Example usage
if __name__ == "__main__":
from shopify_scraper_advanced import ShopifyPriceMonitor
# Initialize database
db = PriceDatabase("postgresql://user:password@localhost:5432/price_monitor")
# Scrape store
monitor = ShopifyPriceMonitor()
products = monitor.fetch_all_products("https://www.aloyoga.com")
# Save to database
store_id = db.ensure_store("aloyoga.com", "Alo Yoga")
db.save_products(store_id, products)
print(f"Saved {len(products)} products to database")
# Check for price changes
changes = db.get_price_changes(hours=24)
print(f"\nPrice changes in last 24 hours: {len(changes)}")
for change in changes[:10]:
print(f"{change['product']}: ${change['old_price']} -> ${change['new_price']} ({change['change_pct']}%)")8. Deploiement avec Coolify
Coolify est une excellente alternative auto-hebergee a Heroku. Voici comment deployer votre moniteur de prix comme tache planifiee:
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV PYTHONUNBUFFERED=1
# Default command (can be overridden)
CMD ["python", "main.py"]requests==2.31.0
psycopg2-binary==2.9.9
python-dotenv==1.0.0
schedule==1.2.1#!/usr/bin/env python3
"""
Shopify Price Monitor - Coolify Deployment
Run as a scheduled task to monitor competitor prices
"""
import os
import schedule
import time
from datetime import datetime
from dotenv import load_dotenv
from shopify_scraper_advanced import ShopifyPriceMonitor
from save_to_postgresql import PriceDatabase
load_dotenv()
# Configuration from environment variables
DATABASE_URL = os.getenv('DATABASE_URL')
STORES_TO_MONITOR = os.getenv('STORES_TO_MONITOR', '').split(',')
SCRAPE_INTERVAL_HOURS = int(os.getenv('SCRAPE_INTERVAL_HOURS', '6'))
def run_price_check():
"""Main price monitoring function"""
print(f"\n{'='*50}")
print(f"Starting price check at {datetime.now().isoformat()}")
print(f"{'='*50}")
db = PriceDatabase(DATABASE_URL)
monitor = ShopifyPriceMonitor(rate_limit_delay=1.0)
for store_url in STORES_TO_MONITOR:
store_url = store_url.strip()
if not store_url:
continue
try:
print(f"\nScraping: {store_url}")
# Extract domain for store name
domain = store_url.replace('https://', '').replace('http://', '').rstrip('/')
# Fetch products
products = monitor.fetch_all_products(store_url)
print(f" Fetched {len(products)} products")
# Save to database
store_id = db.ensure_store(domain)
db.save_products(store_id, products)
print(f" Saved to database")
except Exception as e:
print(f" Error scraping {store_url}: {e}")
# Report price changes
changes = db.get_price_changes(hours=SCRAPE_INTERVAL_HOURS)
if changes:
print(f"\nPrice changes detected: {len(changes)}")
for change in changes[:20]:
direction = "↑" if change['price_change'] > 0 else "↓"
print(f" {direction} {change['product']}: ${change['old_price']} -> ${change['new_price']}")
print(f"\nPrice check completed at {datetime.now().isoformat()}")
def main():
"""Entry point for Coolify deployment"""
print("Shopify Price Monitor Starting...")
print(f"Monitoring {len(STORES_TO_MONITOR)} stores")
print(f"Check interval: {SCRAPE_INTERVAL_HOURS} hours")
# Run immediately on startup
run_price_check()
# Schedule periodic runs
schedule.every(SCRAPE_INTERVAL_HOURS).hours.do(run_price_check)
# Keep running
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == "__main__":
main()# Database connection
DATABASE_URL=postgresql://user:password@db:5432/price_monitor
# Stores to monitor (comma-separated)
STORES_TO_MONITOR=https://www.aloyoga.com,https://www.gymshark.com,https://www.fabletics.com
# How often to check prices (in hours)
SCRAPE_INTERVAL_HOURS=6
# Optional: Proxy configuration
PROXY_LIST=http://user:pass@proxy1:8080,http://user:pass@proxy2:80809. Automatiser avec N8N
N8N est un puissant outil d'automatisation de workflows. Voici comment configurer un workflow de surveillance des prix:
Etapes du Workflow N8N:
- Schedule Trigger: Executer toutes les 6 heures
- HTTP Request: Recuperer /products.json des boutiques cibles
- Function: Extraire et transformer les donnees de prix
- PostgreSQL: Sauvegarder dans la table price_history
- PostgreSQL: Requeter les changements de prix
- IF: Verifier si des changements existent
- Slack/Email: Envoyer une notification d'alerte
// N8N Function node to extract price data from products.json response
const products = $input.item.json.products;
const storeDomain = $input.item.json.storeDomain;
const scrapedAt = new Date().toISOString();
const priceRecords = [];
for (const product of products) {
for (const variant of product.variants || []) {
priceRecords.push({
store_domain: storeDomain,
product_id: product.id,
product_title: product.title,
product_type: product.product_type || '',
variant_id: variant.id,
variant_title: variant.title || 'Default',
sku: variant.sku || '',
price: parseFloat(variant.price),
compare_at_price: variant.compare_at_price ? parseFloat(variant.compare_at_price) : null,
available: variant.available !== false,
scraped_at: scrapedAt
});
}
}
return priceRecords.map(record => ({ json: record }));// N8N Function node to format Slack alert
const changes = $input.all().map(item => item.json);
if (changes.length === 0) {
return [{ json: { skip: true } }];
}
let message = `🔔 *Price Changes Detected*\n\n`;
for (const change of changes.slice(0, 10)) {
const direction = change.price_change > 0 ? '📈' : '📉';
const pct = Math.abs(change.change_pct).toFixed(1);
message += `${direction} *${change.product}*\n`;
message += ` ${change.variant}: $${change.old_price} → $${change.new_price} (${change.price_change > 0 ? '+' : ''}${pct}%)\n\n`;
}
if (changes.length > 10) {
message += `_...and ${changes.length - 10} more changes_`;
}
return [{
json: {
text: message,
channel: '#price-alerts'
}
}];10. Conclusion
Vous avez maintenant tous les outils necessaires pour construire un systeme complet de surveillance des prix Shopify:
- Recuperer les donnees produits avec /products.json et pagination
- Rotation de proxies pour scraping fiable a grande echelle
- Stocker les donnees en CSV pour analyse simple ou PostgreSQL pour production
- Deployer avec Coolify pour automatisation auto-hebergee
- Creer des workflows avec N8N pour automatisation visuelle
Equipe DBShopi
Intelligence E-commerce
Pret a Commencer la Surveillance des Prix?
Evitez la configuration technique et obtenez un acces instantane a la surveillance des prix pour des millions de boutiques Shopify.
Essayer la Surveillance Prix DBShopi