增加一个镜像内部类,控制镜像统计,目前镜像能够按需获取了,3个方法,获取一次,一直获取,按需获取关闭

This commit is contained in:
mianbin 2025-05-28 12:10:25 +08:00
parent 45c717924c
commit 7a639ee01f
3 changed files with 265 additions and 0 deletions

View File

@ -15,4 +15,6 @@ public interface IMonitorService {
Statistics monitorDocker(String clientId, String containerId); Statistics monitorDocker(String clientId, String containerId);
BlockingQueue<Statistics> monitorsDocker(String clientId, String containerId); BlockingQueue<Statistics> monitorsDocker(String clientId, String containerId);
public BlockingQueue<Statistics> monitorsDockerEnableClose(String clientId, String containerId);
} }

View File

@ -5,6 +5,7 @@ import cd.casic.module.execute.docker.service.IMonitorService;
import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback; import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.model.Statistics; import com.github.dockerjava.api.model.Statistics;
import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -63,11 +64,15 @@ public class MonitorService implements IMonitorService {
} }
} }
/*
* 镜像里面的监控获取一直获取
* */
@Override @Override
public BlockingQueue<Statistics> monitorsDocker(String clientId, String containerId) { public BlockingQueue<Statistics> monitorsDocker(String clientId, String containerId) {
// 创建一个阻塞队列用于存储持续获取的统计信息 // 创建一个阻塞队列用于存储持续获取的统计信息
BlockingQueue<Statistics> statsQueue = new LinkedBlockingQueue<>(); BlockingQueue<Statistics> statsQueue = new LinkedBlockingQueue<>();
DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId); DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId);
dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter<Statistics>() { dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter<Statistics>() {
@Override @Override
public void onNext(Statistics statistics) { public void onNext(Statistics statistics) {
@ -97,4 +102,50 @@ public class MonitorService implements IMonitorService {
}); });
return statsQueue; return statsQueue;
} }
@Override
public BlockingQueue<Statistics> monitorsDockerEnableClose(String clientId, String containerId) {
DockerClient dockerClient = dockerClientFactory.getdockerClient(clientId);
MonitorTask monitorTask = new MonitorTask(dockerClient, containerId);
return monitorTask.getStatsQueue();
}
@Getter
public static class MonitorTask {
private final BlockingQueue<Statistics> statsQueue = new LinkedBlockingQueue<>();
private final ResultCallback.Adapter<Statistics> callback;
private final DockerClient dockerClient;
private boolean isClosed = false;
public MonitorTask(DockerClient dockerClient, String containerId) {
this.dockerClient = dockerClient;
this.callback = new ResultCallback.Adapter<Statistics>() {
@Override
public void onNext(Statistics statistics) {
if (!isClosed) {
try {
statsQueue.put(statistics);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("向队列添加统计信息时被中断", e);
}
}
}
};
dockerClient.statsCmd(containerId).exec(callback);
}
public void close() {
if (!isClosed) {
isClosed = true;
try {
callback.close();
} catch (Exception e) {
log.error("关闭监控回调时出错", e);
}
}
}
}
} }

View File

