From 7a639ee01f7cc503f20d24673166749c1bc51963 Mon Sep 17 00:00:00 2001 From: mianbin Date: Wed, 28 May 2025 12:10:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=80=E4=B8=AA=E9=95=9C?= =?UTF-8?q?=E5=83=8F=E5=86=85=E9=83=A8=E7=B1=BB=EF=BC=8C=E6=8E=A7=E5=88=B6?= =?UTF-8?q?=E9=95=9C=E5=83=8F=E7=BB=9F=E8=AE=A1=EF=BC=8C=E7=9B=AE=E5=89=8D?= =?UTF-8?q?=E9=95=9C=E5=83=8F=E8=83=BD=E5=A4=9F=E6=8C=89=E9=9C=80=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E4=BA=86=EF=BC=8C3=E4=B8=AA=E6=96=B9=E6=B3=95?= =?UTF-8?q?=EF=BC=8C=E8=8E=B7=E5=8F=96=E4=B8=80=E6=AC=A1=EF=BC=8C=E4=B8=80?= =?UTF-8?q?=E7=9B=B4=E8=8E=B7=E5=8F=96=EF=BC=8C=E6=8C=89=E9=9C=80=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E5=85=B3=E9=97=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../docker/service/IMonitorService.java | 2 + .../docker/service/impl/MonitorService.java | 51 +++++ .../MonitorServiceTest.java | 212 ++++++++++++++++++ 3 files changed, 265 insertions(+) create mode 100644 modules/module-ci-execute/src/test/java/cd.casic.module.execute/MonitorServiceTest.java diff --git a/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/IMonitorService.java b/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/IMonitorService.java index 6e383dba..fa637e37 100644 --- a/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/IMonitorService.java +++ b/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/IMonitorService.java @@ -15,4 +15,6 @@ public interface IMonitorService { Statistics monitorDocker(String clientId, String containerId); BlockingQueue monitorsDocker(String clientId, String containerId); + + public BlockingQueue monitorsDockerEnableClose(String clientId, String containerId); } diff --git a/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/impl/MonitorService.java b/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/impl/MonitorService.java index 88e7e6be..2aa7c721 100644 --- a/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/impl/MonitorService.java +++ b/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/impl/MonitorService.java @@ -5,6 +5,7 @@ import cd.casic.module.execute.docker.service.IMonitorService; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.async.ResultCallback; import com.github.dockerjava.api.model.Statistics; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -63,11 +64,15 @@ public class MonitorService implements IMonitorService { } } + /* + * 镜像里面的监控获取,一直获取 + * */ @Override public BlockingQueue monitorsDocker(String clientId, String containerId) { // 创建一个阻塞队列用于存储持续获取的统计信息 BlockingQueue statsQueue = new LinkedBlockingQueue<>(); DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId); + dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter() { @Override public void onNext(Statistics statistics) { @@ -97,4 +102,50 @@ public class MonitorService implements IMonitorService { }); return statsQueue; } + + @Override + public BlockingQueue monitorsDockerEnableClose(String clientId, String containerId) { + DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId); + MonitorTask monitorTask = new MonitorTask(dockerClient, containerId); + return monitorTask.getStatsQueue(); + } + + + @Getter + public static class MonitorTask { + private final BlockingQueue statsQueue = new LinkedBlockingQueue<>(); + private final ResultCallback.Adapter callback; + private final DockerClient dockerClient; + private boolean isClosed = false; + + public MonitorTask(DockerClient dockerClient, String containerId) { + this.dockerClient = dockerClient; + this.callback = new ResultCallback.Adapter() { + @Override + public void onNext(Statistics statistics) { + if (!isClosed) { + try { + statsQueue.put(statistics); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("向队列添加统计信息时被中断", e); + } + } + } + }; + dockerClient.statsCmd(containerId).exec(callback); + } + + public void close() { + if (!isClosed) { + isClosed = true; + try { + callback.close(); + } catch (Exception e) { + log.error("关闭监控回调时出错", e); + } + } + } + } + } diff --git a/modules/module-ci-execute/src/test/java/cd.casic.module.execute/MonitorServiceTest.java b/modules/module-ci-execute/src/test/java/cd.casic.module.execute/MonitorServiceTest.java new file mode 100644 index 00000000..eb090a13 --- /dev/null +++ b/modules/module-ci-execute/src/test/java/cd.casic.module.execute/MonitorServiceTest.java @@ -0,0 +1,212 @@ +package cd.casic.module.execute; + +import cd.casic.module.execute.docker.dataobject.model.DockerEndpoint; +import cd.casic.module.execute.docker.service.impl.MonitorService; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.async.ResultCallback; +import com.github.dockerjava.api.model.Statistics; +import com.github.dockerjava.core.DockerClientBuilder; +import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.concurrent.*; + +import static java.lang.String.format; + +/** + * @description: TODO + * @author: mianbin + * @date: 2025/5/26 20:11 + * @version: 1.0 + */ +@Slf4j +public class MonitorServiceTest { + + private final Map clientGroup = new ConcurrentHashMap<>(); + + private ApacheDockerHttpClient createHttpClient(DockerEndpoint endpoint) { + try { + URI dockerHost; + if (endpoint.getType() == DockerEndpoint.DockerEndpointTypeEnum.LOCAL) { + // 使用本地挂载 + dockerHost = new URI("unix:///var/run/docker.sock"); + } else if (endpoint.getType() == DockerEndpoint.DockerEndpointTypeEnum.REMOTE) { + // 远程挂载 + dockerHost = new URI(format("tcp://%s:%s", endpoint.getHost(), endpoint.getPort())); + } else { + log.error("Unsupported Docker endpoint type: {}", endpoint.getType()); + return null; + } + return new ApacheDockerHttpClient.Builder() + .dockerHost(dockerHost) + .build(); + } catch (URISyntaxException e) { + log.error("Failed to create URI for Docker endpoint {}: {}", endpoint.getId(), e.getMessage(), e); + return null; + } + } + + @BeforeEach + public void setUp() { + DockerEndpoint endpoint = new DockerEndpoint(); + endpoint.setId("158"); + endpoint.setType(DockerEndpoint.DockerEndpointTypeEnum.REMOTE); + endpoint.setHost("175.6.27.158"); + endpoint.setPort(22375); + endpoint.setName("test"); + ApacheDockerHttpClient httpClient = createHttpClient(endpoint); + DockerClient dockerClient = DockerClientBuilder.getInstance().withDockerHttpClient(httpClient).build(); + clientGroup.put(endpoint.getId(), dockerClient); + } + + + @Test + public void statistics() { + CompletableFuture future = new CompletableFuture<>(); + String containerId = "c1ffcaf2ff8d"; + DockerClient dockerClient = clientGroup.get("158"); + + dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter() { + @SneakyThrows(RuntimeException.class) + @Override + public void onNext(Statistics statistics) { + future.complete(statistics); +// throw new RuntimeException("镜像{" + containerId + "},采集状态失败,停止运行"); + } + + @Override + public void onError(Throwable throwable) { + if (throwable instanceof RuntimeException) { + return; + } + log.error("docker 拉取状态异常,{}", throwable.getMessage()); + future.completeExceptionally(throwable); + } + }); + try { + // 最多等待 3 秒 + Statistics statistics = future.get(3, TimeUnit.SECONDS); + System.out.println(statistics); + } catch (TimeoutException e) { + log.warn("获取 Docker 统计信息超时"); + new Statistics(); + } catch (InterruptedException | ExecutionException e) { + log.error("获取 Docker 统计信息出错", e); + new Statistics(); + } + + } + + + @Test + public void statisticsMore() { + String containerId = "c1ffcaf2ff8d"; + BlockingQueue statsQueue = monitorDocker("158", containerId); + try { + while (true) { + // 从队列中取出统计信息,若队列为空则阻塞 + Statistics statistics = statsQueue.take(); + System.out.println(statistics); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.err.println("获取统计信息时被中断"); + } + } + + public BlockingQueue monitorDocker(String clientId, String containerId) { + // 创建一个阻塞队列用于存储持续获取的统计信息 + BlockingQueue statsQueue = new LinkedBlockingQueue<>(); + DockerClient dockerClient = clientGroup.get(clientId); + dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter() { + @Override + public void onNext(Statistics statistics) { + try { + // 将获取到的统计信息添加到队列中 + statsQueue.put(statistics); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("向队列添加统计信息时被中断", e); + } + } + + @Override + public void onError(Throwable throwable) { + log.error("docker 拉取状态异常,{}", throwable.getMessage()); + } + + @Override + public void onComplete() { + log.info("Docker 状态监控完成"); + } + }); + return statsQueue; + } + + @Test + public void statisticsEnableClose() { + String containerId = "c1ffcaf2ff8d"; + DockerClient dockerClient = clientGroup.get("158"); + MonitorService.MonitorTask monitorTask = new MonitorService.MonitorTask(dockerClient, containerId); + BlockingQueue statsQueue = monitorTask.getStatsQueue(); + + try { + // 模拟收集一段时间的状态信息 + for (int i = 0; i < 10; i++) { + Statistics statistics = statsQueue.take(); + System.out.println(statistics); + } + // 关闭监控 + monitorTask.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.err.println("获取统计信息时被中断"); + } + } + + + @Getter + public static class MonitorTask { + private final BlockingQueue statsQueue = new LinkedBlockingQueue<>(); + private final ResultCallback.Adapter callback; + private final DockerClient dockerClient; + private boolean isClosed = false; + + public MonitorTask(DockerClient dockerClient, String containerId) { + this.dockerClient = dockerClient; + this.callback = new ResultCallback.Adapter() { + @Override + public void onNext(Statistics statistics) { + if (!isClosed) { + try { + statsQueue.put(statistics); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("向队列添加统计信息时被中断", e); + } + } + } + }; + dockerClient.statsCmd(containerId).exec(callback); + } + + public void close() { + if (!isClosed) { + isClosed = true; + try { + callback.close(); + } catch (Exception e) { + log.error("关闭监控回调时出错", e); + } + } + } + } + +}