优化了下代码

This commit is contained in:
mianbin 2025-05-28 12:35:53 +08:00
parent 7a639ee01f
commit 113f657d76
2 changed files with 33 additions and 26 deletions

View File

@ -11,8 +11,8 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Consumer;
/** /**
* @description: * @description:
@ -34,21 +34,27 @@ public class MonitorService implements IMonitorService {
public Statistics monitorDocker(String clientId, String containerId) { public Statistics monitorDocker(String clientId, String containerId) {
CompletableFuture<Statistics> future = new CompletableFuture<>(); CompletableFuture<Statistics> future = new CompletableFuture<>();
DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId); DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId);
Consumer<Statistics> onNextConsumer = future::complete;
Consumer<Throwable> onErrorConsumer = throwable -> {
if (throwable instanceof RuntimeException) {
return;
}
log.error("docker 拉取状态异常,{}", throwable.getMessage());
future.completeExceptionally(throwable);
};
dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter<Statistics>() { dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter<Statistics>() {
@SneakyThrows(RuntimeException.class) @SneakyThrows(RuntimeException.class)
@Override @Override
public void onNext(Statistics statistics) { public void onNext(Statistics statistics) {
future.complete(statistics); onNextConsumer.accept(statistics);
// throw new RuntimeException("镜像{" + containerId + "},采集状态失败,停止运行");
} }
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
if (throwable instanceof RuntimeException) { onErrorConsumer.accept(throwable);
return;
}
log.error("docker 拉取状态异常,{}", throwable.getMessage());
future.completeExceptionally(throwable);
} }
}); });
try { try {
@ -73,32 +79,25 @@ public class MonitorService implements IMonitorService {
BlockingQueue<Statistics> statsQueue = new LinkedBlockingQueue<>(); BlockingQueue<Statistics> statsQueue = new LinkedBlockingQueue<>();
DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId); DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId);
Consumer<Statistics> onNextConsumer = statistics -> {
try {
statsQueue.put(statistics);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("向队列添加统计信息时被中断", e);
}
};
dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter<Statistics>() { dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter<Statistics>() {
@Override @Override
public void onNext(Statistics statistics) { public void onNext(Statistics statistics) {
try { onNextConsumer.accept(statistics);
// 将获取到的统计信息添加到队列中
statsQueue.put(statistics);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("向队列添加统计信息时被中断", e);
}
} }
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
log.error("docker 拉取状态异常,{}", throwable.getMessage()); log.error("docker 拉取状态异常,{}", throwable.getMessage());
} }
@Override
public void onComplete() {
log.info("Docker 状态监控完成");
try {
super.close();
} catch (IOException e) {
// igonre it
}
}
}); });
return statsQueue; return statsQueue;
} }
@ -128,8 +127,14 @@ public class MonitorService implements IMonitorService {
statsQueue.put(statistics); statsQueue.put(statistics);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
log.error("向队列添加统计信息时被中断", e); log.error("向队列添加统计信息时被中断", e.getMessage());
} }
} else {
/*
* 这个显得多余但是测试时候确实遇到callback.close()无效然后线程一直挂起的情况
* 这个加个保险
* */
Thread.currentThread().interrupt();
} }
} }
}; };

View File

@ -165,6 +165,8 @@ public class MonitorServiceTest {
} }
// 关闭监控 // 关闭监控
monitorTask.close(); monitorTask.close();
// Thread.currentThread().interrupt();
// 都可以 throw new InterruptedException();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
System.err.println("获取统计信息时被中断"); System.err.println("获取统计信息时被中断");