@ -0,0 +1,212 @@
package cd.casic.module.execute;
import cd.casic.module.execute.docker.dataobject.model.DockerEndpoint;
import cd.casic.module.execute.docker.service.impl.MonitorService;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.model.Statistics;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.*;
import static java.lang.String.format;
/**
* @description: TODO
* @author: mianbin
* @date: 2025/5/26 20:11
* @version: 1.0
*/
@Slf4j
public class MonitorServiceTest {
private final Map<String, DockerClient> clientGroup = new ConcurrentHashMap<>();
private ApacheDockerHttpClient createHttpClient(DockerEndpoint endpoint) {
try {
URI dockerHost;
if (endpoint.getType() == DockerEndpoint.DockerEndpointTypeEnum.LOCAL) {
// 使用本地挂载
dockerHost = new URI("unix:///var/run/docker.sock");
} else if (endpoint.getType() == DockerEndpoint.DockerEndpointTypeEnum.REMOTE) {
// 远程挂载
dockerHost = new URI(format("tcp://%s:%s", endpoint.getHost(), endpoint.getPort()));
} else {
log.error("Unsupported Docker endpoint type: {}", endpoint.getType());
return null;
}
return new ApacheDockerHttpClient.Builder()
.dockerHost(dockerHost)
.build();
} catch (URISyntaxException e) {
log.error("Failed to create URI for Docker endpoint {}: {}", endpoint.getId(), e.getMessage(), e);
return null;
}
}
@BeforeEach
public void setUp() {
DockerEndpoint endpoint = new DockerEndpoint();
endpoint.setId("158");
endpoint.setType(DockerEndpoint.DockerEndpointTypeEnum.REMOTE);
endpoint.setHost("175.6.27.158");
endpoint.setPort(22375);
endpoint.setName("test");
ApacheDockerHttpClient httpClient = createHttpClient(endpoint);
DockerClient dockerClient = DockerClientBuilder.getInstance().withDockerHttpClient(httpClient).build();
clientGroup.put(endpoint.getId(), dockerClient);
}
@Test
public void statistics() {
CompletableFuture<Statistics> future = new CompletableFuture<>();
String containerId = "c1ffcaf2ff8d";
DockerClient dockerClient = clientGroup.get("158");
dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter<Statistics>() {
@SneakyThrows(RuntimeException.class)
@Override
public void onNext(Statistics statistics) {
future.complete(statistics);
// throw new RuntimeException("镜像{" + containerId + "},采集状态失败,停止运行");
}
@Override
public void onError(Throwable throwable) {
if (throwable instanceof RuntimeException) {
return;
}
log.error("docker 拉取状态异常,{}", throwable.getMessage());
future.completeExceptionally(throwable);
}
});
try {
// 最多等待 3
Statistics statistics = future.get(3, TimeUnit.SECONDS);
System.out.println(statistics);
} catch (TimeoutException e) {
log.warn("获取 Docker 统计信息超时");
new Statistics();
} catch (InterruptedException | ExecutionException e) {
log.error("获取 Docker 统计信息出错", e);
new Statistics();
}
}
@Test
public void statisticsMore() {
String containerId = "c1ffcaf2ff8d";
BlockingQueue<Statistics> statsQueue = monitorDocker("158", containerId);
try {
while (true) {
// 从队列中取出统计信息若队列为空则阻塞
Statistics statistics = statsQueue.take();
System.out.println(statistics);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("获取统计信息时被中断");
}
}
public BlockingQueue<Statistics> monitorDocker(String clientId, String containerId) {
// 创建一个阻塞队列用于存储持续获取的统计信息
BlockingQueue<Statistics> statsQueue = new LinkedBlockingQueue<>();
DockerClient dockerClient = clientGroup.get(clientId);
dockerClient.statsCmd(containerId).exec(new ResultCallback.Adapter<Statistics>() {
@Override
public void onNext(Statistics statistics) {
try {
// 将获取到的统计信息添加到队列中
statsQueue.put(statistics);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("向队列添加统计信息时被中断", e);
}
}
@Override
public void onError(Throwable throwable) {
log.error("docker 拉取状态异常,{}", throwable.getMessage());
}
@Override
public void onComplete() {
log.info("Docker 状态监控完成");
}
});
return statsQueue;
}
@Test
public void statisticsEnableClose() {
String containerId = "c1ffcaf2ff8d";
DockerClient dockerClient = clientGroup.get("158");
MonitorService.MonitorTask monitorTask = new MonitorService.MonitorTask(dockerClient, containerId);
BlockingQueue<Statistics> statsQueue = monitorTask.getStatsQueue();
try {
// 模拟收集一段时间的状态信息
for (int i = 0; i < 10; i++) {
Statistics statistics = statsQueue.take();
System.out.println(statistics);
}
// 关闭监控
monitorTask.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("获取统计信息时被中断");
}
}
@Getter
public static class MonitorTask {
private final BlockingQueue<Statistics> statsQueue = new LinkedBlockingQueue<>();
private final ResultCallback.Adapter<Statistics> callback;
private final DockerClient dockerClient;
private boolean isClosed = false;
public MonitorTask(DockerClient dockerClient, String containerId) {
this.dockerClient = dockerClient;
this.callback = new ResultCallback.Adapter<Statistics>() {
@Override
public void onNext(Statistics statistics) {
if (!isClosed) {
try {
statsQueue.put(statistics);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("向队列添加统计信息时被中断", e);
}
}
}
};
dockerClient.statsCmd(containerId).exec(callback);
}
public void close() {
if (!isClosed) {
isClosed = true;
try {
callback.close();
} catch (Exception e) {
log.error("关闭监控回调时出错", e);
}
}
}
}
}