jy-aiot-collector-developer

基于矿山鸿雁(jy-aiot)Plugin框架,根据Markdown表格格式的数据采集协议文档,生成混合模式的采集模块代码(配置+代码片段)。使用此技能可快速实现HTTP服务端/客户端、FTP、数据库、MQTT、Kafka等通信协议的采集模块,简化定制协议采集组件的开发复杂度。

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "jy-aiot-collector-developer" with this command: npx skills add feelingsray/ray-aillm-skills/feelingsray-ray-aillm-skills-jy-aiot-collector-developer

JY-AIOT Collector Developer

Overview

本技能用于为矿山鸿雁(jy-aiot)产品的Plugin框架快速生成数据采集模块代码。根据用户提供的Markdown表格格式的协议定义文档,自动生成符合Plugin框架规范的采集模块,包括:

  • 通信层代码:HTTP服务端/客户端、FTP文件读取、数据库客户端、MQTT订阅、Kafka订阅
  • 数据解析器:将原始数据转换为框架抽象的标准格式
  • 配置模板:符合Plugin框架规范的YAML/JSON配置文件
  • 集成入口:可直接集成到jy-aiot产品的采集框架中

Protocol Document Format

Required Fields

协议文档必须包含以下表格结构:

## 通信配置

| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| protocol | string | 是 | 通信协议:http_server/http_client/ftp/database/mqtt/kafka |
| host | string | 是 | 服务地址 |
| port | int | 是 | 端口号 |
| path | string | 否 | 路径(HTTP/FTP) |
| username | string | 否 | 用户名 |
| password | string | 否 | 密码 |
| interval | int | 否 | 采集间隔(秒),默认60 |

## 数据点定义

| 字段名称 | 字段标识 | 数据类型 | 字节偏移 | 字节长度 | 缩放因子 | 起始位 | 位长度 | 备注 |
|----------|----------|----------|----------|----------|----------|--------|--------|------|
| 温度 | temperature | float | 0 | 4 | 0.1 | - | - | 设备温度值 |
| 压力 | pressure | int | 4 | 2 | 1 | - | - | 管道压力 |

## 数据解析规则

| 规则ID | 目标字段 | 源数据表达式 | 转换公式 | 数据单位 | 备注 |
|--------|----------|--------------|----------|----------|------|
| rule_01 | temperature | raw[0:4] | bytes_to_float_le(value) | ℃ | 小端解析 |
| rule_02 | pressure | raw[4:6] | bytes_to_int_le(value) / 100 | MPa | 缩放转换 |

Supported Communication Protocols

协议说明适用场景
http_serverHTTP服务端被动接收设备推送数据
http_clientHTTP客户端主动轮询RESTful API
ftpFTP文件读取定时拉取设备导出文件
database数据库客户端直接读取数据库表
mqttMQTT订阅订阅MQTT主题接收数据
kafkaKafka订阅订阅Kafka topic接收数据

Supported Data Types

类型标识Go类型说明示例
intint32/int64有符号整数传感器整数值
uintuint32/uint64无符号整数计数器值
floatfloat32/float64浮点数温度、压力
stringstring字符串设备状态文本
boolbool布尔值开关状态
bytes[]byte字节数组原始二进制数据

Workflow

1. Protocol Analysis

分析用户提供的协议文档,提取:

  • 通信方式(protocol字段)
  • 连接参数(host、port、credentials)
  • 数据点列表(字段名称、标识、类型、位置)
  • 解析规则(转换公式、单位)

2. Code Generation

根据协议类型生成对应的采集模块代码:

collector_{protocol}/
├── config.yaml          # 通信和采集配置
├── collector.go         # 采集器主逻辑
├── parser.go            # 数据解析器
├── metrics.go           # 数据模型定义
└── README.md            # 集成说明

3. Core Components

Collector Interface

所有采集模块必须实现Plugin框架的核心接口:

type Collector interface {
    // Init 初始化采集器
    Init(config Config) error
    
    // Start 启动采集
    Start(ctx context.Context) error
    
    // Stop 停止采集
    Stop() error
    
    // GetMetrics 获取采集到的数据
    GetMetrics() []Metric
    
    // Health 健康检查
    Health() HealthStatus
}

