retail-expert

Expert guidance for retail systems, point-of-sale solutions, inventory management, e-commerce platforms, customer analytics, and omnichannel retail strategies.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "retail-expert" with this command: npx skills add personamanagmentlayer/pcl/personamanagmentlayer-pcl-retail-expert

Retail Expert

Expert guidance for retail systems, point-of-sale solutions, inventory management, e-commerce platforms, customer analytics, and omnichannel retail strategies.

Core Concepts

Retail Systems

  • Point of Sale (POS) systems

  • Inventory Management Systems (IMS)

  • Customer Relationship Management (CRM)

  • Order Management Systems (OMS)

  • Warehouse Management Systems (WMS)

  • E-commerce platforms

  • Payment processing

Omnichannel Retail

  • Online-to-offline (O2O) integration

  • Buy online, pick up in store (BOPIS)

  • Ship from store

  • Unified customer profiles

  • Cross-channel inventory visibility

  • Consistent pricing across channels

  • Integrated loyalty programs

Technologies

  • Mobile POS (mPOS)

  • Self-checkout systems

  • Electronic shelf labels (ESL)

  • RFID for inventory tracking

  • Computer vision for analytics

  • AI-powered recommendations

  • Contactless payments

Point of Sale System

from dataclasses import dataclass from datetime import datetime from decimal import Decimal from typing import List, Optional from enum import Enum

class PaymentMethod(Enum): CASH = "cash" CREDIT_CARD = "credit_card" DEBIT_CARD = "debit_card" MOBILE_PAYMENT = "mobile_payment" GIFT_CARD = "gift_card"

class TransactionStatus(Enum): PENDING = "pending" COMPLETED = "completed" VOIDED = "voided" REFUNDED = "refunded"

@dataclass class Product: """Product/SKU information""" sku: str name: str description: str price: Decimal cost: Decimal barcode: str category: str department: str tax_rate: Decimal is_taxable: bool stock_quantity: int reorder_point: int

@dataclass class LineItem: """Transaction line item""" sku: str product_name: str quantity: int unit_price: Decimal discount_amount: Decimal tax_amount: Decimal line_total: Decimal

@dataclass class Transaction: """POS transaction""" transaction_id: str store_id: str register_id: str cashier_id: str timestamp: datetime items: List[LineItem] subtotal: Decimal tax_total: Decimal discount_total: Decimal grand_total: Decimal payment_method: PaymentMethod status: TransactionStatus customer_id: Optional[str]

class POSSystem: """Point of Sale system"""

def __init__(self, store_id: str, register_id: str):
    self.store_id = store_id
    self.register_id = register_id
    self.current_transaction = None
    self.products = {}

def start_transaction(self, cashier_id: str) -> str:
    """Start new transaction"""
    transaction_id = self._generate_transaction_id()

    self.current_transaction = Transaction(
        transaction_id=transaction_id,
        store_id=self.store_id,
        register_id=self.register_id,
        cashier_id=cashier_id,
        timestamp=datetime.now(),
        items=[],
        subtotal=Decimal('0'),
        tax_total=Decimal('0'),
        discount_total=Decimal('0'),
        grand_total=Decimal('0'),
        payment_method=None,
        status=TransactionStatus.PENDING,
        customer_id=None
    )

    return transaction_id

