agent-module-architecture

安装量: 36
排名: #19467

安装

npx skills add https://github.com/tencentblueking/bk-ci --skill agent-module-architecture

Agent 构建机模块架构指南

模块定位: Agent 是 BK-CI 的构建机核心组件,由 Go 语言编写,负责与后端服务通信、接收构建任务、拉起 Worker 进程执行构建。

一、模块概述 1.1 核心职责 职责 说明 进程管理 Daemon 守护 Agent 进程,确保持续运行 任务调度 从 Dispatch 服务拉取构建任务并执行 Worker 管理 拉起 Worker(Kotlin JAR)执行实际构建逻辑 心跳上报 定期向后端上报 Agent 状态和环境信息 自动升级 检测并自动升级 Agent、Worker、JDK 数据采集 通过 Telegraf 采集构建机指标数据 Docker 构建 支持 Docker 容器化构建(Linux) 1.2 与 Worker 的关系 ┌─────────────────────────────────────────────────────────────┐ │ 构建机 (Build Machine) │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────┐ 守护 ┌─────────┐ │ │ │ Daemon │ ───────────▶ │ Agent │ │ │ │ (Go) │ │ (Go) │ │ │ └─────────┘ └────┬────┘ │ │ │ 拉起 │ │ ▼ │ │ ┌─────────┐ │ │ │ Worker │ │ │ │(Kotlin) │ │ │ └────┬────┘ │ │ │ 执行 │ │ ▼ │ │ ┌──────────────────────┐ │ │ │ 插件任务 / 脚本任务 │ │ │ └──────────────────────┘ │ └─────────────────────────────────────────────────────────────┘

Agent (Go): 负责进程调度、与后端通信、环境管理 Worker (Kotlin): 负责具体构建任务执行、插件运行、日志上报 二、目录结构 src/agent/ ├── agent/ # 主 Agent 模块 │ ├── src/ │ │ ├── cmd/ # 入口程序 │ │ │ ├── agent/main.go # Agent 主程序入口 │ │ │ ├── daemon/main.go # Daemon 守护进程入口 │ │ │ ├── installer/main.go # 安装程序入口 │ │ │ └── upgrader/main.go # 升级程序入口 │ │ ├── pkg/ # 核心包 │ │ │ ├── agent/ # Agent 核心逻辑 │ │ │ ├── api/ # API 客户端 │ │ │ ├── collector/ # 数据采集 │ │ │ ├── config/ # 配置管理 │ │ │ ├── cron/ # 定时任务 │ │ │ ├── i18n/ # 国际化 │ │ │ ├── imagedebug/ # Docker 镜像调试 │ │ │ ├── job/ # 构建任务管理 │ │ │ ├── job_docker/ # Docker 构建 │ │ │ ├── pipeline/ # Pipeline 任务 │ │ │ ├── upgrade/ # 升级逻辑 │ │ │ ├── upgrader/ # 升级器实现 │ │ │ └── util/ # 工具函数 │ │ └── third_components/ # 第三方组件管理 │ ├── go.mod │ ├── Makefile │ └── README.md ├── agent-slim/ # 轻量版 Agent │ └── cmd/slim.go └── common/ # 公共工具库 └── utils/ ├── fileutil/ └── slice.go

三、核心组件详解 3.1 Daemon 守护进程

文件: src/cmd/daemon/main.go

Daemon 负责守护 Agent 进程,确保其持续运行:

// Unix 实现:通过文件锁检测 Agent 是否存活 func watch(isDebug bool) { totalLock := flock.New(fmt.Sprintf("%s/%s.lock", systemutil.GetRuntimeDir(), systemutil.TotalLock))

// 首次立即检查
totalLock.Lock()
doCheckAndLaunchAgent(isDebug)
totalLock.Unlock()

// 定时检查(5秒间隔)
checkTimeTicker := time.NewTicker(agentCheckGap)
for ; ; totalLock.Unlock() {
    select {
    case <-checkTimeTicker.C:
        if err := totalLock.Lock(); err != nil {
            continue
        }
        doCheckAndLaunchAgent(isDebug)
    }
}

}

// 检查并拉起 Agent func doCheckAndLaunchAgent(isDebug bool) { agentLock := flock.New(fmt.Sprintf("%s/agent.lock", systemutil.GetRuntimeDir())) locked, err := agentLock.TryLock() if err == nil && locked { // 能获取锁说明 Agent 未运行,需要拉起 logs.Warn("agent is not available, will launch it") process, err := launch(workDir+"/"+config.AgentFileClientLinux, isDebug) if err != nil { logs.WithError(err).Error("launch agent failed") } } }

Windows 实现: 使用 github.com/kardianos/service 库实现 Windows Service

