Compare commits

...

6 Commits

Author SHA1 Message Date
even
41a0e2cf9f Merge branch 'master' of http://1.14.125.6:3000/mianbin/ops-pro
# Conflicts:
#	modules/module-ci-process-biz/pom.xml
2025-06-01 17:29:26 +08:00
even
ba5a5ed972 注释补充 2025-06-01 17:24:46 +08:00
even
952e1220ef docker 初试,以及测试类 2025-06-01 17:20:52 +08:00
even
e8a2d21f7b Merge remote-tracking branch 'origin/temp' 2025-05-31 14:48:54 +08:00
even
d4f4167fba Merge branch 'temp' of http://1.14.125.6:3000/mianbin/ops-pro into temp 2025-05-31 00:30:01 +08:00
even
c1ee42791f 日志修改 ssh执行逻辑修改 2025-05-31 00:29:44 +08:00
12 changed files with 763 additions and 16 deletions

View File

@ -52,6 +52,16 @@
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
<version>2.17.0</version> <version>2.17.0</version>
</dependency> </dependency>
<dependency>
<groupId>cd.casic.boot</groupId>
<artifactId>module-ci-commons</artifactId>
</dependency>
<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -29,7 +29,7 @@ public class DockerClientController {
private final DockerEndpointDao dockerEndpointDao; private final DockerEndpointDao dockerEndpointDao;
private final DockerClientFactory dockerClientFactory; private final DockerClientFactory dockerClientFactory;
@GetMapping // @GetMapping
public CommonResult<List<DockerEndpoint>> list() { public CommonResult<List<DockerEndpoint>> list() {
List<DockerEndpoint> collect = dockerEndpointDao List<DockerEndpoint> collect = dockerEndpointDao
.selectList() .selectList()
@ -39,7 +39,7 @@ public class DockerClientController {
return CommonResult.success(collect); return CommonResult.success(collect);
} }
@GetMapping // @GetMapping
public CommonResult<Boolean> testDockerClient(String dockerClientId) { public CommonResult<Boolean> testDockerClient(String dockerClientId) {
DockerClient dockerClient = dockerClientFactory.getdockerClient(dockerClientId); DockerClient dockerClient = dockerClientFactory.getdockerClient(dockerClientId);
dockerClient.pingCmd().exec(); dockerClient.pingCmd().exec();

View File

@ -94,6 +94,10 @@
<groupId>cd.casic.boot</groupId> <groupId>cd.casic.boot</groupId>
<artifactId>module-infra-biz</artifactId> <artifactId>module-infra-biz</artifactId>
</dependency> </dependency>
<dependency>
<groupId>cd.casic.boot</groupId>
<artifactId>module-ci-execute</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

View File

@ -3,6 +3,8 @@ package cd.casic.ci.process.engine.manager;
import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletRequest;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
/** /**
* TODO 用来管理日志连接 * TODO 用来管理日志连接
* TODO 用来写入日志 * TODO 用来写入日志
@ -11,4 +13,5 @@ public interface LoggerManager {
SseEmitter subscribe(String taskId,HttpServletRequest request); SseEmitter subscribe(String taskId,HttpServletRequest request);
void append(String taskId,String logContent); void append(String taskId,String logContent);
String getLogContent(String taskId); String getLogContent(String taskId);
void flushMemory(List<String> taskIdList);
} }

View File

@ -1,23 +1,32 @@
package cd.casic.ci.process.engine.manager.impl; package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext; import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dal.pipeline.PipTaskDao;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Component @Component
public class DefaultRunContextManager implements RunContextManager { public class DefaultRunContextManager implements RunContextManager {
private final Map<String,PipelineRunContext> contextMap= new ConcurrentHashMap(); private final Map<String,PipelineRunContext> contextMap= new ConcurrentHashMap();
@Resource
private LoggerManager loggerManager;
@Resource
private PipTaskDao taskDao;
@Override @Override
public Boolean stopPipeline(String pipelineId) { public Boolean stopPipeline(String pipelineId) {
return null; return null;
@ -56,6 +65,10 @@ public class DefaultRunContextManager implements RunContextManager {
if (contextMap.containsKey(id)) { if (contextMap.containsKey(id)) {
PipelineRunContext oldPipeline = contextMap.get(id); PipelineRunContext oldPipeline = contextMap.get(id);
oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING); oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING);
List<PipTask> taskList = taskDao.selectList("pipelineId", id);
List<String> taskIdList = taskList.stream().map(PipTask::getId).toList();
// 清空上一次的日志
loggerManager.flushMemory(taskIdList);
} }
contextMap.put(id,pipelineRunContext); contextMap.put(id,pipelineRunContext);
} else { } else {

View File

@ -104,6 +104,41 @@ public class MemoryLogManager implements LoggerManager {
} }
} }
} }
public void flushMemory(List<String> taskIdList){
List<PipTaskLog> insertList = new ArrayList<>();
List<PipTaskLog> updateList = new ArrayList<>();
for (String taskId : taskIdList) {
StringBuffer logCache = taskIdMemoryLogMap.get(taskId);
if (logCache!=null) {
if (taskIdDbMap.containsKey(taskId)) {
// 之前已经入库过
// 存在则更新
String id = taskIdDbMap.get(taskId);
PipTaskLog pipTaskLog = logDao.selectById(id);
// TODO 之后优化
pipTaskLog.setContent(pipTaskLog.getContent()+logCache.toString());
updateList.add(pipTaskLog);
taskIdDbMap.remove(taskId);
// logDao.updateById(pipTaskLog);
} else {
// 不存在就新增
PipTaskLog pipTaskLog = new PipTaskLog();
pipTaskLog.setTaskId(taskId);
pipTaskLog.setContent(logCache.toString());
// logDao.insert(pipTaskLog);
// taskIdDbMap.put(taskId,pipTaskLog.getId());
insertList.add(pipTaskLog);
}
}
}
if (!CollectionUtils.isEmpty(insertList)) {
logDao.insertBatch(insertList);
}
if (!CollectionUtils.isEmpty(updateList)) {
logDao.updateBatch(updateList);
}
}
@Override @Override
public String getLogContent(String taskId) { public String getLogContent(String taskId) {

View File

@ -4,10 +4,12 @@ import cd.casic.ci.process.common.WorkAtom;
import cd.casic.ci.process.engine.constant.AFLConstant; import cd.casic.ci.process.engine.constant.AFLConstant;
import cd.casic.ci.process.engine.constant.DIYImageExecuteCommandConstant; import cd.casic.ci.process.engine.constant.DIYImageExecuteCommandConstant;
import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.DockerWorker;
import cd.casic.ci.process.engine.worker.base.SshWorker; import cd.casic.ci.process.engine.worker.base.SshWorker;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo; import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.module.execute.docker.dataobject.model.DockerEndpoint;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -15,7 +17,7 @@ import java.util.Map;
@WorkAtom(taskType = "AFL") @WorkAtom(taskType = "AFL")
@Slf4j @Slf4j
public class AFLWorker extends SshWorker { public class AFLWorker extends DockerWorker {
@Override @Override
public void execute(TaskRunContext context) { public void execute(TaskRunContext context) {
@ -42,24 +44,20 @@ public class AFLWorker extends SshWorker {
//如果machineId为0则说明该节点没有配置机器则使用开始节点的机器 //如果machineId为0则说明该节点没有配置机器则使用开始节点的机器
//获取机器 //获取机器
MachineInfo machineInfoDO = this.getMachineInfoService().getById(machineId); // MachineInfo machineInfoDO = this.getMachineInfoService().getById(machineId);
statusCode = shell(machineInfoDO, null, context, // 获取docker 暂时先写固定值
"echo \"自定义镜像执行命令\"", DockerEndpoint dockerEndpoint = new DockerEndpoint();
commandScript dockerEndpoint.setHost("175.6.27.228");
); dockerEndpoint.setPort(22375);
dockerEndpoint.setType(DockerEndpoint.DockerEndpointTypeEnum.REMOTE);
// TODO dockerEndpoint替换为查询
dockerRun(commandScript,dockerEndpoint,context);
} catch (Exception e) { } catch (Exception e) {
String errorMessage = "该节点配置信息为空,请先配置该节点信息" + "\r\n"; String errorMessage = "该节点配置信息为空,请先配置该节点信息" + "\r\n";
log.error("执行ssh失败:", e); log.error("执行ssh失败:", e);
append(context, errorMessage); append(context, errorMessage);
toBadEnding(); toBadEnding();
} }
if (statusCode == 0) {
log.info("节点执行完成");
append(context,"节点执行完成");
} else {
log.error("节点执行失败");
append(context,"节点执行失败");
}
localVariables.put(DIYImageExecuteCommandConstant.STATUS_CODE, statusCode); localVariables.put(DIYImageExecuteCommandConstant.STATUS_CODE, statusCode);
} }
} }

View File

@ -0,0 +1,238 @@
package cd.casic.ci.process.engine.worker.base;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.module.execute.docker.dataobject.model.DockerEndpoint;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallbackTemplate;
import com.github.dockerjava.api.command.CreateContainerResponse;
import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.util.Lists;
import org.springframework.util.CollectionUtils;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
@Slf4j
public abstract class DockerWorker extends BaseWorker{
public void dockerRun(String command, DockerEndpoint dockerEndpoint, BaseRunContext context){
// 第一行必须是docker run 命令 option 支持 -it -v
String[] split = command.split("\n");
List<String> commandLine = new ArrayList<>(Arrays.stream(split).filter(StringUtils::isNotBlank).toList());
Optional<String> first = commandLine.stream().filter(StringUtils::isNotBlank).findFirst();
String dockerRunCommand="";
// 找到第一个不为空的判断是否为docker run
if (first.isPresent()) {
dockerRunCommand = first.get();
}
if (StringUtils.isEmpty(dockerRunCommand)) {
log.error("未找到有效指令");
append(context,"未找到有效指令");
return;
}
DockerExecHandler handler = null;
try {
handler = loadRunCommand(dockerRunCommand, dockerEndpoint);
// 删除第一个 dockerRun 其他都要在容器内部执行
commandLine.remove(0);
if (handler == null) {
log.error("容器创建失败");
append(context,"容器创建失败");
}
if (!CollectionUtils.isEmpty(commandLine)) {
StringBuffer stringBuffer = new StringBuffer();
// docker连接
DockerClient client = handler.getClient();
for (String cmd : commandLine) {
stringBuffer.append(cmd);
stringBuffer.append("&&");
}
stringBuffer.setLength(stringBuffer.length()-2);
// 启动的容器id
String containerId = handler.getContainerId();
String allCommand = stringBuffer.toString();
String[] commandList = new String[3];
commandList[0]="sh";
commandList[1]="-c";
commandList[2]=allCommand;
log.info("容器内执行命令:{}",commandList);
append(context,"容器创建失败");
// 创建命令
ExecCreateCmdResponse exec = client.execCreateCmd(containerId)
.withAttachStdin(true)
.withAttachStdout(true)
.withAttachStderr(true)
.withCmd(commandList).exec();
try {
client.execStartCmd(exec.getId()).exec(new ResultCallbackTemplate<>() {
@Override
public void onStart(Closeable stream) {
super.onStart(stream);
log.info("命令执行开始,容器id{},执行id{}",containerId,exec.getId());
append(context,"命令执行开始,容器id"+containerId);
}
@Override
public void onError(Throwable throwable) {
super.onError(throwable);
log.error("命令执行出现错误,容器id{},执行id{},错误原因",containerId,exec.getId(),throwable);
append(context,"命令执行出现错误,容器id"+containerId);
}
@Override
public void onComplete() {
super.onComplete();
log.info("命令执行完毕,容器id{},执行id{}",containerId,exec.getId());
append(context,"命令执行完毕,容器id"+containerId);
}
@Override
public void onNext(Frame frame) {
String output = new String(frame.getPayload(), StandardCharsets.UTF_8);
switch (frame.getStreamType()) {
case STDOUT:
System.out.print(output);
log.info("标准输出: {}", output.trim());
append(context,"标准输出: ,容器id"+containerId+" content"+output);
break;
case STDERR:
System.err.print(output);
log.error("错误输出: {}", output.trim());
append(context,"错误输出: ,容器id"+containerId+" content"+output);
break;
default:
log.warn("未知流类型: {}", frame.getStreamType());
}
}
}).awaitCompletion(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("执行异常",e);
append(context,"执行异常,容器id"+containerId);
} finally {
log.info("停止容器{}",containerId);
append(context,"停止容器: ,容器id"+containerId);
handler.getClient().stopContainerCmd(handler.containerId).exec();
log.info("删除容器{}",containerId);
append(context,"删除容器: ,容器id"+containerId);
handler.getClient().removeContainerCmd(handler.containerId).exec();
}
}
} finally {
if (handler!=null) {
try {
append(context,"断开链接: ,容器id"+handler.getContainerId());
handler.getClient().close();
} catch (IOException e) {
log.error("关闭容器失败",e);
}
}
}
}
public DockerExecHandler loadRunCommand(String runCommand, DockerEndpoint dockerEndpoint){
String[] split = runCommand.split("\\s+");
List<String> keywords = new ArrayList<>(split.length);
keywords.addAll(Lists.list(split));
keywords.removeIf(next -> next.equals("docker") || next.equals("run"));
// 容器路径映射
String volume ="";
// 镜像名
String image = "";
// 容器命令 通常为bash或者 /bin/bash
String initCommand = "";
for (int i = 0; i < keywords.size(); i++) {
String keyword = keywords.get(i);
switch (keyword){
case "-v":{
volume = keywords.get(++i);
break;
}
case "-p":{
log.warn("忽略暂时不支持的-p {}",keywords.get(++i));
break;
}
case "--name":{
log.warn("忽略暂时不支持的--name {}",keywords.get(++i));
break;
}
default:{
if(!keyword.startsWith("-")){
if (StringUtils.isEmpty(image)){
image = keyword;
log.info("识别到镜像 {}",image);
} else if (StringUtils.isEmpty(initCommand)) {
initCommand = keyword;
log.info("识别到容器运行命令 {}",initCommand);
}
}
else {
log.warn("未识别部分:{}",keyword);
}
}
}
}
URI uri = null;
try {
if (dockerEndpoint.getType() == DockerEndpoint.DockerEndpointTypeEnum.LOCAL) {
// 使用本地挂载
uri = new URI("unix:///var/run/docker.sock");
} else if (dockerEndpoint.getType() == DockerEndpoint.DockerEndpointTypeEnum.REMOTE) {
// 远程挂载
uri = new URI(format("tcp://%s:%s", dockerEndpoint.getHost(), dockerEndpoint.getPort()));
} else {
log.error("Unsupported Docker endpoint type: {}", dockerEndpoint.getType());
return null;
}
} catch (URISyntaxException e) {
log.error("docker运行时创建uri失败",e);
}
if (uri == null) {
return null;
}
ApacheDockerHttpClient httpClient = new ApacheDockerHttpClient.Builder().dockerHost(uri).build();
DockerClient build = DockerClientBuilder.getInstance().withDockerHttpClient(httpClient).build();
HostConfig hostConfig = HostConfig
.newHostConfig();
if (StringUtils.isNotEmpty(volume)) {
hostConfig.withBinds(Bind.parse(volume));
}
// .withPortBindings(PortBinding.parse()) // 暂时先不支持端口绑定
if (StringUtils.isEmpty(image)||StringUtils.isEmpty(initCommand)) {
log.error("缺少参数image或者initCommand");
return null;
}
// 创建容器
CreateContainerResponse exec = build.createContainerCmd(image).withHostConfig(hostConfig)
.withTty(true).withCmd(initCommand).exec();
String containerId = exec.getId();
// 启动容器
build.startContainerCmd(containerId).exec();
log.info("容器创建并启动完成");
return new DockerExecHandler(build, containerId);
}
@Data
@AllArgsConstructor
private static class DockerExecHandler{
private DockerClient client;
private String containerId;
}
}

View File

@ -198,7 +198,6 @@ public class SshCommand implements SshClient {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }
commander.append("exit").append(CommandConstant.ENTER); commander.append("exit").append(CommandConstant.ENTER);

View File

@ -0,0 +1,240 @@
package cd.casic.server;
import cd.casic.module.execute.docker.callback.CommandExecCallback;
import cd.casic.module.execute.docker.dataobject.model.DockerEndpoint;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.async.ResultCallbackTemplate;
import com.github.dockerjava.api.command.CreateContainerResponse;
import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.api.model.*;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import jodd.util.StringUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.util.CollectionUtils;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
@SpringBootTest(classes = {OpsServerApplication.class})
@ActiveProfiles("local")
@Slf4j
public class DockerRunTest {
@Test
public void test01(){
DockerEndpoint dockerEndpoint = new DockerEndpoint();
dockerEndpoint.setHost("175.6.27.228");
dockerEndpoint.setPort(22375);
dockerEndpoint.setType(DockerEndpoint.DockerEndpointTypeEnum.REMOTE);
String command = "docker run -v /home/casic/706/yunqi/:/test -it aflplusplus/aflplusplus bash\n" +
"cd /test\n" +
"afl-fuzz -i case -o ai_afl -t 3000 -Q ./CaseGenerator/testdata/libpng/libpng/pngfix @@";
dockerRun(command,dockerEndpoint);
}
public void dockerRun(String command, DockerEndpoint dockerEndpoint){
// 第一行必须是docker run 命令 option 支持 -it -v
String[] split = command.split("\n");
List<String> commandLine = new ArrayList<>(Arrays.stream(split).filter(StringUtils::isNotBlank).toList());
Optional<String> first = commandLine.stream().filter(StringUtils::isNotBlank).findFirst();
String dockerRunCommand="";
// 找到第一个不为空的判断是否为docker run
if (first.isPresent()) {
dockerRunCommand = first.get();
}
if (StringUtils.isEmpty(dockerRunCommand)) {
log.error("未找到有效指令");
return;
}
DockerExecHandler handler = null;
try {
handler = loadRunCommand(dockerRunCommand, dockerEndpoint);
// 删除第一个 dockerRun 其他都要在容器内部执行
commandLine.remove(0);
if (handler == null) {
log.error("容器创建失败");
}
if (!CollectionUtils.isEmpty(commandLine)) {
StringBuffer stringBuffer = new StringBuffer();
// docker连接
DockerClient client = handler.getClient();
for (String cmd : commandLine) {
stringBuffer.append(cmd);
stringBuffer.append("&&");
}
stringBuffer.setLength(stringBuffer.length()-2);
// 启动的容器id
String containerId = handler.getContainerId();
String allCommand = stringBuffer.toString();
String[] commandList = new String[3];
commandList[0]="sh";
commandList[1]="-c";
commandList[2]=allCommand;
log.info("容器内执行命令:{}",commandList);
// 创建命令
ExecCreateCmdResponse exec = client.execCreateCmd(containerId)
.withAttachStdin(true)
.withAttachStdout(true)
.withAttachStderr(true)
.withCmd(commandList).exec();
try {
client.execStartCmd(exec.getId()).exec(new ResultCallbackTemplate<>() {
@Override
public void onStart(Closeable stream) {
super.onStart(stream);
log.info("命令执行开始,容器id{},执行id{}",containerId,exec.getId());
}
@Override
public void onError(Throwable throwable) {
super.onError(throwable);
log.error("命令执行出现错误,容器id{},执行id{},错误原因",containerId,exec.getId(),throwable);
}
@Override
public void onComplete() {
super.onComplete();
log.info("命令执行完毕,容器id{},执行id{}",containerId,exec.getId());
}
@Override
public void onNext(Frame frame) {
String output = new String(frame.getPayload(), StandardCharsets.UTF_8);
switch (frame.getStreamType()) {
case STDOUT:
System.out.print(output);
log.info("标准输出: {}", output.trim());
break;
case STDERR:
System.err.print(output);
log.error("错误输出: {}", output.trim());
break;
default:
log.warn("未知流类型: {}", frame.getStreamType());
}
}
}).awaitCompletion(60,TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("执行异常",e);
} finally {
log.info("停止容器{}",containerId);
handler.getClient().stopContainerCmd(handler.containerId).exec();
log.info("删除容器{}",containerId);
handler.getClient().removeContainerCmd(handler.containerId).exec();
}
}
} finally {
if (handler!=null) {
try {
handler.getClient().close();
} catch (IOException e) {
log.error("关闭容器失败",e);
}
}
}
}
public DockerExecHandler loadRunCommand(String runCommand, DockerEndpoint dockerEndpoint){
String[] split = runCommand.split("\\s+");
List<String> keywords = new ArrayList<>(split.length);
keywords.addAll(Lists.list(split));
keywords.removeIf(next -> next.equals("docker") || next.equals("run"));
// 容器路径映射
String volume ="";
// 镜像名
String image = "";
// 容器命令 通常为bash或者 /bin/bash
String initCommand = "";
for (int i = 0; i < keywords.size(); i++) {
String keyword = keywords.get(i);
switch (keyword){
case "-v":{
volume = keywords.get(++i);
break;
}
case "-p":{
log.warn("忽略暂时不支持的-p {}",keywords.get(++i));
break;
}
case "--name":{
log.warn("忽略暂时不支持的--name {}",keywords.get(++i));
break;
}
default:{
if(!keyword.startsWith("-")){
if (StringUtils.isEmpty(image)){
image = keyword;
log.info("识别到镜像 {}",image);
} else if (StringUtils.isEmpty(initCommand)) {
initCommand = keyword;
log.info("识别到容器运行命令 {}",initCommand);
}
}
else {
log.warn("未识别部分:{}",keyword);
}
}
}
}
URI uri = null;
try {
if (dockerEndpoint.getType() == DockerEndpoint.DockerEndpointTypeEnum.LOCAL) {
// 使用本地挂载
uri = new URI("unix:///var/run/docker.sock");
} else if (dockerEndpoint.getType() == DockerEndpoint.DockerEndpointTypeEnum.REMOTE) {
// 远程挂载
uri = new URI(format("tcp://%s:%s", dockerEndpoint.getHost(), dockerEndpoint.getPort()));
} else {
log.error("Unsupported Docker endpoint type: {}", dockerEndpoint.getType());
return null;
}
} catch (URISyntaxException e) {
log.error("docker运行时创建uri失败",e);
}
if (uri == null) {
return null;
}
ApacheDockerHttpClient httpClient = new ApacheDockerHttpClient.Builder().dockerHost(uri).build();
DockerClient build = DockerClientBuilder.getInstance().withDockerHttpClient(httpClient).build();
HostConfig hostConfig = HostConfig
.newHostConfig();
if (StringUtils.isNotEmpty(volume)) {
hostConfig.withBinds(Bind.parse(volume));
}
// .withPortBindings(PortBinding.parse()) // 暂时先不支持端口绑定
if (StringUtils.isEmpty(image)||StringUtils.isEmpty(initCommand)) {
log.error("缺少参数image或者initCommand");
return null;
}
// 创建容器
CreateContainerResponse exec = build.createContainerCmd(image).withHostConfig(hostConfig)
.withTty(true).withCmd(initCommand).exec();
String containerId = exec.getId();
// 启动容器
build.startContainerCmd(containerId).exec();
log.info("容器创建并启动完成");
return new DockerExecHandler(build, containerId);
}
@Data
@AllArgsConstructor
private static class DockerExecHandler{
private DockerClient client;
private String containerId;
}
}

View File

@ -0,0 +1,203 @@
package cd.casic.server;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateContainerResponse;
import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.StreamType;
import com.github.dockerjava.api.model.Volume;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.command.ExecStartResultCallback;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
@SpringBootTest(classes = {OpsServerApplication.class})
@ActiveProfiles("local")
public class DockerTest {
// TODO 测试通过明天继续整通用的
public static String executeParsedDockerCommand(
String dockerHostUri,
String dockerRunCommand,
String[] commandInContainer,
int execTimeoutMinutes) {
// 1. 解析docker run命令
String[] parts = dockerRunCommand.split("\\s+");
if (parts.length < 6 || !"docker".equals(parts[0]) || !"run".equals(parts[1])) {
throw new RuntimeException("Invalid docker run command format");
}
// 2. 提取参数
String volumeArg = null;
String imageName = null;
String initialCommand = null;
for (int i = 2; i < parts.length; i++) {
if ("-v".equals(parts[i]) && i + 1 < parts.length) {
volumeArg = parts[++i];
} else if ("-it".equals(parts[i])) {
// 跳过交互式参数
} else if (imageName == null) {
imageName = parts[i];
} else if (initialCommand == null) {
initialCommand = parts[i];
}
}
if (volumeArg == null || imageName == null) {
throw new RuntimeException("Missing required parameters in docker run command");
}
// 3. 解析卷挂载参数
String[] volumeParts = volumeArg.split(":");
if (volumeParts.length != 2) {
throw new RuntimeException("Invalid volume format, expected host_path:container_path");
}
String hostPath = volumeParts[0];
String containerPath = volumeParts[1];
ApacheDockerHttpClient httpClient;
try {
httpClient = new ApacheDockerHttpClient.Builder().dockerHost(new URI(dockerHostUri)).build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
DockerClient dockerClient = DockerClientBuilder.getInstance().withDockerHttpClient(httpClient).build();
try {
// 5. 创建容器
HostConfig hostConfig = HostConfig.newHostConfig()
.withBinds(Bind.parse(volumeArg));
CreateContainerResponse container = dockerClient.createContainerCmd(imageName)
.withHostConfig(hostConfig)
.withCmd(initialCommand)
.withTty(true)
.exec();
String containerId = container.getId();
// 6. 启动容器
dockerClient.startContainerCmd(containerId).exec();
try {
// 7. 构建要在容器内执行的完整命令
String[] fullCommand = new String[commandInContainer.length + 2];
fullCommand[0] = "sh";
fullCommand[1] = "-c";
fullCommand[2] = "cd " + containerPath + " && " + StringUtils.join(commandInContainer, " ");
// System.arraycopy(commandInContainer, 0, fullCommand, 3, commandInContainer.length - 1);
// 8. 创建exec实例
ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId)
.withAttachStdout(true)
.withAttachStderr(true)
.withCmd(fullCommand)
.exec();
// 9. 执行命令并获取输出
String output = dockerClient.execStartCmd(execCreateCmdResponse.getId())
.exec(new ExecStartResultCallback())
.awaitOutput(execTimeoutMinutes, TimeUnit.MINUTES);
return output;
} finally {
// 10. 清理容器
dockerClient.stopContainerCmd(containerId).exec();
dockerClient.removeContainerCmd(containerId).exec();
}
} finally {
try {
dockerClient.close();
} catch (IOException e) {
}
}
}
private static class ExecStartResultCallback extends com.github.dockerjava.api.async.ResultCallback.Adapter<com.github.dockerjava.api.model.Frame> {
private final StringBuilder output = new StringBuilder();
@Override
public void onNext(com.github.dockerjava.api.model.Frame frame) {
output.append(new String(frame.getPayload()));
}
public String awaitOutput(long timeout, TimeUnit timeUnit) {
try {
super.awaitCompletion(timeout, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Command execution interrupted", e);
}
return output.toString();
}
}
// 示例用法
@Test
public void test() {
// *** 请将此处的远程 Docker TCP 地址替换为您实际的值 ***
String remoteDockerHostUri = "tcp://175.6.27.228:22375"; // <-- !!! REPLACE THIS !!!
// *** 提供您希望解析的 docker run 命令字符串 ***
// 注意此解析器仅支持简化的格式
String dockerRunCmdString = "docker run -v /home/yunqi/:/test -it aflplusplus/aflplusplus bash";
System.out.println("待解析并执行的 Docker run 命令: " + dockerRunCmdString);
// 要在容器内执行的 afl-fuzz 命令及其参数
// 这部分仍然需要单独提供因为解析器不负责从 docker run 命令中提取后续要 exec 的命令
String[] aflFuzzCommand = {
"afl-fuzz",
"-i", "case",
"-o", "ai_afl",
"-t", "3000",
"-Q",
"./CaseGenerator/testdata/libpng/libpng/pngfix",
"@@"
};
System.out.print("将在容器内执行的命令: ");
for (String arg : aflFuzzCommand) {
System.out.print(arg + " ");
}
System.out.println();
// 命令执行超时时间分钟
int commandTimeoutMinutes = 60; // 例如 afl-fuzz 60 分钟运行时间
try {
// 调用解析并执行的方法
System.out.println("\n开始执行解析后的 Docker 命令序列...");
String aflOutput = executeParsedDockerCommand(
remoteDockerHostUri,
dockerRunCmdString,
aflFuzzCommand,
commandTimeoutMinutes
);
System.out.println("\n--- AFL-fuzz 命令的标准输出 ---");
System.out.println(aflOutput);
System.out.println("------------------------------");
} catch (RuntimeException e) {
System.err.println("\n--- 执行过程中发生错误 ---");
System.err.println(e.getMessage());
e.printStackTrace();
System.err.println("---------------------------");
}
}
}

File diff suppressed because one or more lines are too long