def scan_item(self, barcode: str, quantity: int = 1) -> dict:
    """Scan and add item to transaction"""
    if not self.current_transaction:
        return {'error': 'No active transaction'}

    # Lookup product
    product = self._lookup_product(barcode)
    if not product:
        return {'error': 'Product not found', 'barcode': barcode}

    # Check inventory
    if product.stock_quantity < quantity:
        return {
            'error': 'Insufficient inventory',
            'available': product.stock_quantity
        }

    # Calculate line item totals
    unit_price = product.price
    line_subtotal = unit_price * quantity
    discount_amount = Decimal('0')  # Apply promotions here

    # Calculate tax
    tax_amount = Decimal('0')
    if product.is_taxable:
        tax_amount = (line_subtotal - discount_amount) * product.tax_rate

    line_total = line_subtotal - discount_amount + tax_amount

    # Create line item
    line_item = LineItem(
        sku=product.sku,
        product_name=product.name,
        quantity=quantity,
        unit_price=unit_price,
        discount_amount=discount_amount,
        tax_amount=tax_amount,
        line_total=line_total
    )

    # Add to transaction
    self.current_transaction.items.append(line_item)

    # Update transaction totals
    self._recalculate_totals()

    return {
        'success': True,
        'item': {
            'name': product.name,
            'quantity': quantity,
            'price': float(unit_price),
            'line_total': float(line_total)
        },
        'transaction_total': float(self.current_transaction.grand_total)
    }

def apply_discount(self, discount_code: str) -> dict:
    """Apply discount/promotion to transaction"""
    if not self.current_transaction:
        return {'error': 'No active transaction'}

    discount = self._validate_discount(discount_code)
    if not discount:
        return {'error': 'Invalid discount code'}

    # Apply discount based on type
    if discount['type'] == 'percentage':
        discount_amount = self.current_transaction.subtotal * (discount['value'] / 100)
    elif discount['type'] == 'fixed':
        discount_amount = Decimal(str(discount['value']))
    else:
        return {'error': 'Unknown discount type'}

    self.current_transaction.discount_total += discount_amount
    self._recalculate_totals()

    return {
        'success': True,
        'discount_applied': float(discount_amount),
        'new_total': float(self.current_transaction.grand_total)
    }

def process_payment(self,
                   payment_method: PaymentMethod,
                   amount: Decimal,
                   payment_details: dict = None) -> dict:
    """Process payment for transaction"""
    if not self.current_transaction:
        return {'error': 'No active transaction'}

    if amount < self.current_transaction.grand_total:
        return {'error': 'Insufficient payment amount'}

    # Process payment through payment gateway
    payment_result = self._process_payment_gateway(
        payment_method,
        amount,
        payment_details
    )

    if not payment_result['success']:
        return payment_result

    # Complete transaction
    self.current_transaction.payment_method = payment_method
    self.current_transaction.status = TransactionStatus.COMPLETED

    # Update inventory
    self._update_inventory()

    # Calculate change
    change = amount - self.current_transaction.grand_total

    # Generate receipt
    receipt = self._generate_receipt()

    transaction_id = self.current_transaction.transaction_id
    self.current_transaction = None  # Clear current transaction

    return {
        'success': True,
        'transaction_id': transaction_id,
        'amount_paid': float(amount),
        'change': float(change),
        'receipt': receipt
    }

def void_transaction(self, reason: str) -> dict:
    """Void current transaction"""
    if not self.current_transaction:
        return {'error': 'No active transaction'}

    self.current_transaction.status = TransactionStatus.VOIDED
    transaction_id = self.current_transaction.transaction_id
    self.current_transaction = None

    return {
        'success': True,
        'transaction_id': transaction_id,
        'reason': reason
    }

def _recalculate_totals(self):
    """Recalculate transaction totals"""
    self.current_transaction.subtotal = sum(
        item.unit_price * item.quantity for item in self.current_transaction.items
    )

    self.current_transaction.tax_total = sum(
        item.tax_amount for item in self.current_transaction.items
    )

    self.current_transaction.grand_total = (
        self.current_transaction.subtotal +
        self.current_transaction.tax_total -
        self.current_transaction.discount_total
    )

def _lookup_product(self, barcode: str) -> Optional[Product]:
    """Lookup product by barcode"""
    return self.products.get(barcode)

def _validate_discount(self, discount_code: str) -> Optional[dict]:
    """Validate and retrieve discount details"""
    # Implementation would check against promotion database
    return None

def _process_payment_gateway(self,
                            payment_method: PaymentMethod,
                            amount: Decimal,
                            details: dict) -> dict:
    """Process payment through gateway"""
    # Integration with payment processor (Stripe, Square, etc.)
    return {'success': True, 'transaction_id': 'pay_123456'}

