CryptoKlineVisionData Usage
Fetch market data for: $ARGUMENTS
Use automatic failover between data sources.
Quick Start
from ckvd import CryptoKlineVisionData, DataProvider, MarketType, Interval from datetime import datetime, timedelta, timezone
Create manager for USDT-margined futures
manager = CryptoKlineVisionData.create(DataProvider.BINANCE, MarketType.FUTURES_USDT)
Fetch data with automatic failover (cache → Vision → REST)
IMPORTANT: Always use UTC timezone-aware datetimes
end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=7)
df = manager.get_data( symbol="BTCUSDT", start_time=start_time, end_time=end_time, interval=Interval.HOUR_1 )
print(f"Loaded {len(df)} bars") manager.close()
Key Concepts
Concept Quick Reference
Market Types SPOT, FUTURES_USDT, FUTURES_COIN
Intervals MINUTE_1, MINUTE_5, HOUR_1, HOUR_4, DAY_1
FCP Priority Cache (~1ms) → Vision (~1-5s) → REST (~100-500ms)
Symbol Format BTCUSDT (spot/futures), BTCUSD_PERP (coin-margined)
Streaming Real-time klines (sync/async), KlineUpdate objects
High-Level API
For simpler use cases, use fetch_market_data :
from ckvd import fetch_market_data, DataProvider, MarketType, Interval, ChartType from datetime import datetime, timedelta, timezone
df, elapsed_time, records_count = fetch_market_data( provider=DataProvider.BINANCE, market_type=MarketType.FUTURES_USDT, chart_type=ChartType.KLINES, symbol="BTCUSDT", interval=Interval.HOUR_1, start_time=datetime.now(timezone.utc) - timedelta(days=30), end_time=datetime.now(timezone.utc) ) print(f"Loaded {records_count} bars in {elapsed_time:.2f}s")
Streaming Real-Time Klines
For real-time market data, use the streaming API (requires pip install crypto-kline-vision-data[streaming] ):
Synchronous Streaming
from ckvd import CryptoKlineVisionData, DataProvider, MarketType
manager = CryptoKlineVisionData.create(DataProvider.BINANCE, MarketType.FUTURES_USDT)
Blocking iterator - receives KlineUpdate objects
for update in manager.stream_data_sync("BTCUSDT", "1h"): if update.is_closed: print(f"Closed candle: open={update.open}, close={update.close}, volume={update.volume}") else: print(f"Open candle (time: {update.open_time})")
manager.close()
Asynchronous Streaming
import asyncio from ckvd import CryptoKlineVisionData, DataProvider, MarketType, StreamConfig
async def main(): manager = CryptoKlineVisionData.create(DataProvider.BINANCE, MarketType.FUTURES_USDT)
# Create stream with optional config
config = StreamConfig(
market_type=MarketType.FUTURES_USDT,
confirmed_only=True, # Only emit when candle closes
queue_maxsize=1000
)
stream = manager.create_stream(config)
async with stream:
await stream.subscribe("BTCUSDT", "1h")
async for update in stream:
print(f"{update.symbol} {update.interval}: close={update.close}")
asyncio.run(main())
KlineUpdate fields: open_time (UTC datetime), open , high , low , close , volume (all float), symbol , interval , is_closed (bool)
Examples
Practical code examples:
-
@examples/basic-fetch.md - Single/multiple symbols, market types
-
@examples/error-handling.md - Exception patterns, validation, retries
Helper Scripts
Utility scripts for common operations:
Validate symbol format
uv run -p 3.13 python docs/skills/ckvd-usage/scripts/validate_symbol.py BTCUSDT FUTURES_COIN
Check cache status
uv run -p 3.13 python docs/skills/ckvd-usage/scripts/check_cache.py BTCUSDT futures_usdt 1h
Diagnose FCP behavior (with debug logging)
uv run -p 3.13 python docs/skills/ckvd-usage/scripts/diagnose_fcp.py BTCUSDT futures_usdt 1h --days 3
TodoWrite Task Templates
Template A: Fetch Data for Symbol
- Determine market type from symbol format (BTCUSDT vs BTCUSD_PERP)
- Create CryptoKlineVisionData manager with correct MarketType
- Define UTC time range with datetime.now(timezone.utc)
- Call get_data() with symbol, interval, and time range
- Validate returned DataFrame (not empty, monotonic index)
- Close manager when done
Template B: Debug Empty Results
- Validate symbol format for market type (validate_symbol_for_market_type)
- Verify time range is in the past and UTC-aware
- Enable debug logging (CKVD_LOG_LEVEL=DEBUG)
- Test each FCP source individually with enforce_source
- Check cache directory exists and has data
- Document root cause and resolution
Template C: Switch Market Type
- Identify current market type and symbol format
- Map symbol to new market type format (e.g., BTCUSDT -> BTCUSD_PERP)
- Create new manager with target MarketType
- Verify data fetches correctly for new market type
- Update any dependent code with new symbol format
Template D: Stream Real-Time Data
- Determine if sync (blocking) or async streaming is needed
- Sync: Simple use case, single symbol, main thread OK
- Async: Multiple symbols, integration with other async code
- Create CryptoKlineVisionData manager
- For sync: Use manager.stream_data_sync("SYMBOL", "INTERVAL")
- For async: Create StreamConfig, create_stream(), subscribe to symbols
- Process KlineUpdate objects in loop
- Check is_closed flag to detect candle completion
- Close manager when done (use context manager for async)
Post-Change Checklist
After modifying this skill:
-
Quick Start code example still matches actual API
-
Helper script commands are correct and scripts exist
-
@-links to references/ and examples/ resolve
-
Append changes to evolution-log.md
Detailed References
For deeper information, see:
-
@references/market-types.md - Detailed market type documentation
-
@references/intervals.md - Complete interval reference
-
@references/fcp-protocol.md - FCP architecture and debugging
-
@references/debugging.md - Debugging techniques and troubleshooting
-
@references/evolution-log.md - Skill change history