Metric Data Model

type Metric struct {
    DeviceID    string            `json:"device_id"`    // 设备标识
    MetricID    string            `json:"metric_id"`    // 指标标识
    Timestamp   time.Time         `json:"timestamp"`    // 采集时间
    Value       interface{}       `json:"value"`        // 值
    Quality     DataQuality       `json:"quality"`      // 数据质量
    Tags        map[string]string `json:"tags"`         // 标签
    Fields      map[string]string `json:"fields"`       // 扩展字段
}

type DataQuality int

const (
    QualityGood     DataQuality = 0  // 好
    QualityBad      DataQuality = 1  // 坏数据
    QualityUncertain DataQuality = 2  // 不确定
)

4. Protocol-Specific Implementations

HTTP Server Collector

用于被动接收设备推送的数据:

type HTTPServerCollector struct {
    config     HTTPServerConfig
    handler    *http.Server
    parser     DataParser
    buffer     sync.Map
}

func (c *HTTPServerCollector) Start(ctx context.Context) error {
    c.handler = &http.Server{
        Addr:    fmt.Sprintf(":%d", c.config.Port),
        Handler: c.createRouter(),
    }
    go c.handler.ListenAndServe()
    return nil
}

MQTT Collector

用于订阅MQTT主题接收实时数据:

type MQTTCollector struct {
    config     MQTTConfig
    client     mqtt.Client
    parser     DataParser
    buffer     sync.Map
}

func (c *MQTTCollector) Start(ctx context.Context) error {
    opts := mqtt.NewClientOptions().
        AddBroker(fmt.Sprintf("tcp://%s:%d", c.config.Host, c.config.Port)).
        SetClientID(c.config.ClientID)
    
    c.client = mqtt.NewClient(opts)
    if token := c.client.Connect(); token.Wait() && token.Error() != nil {
        return token.Error()
    }
    
    c.client.Subscribe(c.config.Topic, 0, c.messageHandler)
    return nil
}

Database Collector

用于直接查询数据库:

type DatabaseCollector struct {
    config     DatabaseConfig
    db         *sql.DB
    parser     DataParser
    interval   time.Duration
}

func (c *DatabaseCollector) Start(ctx context.Context) error {
    connStr := fmt.Sprintf("user=%s password=%s host=%s port=%d dbname=%s",
        c.config.Username, c.config.Password, 
        c.config.Host, c.config.Port, c.config.Database)
    
    var err error
    c.db, err = sql.Open("postgres", connStr)
    if err != nil {
        return err
    }
    
    ticker := time.NewTicker(c.interval)
    go c.pollLoop(ctx, ticker)
    return nil
}

Generated Code Structure

Directory Layout

jy-aiot-collector-{protocol}/
├── config/
│   ├── collector.yaml          # 主配置
│   └── metrics.yaml            # 指标定义
├── internal/
│   ├── collector.go            # 采集器实现
│   ├── parser.go               # 数据解析器
│   ├── client.go               # 协议客户端
│   └── model.go                # 数据模型
├── scripts/
│   ├── test_connection.sh      # 连接测试脚本
│   └── mock_server.py          # 模拟服务器(测试用)
├── deploy/
│   ├── docker-compose.yml      # 部署配置
│   └── k8s-deployment.yaml     # K8s部署
├── go.mod
├── main.go
└── README.md

Configuration Template

# collector.yaml
collector:
  name: "temperature_sensor"
  protocol: "mqtt"
  enabled: true
  interval: 10  # 秒

mqtt:
  broker: "tcp://192.168.1.100:1883"
  topic: "sensors/+/temperature"
  client_id: "jy-aiot-collector-001"
  username: "collector"
  password: "${MQTT_PASSWORD}"
  qos: 1
  clean_session: true

metrics:
  - id: "temperature"
    name: "设备温度"
    source: "payload.temperature"
    datatype: "float"
    unit: "℃"
    tags:
      device_type: "sensor"