def _update_inventory(self):
    """Update inventory after sale"""
    for item in self.current_transaction.items:
        product = self.products.get(item.sku)
        if product:
            product.stock_quantity -= item.quantity

def _generate_receipt(self) -> dict:
    """Generate transaction receipt"""
    return {
        'transaction_id': self.current_transaction.transaction_id,
        'timestamp': self.current_transaction.timestamp.isoformat(),
        'items': [
            {
                'name': item.product_name,
                'qty': item.quantity,
                'price': float(item.unit_price),
                'total': float(item.line_total)
            }
            for item in self.current_transaction.items
        ],
        'subtotal': float(self.current_transaction.subtotal),
        'tax': float(self.current_transaction.tax_total),
        'discount': float(self.current_transaction.discount_total),
        'total': float(self.current_transaction.grand_total)
    }

def _generate_transaction_id(self) -> str:
    """Generate unique transaction ID"""
    import uuid
    return f"TXN-{uuid.uuid4().hex[:12].upper()}"

Inventory Management

import numpy as np from datetime import datetime, timedelta

class InventoryManagementSystem: """Inventory management and optimization"""

def __init__(self):
    self.products = {}
    self.warehouses = {}
    self.transfer_orders = []

def calculate_reorder_point(self,
                           average_daily_demand: float,
                           lead_time_days: int,
                           service_level: float = 0.95) -> dict:
    """Calculate optimal reorder point"""
    # Safety stock calculation
    demand_std_dev = average_daily_demand * 0.2  # Assume 20% variation

    # Z-score for service level
    from scipy import stats
    z_score = stats.norm.ppf(service_level)

    safety_stock = z_score * demand_std_dev * np.sqrt(lead_time_days)
    reorder_point = (average_daily_demand * lead_time_days) + safety_stock

    return {
        'reorder_point': int(np.ceil(reorder_point)),
        'safety_stock': int(np.ceil(safety_stock)),
        'average_daily_demand': average_daily_demand,
        'lead_time_days': lead_time_days,
        'service_level': service_level
    }

def calculate_economic_order_quantity(self,
                                     annual_demand: float,
                                     ordering_cost: Decimal,
                                     holding_cost_per_unit: Decimal) -> dict:
    """Calculate Economic Order Quantity (EOQ)"""
    eoq = np.sqrt(
        (2 * annual_demand * float(ordering_cost)) /
        float(holding_cost_per_unit)
    )

    # Calculate total annual cost
    number_of_orders = annual_demand / eoq
    ordering_cost_total = number_of_orders * float(ordering_cost)
    holding_cost_total = (eoq / 2) * float(holding_cost_per_unit)
    total_cost = ordering_cost_total + holding_cost_total

    return {
        'eoq': int(np.ceil(eoq)),
        'orders_per_year': number_of_orders,
        'order_frequency_days': int(365 / number_of_orders),
        'total_annual_cost': total_cost,
        'ordering_cost': ordering_cost_total,
        'holding_cost': holding_cost_total
    }

def analyze_abc(self, products: List[dict]) -> dict:
    """ABC analysis for inventory classification"""
    # Calculate annual value for each product
    for product in products:
        product['annual_value'] = (
            product['unit_cost'] * product['annual_demand']
        )

    # Sort by annual value
    sorted_products = sorted(
        products,
        key=lambda x: x['annual_value'],
        reverse=True
    )

    total_value = sum(p['annual_value'] for p in sorted_products)
    cumulative_value = 0
    results = {'A': [], 'B': [], 'C': []}

    for product in sorted_products:
        cumulative_value += product['annual_value']
        percentage = (cumulative_value / total_value) * 100

        if percentage <= 80:
            category = 'A'  # Top 20% items, 80% value
        elif percentage <= 95:
            category = 'B'  # Next 30% items, 15% value
        else:
            category = 'C'  # Bottom 50% items, 5% value

        product['abc_category'] = category
        results[category].append(product)

    return {
        'classification': results,
        'summary': {
            'A_items': len(results['A']),
            'B_items': len(results['B']),
            'C_items': len(results['C']),
            'total_value': total_value
        }
    }