3.2 Agent 核心流程

文件: src/pkg/agent/agent.go

func Run(isDebug bool) { // 1. 初始化配置 config.Init(isDebug) third_components.Init()

// 2. 初始化国际化
i18n.InitAgentI18n()

// 3. 上报启动(重试直到成功)
_, err := job.AgentStartup()
if err != nil {
    for {
        _, err = job.AgentStartup()
        if err == nil {
            break
        }
        time.Sleep(5 * time.Second)
    }
}

// 4. 启动后台任务
go collector.Collect()      // 数据采集
go cron.CleanJob()          // 定期清理
go cron.CleanDebugContainer() // 清理调试容器

// 5. 主循环:Ask 请求
for {
    doAsk()
    config.LoadAgentIp()
    time.Sleep(5 * time.Second)
}

}

3.3 Ask 统一请求模式

Agent 使用 Ask 模式统一处理多种任务:

func doAsk() { // 构建 Ask 请求 enable := genAskEnable() heart, upgrad := genHeartInfoAndUpgrade(enable.Upgrade, exiterror)

result, err := api.Ask(&api.AskInfo{
    Enable:  enable,      // 启用的功能
    Heart:   heart,       // 心跳信息
    Upgrade: upgrad,      // 升级信息
})

// 处理响应
resp := new(api.AskResp)
util.ParseJsonToData(result.Data, &resp)

// 执行各类任务
doAgentJob(enable, resp)

}

func doAgentJob(enable api.AskEnable, resp *api.AskResp) { // 心跳响应处理 if resp.Heart != nil { go agentHeartbeat(resp.Heart) }

// 构建任务
hasBuild := (enable.Build != api.NoneBuildType) && (resp.Build != nil)
if hasBuild {
    go job.DoBuild(resp.Build)
}

// 升级任务
if enable.Upgrade && resp.Upgrade != nil {
    go upgrade.AgentUpgrade(resp.Upgrade, hasBuild)
}

// Pipeline 任务
if enable.Pipeline && resp.Pipeline != nil {
    go pipeline.RunPipeline(resp.Pipeline)
}

// Docker 调试
if enable.DockerDebug && resp.Debug != nil {
    go imagedebug.DoImageDebug(resp.Debug)
}

}

3.4 构建任务执行

文件: src/pkg/job/build.go

// DoBuild 执行构建任务 func DoBuild(buildInfo *api.ThirdPartyBuildInfo) { // 获取任务锁 BuildTotalManager.Lock.Lock()

// 检查并发数
dockerCanRun, normalCanRun := CheckParallelTaskCount()

if buildInfo.DockerBuildInfo != nil && dockerCanRun {
    // Docker 构建
    GBuildDockerManager.AddBuild(buildInfo.BuildId, &api.ThirdPartyDockerTaskInfo{...})
    BuildTotalManager.Lock.Unlock()
    runDockerBuild(buildInfo)
    return
}

if normalCanRun {
    // 普通构建
    GBuildManager.AddPreInstance(buildInfo.BuildId)
    BuildTotalManager.Lock.Unlock()
    runBuild(buildInfo)
}

}

// runBuild 启动 Worker 进程 func runBuild(buildInfo *api.ThirdPartyBuildInfo) error { // 检查 worker.jar 是否存在 agentJarPath := config.BuildAgentJarPath() if !fileutil.Exists(agentJarPath) { // 尝试自愈 upgradeWorkerFile := systemutil.GetUpgradeDir() + "/" + config.WorkAgentFile if fileutil.Exists(upgradeWorkerFile) { fileutil.CopyFile(upgradeWorkerFile, agentJarPath, true) } }

// 设置环境变量
goEnv := map[string]string{
    "DEVOPS_AGENT_VERSION":     config.AgentVersion,
    "DEVOPS_WORKER_VERSION":    third_components.Worker.GetVersion(),
    "DEVOPS_PROJECT_ID":        buildInfo.ProjectId,
    "DEVOPS_BUILD_ID":          buildInfo.BuildId,
    "DEVOPS_VM_SEQ_ID":         buildInfo.VmSeqId,
    "DEVOPS_FILE_GATEWAY":      config.GAgentConfig.FileGateway,
    "DEVOPS_GATEWAY":           config.GetGateWay(),
    "BK_CI_LOCALE_LANGUAGE":    config.GAgentConfig.Language,
    "DEVOPS_AGENT_JDK_8_PATH":  third_components.Jdk.Jdk8.GetJavaOrNull(),
    "DEVOPS_AGENT_JDK_17_PATH": third_components.Jdk.Jdk17.GetJavaOrNull(),
}