parser:
  format: "json"
  timestamp_field: "timestamp"
  timestamp_format: "2006-01-02T15:04:05Z07:00"

output:
  buffer_size: 1000
  batch_size: 100
  flush_interval: 5

Parser Implementation

// internal/parser.go
type DataParser interface {
    Parse(raw []byte) ([]Metric, error)
    Validate(data interface{}) bool
    Transform(value interface{}, rule string) interface{}
}

type JSONParser struct {
    timestampField string
    timestampFormat string
}

func (p *JSONParser) Parse(raw []byte) ([]Metric, error) {
    var data map[string]interface{}
    if err := json.Unmarshal(raw, &data); err != nil {
        return nil, err
    }
    
    metrics := make([]Metric, 0)
    // 解析逻辑
    return metrics, nil
}

Integration Guide

1. Module Registration

在Plugin框架中注册采集模块:

// plugin.go
func init() {
    collector.Register("mqtt", NewMQTTCollector)
    collector.Register("http_server", NewHTTPServerCollector)
    collector.Register("database", NewDatabaseCollector)
}

2. Configuration Loading

加载并验证配置:

// config/config.go
type Config struct {
    Collector CollectorConfig `yaml:"collector"`
    MQTT      MQTTConfig      `yaml:"mqtt"`
    Metrics   []MetricConfig  `yaml:"metrics"`
    Parser    ParserConfig    `yaml:"parser"`
    Output    OutputConfig    `yaml:"output"`
}

func LoadConfig(path string) (*Config, error) {
    data, err := os.ReadFile(path)
    if err != nil {
        return nil, err
    }
    
    var config Config
    if err := yaml.Unmarshal(data, &config); err != nil {
        return nil, err
    }
    
    // 验证配置
    if err := config.Validate(); err != nil {
        return nil, err
    }
    
    return &config, nil
}

3. Health Check

实现健康检查接口:

func (c *MQTTCollector) Health() HealthStatus {
    status := HealthStatus{
        Status: "healthy",
        Checks: make(map[string]string),
    }
    
    if !c.client.IsConnected() {
        status.Status = "unhealthy"
        status.Checks["connection"] = "disconnected"
    }
    
    return status
}

Example Usage

User Request Example

用户请求:

请基于以下协议文档生成MQTT温度传感器采集模块:

## 通信配置
| 字段 | 值 |
|------|-----|
| protocol | mqtt |
| host | 192.168.1.100 |
| port | 1883 |
| topic | sensors/+/temperature |

## 数据点定义
| 字段名称 | 字段标识 | 数据类型 |
|----------|----------|----------|
| 温度值 | temperature | float |
| 设备ID | device_id | string |

Generated Response

根据协议文档生成完整的采集模块代码,包括:

  • collector.yaml 配置文件
  • internal/collector.go 采集器实现
  • internal/parser.go JSON解析器
  • main.go 程序入口
  • README.md 集成说明

Resources

scripts/

  • generate_collector.py - 采集模块代码生成脚本
  • test_protocol.py - 协议解析测试工具
  • mock_device.py - 模拟设备数据源

references/

assets/

  • templates/collector_template.go - 采集器代码模板
  • templates/config_template.yaml - 配置文件模板
  • schemas/metric.schema.json - 指标定义JSON Schema

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

Bitpanda

Query a Bitpanda account via the Bitpanda API using a bundled bash CLI. Covers all read-only endpoints: balances, trades, transactions, asset info, and live...

Registry SourceRecently Updated
Coding

Bark Push

Send push notifications to iOS devices via Bark. Use when you need to send a push notification to user's iPhone. Triggered by phrases like "send a notificati...

Registry SourceRecently Updated
Coding

Sslgen

Self-signed SSL certificate generator. Create SSL certificates for development, generate CA certificates, create certificate signing requests, and manage dev...

Registry SourceRecently Updated
850Profile unavailable
Coding

Snippet

Code snippet manager for your terminal. Save, organize, search, and recall frequently used code snippets, shell commands, and text templates. Tag and categor...

Registry SourceRecently Updated
830Profile unavailable