def forecast_demand(self,
                   historical_sales: List[float],
                   periods_ahead: int = 12) -> dict:
    """Forecast future demand using exponential smoothing"""
    # Triple exponential smoothing (Holt-Winters)
    alpha = 0.3  # Level smoothing
    beta = 0.1   # Trend smoothing
    gamma = 0.2  # Seasonality smoothing
    season_length = 12  # Monthly seasonality

    n = len(historical_sales)
    forecast = []

    # Initialize level and trend
    level = np.mean(historical_sales[:season_length])
    trend = (np.mean(historical_sales[season_length:2*season_length]) -
            np.mean(historical_sales[:season_length])) / season_length

    # Initialize seasonal indices
    seasonal = np.array(historical_sales[:season_length]) / level

    # Generate forecasts
    for i in range(periods_ahead):
        season_idx = i % season_length
        forecast_value = (level + trend * (i + 1)) * seasonal[season_idx]
        forecast.append(max(0, forecast_value))

    return {
        'forecast': forecast,
        'periods_ahead': periods_ahead,
        'method': 'holt_winters',
        'confidence_interval_95': self._calculate_confidence_interval(
            historical_sales,
            forecast
        )
    }

def check_stock_levels(self) -> List[dict]:
    """Check stock levels and generate alerts"""
    alerts = []

    for sku, product in self.products.items():
        # Check for low stock
        if product.stock_quantity <= product.reorder_point:
            alerts.append({
                'type': 'reorder',
                'severity': 'high',
                'sku': sku,
                'product_name': product.name,
                'current_stock': product.stock_quantity,
                'reorder_point': product.reorder_point,
                'action': 'Place purchase order'
            })

        # Check for overstock
        max_stock = product.reorder_point * 3
        if product.stock_quantity > max_stock:
            alerts.append({
                'type': 'overstock',
                'severity': 'medium',
                'sku': sku,
                'product_name': product.name,
                'current_stock': product.stock_quantity,
                'max_stock': max_stock,
                'action': 'Review purchasing strategy'
            })

        # Check for no sales (dead stock)
        # Implementation would check sales history

    return alerts

def _calculate_confidence_interval(self,
                                  historical: List[float],
                                  forecast: List[float]) -> dict:
    """Calculate 95% confidence interval for forecast"""
    # Simplified confidence interval
    std_error = np.std(historical) * 1.5
    return {
        'lower': [max(0, f - 1.96 * std_error) for f in forecast],
        'upper': [f + 1.96 * std_error for f in forecast]
    }

Customer Analytics

from sklearn.cluster import KMeans import pandas as pd

class CustomerAnalytics: """Customer segmentation and analytics"""

def __init__(self):
    self.customers = {}
    self.transactions = []

def calculate_rfm(self, customer_transactions: pd.DataFrame) -> pd.DataFrame:
    """Calculate RFM (Recency, Frequency, Monetary) scores"""
    current_date = datetime.now()

    rfm = customer_transactions.groupby('customer_id').agg({
        'transaction_date': lambda x: (current_date - x.max()).days,  # Recency
        'transaction_id': 'count',  # Frequency
        'amount': 'sum'  # Monetary
    })

    rfm.columns = ['recency', 'frequency', 'monetary']

    # Calculate RFM scores (1-5 scale)
    rfm['r_score'] = pd.qcut(rfm['recency'], 5, labels=[5, 4, 3, 2, 1])
    rfm['f_score'] = pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=[1, 2, 3, 4, 5])
    rfm['m_score'] = pd.qcut(rfm['monetary'], 5, labels=[1, 2, 3, 4, 5])

    # Combined RFM score
    rfm['rfm_score'] = (
        rfm['r_score'].astype(int) +
        rfm['f_score'].astype(int) +
        rfm['m_score'].astype(int)
    )

    return rfm

