Commit 5a3e139e by 梁家彪

完善

parent b167f269
...@@ -43,8 +43,8 @@ public class DataCollectApiController { ...@@ -43,8 +43,8 @@ public class DataCollectApiController {
@ApiOperation("执行接口汇聚") @ApiOperation("执行接口汇聚")
@PostMapping(value = "/execute") @PostMapping(value = "/execute")
public ResultVo executeDb(@RequestParam("dataCollectSettingId") Integer dataCollectSettingId, @RequestBody DataCollectApiExecuteReq dataCollectApiExecuteReq) { public ResultVo executeDb(@RequestParam("dataCollectSettingId") Integer dataCollectSettingId, @RequestParam("collectType") Integer collectType, @RequestBody DataCollectApiExecuteReq dataCollectApiExecuteReq) {
dataCollectApiService.execute(dataCollectSettingId, dataCollectApiExecuteReq); dataCollectApiService.execute(dataCollectSettingId, collectType, dataCollectApiExecuteReq);
return ResultVo.success("汇聚执行成功"); return ResultVo.success("汇聚执行成功");
} }
} }
\ No newline at end of file
...@@ -41,8 +41,8 @@ public class DataCollectDbController { ...@@ -41,8 +41,8 @@ public class DataCollectDbController {
@ApiOperation("执行数据库汇聚") @ApiOperation("执行数据库汇聚")
@PostMapping(value = "/execute") @PostMapping(value = "/execute")
public ResultVo executeDb(@RequestParam("dataCollectSettingId") Integer dataCollectSettingId) { public ResultVo executeDb(@RequestParam("dataCollectSettingId") Integer dataCollectSettingId, @RequestParam("collectType") Integer collectType) {
dataCollectDbService.execute(dataCollectSettingId); dataCollectDbService.execute(dataCollectSettingId, collectType);
return ResultVo.success("汇聚执行成功"); return ResultVo.success("汇聚执行成功");
} }
} }
...@@ -48,8 +48,8 @@ public class DataCollectFileController { ...@@ -48,8 +48,8 @@ public class DataCollectFileController {
@ApiOperation("执行文件汇聚") @ApiOperation("执行文件汇聚")
@PostMapping(value = "/execute") @PostMapping(value = "/execute")
public ResultVo executeDb(@RequestParam("dataCollectSettingId") Integer dataCollectSettingId, MultipartFile file) { public ResultVo executeDb(@RequestParam("dataCollectSettingId") Integer dataCollectSettingId, @RequestParam("collectType") Integer collectType, MultipartFile file) {
dataCollectFileService.execute(dataCollectSettingId, file); dataCollectFileService.execute(dataCollectSettingId, collectType, file);
return ResultVo.success("汇聚执行成功"); return ResultVo.success("汇聚执行成功");
} }
} }
\ No newline at end of file
...@@ -33,6 +33,12 @@ public class DataCollectLogController { ...@@ -33,6 +33,12 @@ public class DataCollectLogController {
return ResultVo.success(dataCollectLogService.getList(req)); return ResultVo.success(dataCollectLogService.getList(req));
} }
@ApiOperation("分页获取指定汇聚的日志")
@PostMapping(value = "/list/{id}")
public ResultVo getListById(@RequestBody DataCollectLogReq req, @PathVariable Integer id) {
return ResultVo.success(dataCollectLogService.getList(req, id));
}
@ApiOperation("删除数据汇聚日志") @ApiOperation("删除数据汇聚日志")
@DeleteMapping(value = "/delete/{id}") @DeleteMapping(value = "/delete/{id}")
public ResultVo delete(@PathVariable Integer id) { public ResultVo delete(@PathVariable Integer id) {
......
...@@ -91,14 +91,14 @@ public class DataCollectSetting { ...@@ -91,14 +91,14 @@ public class DataCollectSetting {
private Long deptId; private Long deptId;
/** /**
* createTime * createTime
*/ */
@TableField(fill = FieldFill.INSERT) @TableField(fill = FieldFill.INSERT)
private Date createTime; private Date createTime;
/** /**
* updateTime * updateTime
*/ */
@TableField(fill = FieldFill.INSERT_UPDATE) @TableField(fill = FieldFill.INSERT_UPDATE)
private Date updateTime; private Date updateTime;
} }
\ No newline at end of file
...@@ -53,5 +53,5 @@ public class DataCollectSettingFile { ...@@ -53,5 +53,5 @@ public class DataCollectSettingFile {
* updateTime * updateTime
*/ */
@TableField(fill = FieldFill.INSERT_UPDATE) @TableField(fill = FieldFill.INSERT_UPDATE)
private Date updateTime;; private Date updateTime;
} }
\ No newline at end of file
...@@ -38,7 +38,7 @@ public class DatabaseRunner implements ApplicationRunner { ...@@ -38,7 +38,7 @@ public class DatabaseRunner implements ApplicationRunner {
switch (ds.getDbType()) { switch (ds.getDbType()) {
case 1: case 1:
className = "com.mysql.cj.jdbc.Driver"; className = "com.mysql.cj.jdbc.Driver";
url = "jdbc:mysql://" + ds.getDbIp() + ":" + ds.getDbPort() + "/" + ds.getDbName() + "?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&autoReconnect=true"; url = "jdbc:mysql://" + ds.getDbIp() + ":" + ds.getDbPort() + "/" + ds.getDbName() + "?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&autoReconnect=true";
break; break;
case 2: case 2:
className = "oracle.jdbc.driver.OracleDriver"; className = "oracle.jdbc.driver.OracleDriver";
......
...@@ -12,5 +12,5 @@ public interface DataCollectApiService { ...@@ -12,5 +12,5 @@ public interface DataCollectApiService {
void update(DataCollectApiAddReq dataCollectApiAddReq); void update(DataCollectApiAddReq dataCollectApiAddReq);
void execute(Integer dataCollectSettingId, DataCollectApiExecuteReq dataCollectApiExecuteReq); void execute(Integer dataCollectSettingId, Integer collectType, DataCollectApiExecuteReq dataCollectApiExecuteReq);
} }
\ No newline at end of file
...@@ -11,5 +11,5 @@ public interface DataCollectDbService { ...@@ -11,5 +11,5 @@ public interface DataCollectDbService {
void update(DataCollectDbAddReq dataCollectDbAddReq); void update(DataCollectDbAddReq dataCollectDbAddReq);
void execute(Integer dataCollectSettingId); void execute(Integer dataCollectSettingId, Integer collectType);
} }
...@@ -17,5 +17,5 @@ public interface DataCollectFileService { ...@@ -17,5 +17,5 @@ public interface DataCollectFileService {
Map<String, Object> analysisExcel(MultipartFile multipartFile); Map<String, Object> analysisExcel(MultipartFile multipartFile);
void execute(Integer dataCollectSettingId, MultipartFile file); void execute(Integer dataCollectSettingId, Integer collectType, MultipartFile file);
} }
\ No newline at end of file
...@@ -12,6 +12,8 @@ public interface DataCollectLogService { ...@@ -12,6 +12,8 @@ public interface DataCollectLogService {
PageVo getList(DataCollectLogReq req); PageVo getList(DataCollectLogReq req);
PageVo getList(DataCollectLogReq req, Integer id);
List<DataCollectFileLog> file(Integer id); List<DataCollectFileLog> file(Integer id);
boolean removeById(Integer id); boolean removeById(Integer id);
......
...@@ -4,6 +4,7 @@ import cn.hutool.core.bean.BeanUtil; ...@@ -4,6 +4,7 @@ import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
...@@ -86,7 +87,7 @@ public class DataCollectApiServiceImpl extends ServiceImpl<DataCollectSettingApi ...@@ -86,7 +87,7 @@ public class DataCollectApiServiceImpl extends ServiceImpl<DataCollectSettingApi
@Override @Override
@Transactional @Transactional
public void execute(Integer dataCollectSettingId, DataCollectApiExecuteReq dataCollectApiExecuteReq) { public void execute(Integer dataCollectSettingId, Integer collectType, DataCollectApiExecuteReq dataCollectApiExecuteReq) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
DataCollectSettingApi settingApi = lambdaQuery().eq(DataCollectSettingApi::getDataCollectSettingId, dataCollectSettingId).one(); DataCollectSettingApi settingApi = lambdaQuery().eq(DataCollectSettingApi::getDataCollectSettingId, dataCollectSettingId).one();
List<DataCollectSettingApiPara> apiParas; List<DataCollectSettingApiPara> apiParas;
...@@ -106,12 +107,12 @@ public class DataCollectApiServiceImpl extends ServiceImpl<DataCollectSettingApi ...@@ -106,12 +107,12 @@ public class DataCollectApiServiceImpl extends ServiceImpl<DataCollectSettingApi
throw new BusinessException("不支持的请求类型"); throw new BusinessException("不支持的请求类型");
} }
if(StrUtil.isNotBlank(dataCollectApiExecuteReq.getHeaders())){ if(StrUtil.isNotBlank(dataCollectApiExecuteReq.getHeaders()) && JSONUtil.isJsonObj(dataCollectApiExecuteReq.getHeaders())){
for (Map.Entry<String, Object> entry : JSONObject.parseObject(dataCollectApiExecuteReq.getHeaders()).entrySet()) { for (Map.Entry<String, Object> entry : JSONObject.parseObject(dataCollectApiExecuteReq.getHeaders()).entrySet()) {
request.header(entry.getKey(), entry.getValue().toString()); request.header(entry.getKey(), entry.getValue().toString());
} }
} }
else if(StrUtil.isNotBlank(settingApi.getHeaders())){ else if(StrUtil.isNotBlank(settingApi.getHeaders()) && JSONUtil.isJsonObj(settingApi.getHeaders())){
for (Map.Entry<String, Object> entry : JSONObject.parseObject(settingApi.getHeaders()).entrySet()) { for (Map.Entry<String, Object> entry : JSONObject.parseObject(settingApi.getHeaders()).entrySet()) {
request.header(entry.getKey(), entry.getValue().toString()); request.header(entry.getKey(), entry.getValue().toString());
} }
...@@ -130,6 +131,9 @@ public class DataCollectApiServiceImpl extends ServiceImpl<DataCollectSettingApi ...@@ -130,6 +131,9 @@ public class DataCollectApiServiceImpl extends ServiceImpl<DataCollectSettingApi
JdbcTemplate toDbJdbcTemplate = DatabaseHolder.getJdbcTemplate(dataCollectSetting.getToDbId()); JdbcTemplate toDbJdbcTemplate = DatabaseHolder.getJdbcTemplate(dataCollectSetting.getToDbId());
Set<String> set = list.get(0).keySet(); Set<String> set = list.get(0).keySet();
set.remove("id"); set.remove("id");
if(collectType == 0){
toDbJdbcTemplate.execute("TRUNCATE TABLE " + dataCollectSetting.getToTable() + ";");
}
if(dataCollectSetting.getCreateTable()==1 && toDbJdbcTemplate.queryForList("SHOW TABLES LIKE '"+ dataCollectSetting.getToTable() + "'").size()==0){ if(dataCollectSetting.getCreateTable()==1 && toDbJdbcTemplate.queryForList("SHOW TABLES LIKE '"+ dataCollectSetting.getToTable() + "'").size()==0){
toDbJdbcTemplate.execute(SqlUtil.joinCreateTableSql(dataCollectSetting.getToTable(), set)); toDbJdbcTemplate.execute(SqlUtil.joinCreateTableSql(dataCollectSetting.getToTable(), set));
} }
......
...@@ -60,7 +60,7 @@ public class DataCollectDbServiceImpl extends ServiceImpl<DataCollectSettingDbMa ...@@ -60,7 +60,7 @@ public class DataCollectDbServiceImpl extends ServiceImpl<DataCollectSettingDbMa
@Override @Override
@Transactional @Transactional
public void execute(Integer dataCollectSettingId) { public void execute(Integer dataCollectSettingId, Integer collectType) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
DataCollectSetting dataCollectSetting = dataCollectMapper.selectById(dataCollectSettingId); DataCollectSetting dataCollectSetting = dataCollectMapper.selectById(dataCollectSettingId);
DataCollectSettingDb settingDb = getOne(new LambdaQueryWrapper<DataCollectSettingDb>().eq(DataCollectSettingDb::getDataCollectSettingId, dataCollectSettingId)); DataCollectSettingDb settingDb = getOne(new LambdaQueryWrapper<DataCollectSettingDb>().eq(DataCollectSettingDb::getDataCollectSettingId, dataCollectSettingId));
...@@ -69,6 +69,9 @@ public class DataCollectDbServiceImpl extends ServiceImpl<DataCollectSettingDbMa ...@@ -69,6 +69,9 @@ public class DataCollectDbServiceImpl extends ServiceImpl<DataCollectSettingDbMa
List<Map<String, Object>> list = fromDbJdbcTemplate.queryForList("SELECT * FROM " + settingDb.getFromTable()); List<Map<String, Object>> list = fromDbJdbcTemplate.queryForList("SELECT * FROM " + settingDb.getFromTable());
Set<String> set = list.get(0).keySet(); Set<String> set = list.get(0).keySet();
set.remove("id"); set.remove("id");
if(collectType == 0){
toDbJdbcTemplate.execute("TRUNCATE TABLE " + dataCollectSetting.getToTable() + ";");
}
if(dataCollectSetting.getCreateTable()==1 && toDbJdbcTemplate.queryForList("SHOW TABLES LIKE '"+ dataCollectSetting.getToTable() + "'").size()==0){ if(dataCollectSetting.getCreateTable()==1 && toDbJdbcTemplate.queryForList("SHOW TABLES LIKE '"+ dataCollectSetting.getToTable() + "'").size()==0){
toDbJdbcTemplate.execute(SqlUtil.joinCreateTableSql(dataCollectSetting.getToTable(), set)); toDbJdbcTemplate.execute(SqlUtil.joinCreateTableSql(dataCollectSetting.getToTable(), set));
} }
......
...@@ -67,7 +67,8 @@ public class DataCollectFileServiceImpl extends ServiceImpl<DataCollectSettingFi ...@@ -67,7 +67,8 @@ public class DataCollectFileServiceImpl extends ServiceImpl<DataCollectSettingFi
String suffix = FileUtil.extName(file.getOriginalFilename()); String suffix = FileUtil.extName(file.getOriginalFilename());
AssertUtils.isTrue(suffix.equals("xls") || suffix.equals("xlsx") || suffix.equals("et"), "不支持的文件类型"); AssertUtils.isTrue(suffix.equals("xls") || suffix.equals("xlsx") || suffix.equals("et"), "不支持的文件类型");
try { try {
return EasyExcelUtil.preview100(file.getInputStream()); Map<String, Object> map = EasyExcelUtil.preview100(file.getInputStream());
return map;
} catch (Exception e) { } catch (Exception e) {
throw new BusinessException("文件读取失败,请检查后重试"); throw new BusinessException("文件读取失败,请检查后重试");
} }
...@@ -75,7 +76,7 @@ public class DataCollectFileServiceImpl extends ServiceImpl<DataCollectSettingFi ...@@ -75,7 +76,7 @@ public class DataCollectFileServiceImpl extends ServiceImpl<DataCollectSettingFi
@Override @Override
@Transactional @Transactional
public void execute(Integer dataCollectSettingId, MultipartFile file) { public void execute(Integer dataCollectSettingId, Integer collectType, MultipartFile file) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
String suffix = FileUtil.extName(file.getOriginalFilename()); String suffix = FileUtil.extName(file.getOriginalFilename());
AssertUtils.isTrue(suffix.equals("xls") || suffix.equals("xlsx") || suffix.equals("et"), "不支持的文件类型"); AssertUtils.isTrue(suffix.equals("xls") || suffix.equals("xlsx") || suffix.equals("et"), "不支持的文件类型");
...@@ -85,6 +86,9 @@ public class DataCollectFileServiceImpl extends ServiceImpl<DataCollectSettingFi ...@@ -85,6 +86,9 @@ public class DataCollectFileServiceImpl extends ServiceImpl<DataCollectSettingFi
JdbcTemplate toDbJdbcTemplate = DatabaseHolder.getJdbcTemplate(dataCollectSetting.getToDbId()); JdbcTemplate toDbJdbcTemplate = DatabaseHolder.getJdbcTemplate(dataCollectSetting.getToDbId());
Set<String> set = list.get(0).keySet(); Set<String> set = list.get(0).keySet();
set.remove("id"); set.remove("id");
if(collectType == 0){
toDbJdbcTemplate.execute("TRUNCATE TABLE " + dataCollectSetting.getToTable() + ";");
}
if(dataCollectSetting.getCreateTable()==1 && toDbJdbcTemplate.queryForList("SHOW TABLES LIKE '"+ dataCollectSetting.getToTable() + "'").size()==0){ if(dataCollectSetting.getCreateTable()==1 && toDbJdbcTemplate.queryForList("SHOW TABLES LIKE '"+ dataCollectSetting.getToTable() + "'").size()==0){
toDbJdbcTemplate.execute(SqlUtil.joinCreateTableSql(dataCollectSetting.getToTable(), set)); toDbJdbcTemplate.execute(SqlUtil.joinCreateTableSql(dataCollectSetting.getToTable(), set));
} }
......
...@@ -52,6 +52,27 @@ public class DataCollectLogServiceImpl extends ServiceImpl<DataCollectTaskLogMap ...@@ -52,6 +52,27 @@ public class DataCollectLogServiceImpl extends ServiceImpl<DataCollectTaskLogMap
} }
@Override @Override
public PageVo getList(DataCollectLogReq req, Integer id) {
LambdaQueryWrapper<DataCollectTaskLog> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(DataCollectTaskLog::getDataCollectSettingId, id);
PageVo pageVo = PagingUtils.paging(req, dataCollectTaskLogMapper, wrapper,DataCollectTaskLog.class);
List<DataCollectTaskLogResp> taskLogRespList = new ArrayList<>(pageVo.getRows().size());
for (Object row : pageVo.getRows()) {
DataCollectTaskLog dataCollectTaskLog = (DataCollectTaskLog) row;
DataCollectSetting dataCollectSetting = dataCollectServiceImpl.getById(dataCollectTaskLog.getDataCollectSettingId());
if(null == dataCollectSetting){
continue;
}
DataCollectTaskLogResp resp = new DataCollectTaskLogResp();
resp.setTaskName(dataCollectSetting.getTaskName());
BeanUtil.copyProperties(dataCollectTaskLog, resp);
taskLogRespList.add(resp);
}
pageVo.setRows(taskLogRespList);
return pageVo;
}
@Override
public List<DataCollectFileLog> file(Integer id){ public List<DataCollectFileLog> file(Integer id){
return dataCollectFileLogMapper.selectList(new LambdaQueryWrapper<DataCollectFileLog>().eq(DataCollectFileLog::getDataCollectSettingFileId, id)); return dataCollectFileLogMapper.selectList(new LambdaQueryWrapper<DataCollectFileLog>().eq(DataCollectFileLog::getDataCollectSettingFileId, id));
} }
......
...@@ -43,6 +43,9 @@ public class DataCollectServiceImpl extends ServiceImpl<DataCollectMapper, DataC ...@@ -43,6 +43,9 @@ public class DataCollectServiceImpl extends ServiceImpl<DataCollectMapper, DataC
@Resource @Resource
private DataCollectSettingApiParaMapper dataCollectSettingApiParaMapper; private DataCollectSettingApiParaMapper dataCollectSettingApiParaMapper;
@Resource
private DataCollectTaskLogMapper dataCollectTaskLogMapper;
@Override @Override
public PageVo<DataCollectSetting> list(DataCollectReq req) { public PageVo<DataCollectSetting> list(DataCollectReq req) {
return PagingUtils.paging(req, dataCollectMapper, new LambdaQueryWrapper<DataCollectSetting>().orderByDesc(DataCollectSetting::getId), DataCollectSetting.class); return PagingUtils.paging(req, dataCollectMapper, new LambdaQueryWrapper<DataCollectSetting>().orderByDesc(DataCollectSetting::getId), DataCollectSetting.class);
...@@ -64,26 +67,29 @@ public class DataCollectServiceImpl extends ServiceImpl<DataCollectMapper, DataC ...@@ -64,26 +67,29 @@ public class DataCollectServiceImpl extends ServiceImpl<DataCollectMapper, DataC
@Override @Override
@Transactional @Transactional
public void delete(Integer id){ public void delete(Integer id) {
DataCollectSetting dataCollectSetting = getById(id); DataCollectSetting dataCollectSetting = getById(id);
UpdateWrapper wrapper = new UpdateWrapper(); UpdateWrapper wrapper = new UpdateWrapper();
wrapper.eq("data_collect_setting_id", dataCollectSetting.getId()); wrapper.eq("data_collect_setting_id", dataCollectSetting.getId());
getDao(dataCollectSetting.getDataType()).delete(wrapper); getDao(dataCollectSetting.getDataType()).delete(wrapper);
if(dataCollectSetting.getDataType()==3){ if (dataCollectSetting.getDataType() == 2) {
dataCollectSettingFileMapper.delete(wrapper);
} else if (dataCollectSetting.getDataType() == 3) {
dataCollectSettingApiParaMapper.delete(wrapper); dataCollectSettingApiParaMapper.delete(wrapper);
} }
dataCollectTaskLogMapper.delete(wrapper);
AssertUtils.isTrue(removeById(id), "删除失败"); AssertUtils.isTrue(removeById(id), "删除失败");
} }
@Override @Override
@Transactional @Transactional
public void batchDelete(List<Integer> ids){ public void batchDelete(List<Integer> ids) {
for (Integer id : ids) { for (Integer id : ids) {
delete(id); delete(id);
} }
} }
public BaseMapper getDao(int dataType){ public BaseMapper getDao(int dataType) {
switch (dataType) { switch (dataType) {
case 1: case 1:
return dataCollectSettingDbMapper; return dataCollectSettingDbMapper;
......
...@@ -7,7 +7,7 @@ public class SqlUtil { ...@@ -7,7 +7,7 @@ public class SqlUtil {
public static String jointInsertSql(String tableName, Set<String> cloums){ public static String jointInsertSql(String tableName, Set<String> cloums){
StringBuilder fileds = new StringBuilder(); StringBuilder fileds = new StringBuilder();
for (String cloum : cloums) { for (String cloum : cloums) {
fileds.append(cloum.replaceAll("\\p{Punct}", "")).append(","); fileds.append(cloum.replaceAll("\\p{Punct}", "").replaceAll("(", "").replaceAll(")", "").replaceAll("\r", "").replaceAll("\n", "")).append(",");
} }
fileds.deleteCharAt(fileds.length()-1); fileds.deleteCharAt(fileds.length()-1);
StringBuilder sql = new StringBuilder("INSERT INTO ").append(tableName).append("(").append(fileds).append(") VALUES ("); StringBuilder sql = new StringBuilder("INSERT INTO ").append(tableName).append("(").append(fileds).append(") VALUES (");
...@@ -25,7 +25,7 @@ public class SqlUtil { ...@@ -25,7 +25,7 @@ public class SqlUtil {
public static String joinCreateTableSql(String tableName, Set<String> cloums){ public static String joinCreateTableSql(String tableName, Set<String> cloums){
StringBuilder sql = new StringBuilder("CREATE TABLE ").append(tableName).append("("); StringBuilder sql = new StringBuilder("CREATE TABLE ").append(tableName).append("(");
for (String cloum : cloums) { for (String cloum : cloums) {
sql.append(cloum.replaceAll("\\p{Punct}", "")).append(" VARCHAR(255),"); sql.append(cloum.replaceAll("\\p{Punct}", "").replaceAll("(", "").replaceAll(")", "").replaceAll("\r", "").replaceAll("\n", "")).append(" TEXT,");
} }
sql.deleteCharAt(sql.length()-1).append(")"); sql.deleteCharAt(sql.length()-1).append(")");
return sql.toString(); return sql.toString();
......
...@@ -22,7 +22,7 @@ public class DatabaseHolder { ...@@ -22,7 +22,7 @@ public class DatabaseHolder {
switch (dbType) { switch (dbType) {
case 1: case 1:
className = "com.mysql.cj.jdbc.Driver"; className = "com.mysql.cj.jdbc.Driver";
url = "jdbc:mysql://" + dbIp + ":" + dbPort + "/" + dbName + "?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&autoReconnect=true"; url = "jdbc:mysql://" + dbIp + ":" + dbPort + "/" + dbName + "?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&autoReconnect=true";
break; break;
case 2: case 2:
className = "oracle.jdbc.driver.OracleDriver"; className = "oracle.jdbc.driver.OracleDriver";
......
...@@ -6,6 +6,7 @@ import cn.hutool.json.JSONUtil; ...@@ -6,6 +6,7 @@ import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zq.common.context.ThreadContext; import com.zq.common.context.ThreadContext;
import com.zq.common.exception.BusinessException;
import com.zq.common.utils.AssertUtils;; import com.zq.common.utils.AssertUtils;;
import com.zq.common.vo.PageVo; import com.zq.common.vo.PageVo;
import com.zq.common.vo.ResultVo; import com.zq.common.vo.ResultVo;
...@@ -49,6 +50,7 @@ public class CommonQueryService extends ServiceImpl<CommonQuerySettingDao, Commo ...@@ -49,6 +50,7 @@ public class CommonQueryService extends ServiceImpl<CommonQuerySettingDao, Commo
} }
public ResultVo addDatasource(QueryDb queryDb) { public ResultVo addDatasource(QueryDb queryDb) {
queryDb.setCreateTime(DateUtil.date());
AssertUtils.isTrue(queryDbDao.insert(queryDb) == 1, "添加失败,请检查后重试"); AssertUtils.isTrue(queryDbDao.insert(queryDb) == 1, "添加失败,请检查后重试");
DatabaseHolder.add(queryDb.getDbType(), queryDb.getDbIp(), queryDb.getDbPort(), queryDb.getDbName(), queryDb.getUsername(), queryDb.getPassword(), queryDb.getId()); DatabaseHolder.add(queryDb.getDbType(), queryDb.getDbIp(), queryDb.getDbPort(), queryDb.getDbName(), queryDb.getUsername(), queryDb.getPassword(), queryDb.getId());
return ResultVo.success(); return ResultVo.success();
...@@ -93,14 +95,22 @@ public class CommonQueryService extends ServiceImpl<CommonQuerySettingDao, Commo ...@@ -93,14 +95,22 @@ public class CommonQueryService extends ServiceImpl<CommonQuerySettingDao, Commo
} }
public Object checkConnect(QueryDb queryDb) { public Object checkConnect(QueryDb queryDb) {
QueryDb queryDb1 = queryDbDao.selectById(queryDb.getId()); String url = null;
String jdbcUrl = queryDb1.getDbIp(); switch (queryDb.getDbType()) {
try { case 1:
return SqlUtils.testConnection(jdbcUrl, queryDb1.getUsername(), queryDb1.getPassword()); url = "jdbc:mysql://" + queryDb.getDbIp() + ":" + queryDb.getDbPort() + "/" + queryDb.getDbName() + "?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&autoReconnect=true";
} catch (Exception e) { break;
log.error(e.getMessage()); case 2:
return false; url = "jdbc:oracle:thin:@//" + queryDb.getDbIp() + ":" + queryDb.getDbPort() + "/" + queryDb.getDbName();
break;
case 3:
url = "jdbc:sqlserver://" + queryDb.getDbIp() + ":" + queryDb.getDbPort() + "/" + queryDb.getDbName();
break;
default:
throw new BusinessException("不支持的数据库");
} }
AssertUtils.isTrue(SqlUtils.testConnection(url, queryDb.getUsername(), queryDb.getPassword()), "连接失败");
return true;
} }
public List<Map<String, Object>> runSelect(Map<String, Object> body) throws Exception { public List<Map<String, Object>> runSelect(Map<String, Object> body) throws Exception {
......
package com.zq.spiderflow.datacollision.executor; package com.zq.spiderflow.datacollision.executor;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.zq.common.exception.BusinessException; import com.zq.common.exception.BusinessException;
import com.zq.spiderflow.context.SpiderContext; import com.zq.spiderflow.context.SpiderContext;
import com.zq.spiderflow.executor.ShapeExecutor; import com.zq.spiderflow.executor.ShapeExecutor;
...@@ -10,17 +14,13 @@ import com.zq.spiderflow.model.SpiderNode; ...@@ -10,17 +14,13 @@ import com.zq.spiderflow.model.SpiderNode;
import com.zq.spiderflow.model.SpiderOutput; import com.zq.spiderflow.model.SpiderOutput;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
@Component @Component
public class DataCollisionExecutor implements ShapeExecutor { public class DataCollisionExecutor implements ShapeExecutor {
public static final String TABLE1 = "table1";
public static final String FIELD1 = "field1"; public static final String COLLISION = "collision";
public static final String TABLE2 = "table2";
public static final String FIELD2 = "field2";
public static final String RESERVE = "reserve"; public static final String RESERVE = "reserve";
@Override @Override
...@@ -40,53 +40,60 @@ public class DataCollisionExecutor implements ShapeExecutor { ...@@ -40,53 +40,60 @@ public class DataCollisionExecutor implements ShapeExecutor {
@Override @Override
public void execute(SpiderNode node, SpiderContext context, Map<String, Object> variables) throws Exception { public void execute(SpiderNode node, SpiderContext context, Map<String, Object> variables) throws Exception {
String table1 = null; JSONArray collisionArray = JSONUtil.parseArray(node.getStringJsonValue(COLLISION));
String table2 = null; LinkedHashSet<Map<String, Object>> results = new LinkedHashSet<>();
String field1 = null;
String field2 = null;
try {
table1 = node.getStringJsonValue(TABLE1);
table2 = node.getStringJsonValue(TABLE2);
field1 = node.getStringJsonValue(FIELD1);
field2 = node.getStringJsonValue(FIELD2);
}catch (Exception e){
throw new BusinessException("不是字符串");
}
if(StrUtil.isBlank(table1)){ for (Object o : collisionArray) {
throw new BusinessException("没有表1"); JSONObject jsonObject = JSONUtil.parseObj(o);
} String table1 = jsonObject.getStr("table1");
if(StrUtil.isBlank(table2)){ String field1 = jsonObject.getStr("field1");
throw new BusinessException("没有表2"); String condition = jsonObject.getStr("condition");
} String table2 = jsonObject.getStr("table2");
if(StrUtil.isBlank(field1)){ String field2 = jsonObject.getStr("field2");
throw new BusinessException("没有字段1"); String logic = jsonObject.getStr("logic", "2");
}
if(StrUtil.isBlank(field2)){
throw new BusinessException("没有字段2");
}
List<Map<String, Object>> list1 = (List<Map<String, Object>>) variables.get(table1); List<Map<String, Object>> list1 = (List<Map<String, Object>>) variables.get(table1);
List<Map<String, Object>> list2 = (List<Map<String, Object>>) variables.get(table2); List<Map<String, Object>> list2 = (List<Map<String, Object>>) variables.get(table2);
Set<Map<String, Object>> temp = new HashSet<>();
if(list1==null){ for (Map<String, Object> map1 : list1) {
throw new BusinessException("表1没有数据"); String field1Str = map1.get(field1).toString();
} for (Map<String, Object> map2 : list2) {
if(list2==null){ String field2Str = map2.get(field2).toString();
throw new BusinessException("表2没有数据"); switch (condition) {
} case "1":
if(field1Str.equals(field2Str)){
List<Map<String, Object>> results = new ArrayList<>(); map1.putAll(map2);
for (Map<String, Object> map1 : list1) { temp.add(map1);
String field1Str = map1.get(field1).toString(); }
for (Map<String, Object> map2 : list2) { break;
String field2Str = map2.get(field2).toString(); case "2":
if(field1Str.equals(field2Str)){ if(!field1Str.equals(field2Str)){
map1.putAll(map2); map1.putAll(map2);
results.add(map1); temp.add(map1);
break; }
break;
case "3":
if(field1Str.contains(field2Str)){
map1.putAll(map2);
temp.add(map1);
}
break;
case "4":
if(!field1Str.contains(field2Str)){
map1.putAll(map2);
temp.add(map1);
}
break;
}
} }
} }
if(logic.equals("1")){
results.retainAll(temp);
}else {
results.addAll(temp);
}
} }
variables.put(node.getNodeId() + "_" + node.getNodeName(), results); variables.put(node.getNodeId() + "_" + node.getNodeName(), results);
......
package com.zq.spiderflow.excelremoveduplicate.executor; //package com.zq.spiderflow.excelremoveduplicate.executor;
//
import com.zq.spiderflow.context.SpiderContext; //import com.zq.spiderflow.context.SpiderContext;
import com.zq.spiderflow.executor.ShapeExecutor; //import com.zq.spiderflow.executor.ShapeExecutor;
import com.zq.spiderflow.model.Shape; //import com.zq.spiderflow.model.Shape;
import com.zq.spiderflow.model.SpiderNode; //import com.zq.spiderflow.model.SpiderNode;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.util.Map; //import java.util.Map;
//
@Component //@Component
public class ExcelRemoveDuplicateExecutor implements ShapeExecutor { //public class ExcelRemoveDuplicateExecutor implements ShapeExecutor {
//
public static final String TABLE1 = "table1"; // public static final String TABLE1 = "table1";
public static final String FIELD1 = "field1"; // public static final String FIELD1 = "field1";
public static final String TABLE2 = "table2"; // public static final String TABLE2 = "table2";
public static final String FIELD2 = "field2"; // public static final String FIELD2 = "field2";
public static final String RESERVE = "reserve"; // public static final String RESERVE = "reserve";
//
@Override // @Override
public Shape shape() { // public Shape shape() {
Shape shape = new Shape(); // Shape shape = new Shape();
shape.setImage(""); // shape.setImage("");
shape.setLabel("表格去重"); // shape.setLabel("表格去重");
shape.setName("ExcelRemoveDuplicate"); // shape.setName("ExcelRemoveDuplicate");
shape.setTitle("表格去重"); // shape.setTitle("表格去重");
return shape; // return shape;
} // }
//
@Override // @Override
public String supportShape() { // public String supportShape() {
return "ExcelRemoveDuplicate"; // return "ExcelRemoveDuplicate";
} // }
//
@Override // @Override
public void execute(SpiderNode node, SpiderContext context, Map<String, Object> variables) throws Exception { // public void execute(SpiderNode node, SpiderContext context, Map<String, Object> variables) throws Exception {
variables.put("test", "test"); // variables.put("test", "test");
} // }
} //}
\ No newline at end of file \ No newline at end of file
package com.zq.spiderflow.flowcall.executor; //package com.zq.spiderflow.flowcall.executor;
//
import com.zq.spiderflow.context.SpiderContext; //import com.zq.spiderflow.context.SpiderContext;
import com.zq.spiderflow.executor.ShapeExecutor; //import com.zq.spiderflow.executor.ShapeExecutor;
import com.zq.spiderflow.model.Shape; //import com.zq.spiderflow.model.Shape;
import com.zq.spiderflow.model.SpiderNode; //import com.zq.spiderflow.model.SpiderNode;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.util.Map; //import java.util.Map;
//
@Component //@Component
public class FlowCallExecutor implements ShapeExecutor { //public class FlowCallExecutor implements ShapeExecutor {
//
public static final String FLOW_ID = "flowId"; // public static final String FLOW_ID = "flowId";
//
@Override // @Override
public Shape shape() { // public Shape shape() {
Shape shape = new Shape(); // Shape shape = new Shape();
shape.setImage(""); // shape.setImage("");
shape.setLabel("流程调用"); // shape.setLabel("流程调用");
shape.setName("FlowCall"); // shape.setName("FlowCall");
shape.setTitle("流程调用"); // shape.setTitle("流程调用");
return shape; // return shape;
} // }
//
@Override // @Override
public String supportShape() { // public String supportShape() {
return "FlowCall"; // return "FlowCall";
} // }
//
@Override // @Override
public void execute(SpiderNode node, SpiderContext context, Map<String, Object> variables) throws Exception { // public void execute(SpiderNode node, SpiderContext context, Map<String, Object> variables) throws Exception {
String flowId = node.getStringJsonValue(FLOW_ID); // String flowId = node.getStringJsonValue(FLOW_ID);
} // }
} //}
\ No newline at end of file \ No newline at end of file
...@@ -86,6 +86,10 @@ public class SpiderNode { ...@@ -86,6 +86,10 @@ public class SpiderNode {
return value; return value;
} }
public Object getObjectJsonValue(String key){
return this.jsonProperty.get(key);
}
public String getStringJsonValue(String key,String defaultValue){ public String getStringJsonValue(String key,String defaultValue){
String value = getStringJsonValue(key); String value = getStringJsonValue(key);
return StringUtils.isNotBlank(value) ? value : defaultValue; return StringUtils.isNotBlank(value) ? value : defaultValue;
......
...@@ -5,6 +5,9 @@ import com.alibaba.fastjson.JSONObject; ...@@ -5,6 +5,9 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.ttl.TtlRunnable; import com.alibaba.ttl.TtlRunnable;
import com.zq.spiderflow.concurrent.*; import com.zq.spiderflow.concurrent.*;
import com.zq.spiderflow.core.exception.RpaException; import com.zq.spiderflow.core.exception.RpaException;
import com.zq.spiderflow.core.model.Task;
import com.zq.spiderflow.core.service.SpiderFlowService;
import com.zq.spiderflow.core.service.TaskService;
import com.zq.spiderflow.core.utils.ExecutorsUtils; import com.zq.spiderflow.core.utils.ExecutorsUtils;
import com.zq.spiderflow.core.utils.ExpressionUtils; import com.zq.spiderflow.core.utils.ExpressionUtils;
import com.zq.spiderflow.core.utils.SpiderFlowUtils; import com.zq.spiderflow.core.utils.SpiderFlowUtils;
...@@ -16,7 +19,6 @@ import org.dom4j.DocumentHelper; ...@@ -16,7 +19,6 @@ import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.zq.spiderflow.concurrent.*;
import com.zq.spiderflow.concurrent.SpiderFlowThreadPoolExecutor.SubThreadPoolExecutor; import com.zq.spiderflow.concurrent.SpiderFlowThreadPoolExecutor.SubThreadPoolExecutor;
import com.zq.spiderflow.context.SpiderContext; import com.zq.spiderflow.context.SpiderContext;
import com.zq.spiderflow.context.SpiderContextHolder; import com.zq.spiderflow.context.SpiderContextHolder;
...@@ -49,399 +51,412 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -49,399 +51,412 @@ import java.util.concurrent.atomic.AtomicInteger;
@Component @Component
public class Spider { public class Spider {
@Autowired(required = false) @Autowired(required = false)
private List<SpiderListener> listeners; private List<SpiderListener> listeners;
@Value("${spider.thread.max}") @Value("${spider.thread.max}")
private Integer totalThreads; private Integer totalThreads;
@Value("${spider.thread.default}") @Value("${spider.thread.default}")
private Integer defaultThreads; private Integer defaultThreads;
@Value("${spider.detect.dead-cycle:5000}") @Value("${spider.detect.dead-cycle:5000}")
private Integer deadCycle; private Integer deadCycle;
@Autowired @Autowired
private FlowNoticeService flowNoticeService; private FlowNoticeService flowNoticeService;
public static SpiderFlowThreadPoolExecutor executorInstance; @Autowired
private SpiderFlowService spiderFlowService;
private static final String ATOMIC_DEAD_CYCLE = "__atomic_dead_cycle";
@Autowired
private static Logger logger = LoggerFactory.getLogger(Spider.class); private TaskService taskService;
@PostConstruct public static SpiderFlowThreadPoolExecutor executorInstance;
private void init() {
executorInstance = new SpiderFlowThreadPoolExecutor(totalThreads); private static final String ATOMIC_DEAD_CYCLE = "__atomic_dead_cycle";
}
private static Logger logger = LoggerFactory.getLogger(Spider.class);
public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context, Map<String, Object> variables) {
if (variables == null) { @PostConstruct
variables = new HashMap<>(); private void init() {
} executorInstance = new SpiderFlowThreadPoolExecutor(totalThreads);
SpiderNode root = SpiderFlowUtils.loadXMLFromString(spiderFlow.getXml()); }
// 流程开始通知
flowNoticeService.sendFlowNotice(spiderFlow, FlowNoticeType.startNotice); public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context, Map<String, Object> variables) {
executeRoot(root, context, variables); if (variables == null) {
// 流程结束通知 variables = new HashMap<>();
flowNoticeService.sendFlowNotice(spiderFlow, FlowNoticeType.endNotice); }
return context.getOutputs(); SpiderNode root = SpiderFlowUtils.loadXMLFromString(spiderFlow.getXml());
} // 流程开始通知
flowNoticeService.sendFlowNotice(spiderFlow, FlowNoticeType.startNotice);
public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context, Map<String, Object> variables, String body) { executeRoot(root, context, variables);
if (variables == null) { // 流程结束通知
variables = new HashMap<>(); flowNoticeService.sendFlowNotice(spiderFlow, FlowNoticeType.endNotice);
} return context.getOutputs();
SpiderNode root = SpiderFlowUtils.loadXMLFromString(spiderFlow.getXml()); }
replaceParam(root.getNextNodes(), JSONObject.parseObject(body));
// 流程开始通知 public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context, Map<String, Object> variables, String body) {
flowNoticeService.sendFlowNotice(spiderFlow, FlowNoticeType.startNotice); if (variables == null) {
executeRoot(root, context, variables); variables = new HashMap<>();
// 流程结束通知 }
flowNoticeService.sendFlowNotice(spiderFlow, FlowNoticeType.endNotice); SpiderNode root = SpiderFlowUtils.loadXMLFromString(spiderFlow.getXml());
return context.getOutputs(); replaceParam(root.getNextNodes(), JSONObject.parseObject(body));
} // 流程开始通知
flowNoticeService.sendFlowNotice(spiderFlow, FlowNoticeType.startNotice);
public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context) { executeRoot(root, context, variables);
return run(spiderFlow, context, new HashMap<>()); // 流程结束通知
} flowNoticeService.sendFlowNotice(spiderFlow, FlowNoticeType.endNotice);
return context.getOutputs();
public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context, String body) { }
return run(spiderFlow, context, new HashMap<>(), body);
} public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context) {
return run(spiderFlow, context, new HashMap<>());
public void runWithTest(SpiderNode root, SpiderContext context) { }
//将上下文存到ThreadLocal里,以便后续使用
SpiderContextHolder.set(context); public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context, String body) {
//死循环检测的计数器(死循环检测只在测试时有效) return run(spiderFlow, context, new HashMap<>(), body);
AtomicInteger executeCount = new AtomicInteger(0); }
//存入到上下文中,以供后续检测
context.put(ATOMIC_DEAD_CYCLE, executeCount); public void runWithTest(SpiderNode root, SpiderContext context) {
//执行根节点 //将上下文存到ThreadLocal里,以便后续使用
executeRoot(root, context, new HashMap<>()); SpiderContextHolder.set(context);
//当爬虫任务执行完毕时,判断是否超过预期 //死循环检测的计数器(死循环检测只在测试时有效)
if (executeCount.get() > deadCycle) { AtomicInteger executeCount = new AtomicInteger(0);
logger.error("检测到可能出现死循环,测试终止"); //存入到上下文中,以供后续检测
} else { context.put(ATOMIC_DEAD_CYCLE, executeCount);
logger.info("测试完毕!"); Task task = new Task();
} task.setFlowId(context.getFlowId());
//将上下文从ThreadLocal移除,防止内存泄漏 task.setBeginTime(new Date());
SpiderContextHolder.remove(); //执行根节点
} executeRoot(root, context, new HashMap<>());
//当爬虫任务执行完毕时,判断是否超过预期
/** if (executeCount.get() > deadCycle) {
* 执行根节点 logger.error("检测到可能出现死循环,测试终止");
*/ } else {
private void executeRoot(SpiderNode root, SpiderContext context, Map<String, Object> variables) { logger.info("测试完毕!");
//获取当前流程执行线程数 }
int nThreads = NumberUtils.toInt(root.getStringJsonValue(ShapeExecutor.THREAD_COUNT), defaultThreads); Date date = new Date();
String strategy = root.getStringJsonValue("submit-strategy"); task.setEndTime(date);
ThreadSubmitStrategy submitStrategy; taskService.save(task);
//选择提交策略,这里一定要使用new,不能与其他实例共享 spiderFlowService.executeCountIncrement(context.getFlowId(), date, null);
if("linked".equalsIgnoreCase(strategy)){ //将上下文从ThreadLocal移除,防止内存泄漏
submitStrategy = new LinkedThreadSubmitStrategy(); SpiderContextHolder.remove();
}else if("child".equalsIgnoreCase(strategy)){ }
submitStrategy = new ChildPriorThreadSubmitStrategy();
}else if("parent".equalsIgnoreCase(strategy)){ /**
submitStrategy = new ParentPriorThreadSubmitStrategy(); * 执行根节点
}else{ */
submitStrategy = new RandomThreadSubmitStrategy(); private void executeRoot(SpiderNode root, SpiderContext context, Map<String, Object> variables) {
} //获取当前流程执行线程数
//创建子线程池,采用一父多子的线程池,子线程数不能超过总线程数(超过时进入队列等待),+1是因为会占用一个线程用来调度执行下一级 int nThreads = NumberUtils.toInt(root.getStringJsonValue(ShapeExecutor.THREAD_COUNT), defaultThreads);
SubThreadPoolExecutor pool = executorInstance.createSubThreadPoolExecutor(Math.max(nThreads,1) + 1,submitStrategy); String strategy = root.getStringJsonValue("submit-strategy");
context.setRootNode(root); ThreadSubmitStrategy submitStrategy;
context.setThreadPool(pool); //选择提交策略,这里一定要使用new,不能与其他实例共享
//触发监听器 if ("linked".equalsIgnoreCase(strategy)) {
if (listeners != null) { submitStrategy = new LinkedThreadSubmitStrategy();
listeners.forEach(listener -> listener.beforeStart(context)); } else if ("child".equalsIgnoreCase(strategy)) {
} submitStrategy = new ChildPriorThreadSubmitStrategy();
Comparator<SpiderNode> comparator = submitStrategy.comparator(); } else if ("parent".equalsIgnoreCase(strategy)) {
//启动一个线程开始执行任务,并监听其结束并执行下一级 submitStrategy = new ParentPriorThreadSubmitStrategy();
Future<?> f = pool.submitAsync(TtlRunnable.get(() -> { } else {
try { submitStrategy = new RandomThreadSubmitStrategy();
//执行具体节点 }
Spider.this.executeNode(null, root, context, variables); //创建子线程池,采用一父多子的线程池,子线程数不能超过总线程数(超过时进入队列等待),+1是因为会占用一个线程用来调度执行下一级
Queue<Future<?>> queue = context.getFutureQueue(); SubThreadPoolExecutor pool = executorInstance.createSubThreadPoolExecutor(Math.max(nThreads, 1) + 1, submitStrategy);
//循环从队列中获取Future,直到队列为空结束,当任务完成时,则执行下一级 context.setRootNode(root);
while (!queue.isEmpty()) { context.setThreadPool(pool);
try { //触发监听器
//TODO 这里应该是取出最先执行完毕的任务 if (listeners != null) {
Optional<Future<?>> max = queue.stream().filter(Future::isDone).max((o1, o2) -> { listeners.forEach(listener -> listener.beforeStart(context));
try { }
return comparator.compare(((SpiderTask) o1.get()).node, ((SpiderTask) o2.get()).node); Comparator<SpiderNode> comparator = submitStrategy.comparator();
} catch (InterruptedException | ExecutionException e) { //启动一个线程开始执行任务,并监听其结束并执行下一级
} Future<?> f = pool.submitAsync(TtlRunnable.get(() -> {
return 0; try {
//执行具体节点
}); Spider.this.executeNode(null, root, context, variables);
if (max.isPresent()) { //判断任务是否完成 Queue<Future<?>> queue = context.getFutureQueue();
queue.remove(max.get()); //循环从队列中获取Future,直到队列为空结束,当任务完成时,则执行下一级
if (context.isRunning()) { //检测是否运行中(当在页面中点击"停止"时,此值为false,其余为true) while (!queue.isEmpty()) {
SpiderTask task = (SpiderTask) max.get().get(); try {
task.node.decrement(); //任务执行完毕,计数器减一(该计数器是给Join节点使用) //TODO 这里应该是取出最先执行完毕的任务
if (task.executor.allowExecuteNext(task.node, context, task.variables)) { //判断是否允许执行下一级 Optional<Future<?>> max = queue.stream().filter(Future::isDone).max((o1, o2) -> {
logger.debug("执行节点[{}:{}]完毕", task.node.getNodeName(), task.node.getNodeId()); try {
//执行下一级 return comparator.compare(((SpiderTask) o1.get()).node, ((SpiderTask) o2.get()).node);
Spider.this.executeNextNodes(task.node, context, task.variables); } catch (InterruptedException | ExecutionException e) {
} else { }
logger.debug("执行节点[{}:{}]完毕,忽略执行下一节点", task.node.getNodeName(), task.node.getNodeId()); return 0;
}
} });
} if (max.isPresent()) { //判断任务是否完成
//睡眠1ms,让出cpu queue.remove(max.get());
Thread.sleep(1); if (context.isRunning()) { //检测是否运行中(当在页面中点击"停止"时,此值为false,其余为true)
} catch (InterruptedException ignored) { SpiderTask task = (SpiderTask) max.get().get();
} catch (Throwable t){ task.node.decrement(); //任务执行完毕,计数器减一(该计数器是给Join节点使用)
if (t.getCause() instanceof RpaException) { if (task.executor.allowExecuteNext(task.node, context, task.variables)) { //判断是否允许执行下一级
throw (RpaException)t.getCause(); logger.debug("执行节点[{}:{}]完毕", task.node.getNodeName(), task.node.getNodeId());
}else { //执行下一级
logger.error("程序发生异常",t); Spider.this.executeNextNodes(task.node, context, task.variables);
throw new RuntimeException(t); } else {
} logger.debug("执行节点[{}:{}]完毕,忽略执行下一节点", task.node.getNodeName(), task.node.getNodeId());
} }
} }
//等待线程池结束 }
pool.awaitTermination(); //睡眠1ms,让出cpu
} finally { Thread.sleep(1);
//触发监听器 } catch (InterruptedException ignored) {
if (listeners != null) { } catch (Throwable t) {
listeners.forEach(listener -> listener.afterEnd(context)); if (t.getCause() instanceof RpaException) {
} throw (RpaException) t.getCause();
} } else {
}), null, root); logger.error("程序发生异常", t);
try { throw new RuntimeException(t);
f.get(); //阻塞等待所有任务执行完毕 }
} catch (Exception e) { }
throw new RuntimeException(); }
} //等待线程池结束
} pool.awaitTermination();
} finally {
/** //触发监听器
* 执行下一级节点 if (listeners != null) {
*/ listeners.forEach(listener -> listener.afterEnd(context));
private void executeNextNodes(SpiderNode node, SpiderContext context, Map<String, Object> variables) { }
List<SpiderNode> nextNodes = node.getNextNodes(); }
if (nextNodes != null) { }), null, root);
for (SpiderNode nextNode : nextNodes) { try {
executeNode(node, nextNode, context, variables); f.get(); //阻塞等待所有任务执行完毕
} } catch (Exception e) {
} throw new RuntimeException();
} }
}
/**
* 执行节点 /**
*/ * 执行下一级节点
public void executeNode(SpiderNode fromNode, SpiderNode node, SpiderContext context, Map<String, Object> variables) { */
String shape = node.getStringJsonValue("shape"); private void executeNextNodes(SpiderNode node, SpiderContext context, Map<String, Object> variables) {
if (StringUtils.isBlank(shape)) { List<SpiderNode> nextNodes = node.getNextNodes();
executeNextNodes(node, context, variables); if (nextNodes != null) {
return; for (SpiderNode nextNode : nextNodes) {
} executeNode(node, nextNode, context, variables);
//判断箭头上的条件,如果不成立则不执行 }
if (!executeCondition(fromNode, node, variables, context)) { }
return; }
}
logger.debug("执行节点[{}:{}]", node.getNodeName(), node.getNodeId()); /**
//找到对应的执行器 * 执行节点
ShapeExecutor executor = ExecutorsUtils.get(shape); */
if (executor == null) { public void executeNode(SpiderNode fromNode, SpiderNode node, SpiderContext context, Map<String, Object> variables) {
logger.error("执行失败,找不到对应的执行器:{}", shape); String shape = node.getStringJsonValue("shape");
context.setRunning(false); if (StringUtils.isBlank(shape)) {
} executeNextNodes(node, context, variables);
int loopCount = 1; //循环次数默认为1,如果节点有循环属性且填了循环次数/集合,则取出循环次数 return;
int loopStart = 0; //循环起始位置 }
int loopEnd = 1; //循环结束位置 //判断箭头上的条件,如果不成立则不执行
String loopCountStr = node.getStringJsonValue(ShapeExecutor.LOOP_COUNT); if (!executeCondition(fromNode, node, variables, context)) {
Object loopArray = null; return;
boolean isLoop = false; }
if (isLoop = StringUtils.isNotBlank(loopCountStr)) { logger.debug("执行节点[{}:{}]", node.getNodeName(), node.getNodeId());
try { //找到对应的执行器
loopArray = ExpressionUtils.execute(loopCountStr, variables); ShapeExecutor executor = ExecutorsUtils.get(shape);
if(loopArray == null){ if (executor == null) {
loopCount = 0; logger.error("执行失败,找不到对应的执行器:{}", shape);
}else if(loopArray instanceof Collection){ context.setRunning(false);
loopCount = ((Collection)loopArray).size(); }
loopArray = ((Collection)loopArray).toArray(); int loopCount = 1; //循环次数默认为1,如果节点有循环属性且填了循环次数/集合,则取出循环次数
}else if(loopArray.getClass().isArray()){ int loopStart = 0; //循环起始位置
loopCount = Array.getLength(loopArray); int loopEnd = 1; //循环结束位置
}else{ String loopCountStr = node.getStringJsonValue(ShapeExecutor.LOOP_COUNT);
loopCount = NumberUtils.toInt(loopArray.toString(),0); Object loopArray = null;
loopArray = null; boolean isLoop = false;
} if (isLoop = StringUtils.isNotBlank(loopCountStr)) {
loopEnd = loopCount; try {
if(loopCount > 0){ loopArray = ExpressionUtils.execute(loopCountStr, variables);
loopStart = Math.max(NumberUtils.toInt(node.getStringJsonValue(LoopExecutor.LOOP_START), 0),0); if (loopArray == null) {
int end = NumberUtils.toInt(node.getStringJsonValue(LoopExecutor.LOOP_END), -1); loopCount = 0;
if(end >=0){ } else if (loopArray instanceof Collection) {
loopEnd = Math.min(end,loopEnd); loopCount = ((Collection) loopArray).size();
}else{ loopArray = ((Collection) loopArray).toArray();
loopEnd = Math.max(loopEnd + end + 1,0); } else if (loopArray.getClass().isArray()) {
} loopCount = Array.getLength(loopArray);
} } else {
logger.info("获取循环次数{}={}", loopCountStr, loopCount); loopCount = NumberUtils.toInt(loopArray.toString(), 0);
} catch (Throwable t) { loopArray = null;
loopCount = 0; }
logger.error("获取循环次数失败,异常信息:{}", t); loopEnd = loopCount;
} if (loopCount > 0) {
} loopStart = Math.max(NumberUtils.toInt(node.getStringJsonValue(LoopExecutor.LOOP_START), 0), 0);
if (loopCount > 0) { int end = NumberUtils.toInt(node.getStringJsonValue(LoopExecutor.LOOP_END), -1);
//获取循环下标的变量名称 if (end >= 0) {
String loopVariableName = node.getStringJsonValue(ShapeExecutor.LOOP_VARIABLE_NAME); loopEnd = Math.min(end, loopEnd);
String loopItem = node.getStringJsonValue(LoopExecutor.LOOP_ITEM,"item"); } else {
List<SpiderTask> tasks = new ArrayList<>(); loopEnd = Math.max(loopEnd + end + 1, 0);
for (int i = loopStart; i < loopEnd; i++) { }
node.increment(); //节点执行次数+1(后续Join节点使用) }
if (context.isRunning()) { logger.info("获取循环次数{}={}", loopCountStr, loopCount);
Map<String, Object> nVariables = new HashMap<>(); } catch (Throwable t) {
// 判断是否需要传递变量 loopCount = 0;
if(fromNode == null || node.isTransmitVariable(fromNode.getNodeId())){ logger.error("获取循环次数失败,异常信息:{}", t);
nVariables.putAll(variables); }
} }
if(isLoop){ if (loopCount > 0) {
// 存入下标变量 //获取循环下标的变量名称
if (!StringUtils.isBlank(loopVariableName)) { String loopVariableName = node.getStringJsonValue(ShapeExecutor.LOOP_VARIABLE_NAME);
nVariables.put(loopVariableName, i); String loopItem = node.getStringJsonValue(LoopExecutor.LOOP_ITEM, "item");
} List<SpiderTask> tasks = new ArrayList<>();
// 存入item for (int i = loopStart; i < loopEnd; i++) {
nVariables.put(loopItem,loopArray == null ? i : Array.get(loopArray, i)); node.increment(); //节点执行次数+1(后续Join节点使用)
} if (context.isRunning()) {
tasks.add(new SpiderTask(TtlRunnable.get(() -> { Map<String, Object> nVariables = new HashMap<>();
if (context.isRunning()) { // 判断是否需要传递变量
try { if (fromNode == null || node.isTransmitVariable(fromNode.getNodeId())) {
//死循环检测,当执行节点次数大于阈值时,结束本次测试 nVariables.putAll(variables);
AtomicInteger executeCount = context.get(ATOMIC_DEAD_CYCLE); }
if (executeCount != null && executeCount.incrementAndGet() > deadCycle) { if (isLoop) {
context.setRunning(false); // 存入下标变量
return; if (!StringUtils.isBlank(loopVariableName)) {
} nVariables.put(loopVariableName, i);
//执行节点具体逻辑 }
executor.execute(node, context, nVariables); // 存入item
//当未发生异常时,移除ex变量 nVariables.put(loopItem, loopArray == null ? i : Array.get(loopArray, i));
nVariables.remove("ex"); }
} catch (RpaException e){ tasks.add(new SpiderTask(TtlRunnable.get(() -> {
nVariables.put("ex", e); if (context.isRunning()) {
throw e; try {
}catch (Throwable t) { //死循环检测,当执行节点次数大于阈值时,结束本次测试
nVariables.put("ex", t); AtomicInteger executeCount = context.get(ATOMIC_DEAD_CYCLE);
logger.error("执行节点[{}:{}]出错,异常信息:{}", node.getNodeName(), node.getNodeId(), t); if (executeCount != null && executeCount.incrementAndGet() > deadCycle) {
throw new RuntimeException(t.getMessage()); context.setRunning(false);
} return;
} }
}), node, nVariables, executor)); //执行节点具体逻辑
} executor.execute(node, context, nVariables);
} //当未发生异常时,移除ex变量
LinkedBlockingQueue<Future<?>> futureQueue = context.getFutureQueue(); nVariables.remove("ex");
for (SpiderTask task : tasks) { } catch (RpaException e) {
if(executor.isThread()){ //判断节点是否是异步运行 nVariables.put("ex", e);
//提交任务至线程池中,并将Future添加到队列末尾 throw e;
futureQueue.add(context.getThreadPool().submitAsync(task.runnable, task, node)); } catch (Throwable t) {
}else{ nVariables.put("ex", t);
FutureTask<SpiderTask> futureTask = new FutureTask<>(task.runnable, task); logger.error("执行节点[{}:{}]出错,异常信息:{}", node.getNodeName(), node.getNodeId(), t);
futureTask.run(); throw new RuntimeException(t.getMessage());
futureQueue.add(futureTask); }
} }
} }), node, nVariables, executor));
} }
} }
LinkedBlockingQueue<Future<?>> futureQueue = context.getFutureQueue();
/** for (SpiderTask task : tasks) {
* 判断箭头上的表达式是否成立 if (executor.isThread()) { //判断节点是否是异步运行
*/ //提交任务至线程池中,并将Future添加到队列末尾
private boolean executeCondition(SpiderNode fromNode, SpiderNode node, Map<String, Object> variables, SpiderContext context) { futureQueue.add(context.getThreadPool().submitAsync(task.runnable, task, node));
if (fromNode != null) { } else {
boolean hasException = variables.get("ex") != null; FutureTask<SpiderTask> futureTask = new FutureTask<>(task.runnable, task);
String exceptionFlow = node.getExceptionFlow(fromNode.getNodeId()); futureTask.run();
//当出现异常流转 : 1 futureQueue.add(futureTask);
//未出现异常流转 : 2 }
if(("1".equalsIgnoreCase(exceptionFlow) && !hasException) || ("2".equalsIgnoreCase(exceptionFlow) && hasException)){ }
return false; }
} }
String condition = node.getCondition(fromNode.getNodeId());
if (StringUtils.isNotBlank(condition)) { // 判断是否有条件 /**
Object result = null; * 判断箭头上的表达式是否成立
try { */
result = ExpressionUtils.execute(condition, variables); private boolean executeCondition(SpiderNode fromNode, SpiderNode node, Map<String, Object> variables, SpiderContext context) {
} catch (Exception e) { if (fromNode != null) {
logger.error("判断{}出错,异常信息:{}", condition, e); boolean hasException = variables.get("ex") != null;
} String exceptionFlow = node.getExceptionFlow(fromNode.getNodeId());
if (result != null) { //当出现异常流转 : 1
boolean isContinue = "true".equals(result) || Objects.equals(result, true); //未出现异常流转 : 2
logger.debug("判断{}={}", condition, isContinue); if (("1".equalsIgnoreCase(exceptionFlow) && !hasException) || ("2".equalsIgnoreCase(exceptionFlow) && hasException)) {
return isContinue; return false;
} }
return false; String condition = node.getCondition(fromNode.getNodeId());
} if (StringUtils.isNotBlank(condition)) { // 判断是否有条件
} Object result = null;
return true; try {
} result = ExpressionUtils.execute(condition, variables);
} catch (Exception e) {
public void replaceParam(List<SpiderNode> nodes, JSONObject paramJson){ logger.error("判断{}出错,异常信息:{}", condition, e);
for (SpiderNode node : nodes) { }
JSONObject jsonObject = new JSONObject(node.getJsonProperty()); if (result != null) {
if (jsonObject.containsKey("variable-name")) { boolean isContinue = "true".equals(result) || Objects.equals(result, true);
JSONArray jsonArray = jsonObject.getJSONArray("variable-name"); logger.debug("判断{}={}", condition, isContinue);
JSONArray valueArray = jsonObject.getJSONArray("variable-value"); return isContinue;
for (int i = 0; i < jsonArray.size(); i++) { }
String name = (String) jsonArray.get(i); return false;
if (paramJson.containsKey(name)) { }
valueArray.set(i, paramJson.get(name)); }
} return true;
} }
System.out.println(jsonObject);
} public void replaceParam(List<SpiderNode> nodes, JSONObject paramJson) {
replaceParam(node.getNextNodes(), paramJson); for (SpiderNode node : nodes) {
} JSONObject jsonObject = new JSONObject(node.getJsonProperty());
} if (jsonObject.containsKey("variable-name")) {
JSONArray jsonArray = jsonObject.getJSONArray("variable-name");
public String updateXml(String xml, String json){ JSONArray valueArray = jsonObject.getJSONArray("variable-value");
JSONObject jsonObject = JSONObject.parseObject(json); for (int i = 0; i < jsonArray.size(); i++) {
String name = (String) jsonArray.get(i);
Document document; if (paramJson.containsKey(name)) {
try { valueArray.set(i, paramJson.get(name));
document = DocumentHelper.parseText(xml); }
} catch (DocumentException e) { }
throw new RpaException("流程错误,请检查流程"); System.out.println(jsonObject);
} }
Element rootElement = document.getRootElement(); replaceParam(node.getNextNodes(), paramJson);
List<Element> elements = rootElement.elements().get(0).elements(); }
for (Element element : elements) { }
List<Element> elems = element.elements();
for (Element elem : elems) { public String updateXml(String xml, String json) {
if("JsonProperty".equals(elem.getName().trim())){ JSONObject jsonObject = JSONObject.parseObject(json);
JSONObject object = JSONObject.parseObject(elem.getText().trim());
if (object.containsKey("variable-name")) { Document document;
JSONArray jsonArray = object.getJSONArray("variable-name"); try {
JSONArray valueArray = object.getJSONArray("variable-value"); document = DocumentHelper.parseText(xml);
for (int i = 0; i < jsonArray.size(); i++) { } catch (DocumentException e) {
String name = (String) jsonArray.get(i); throw new RpaException("流程错误,请检查流程");
if (jsonObject.containsKey(name)) { }
valueArray.set(i, jsonObject.get(name)); Element rootElement = document.getRootElement();
} List<Element> elements = rootElement.elements().get(0).elements();
} for (Element element : elements) {
elem.setText(JSONObject.toJSONString(object)); List<Element> elems = element.elements();
} for (Element elem : elems) {
} if ("JsonProperty".equals(elem.getName().trim())) {
} JSONObject object = JSONObject.parseObject(elem.getText().trim());
} if (object.containsKey("variable-name")) {
JSONArray jsonArray = object.getJSONArray("variable-name");
return document.asXML(); JSONArray valueArray = object.getJSONArray("variable-value");
} for (int i = 0; i < jsonArray.size(); i++) {
String name = (String) jsonArray.get(i);
class SpiderTask{ if (jsonObject.containsKey(name)) {
valueArray.set(i, jsonObject.get(name));
Runnable runnable; }
}
SpiderNode node; elem.setText(JSONObject.toJSONString(object));
}
Map<String,Object> variables; }
}
ShapeExecutor executor; }
public SpiderTask(Runnable runnable, SpiderNode node, Map<String, Object> variables,ShapeExecutor executor) { return document.asXML();
this.runnable = runnable; }
this.node = node;
this.variables = variables; class SpiderTask {
this.executor = executor;
} Runnable runnable;
}
SpiderNode node;
Map<String, Object> variables;
ShapeExecutor executor;
public SpiderTask(Runnable runnable, SpiderNode node, Map<String, Object> variables, ShapeExecutor executor) {
this.runnable = runnable;
this.node = node;
this.variables = variables;
this.executor = executor;
}
}
} }
...@@ -38,24 +38,24 @@ public class WebSocketEditorServer { ...@@ -38,24 +38,24 @@ public class WebSocketEditorServer {
context.setDebug(isDebug); context.setDebug(isDebug);
context.setRunning(true); context.setRunning(true);
String flowId = event.getString("flowId"); String flowId = event.getString("flowId");
if(StrUtil.isBlank(flowId)){ if (StrUtil.isBlank(flowId)) {
context.write(new WebSocketEvent<>("error", "flowId不能为空!")); context.write(new WebSocketEvent<>("error", "flowId不能为空!"));
} }
context.setFlowId(flowId); context.setFlowId(flowId);
// new Thread(() -> { // new Thread(() -> {
String xml = event.getString("message"); String xml = event.getString("message");
if (xml != null) { if (xml != null) {
spider. runWithTest(SpiderFlowUtils.loadXMLFromString(xml), context); spider.runWithTest(SpiderFlowUtils.loadXMLFromString(xml), context);
context.write(new WebSocketEvent<>("finish", null)); context.write(new WebSocketEvent<>("finish", null));
} else { } else {
context.write(new WebSocketEvent<>("error", "xml不正确!")); context.write(new WebSocketEvent<>("error", "xml不正确!"));
} }
context.setRunning(false); context.setRunning(false);
// }).start(); // }).start();
} else if ("stop".equals(eventType) && context != null) { } else if ("stop".equals(eventType) && context != null) {
context.setRunning(false); context.setRunning(false);
context.stop(); context.stop();
} else if("resume".equalsIgnoreCase(eventType) && context != null){ } else if ("resume".equalsIgnoreCase(eventType) && context != null) {
context.resume(); context.resume();
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment