trader-analysis

Trader Analysis Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "trader-analysis" with this command: npx skills add agentmc15/polymarket-trader/agentmc15-polymarket-trader-trader-analysis

Trader Analysis Skill

Tracking Trader Activity

On-Chain Data

from web3 import Web3 import httpx from typing import AsyncIterator

CTF_EXCHANGE = "0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E"

class TraderTracker: def init(self, polygon_rpc: str): self.w3 = Web3(Web3.HTTPProvider(polygon_rpc)) self.exchange = self.w3.eth.contract( address=CTF_EXCHANGE, abi=CTF_EXCHANGE_ABI )

async def get_trader_trades(
    self,
    address: str,
    from_block: int = None
) -> list[dict]:
    """Fetch all trades for an address."""
    events = self.exchange.events.OrderFilled.get_logs(
        fromBlock=from_block or "earliest",
        argument_filters={"maker": address}
    )
    
    return [self._parse_trade_event(e) for e in events]

def _parse_trade_event(self, event: dict) -> dict:
    """Parse OrderFilled event into trade dict."""
    return {
        "tx_hash": event.transactionHash.hex(),
        "block_number": event.blockNumber,
        "maker": event.args.maker,
        "taker": event.args.taker,
        "token_id": str(event.args.tokenId),
        "amount": event.args.amount / 1e6,  # Assuming 6 decimals
        "price": event.args.price / 1e18,
        "side": "BUY" if event.args.side == 0 else "SELL",
        "timestamp": self._get_block_timestamp(event.blockNumber)
    }

Polymarket Data API

class PolymarketDataClient: BASE_URL = "https://data-api.polymarket.com"

def __init__(self):
    self.client = httpx.AsyncClient(
        base_url=self.BASE_URL,
        timeout=30.0
    )

async def get_trader_profile(self, address: str) -> dict:
    """Fetch trader profile and stats."""
    response = await self.client.get(f"/users/{address}")
    response.raise_for_status()
    return response.json()

async def get_trader_positions(self, address: str) -> list[dict]:
    """Get all positions for a trader."""
    response = await self.client.get(
        "/positions",
        params={"user": address}
    )
    response.raise_for_status()
    return response.json()

async def get_trader_activity(
    self,
    address: str,
    limit: int = 100,
    offset: int = 0
) -> list[dict]:
    """Get recent trading activity."""
    response = await self.client.get(
        "/activity",
        params={
            "user": address,
            "limit": limit,
            "offset": offset
        }
    )
    response.raise_for_status()
    return response.json()

async def get_leaderboard(
    self,
    period: str = "all",
    limit: int = 100
) -> list[dict]:
    """Get top traders by P&L."""
    response = await self.client.get(
        "/leaderboard",
        params={"period": period, "limit": limit}
    )
    response.raise_for_status()
    return response.json()

Trader Scoring System

from dataclasses import dataclass from datetime import datetime, timedelta import numpy as np from typing import Optional

@dataclass class TraderMetrics: address: str total_pnl: float realized_pnl: float unrealized_pnl: float win_rate: float avg_return_per_trade: float sharpe_ratio: float total_trades: int unique_markets: int avg_position_size: float avg_hold_time: timedelta consistency_score: float recency_score: float largest_win: float largest_loss: float profit_factor: float # gross profit / gross loss

class TraderAnalyzer: def init(self, data_client: PolymarketDataClient): self.client = data_client

async def analyze_trader(
    self,
    address: str,
    days: int = 90
) -> TraderMetrics:
    """Comprehensive trader analysis."""
    activity = await self.client.get_trader_activity(
        address, limit=1000
    )
    positions = await self.client.get_trader_positions(address)
    
    # Filter to time period
    cutoff = datetime.utcnow() - timedelta(days=days)
    recent_trades = [
        t for t in activity
        if datetime.fromisoformat(t["timestamp"]) > cutoff
    ]
    
    return TraderMetrics(
        address=address,
        total_pnl=self._calculate_total_pnl(positions, recent_trades),
        realized_pnl=self._calculate_realized_pnl(recent_trades),
        unrealized_pnl=self._calculate_unrealized_pnl(positions),
        win_rate=self._calculate_win_rate(recent_trades),
        avg_return_per_trade=self._calculate_avg_return(recent_trades),
        sharpe_ratio=self._calculate_sharpe(recent_trades),
        total_trades=len(recent_trades),
        unique_markets=len(set(t["market_id"] for t in recent_trades)),
        avg_position_size=self._calculate_avg_size(recent_trades),
        avg_hold_time=self._calculate_avg_hold_time(recent_trades),
        consistency_score=self._calculate_consistency(recent_trades),
        recency_score=self._calculate_recency_score(recent_trades),
        largest_win=max((t.get("pnl", 0) for t in recent_trades), default=0),
        largest_loss=min((t.get("pnl", 0) for t in recent_trades), default=0),
        profit_factor=self._calculate_profit_factor(recent_trades)
    )

def _calculate_win_rate(self, trades: list[dict]) -> float:
    """Calculate percentage of profitable trades."""
    if not trades:
        return 0
    
    winning = sum(1 for t in trades if t.get("pnl", 0) > 0)
    return winning / len(trades)

def _calculate_sharpe(self, trades: list[dict]) -> float:
    """Calculate Sharpe ratio of returns."""
    returns = [t.get("return_pct", 0) for t in trades if "return_pct" in t]
    
    if len(returns) < 2:
        return 0
    
    mean_return = np.mean(returns)
    std_return = np.std(returns)
    
    if std_return == 0:
        return 0
    
    # Annualize assuming daily trades
    return (mean_return * 365**0.5) / std_return

def _calculate_consistency(self, trades: list[dict]) -> float:
    """Score how consistent the trader's performance is."""
    if len(trades) < 10:
        return 0
    
    # Group by week
    weekly_pnl = {}
    for trade in trades:
        week = datetime.fromisoformat(trade["timestamp"]).isocalendar()[:2]
        weekly_pnl[week] = weekly_pnl.get(week, 0) + trade.get("pnl", 0)
    
    if len(weekly_pnl) < 4:
        return 0
    
    # Calculate consistency as % of profitable weeks
    profitable_weeks = sum(1 for pnl in weekly_pnl.values() if pnl > 0)
    return profitable_weeks / len(weekly_pnl)

def _calculate_recency_score(self, trades: list[dict]) -> float:
    """Score based on recent activity (more recent = higher)."""
    if not trades:
        return 0
    
    latest = max(
        datetime.fromisoformat(t["timestamp"]) for t in trades
    )
    days_since = (datetime.utcnow() - latest).days
    
    # Decay score over 30 days
    return max(0, 1 - (days_since / 30))

def _calculate_profit_factor(self, trades: list[dict]) -> float:
    """Gross profit / gross loss."""
    gross_profit = sum(t.get("pnl", 0) for t in trades if t.get("pnl", 0) > 0)
    gross_loss = abs(sum(t.get("pnl", 0) for t in trades if t.get("pnl", 0) < 0))
    
    if gross_loss == 0:
        return float('inf') if gross_profit > 0 else 0
    
    return gross_profit / gross_loss

class TraderScorer: def init(self, weights: dict = None): self.weights = weights or { "pnl": 0.20, "win_rate": 0.15, "sharpe": 0.15, "consistency": 0.15, "recency": 0.10, "profit_factor": 0.10, "experience": 0.10, "diversity": 0.05 }

def calculate_score(self, metrics: TraderMetrics) -> float:
    """Calculate overall trader score (0-100)."""
    scores = {
        "pnl": self._normalize_pnl(metrics.total_pnl),
        "win_rate": metrics.win_rate * 100,
        "sharpe": self._normalize_sharpe(metrics.sharpe_ratio),
        "consistency": metrics.consistency_score * 100,
        "recency": metrics.recency_score * 100,
        "profit_factor": self._normalize_profit_factor(metrics.profit_factor),
        "experience": self._normalize_trades(metrics.total_trades),
        "diversity": self._normalize_markets(metrics.unique_markets)
    }
    
    return sum(scores[k] * self.weights[k] for k in self.weights)

def _normalize_pnl(self, pnl: float) -> float:
    """Normalize P&L to 0-100 scale."""
    if pnl <= 0:
        return max(0, 50 + pnl / 1000)
    return min(100, 50 + np.log1p(pnl) * 8)

def _normalize_sharpe(self, sharpe: float) -> float:
    """Normalize Sharpe ratio to 0-100."""
    # Sharpe of 2+ is excellent
    return min(100, max(0, sharpe * 33))

def _normalize_profit_factor(self, pf: float) -> float:
    """Normalize profit factor to 0-100."""
    if pf == float('inf'):
        return 100
    # PF of 2+ is good
    return min(100, pf * 40)

def _normalize_trades(self, trades: int) -> float:
    """Normalize trade count to 0-100."""
    # 100+ trades shows experience
    return min(100, trades)

def _normalize_markets(self, markets: int) -> float:
    """Normalize unique markets to 0-100."""
    # Trading 10+ markets shows diversity
    return min(100, markets * 10)

Finding Traders to Follow

class TraderDiscovery: def init( self, data_client: PolymarketDataClient, analyzer: TraderAnalyzer ): self.client = data_client self.analyzer = analyzer self.scorer = TraderScorer()

async def find_top_traders(
    self,
    min_trades: int = 50,
    min_pnl: float = 1000,
    min_win_rate: float = 0.5,
    days: int = 30
) -> list[tuple[str, float, TraderMetrics]]:
    """Discover top performing traders."""
    leaderboard = await self.client.get_leaderboard(limit=500)
    
    candidates = []
    for trader in leaderboard:
        try:
            metrics = await self.analyzer.analyze_trader(
                trader["address"],
                days=days
            )
            
            # Apply filters
            if (metrics.total_trades >= min_trades and
                metrics.total_pnl >= min_pnl and
                metrics.win_rate >= min_win_rate):
                
                score = self.scorer.calculate_score(metrics)
                candidates.append((trader["address"], score, metrics))
        except Exception as e:
            # Skip traders with errors
            continue
    
    return sorted(candidates, key=lambda x: x[1], reverse=True)