def segment_customers(self, rfm_data: pd.DataFrame) -> dict:
    """Segment customers based on RFM scores"""
    segments = {}

    for customer_id, row in rfm_data.iterrows():
        r, f, m = int(row['r_score']), int(row['f_score']), int(row['m_score'])

        if r >= 4 and f >= 4 and m >= 4:
            segment = 'Champions'
        elif r >= 3 and f >= 3 and m >= 3:
            segment = 'Loyal Customers'
        elif r >= 4 and f <= 2:
            segment = 'New Customers'
        elif r <= 2 and f >= 3:
            segment = 'At Risk'
        elif r <= 2 and f <= 2:
            segment = 'Lost Customers'
        elif m >= 4:
            segment = 'Big Spenders'
        else:
            segment = 'Regular Customers'

        segments[customer_id] = {
            'segment': segment,
            'rfm_scores': {'r': r, 'f': f, 'm': m}
        }

    return segments

def calculate_customer_lifetime_value(self,
                                     average_purchase_value: Decimal,
                                     purchase_frequency: float,
                                     customer_lifespan_years: float) -> Decimal:
    """Calculate Customer Lifetime Value (CLV)"""
    clv = (
        float(average_purchase_value) *
        purchase_frequency *
        customer_lifespan_years
    )

    return Decimal(str(clv)).quantize(Decimal('0.01'))

def predict_churn(self, customer_features: dict) -> dict:
    """Predict customer churn probability"""
    # Features: recency, frequency, monetary, days_since_last_purchase, etc.
    # This would use a trained ML model

    churn_score = 0.35  # Placeholder

    if churn_score > 0.7:
        risk = 'high'
        action = 'Send personalized offer immediately'
    elif churn_score > 0.4:
        risk = 'medium'
        action = 'Include in next marketing campaign'
    else:
        risk = 'low'
        action = 'Continue regular engagement'

    return {
        'churn_probability': churn_score,
        'risk_level': risk,
        'recommended_action': action
    }

def recommend_products(self,
                      customer_id: str,
                      top_n: int = 5) -> List[dict]:
    """Generate product recommendations"""
    # Collaborative filtering or content-based recommendations
    # This would use recommendation algorithms

    recommendations = [
        {
            'sku': 'PROD001',
            'name': 'Recommended Product 1',
            'score': 0.95,
            'reason': 'Frequently bought together'
        }
    ]

    return recommendations[:top_n]

Best Practices

POS Operations

  • Ensure POS system uptime (99.9%+)

  • Implement offline mode for network outages

  • Use barcode scanning for accuracy

  • Support multiple payment methods

  • Enable quick item lookup

  • Implement receipt management (print/email)

  • Track cashier performance metrics

Inventory Management

  • Implement cycle counting programs

  • Use ABC analysis for prioritization

  • Maintain accurate stock records

  • Set appropriate reorder points

  • Use RFID for high-value items

  • Implement first-in-first-out (FIFO)

  • Track inventory turnover ratios

E-commerce

  • Optimize for mobile shopping

  • Implement abandoned cart recovery

  • Use high-quality product images

  • Enable customer reviews

  • Provide multiple shipping options

  • Implement real-time inventory updates

  • Support guest checkout

Customer Experience

  • Personalize marketing communications

  • Implement loyalty programs

  • Provide omnichannel support

  • Enable easy returns and exchanges

  • Use customer feedback

  • Implement chatbots for support

  • Track Net Promoter Score (NPS)

Anti-Patterns

❌ No inventory tracking or inaccurate counts ❌ Single payment method only ❌ Poor checkout experience (slow/complex) ❌ No customer data collection ❌ Siloed online and offline systems ❌ Manual price updates across locations ❌ No backup for POS systems ❌ Ignoring cart abandonment ❌ No product recommendations

Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

finance-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

trading-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

dart-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-expert

No summary provided by upstream source.

Repository SourceNeeds Review