diff --git a/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/impl/MonitorService.java b/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/impl/MonitorService.java index 2aa7c721..f94d1bf3 100644 --- a/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/impl/MonitorService.java +++ b/modules/module-ci-execute/src/main/java/cd/casic/module/execute/docker/service/impl/MonitorService.java @@ -11,8 +11,8 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import java.io.IOException; import java.util.concurrent.*; +import java.util.function.Consumer; /** * @description: @@ -34,21 +34,27 @@ public class MonitorService implements IMonitorService { public Statistics monitorDocker(String clientId, String containerId) { CompletableFuture future = new CompletableFuture<>(); DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId); + + Consumer onNextConsumer = future::complete; + + Consumer onErrorConsumer = throwable -> { + if (throwable instanceof RuntimeException) { + return; + } + log.error("docker 拉取状态异常,{}", throwable.getMessage()); + future.completeExceptionally(throwable); + }; + dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter() { @SneakyThrows(RuntimeException.class) @Override public void onNext(Statistics statistics) { - future.complete(statistics); -// throw new RuntimeException("镜像{" + containerId + "},采集状态失败,停止运行"); + onNextConsumer.accept(statistics); } @Override public void onError(Throwable throwable) { - if (throwable instanceof RuntimeException) { - return; - } - log.error("docker 拉取状态异常,{}", throwable.getMessage()); - future.completeExceptionally(throwable); + onErrorConsumer.accept(throwable); } }); try { @@ -73,32 +79,25 @@ public class MonitorService implements IMonitorService { BlockingQueue statsQueue = new LinkedBlockingQueue<>(); DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId); + Consumer onNextConsumer = statistics -> { + try { + statsQueue.put(statistics); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("向队列添加统计信息时被中断", e); + } + }; + dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter() { @Override public void onNext(Statistics statistics) { - try { - // 将获取到的统计信息添加到队列中 - statsQueue.put(statistics); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("向队列添加统计信息时被中断", e); - } + onNextConsumer.accept(statistics); } @Override public void onError(Throwable throwable) { log.error("docker 拉取状态异常,{}", throwable.getMessage()); } - - @Override - public void onComplete() { - log.info("Docker 状态监控完成"); - try { - super.close(); - } catch (IOException e) { - // igonre it - } - } }); return statsQueue; } @@ -128,8 +127,14 @@ public class MonitorService implements IMonitorService { statsQueue.put(statistics); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - log.error("向队列添加统计信息时被中断", e); + log.error("向队列添加统计信息时被中断", e.getMessage()); } + } else { + /* + * 这个显得多余,但是测试时候确实遇到callback.close()无效,然后线程一直挂起的情况 + * 这个加个保险 + * */ + Thread.currentThread().interrupt(); } } }; diff --git a/modules/module-ci-execute/src/test/java/cd.casic.module.execute/MonitorServiceTest.java b/modules/module-ci-execute/src/test/java/cd.casic.module.execute/MonitorServiceTest.java index eb090a13..359f540c 100644 --- a/modules/module-ci-execute/src/test/java/cd.casic.module.execute/MonitorServiceTest.java +++ b/modules/module-ci-execute/src/test/java/cd.casic.module.execute/MonitorServiceTest.java @@ -165,6 +165,8 @@ public class MonitorServiceTest { } // 关闭监控 monitorTask.close(); +// Thread.currentThread().interrupt(); +// 都可以 ,throw new InterruptedException(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("获取统计信息时被中断");