// 创建临时目录并启动构建
tmpDir, _ := systemutil.MkBuildTmpDir()
doBuild(buildInfo, tmpDir, workDir, goEnv, runUser)

}

3.5 配置管理

文件: src/pkg/config/config.go

Agent 配置从 .agent.properties 文件加载:

// 配置键定义 const ( KeyProjectId = "devops.project.id" KeyAgentId = "devops.agent.id" KeySecretKey = "devops.agent.secret.key" KeyDevopsGateway = "landun.gateway" KeyDevopsFileGateway = "landun.fileGateway" KeyTaskCount = "devops.parallel.task.count" KeyEnvType = "landun.env" KeySlaveUser = "devops.slave.user" KeyDockerTaskCount = "devops.docker.parallel.task.count" KeyLanguage = "devops.language" // ... )

// AgentConfig 配置结构 type AgentConfig struct { Gateway string FileGateway string BuildType string ProjectId string AgentId string SecretKey string ParallelTaskCount int DockerParallelTaskCount int EnableDockerBuild bool Language string // ... }

// AgentEnv 环境信息 type AgentEnv struct { OsName string agentIp string HostName string AgentVersion string AgentInstallPath string OsVersion string CPUProductInfo string GPUProductInfo string }

3.6 API 客户端

文件: src/pkg/api/api.go

// 构建 URL func buildUrl(url string) string { return config.GetGateWay() + url }

// Agent 启动上报 func AgentStartup() (*httputil.DevopsResult, error) { url := buildUrl("/ms/environment/api/buildAgent/agent/thirdPartyAgent/startup") startInfo := &ThirdPartyAgentStartInfo{ HostName: config.GAgentEnv.HostName, HostIp: config.GAgentEnv.GetAgentIp(), DetectOs: config.GAgentEnv.OsName, MasterVersion: config.AgentVersion, SlaveVersion: third_components.Worker.GetVersion(), } return httputil.NewHttpClient().Post(url).Body(startInfo, false). SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult() }

// 构建完成上报 func WorkerBuildFinish(buildInfo ThirdPartyBuildWithStatus) (httputil.DevopsResult, error) { url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish") return httputil.NewHttpClient().Post(url).Body(buildInfo, false). SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult() }

// Ask 统一请求 func Ask(info AskInfo) (httputil.AgentResult, error) { url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask") return httputil.NewHttpClient().Post(url).Body(info, bodyEq). SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(askRequest.Resp).IntoAgentResult() }

3.7 升级机制

文件: src/pkg/upgrade/upgrade.go

// AgentUpgrade 升级主逻辑 func AgentUpgrade(upgradeItem *api.UpgradeItem, hasBuild bool) { upItems := &upgradeItems{ Agent: upgradeItem.Agent, Worker: upgradeItem.Worker, Jdk: upgradeItem.Jdk, DockerInitFile: upgradeItem.DockerInitFile, }

if upItems.NoChange() {
    return
}

// 有构建任务时跳过升级
if hasBuild {
    return
}

// 获取任务锁,确保无任务运行
if !job.BuildTotalManager.Lock.TryLock() {
    return
}
defer job.BuildTotalManager.Lock.Unlock()

if job.CheckRunningJob() {
    return
}

// 下载升级文件
downloadUpgradeFiles(upItems)

// 执行升级
DoUpgradeOperation(upItems)

}

3.8 数据采集

文件: src/pkg/collector/collector.go

使用 Telegraf 进行数据采集:

func Collect() { if config.GAgentConfig.CollectorOn == false { logs.Info("agent collector off") return }

for {
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        // 监听 IP 变化事件
        ipData := <-ipChan.DChan
        cancel()
    }()
    doAgentCollect(ctx)
}

}

func doAgentCollect(ctx context.Context) { // 生成 Telegraf 配置 configContent, _ := genTelegrafConfig()

// 初始化 Telegraf Agent
tAgent, _ := getTelegrafAgent(configContent.Bytes(), logFile)

// 运行采集
for {
    tAgent.Run(ctx)
    time.Sleep(telegrafRelaunchTime)
}

}

四、数据类型定义 4.1 构建信息

文件: src/pkg/api/type.go

// 构建任务类型 type BuildJobType string

const ( AllBuildType BuildJobType = "ALL" DockerBuildType BuildJobType = "DOCKER" BinaryBuildType BuildJobType = "BINARY" NoneBuildType BuildJobType = "NONE" )

// 第三方构建信息 type ThirdPartyBuildInfo struct { ProjectId string json:"projectId" BuildId string json:"buildId" VmSeqId string json:"vmSeqId" Workspace string json:"workspace" PipelineId string json:"pipelineId" DockerBuildInfo ThirdPartyDockerBuildInfo json:"dockerBuildInfo" ExecuteCount int json:"executeCount" ContainerHashId string json:"containerHashId" }