async def find_market_specialists(
    self,
    market_category: str,
    min_trades_in_category: int = 20
) -> list[str]:
    """Find traders who specialize in specific market categories."""
    # Implementation would query by category
    pass

async def find_original_traders(
    self,
    min_originality_score: float = 0.7
) -> list[str]:
    """
    Find traders who make original trades (not copy trading).
    
    Originality is measured by:
    - Trade timing (not consistently after other traders)
    - Position uniqueness (not mirroring others)
    - Contrarian indicators
    """
    leaderboard = await self.client.get_leaderboard(limit=200)
    original_traders = []
    
    for trader in leaderboard:
        activity = await self.client.get_trader_activity(
            trader["address"],
            limit=100
        )
        
        originality = await self._calculate_originality(activity)
        
        if originality >= min_originality_score:
            original_traders.append(trader["address"])
    
    return original_traders

async def _calculate_originality(
    self,
    trades: list[dict]
) -> float:
    """Calculate how original a trader's trades are."""
    # Compare trade timing with market average
    # Check for unique position entries
    # Measure contrarian behavior
    return 0.5  # Placeholder

class CopyTradingManager: def init( self, data_client: PolymarketDataClient, trading_service, # Your trading service config: dict ): self.client = data_client self.trading = trading_service self.tracked_traders: dict[str, dict] = {} self.copy_delay = config.get("copy_delay_seconds", 30) self.size_multiplier = config.get("size_multiplier", 0.25) self.max_position_pct = config.get("max_position_pct", 0.1)

def add_trader(
    self,
    address: str,
    multiplier: float = None,
    markets: list[str] = None
):
    """Add a trader to copy."""
    self.tracked_traders[address] = {
        "multiplier": multiplier or self.size_multiplier,
        "markets": markets,  # None = all markets
        "last_trade": None
    }

def remove_trader(self, address: str):
    """Stop copying a trader."""
    self.tracked_traders.pop(address, None)

async def process_trade(self, trade: dict):
    """Process a trade from tracked trader."""
    address = trade["trader_address"]
    
    if address not in self.tracked_traders:
        return
    
    config = self.tracked_traders[address]
    
    # Check market filter
    if config["markets"] and trade["market_id"] not in config["markets"]:
        return
    
    # Wait for delay
    await asyncio.sleep(self.copy_delay)
    
    # Calculate size
    size = trade["size"] * config["multiplier"]
    
    # Apply max position limit
    portfolio = await self.trading.get_portfolio()
    max_size = portfolio["value"] * self.max_position_pct / trade["price"]
    size = min(size, max_size)
    
    # Execute copy trade
    await self.trading.place_order(
        token_id=trade["token_id"],
        side=trade["side"],
        price=trade["price"],
        size=size,
        metadata={"copy_source": address}
    )

Real-Time Monitoring

import asyncio from collections import defaultdict from typing import Callable, Awaitable

class LiveTraderMonitor: def init(self, tracked_addresses: list[str]): self.tracked = set(tracked_addresses) self.callbacks: dict[str, list[Callable]] = defaultdict(list) self._running = False

def on_trade(self, callback: Callable[[dict], Awaitable[None]]):
    """Register callback for trade events."""
    self.callbacks["trade"].append(callback)
    return callback

def on_position_change(self, callback: Callable[[dict], Awaitable[None]]):
    """Register callback for position changes."""
    self.callbacks["position"].append(callback)
    return callback

async def start(self):
    """Start monitoring tracked traders."""
    self._running = True
    
    async for event in self._watch_events():
        if not self._running:
            break
        
        if event.get("trader") in self.tracked:
            event_type = event.get("type", "trade")
            
            for callback in self.callbacks[event_type]:
                try:
                    await callback(event)
                except Exception as e:
                    print(f"Callback error: {e}")

def stop(self):
    """Stop monitoring."""
    self._running = False

def add_trader(self, address: str):
    """Add trader to watch list."""
    self.tracked.add(address)

def remove_trader(self, address: str):
    """Remove trader from watch list."""
    self.tracked.discard(address)

async def _watch_events(self):
    """Watch for on-chain events."""
    # Implementation would use WebSocket or polling
    while self._running:
        # Poll for new events
        await asyncio.sleep(5)
        yield {}  # Placeholder

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

trading-strategies

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

polymarket-api

No summary provided by upstream source.

Repository SourceNeeds Review
Research

Ai Task Hub

AI task hub for image analysis, background removal, speech-to-text, text-to-speech, markdown conversion, points balance/ledger lookup, and async execute/poll...

Registry SourceRecently Updated
Research

paper-review

Review academic papers for correctness, quality, and novelty using OpenJudge's multi-stage pipeline. Supports PDF files and LaTeX source packages (.tar.gz/.z...

Registry SourceRecently Updated