Agent 构建机模块架构指南
模块定位: Agent 是 BK-CI 的构建机核心组件,由 Go 语言编写,负责与后端服务通信、接收构建任务、拉起 Worker 进程执行构建。
一、模块概述
1.1 核心职责
职责 说明
进程管理 Daemon 守护 Agent 进程,确保持续运行
任务调度 从 Dispatch 服务拉取构建任务并执行
Worker 管理 拉起 Worker(Kotlin JAR)执行实际构建逻辑
心跳上报 定期向后端上报 Agent 状态和环境信息
自动升级 检测并自动升级 Agent、Worker、JDK
数据采集 通过 Telegraf 采集构建机指标数据
Docker 构建 支持 Docker 容器化构建(Linux)
1.2 与 Worker 的关系
┌─────────────────────────────────────────────────────────────┐ │ 构建机 (Build Machine) │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────┐ 守护 ┌─────────┐ │ │ │ Daemon │ ───────────▶ │ Agent │ │ │ │ (Go) │ │ (Go) │ │ │ └─────────┘ └────┬────┘ │ │ │ 拉起 │ │ ▼ │ │ ┌─────────┐ │ │ │ Worker │ │ │ │(Kotlin) │ │ │ └────┬────┘ │ │ │ 执行 │ │ ▼ │ │ ┌──────────────────────┐ │ │ │ 插件任务 / 脚本任务 │ │ │ └──────────────────────┘ │ └─────────────────────────────────────────────────────────────┘
-
Agent (Go): 负责进程调度、与后端通信、环境管理
-
Worker (Kotlin): 负责具体构建任务执行、插件运行、日志上报
二、目录结构
src/agent/ ├── agent/ # 主 Agent 模块 │ ├── src/ │ │ ├── cmd/ # 入口程序 │ │ │ ├── agent/main.go # Agent 主程序入口 │ │ │ ├── daemon/main.go # Daemon 守护进程入口 │ │ │ ├── installer/main.go # 安装程序入口 │ │ │ └── upgrader/main.go # 升级程序入口 │ │ ├── pkg/ # 核心包 │ │ │ ├── agent/ # Agent 核心逻辑 │ │ │ ├── api/ # API 客户端 │ │ │ ├── collector/ # 数据采集 │ │ │ ├── config/ # 配置管理 │ │ │ ├── cron/ # 定时任务 │ │ │ ├── i18n/ # 国际化 │ │ │ ├── imagedebug/ # Docker 镜像调试 │ │ │ ├── job/ # 构建任务管理 │ │ │ ├── job_docker/ # Docker 构建 │ │ │ ├── pipeline/ # Pipeline 任务 │ │ │ ├── upgrade/ # 升级逻辑 │ │ │ ├── upgrader/ # 升级器实现 │ │ │ └── util/ # 工具函数 │ │ └── third_components/ # 第三方组件管理 │ ├── go.mod │ ├── Makefile │ └── README.md ├── agent-slim/ # 轻量版 Agent │ └── cmd/slim.go └── common/ # 公共工具库 └── utils/ ├── fileutil/ └── slice.go
三、核心组件详解
3.1 Daemon 守护进程
文件: src/cmd/daemon/main.go
Daemon 负责守护 Agent 进程,确保其持续运行:
// Unix 实现:通过文件锁检测 Agent 是否存活 func watch(isDebug bool) { totalLock := flock.New(fmt.Sprintf("%s/%s.lock", systemutil.GetRuntimeDir(), systemutil.TotalLock))
// 首次立即检查
totalLock.Lock()
doCheckAndLaunchAgent(isDebug)
totalLock.Unlock()
// 定时检查(5秒间隔)
checkTimeTicker := time.NewTicker(agentCheckGap)
for ; ; totalLock.Unlock() {
select {
case <-checkTimeTicker.C:
if err := totalLock.Lock(); err != nil {
continue
}
doCheckAndLaunchAgent(isDebug)
}
}
}
// 检查并拉起 Agent func doCheckAndLaunchAgent(isDebug bool) { agentLock := flock.New(fmt.Sprintf("%s/agent.lock", systemutil.GetRuntimeDir())) locked, err := agentLock.TryLock() if err == nil && locked { // 能获取锁说明 Agent 未运行,需要拉起 logs.Warn("agent is not available, will launch it") process, err := launch(workDir+"/"+config.AgentFileClientLinux, isDebug) if err != nil { logs.WithError(err).Error("launch agent failed") } } }
Windows 实现: 使用 github.com/kardianos/service 库实现 Windows Service
3.2 Agent 核心流程
文件: src/pkg/agent/agent.go
func Run(isDebug bool) { // 1. 初始化配置 config.Init(isDebug) third_components.Init()
// 2. 初始化国际化
i18n.InitAgentI18n()
// 3. 上报启动(重试直到成功)
_, err := job.AgentStartup()
if err != nil {
for {
_, err = job.AgentStartup()
if err == nil {
break
}
time.Sleep(5 * time.Second)
}
}
// 4. 启动后台任务
go collector.Collect() // 数据采集
go cron.CleanJob() // 定期清理
go cron.CleanDebugContainer() // 清理调试容器
// 5. 主循环:Ask 请求
for {
doAsk()
config.LoadAgentIp()
time.Sleep(5 * time.Second)
}
}
3.3 Ask 统一请求模式
Agent 使用 Ask 模式统一处理多种任务:
func doAsk() { // 构建 Ask 请求 enable := genAskEnable() heart, upgrad := genHeartInfoAndUpgrade(enable.Upgrade, exiterror)
result, err := api.Ask(&api.AskInfo{
Enable: enable, // 启用的功能
Heart: heart, // 心跳信息
Upgrade: upgrad, // 升级信息
})
// 处理响应
resp := new(api.AskResp)
util.ParseJsonToData(result.Data, &resp)
// 执行各类任务
doAgentJob(enable, resp)
}
func doAgentJob(enable api.AskEnable, resp *api.AskResp) { // 心跳响应处理 if resp.Heart != nil { go agentHeartbeat(resp.Heart) }
// 构建任务
hasBuild := (enable.Build != api.NoneBuildType) && (resp.Build != nil)
if hasBuild {
go job.DoBuild(resp.Build)
}
// 升级任务
if enable.Upgrade && resp.Upgrade != nil {
go upgrade.AgentUpgrade(resp.Upgrade, hasBuild)
}
// Pipeline 任务
if enable.Pipeline && resp.Pipeline != nil {
go pipeline.RunPipeline(resp.Pipeline)
}
// Docker 调试
if enable.DockerDebug && resp.Debug != nil {
go imagedebug.DoImageDebug(resp.Debug)
}
}
3.4 构建任务执行
文件: src/pkg/job/build.go
// DoBuild 执行构建任务 func DoBuild(buildInfo *api.ThirdPartyBuildInfo) { // 获取任务锁 BuildTotalManager.Lock.Lock()
// 检查并发数
dockerCanRun, normalCanRun := CheckParallelTaskCount()
if buildInfo.DockerBuildInfo != nil && dockerCanRun {
// Docker 构建
GBuildDockerManager.AddBuild(buildInfo.BuildId, &api.ThirdPartyDockerTaskInfo{...})
BuildTotalManager.Lock.Unlock()
runDockerBuild(buildInfo)
return
}
if normalCanRun {
// 普通构建
GBuildManager.AddPreInstance(buildInfo.BuildId)
BuildTotalManager.Lock.Unlock()
runBuild(buildInfo)
}
}
// runBuild 启动 Worker 进程 func runBuild(buildInfo *api.ThirdPartyBuildInfo) error { // 检查 worker.jar 是否存在 agentJarPath := config.BuildAgentJarPath() if !fileutil.Exists(agentJarPath) { // 尝试自愈 upgradeWorkerFile := systemutil.GetUpgradeDir() + "/" + config.WorkAgentFile if fileutil.Exists(upgradeWorkerFile) { fileutil.CopyFile(upgradeWorkerFile, agentJarPath, true) } }
// 设置环境变量
goEnv := map[string]string{
"DEVOPS_AGENT_VERSION": config.AgentVersion,
"DEVOPS_WORKER_VERSION": third_components.Worker.GetVersion(),
"DEVOPS_PROJECT_ID": buildInfo.ProjectId,
"DEVOPS_BUILD_ID": buildInfo.BuildId,
"DEVOPS_VM_SEQ_ID": buildInfo.VmSeqId,
"DEVOPS_FILE_GATEWAY": config.GAgentConfig.FileGateway,
"DEVOPS_GATEWAY": config.GetGateWay(),
"BK_CI_LOCALE_LANGUAGE": config.GAgentConfig.Language,
"DEVOPS_AGENT_JDK_8_PATH": third_components.Jdk.Jdk8.GetJavaOrNull(),
"DEVOPS_AGENT_JDK_17_PATH": third_components.Jdk.Jdk17.GetJavaOrNull(),
}
// 创建临时目录并启动构建
tmpDir, _ := systemutil.MkBuildTmpDir()
doBuild(buildInfo, tmpDir, workDir, goEnv, runUser)
}
3.5 配置管理
文件: src/pkg/config/config.go
Agent 配置从 .agent.properties 文件加载:
// 配置键定义 const ( KeyProjectId = "devops.project.id" KeyAgentId = "devops.agent.id" KeySecretKey = "devops.agent.secret.key" KeyDevopsGateway = "landun.gateway" KeyDevopsFileGateway = "landun.fileGateway" KeyTaskCount = "devops.parallel.task.count" KeyEnvType = "landun.env" KeySlaveUser = "devops.slave.user" KeyDockerTaskCount = "devops.docker.parallel.task.count" KeyLanguage = "devops.language" // ... )
// AgentConfig 配置结构 type AgentConfig struct { Gateway string FileGateway string BuildType string ProjectId string AgentId string SecretKey string ParallelTaskCount int DockerParallelTaskCount int EnableDockerBuild bool Language string // ... }
// AgentEnv 环境信息 type AgentEnv struct { OsName string agentIp string HostName string AgentVersion string AgentInstallPath string OsVersion string CPUProductInfo string GPUProductInfo string }
3.6 API 客户端
文件: src/pkg/api/api.go
// 构建 URL func buildUrl(url string) string { return config.GetGateWay() + url }
// Agent 启动上报 func AgentStartup() (*httputil.DevopsResult, error) { url := buildUrl("/ms/environment/api/buildAgent/agent/thirdPartyAgent/startup") startInfo := &ThirdPartyAgentStartInfo{ HostName: config.GAgentEnv.HostName, HostIp: config.GAgentEnv.GetAgentIp(), DetectOs: config.GAgentEnv.OsName, MasterVersion: config.AgentVersion, SlaveVersion: third_components.Worker.GetVersion(), } return httputil.NewHttpClient().Post(url).Body(startInfo, false). SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult() }
// 构建完成上报 func WorkerBuildFinish(buildInfo *ThirdPartyBuildWithStatus) (*httputil.DevopsResult, error) { url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish") return httputil.NewHttpClient().Post(url).Body(buildInfo, false). SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult() }
// Ask 统一请求 func Ask(info *AskInfo) (*httputil.AgentResult, error) { url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask") return httputil.NewHttpClient().Post(url).Body(info, bodyEq). SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(askRequest.Resp).IntoAgentResult() }
3.7 升级机制
文件: src/pkg/upgrade/upgrade.go
// AgentUpgrade 升级主逻辑 func AgentUpgrade(upgradeItem *api.UpgradeItem, hasBuild bool) { upItems := &upgradeItems{ Agent: upgradeItem.Agent, Worker: upgradeItem.Worker, Jdk: upgradeItem.Jdk, DockerInitFile: upgradeItem.DockerInitFile, }
if upItems.NoChange() {
return
}
// 有构建任务时跳过升级
if hasBuild {
return
}
// 获取任务锁,确保无任务运行
if !job.BuildTotalManager.Lock.TryLock() {
return
}
defer job.BuildTotalManager.Lock.Unlock()
if job.CheckRunningJob() {
return
}
// 下载升级文件
downloadUpgradeFiles(upItems)
// 执行升级
DoUpgradeOperation(upItems)
}
3.8 数据采集
文件: src/pkg/collector/collector.go
使用 Telegraf 进行数据采集:
func Collect() { if config.GAgentConfig.CollectorOn == false { logs.Info("agent collector off") return }
for {
ctx, cancel := context.WithCancel(context.Background())
go func() {
// 监听 IP 变化事件
ipData := <-ipChan.DChan
cancel()
}()
doAgentCollect(ctx)
}
}
func doAgentCollect(ctx context.Context) { // 生成 Telegraf 配置 configContent, _ := genTelegrafConfig()
// 初始化 Telegraf Agent
tAgent, _ := getTelegrafAgent(configContent.Bytes(), logFile)
// 运行采集
for {
tAgent.Run(ctx)
time.Sleep(telegrafRelaunchTime)
}
}
四、数据类型定义
4.1 构建信息
文件: src/pkg/api/type.go
// 构建任务类型 type BuildJobType string
const ( AllBuildType BuildJobType = "ALL" DockerBuildType BuildJobType = "DOCKER" BinaryBuildType BuildJobType = "BINARY" NoneBuildType BuildJobType = "NONE" )
// 第三方构建信息
type ThirdPartyBuildInfo struct {
ProjectId string json:"projectId"
BuildId string json:"buildId"
VmSeqId string json:"vmSeqId"
Workspace string json:"workspace"
PipelineId string json:"pipelineId"
DockerBuildInfo *ThirdPartyDockerBuildInfo json:"dockerBuildInfo"
ExecuteCount *int json:"executeCount"
ContainerHashId string json:"containerHashId"
}
// Docker 构建信息
type ThirdPartyDockerBuildInfo struct {
AgentId string json:"agentId"
SecretKey string json:"secretKey"
Image string json:"image"
Credential Credential json:"credential"
Options DockerOptions json:"options"
ImagePullPolicy string json:"imagePullPolicy"
}
4.2 心跳信息
// Agent 心跳信息
type AgentHeartbeatInfo struct {
MasterVersion string json:"masterVersion"
SlaveVersion string json:"slaveVersion"
HostName string json:"hostName"
AgentIp string json:"agentIp"
ParallelTaskCount int json:"parallelTaskCount"
AgentInstallPath string json:"agentInstallPath"
StartedUser string json:"startedUser"
TaskList []ThirdPartyTaskInfo json:"taskList"
DockerParallelTaskCount int json:"dockerParallelTaskCount"
DockerTaskList []ThirdPartyDockerTaskInfo json:"dockerTaskList"
}
// 心跳响应
type AgentHeartbeatResponse struct {
MasterVersion string json:"masterVersion"
SlaveVersion string json:"slaveVersion"
AgentStatus string json:"agentStatus"
ParallelTaskCount int json:"parallelTaskCount"
Envs map[string]string json:"envs"
Gateway string json:"gateway"
FileGateway string json:"fileGateway"
DockerParallelTaskCount int json:"dockerParallelTaskCount"
Language string json:"language"
}
五、跨平台支持
5.1 平台特定代码
Agent 通过 Go 的构建标签支持多平台:
src/pkg/config/ ├── config.go # 通用配置 ├── config_darwin.go # macOS 特定 ├── config_linux.go # Linux 特定 └── config_win.go # Windows 特定
src/pkg/upgrader/ ├── upgrader_darwin.go # macOS 升级器 ├── upgrader_unix.go # Unix 升级器 └── upgrader_win.go # Windows 升级器
5.2 构建命令
Linux
make clean build_linux
macOS
make clean build_macos
Windows
build_windows.bat
生成的二进制文件:
-
devopsDaemon_linux / devopsDaemon_macos / devopsDaemon.exe
-
devopsAgent_linux / devopsAgent_macos / devopsAgent.exe
-
upgrader_linux / upgrader_macos / upgrader.exe
六、与后端服务交互
6.1 API 端点
服务 端点 用途
Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/startup
Agent 启动上报
Dispatch /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask
统一 Ask 请求
Dispatch /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish
构建完成上报
Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/agents/pipelines
Pipeline 任务
Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/upgrade/files/download
下载升级文件
6.2 认证头
func (a *AgentConfig) GetAuthHeaderMap() map[string]string { return map[string]string{ "X-DEVOPS-BUILD-TYPE": a.BuildType, "X-DEVOPS-PROJECT-ID": a.ProjectId, "X-DEVOPS-AGENT-ID": a.AgentId, "X-DEVOPS-AGENT-SECRET-KEY": a.SecretKey, } }
七、开发规范
7.1 错误处理
// 标准错误检查 if err != nil { logs.WithError(err).Error("operation failed") return errors.Wrap(err, "context message") }
// Panic 恢复 defer func() { if err := recover(); err != nil { logs.Error("panic: ", err) } }()
7.2 日志规范
// 日志级别 logs.Debug("debug message") logs.Info("info message") logs.Infof("formatted: %s", value) logs.Warn("warning message") logs.Error("error message") logs.WithError(err).Error("error with context")
7.3 并发模式
// 启动 goroutine go collector.Collect() go cron.CleanJob()
// 使用锁保护共享资源 BuildTotalManager.Lock.Lock() defer BuildTotalManager.Lock.Unlock()
// 使用文件锁进行进程间同步 agentLock := flock.New(fmt.Sprintf("%s/agent.lock", runtimeDir)) locked, err := agentLock.TryLock()
7.4 新增功能开发
-
新增 API 调用:在 src/pkg/api/api.go 添加函数
-
新增数据类型:在 src/pkg/api/type.go 定义结构体
-
新增配置项:在 src/pkg/config/config.go 添加常量和字段
-
新增后台任务:在 doAgentJob() 中添加处理逻辑
八、控制脚本
Linux 示例
scripts/linux/install.sh # 安装 scripts/linux/start.sh # 启动 scripts/linux/stop.sh # 停止 scripts/linux/uninstall.sh # 卸载
九、相关模块
模块 关系 说明
Worker 下游 Agent 拉起 Worker 执行构建
Environment 上游 Agent 状态管理、心跳上报
Dispatch 上游 构建任务分发
Log 下游 构建日志上报(通过 Worker)