// Docker 构建信息 type ThirdPartyDockerBuildInfo struct { AgentId string json:"agentId" SecretKey string json:"secretKey" Image string json:"image" Credential Credential json:"credential" Options DockerOptions json:"options" ImagePullPolicy string json:"imagePullPolicy" }

4.2 心跳信息 // Agent 心跳信息 type AgentHeartbeatInfo struct { MasterVersion string json:"masterVersion" SlaveVersion string json:"slaveVersion" HostName string json:"hostName" AgentIp string json:"agentIp" ParallelTaskCount int json:"parallelTaskCount" AgentInstallPath string json:"agentInstallPath" StartedUser string json:"startedUser" TaskList []ThirdPartyTaskInfo json:"taskList" DockerParallelTaskCount int json:"dockerParallelTaskCount" DockerTaskList []ThirdPartyDockerTaskInfo json:"dockerTaskList" }

// 心跳响应 type AgentHeartbeatResponse struct { MasterVersion string json:"masterVersion" SlaveVersion string json:"slaveVersion" AgentStatus string json:"agentStatus" ParallelTaskCount int json:"parallelTaskCount" Envs map[string]string json:"envs" Gateway string json:"gateway" FileGateway string json:"fileGateway" DockerParallelTaskCount int json:"dockerParallelTaskCount" Language string json:"language" }

五、跨平台支持 5.1 平台特定代码

Agent 通过 Go 的构建标签支持多平台:

src/pkg/config/ ├── config.go # 通用配置 ├── config_darwin.go # macOS 特定 ├── config_linux.go # Linux 特定 └── config_win.go # Windows 特定

src/pkg/upgrader/ ├── upgrader_darwin.go # macOS 升级器 ├── upgrader_unix.go # Unix 升级器 └── upgrader_win.go # Windows 升级器

5.2 构建命令

Linux

make clean build_linux

macOS

make clean build_macos

Windows

build_windows.bat

生成的二进制文件:

devopsDaemon_linux / devopsDaemon_macos / devopsDaemon.exe devopsAgent_linux / devopsAgent_macos / devopsAgent.exe upgrader_linux / upgrader_macos / upgrader.exe 六、与后端服务交互 6.1 API 端点 服务 端点 用途 Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/startup Agent 启动上报 Dispatch /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask 统一 Ask 请求 Dispatch /ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish 构建完成上报 Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/agents/pipelines Pipeline 任务 Environment /ms/environment/api/buildAgent/agent/thirdPartyAgent/upgrade/files/download 下载升级文件 6.2 认证头 func (a *AgentConfig) GetAuthHeaderMap() map[string]string { return map[string]string{ "X-DEVOPS-BUILD-TYPE": a.BuildType, "X-DEVOPS-PROJECT-ID": a.ProjectId, "X-DEVOPS-AGENT-ID": a.AgentId, "X-DEVOPS-AGENT-SECRET-KEY": a.SecretKey, } }

七、开发规范 7.1 错误处理 // 标准错误检查 if err != nil { logs.WithError(err).Error("operation failed") return errors.Wrap(err, "context message") }

// Panic 恢复 defer func() { if err := recover(); err != nil { logs.Error("panic: ", err) } }()

7.2 日志规范 // 日志级别 logs.Debug("debug message") logs.Info("info message") logs.Infof("formatted: %s", value) logs.Warn("warning message") logs.Error("error message") logs.WithError(err).Error("error with context")

7.3 并发模式 // 启动 goroutine go collector.Collect() go cron.CleanJob()

// 使用锁保护共享资源 BuildTotalManager.Lock.Lock() defer BuildTotalManager.Lock.Unlock()

// 使用文件锁进行进程间同步 agentLock := flock.New(fmt.Sprintf("%s/agent.lock", runtimeDir)) locked, err := agentLock.TryLock()

7.4 新增功能开发 新增 API 调用:在 src/pkg/api/api.go 添加函数 新增数据类型:在 src/pkg/api/type.go 定义结构体 新增配置项:在 src/pkg/config/config.go 添加常量和字段 新增后台任务:在 doAgentJob() 中添加处理逻辑 八、控制脚本

Linux 示例

scripts/linux/install.sh # 安装 scripts/linux/start.sh # 启动 scripts/linux/stop.sh # 停止 scripts/linux/uninstall.sh # 卸载

九、相关模块 模块 关系 说明 Worker 下游 Agent 拉起 Worker 执行构建 Environment 上游 Agent 状态管理、心跳上报 Dispatch 上游 构建任务分发 Log 下游 构建日志上报(通过 Worker)

返回排行榜