Commit 0e9d716d by wqc

数据连接调整、添加数据清洗接口

parent 2919eac0
......@@ -12,7 +12,7 @@ import javax.annotation.Resource;
@RestController
@Api(tags = "数据库汇聚")
@RequestMapping(value = "/data/collect/db")
@RequestMapping(value = "/data/collect")
public class DataCollectDbController {
@Resource
......@@ -45,4 +45,4 @@ public class DataCollectDbController {
dataCollectDbService.execute(dataCollectSettingId);
return ResultVo.success("汇聚执行成功");
}
}
\ No newline at end of file
}
......@@ -29,4 +29,4 @@ public class DatabaseHolder {
throw new RuntimeException(e);
}
}
}
\ No newline at end of file
}
......@@ -61,4 +61,4 @@ public class DatabaseRunner implements ApplicationRunner {
DatabaseHolder.set(jdbcTemplateMap);
log.info("成功加载数据库:{}个", jdbcTemplateMap.size());
}
}
\ No newline at end of file
}
......@@ -82,4 +82,4 @@ public class DataCollectDbServiceImpl extends ServiceImpl<DataCollectSettingDbMa
}
});
}
}
\ No newline at end of file
}
......@@ -74,7 +74,7 @@
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>4.0.0</version>
<version>4.1.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.oracle</groupId>-->
......@@ -117,6 +117,12 @@
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.zq</groupId>
<artifactId>data-collect-server</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
......
......@@ -5,9 +5,11 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@EnableSwagger2
@EnableScheduling
@MapperScan({"com.zq.dataoperation.dao","com.zq.spiderflow.dao"})
@MapperScan({"com.zq.dataoperation.dao","com.zq.spiderflow.dao","com.zq.datacollect.mapper"})
@EnableDiscoveryClient
@SpringBootApplication//(scanBasePackages = {"com.zq.dataoperation", "com.zq.common.config","com.zq.spiderflow"})
public class DataoperationApplication {
......
......@@ -93,12 +93,12 @@ public class CommonQueryController {
return ResultVo.success();
}
@ApiOperation("运行查询")
@PostMapping("/run")
public ResultVo run(@RequestBody Map<String, Object> body) throws Exception {
System.out.println(body);
return ResultVo.success(commonQueryService.run(body));
}
// @ApiOperation("运行查询")
// @PostMapping("/run")
// public ResultVo run(@RequestBody Map<String, Object> body) throws Exception {
// System.out.println(body);
// return ResultVo.success(commonQueryService.run(body));
// }
@ApiOperation("自定义sql查询")
@PostMapping("/runSelect")
......
......@@ -37,12 +37,12 @@ public class ComponentRunController {
}
@ApiOperation("记录排序")
@PostMapping("/SortRecord")
public ResultVo SortRecord(@RequestBody Map<String, Object> body) throws Exception {
return ResultVo.success(commonQueryService.SortRecord(body));
@PostMapping("/sortRecord")
public ResultVo sortRecord(@RequestBody Map<String, Object> body) throws Exception {
return ResultVo.success(commonQueryService.sortRecord(body));
}
@ApiOperation("记录排序")
@ApiOperation("剪切字符串")
@PostMapping("/cutString")
public ResultVo CutString(@RequestBody Map<String, Object> body) throws Exception {
return ResultVo.success(commonQueryService.CutString(body));
......@@ -54,10 +54,15 @@ public class ComponentRunController {
return ResultVo.success(commonQueryService.validateNumber(body));
}
@ApiOperation("数据输出")
@ApiOperation("excel输出")
@PostMapping("/excelOutput")
public ResultVo excelOutput(HttpServletResponse response, @RequestBody Map<String, Object> body) throws Exception {
return ResultVo.success(commonQueryService.excelOutput(response,body));
}
@ApiOperation("数据库输出")
@PostMapping("/sqlOutput")
public ResultVo sqlOutput(@RequestBody Map<String, Object> body) throws Exception {
return ResultVo.success(commonQueryService.sqlOutput(body));
}
}
......@@ -6,7 +6,9 @@ import com.zq.common.utils.AssertUtils;
import com.zq.common.vo.ResultVo;
import com.zq.dataoperation.dao.MetaDataDao;
import com.zq.dataoperation.entity.DataCleanExpressCategory;
import com.zq.dataoperation.entity.DataCleanRecord;
import com.zq.dataoperation.entity.DataCleanRule;
import com.zq.dataoperation.service.DataCleanRecordService;
import com.zq.dataoperation.service.DataCleanRuleService;
import com.zq.dataoperation.service.DataCleanService;
import com.zq.dataoperation.entity.MetaData;
......@@ -17,6 +19,7 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
@Api(tags = "数据清洗")
@RequiredArgsConstructor
......@@ -30,6 +33,8 @@ public class DataCleanController {
private DataCleanRuleService dataCleanRuleService;
@Resource
private MetaDataDao metaDataDao;
@Resource
private DataCleanRecordService dataCleanRecordService;
/**
* 新增规则表达式
......@@ -93,4 +98,13 @@ public class DataCleanController {
return null;
}
@ApiOperation("新增清洗记录")
@PostMapping("/addCleanList")
public ResultVo addCleanList(@RequestBody DataCleanRecord dataCleanRecord){
AssertUtils.notNull(dataCleanRecord.getQueryDbId(),"缺少连接库ID");
AssertUtils.notNull(dataCleanRecord.getDataCollectTaskLogId(),"缺少汇聚任务ID");
return ResultVo.success(dataCleanRecordService.runAndSave(dataCleanRecord));
}
}
package com.zq.dataoperation.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zq.dataoperation.entity.DataCleanRecord;
public interface DataCleanRecordDao extends BaseMapper<DataCleanRecord> {
}
package com.zq.dataoperation.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* (data_clean_record)实体类
*
* @author zq
* @since 2023-07-04 16:12:53
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@TableName(value = "data_clean_record")
public class DataCleanRecord {
/**
* id
*/
private Integer id;
/**
* 连接id
*/
@ApiModelProperty("连接id")
private Long queryDbId;
/**
* 表名
*/
@ApiModelProperty("表名")
private String tableName;
/**
* 任务汇聚表ID
*/
@ApiModelProperty("任务汇聚表ID")
private Integer dataCollectTaskLogId;
/**
* 清洗开始时间
*/
@ApiModelProperty("清洗开始时间")
private Date startTime;
/**
* 结束时间
*/
@ApiModelProperty("结束时间")
private Date endTime;
/**
* 花费时间
*/
@ApiModelProperty("花费时间")
private String duration;
/**
* 1运行中 2正常结束 3异常终止
*/
@ApiModelProperty("状态")
private Integer status;
/**
* 花费时间
*/
@ApiModelProperty("花费时间")
private String dataCollectCount;
/**
* 花费时间
*/
@ApiModelProperty("花费时间")
private String dataCleanCount;
/**
* 花费时间
*/
@ApiModelProperty("花费时间")
private String dataDirtyCount;
/**
* 创建时间
*/
@ApiModelProperty("创建时间")
private Date createTime;
/**
* 更新时间
*/
@ApiModelProperty("更新时间")
private Date updateTime;
}
package com.zq.dataoperation.holder;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class DatabaseHolder {
private static Map<Integer, JdbcTemplate> jdbcTemplateMap;
public static Map<Integer, JdbcTemplate> getJdbcTemplateMap() {
return jdbcTemplateMap;
}
public static void set(Map<Integer, JdbcTemplate> map) {
jdbcTemplateMap = map;
}
public static void add(Integer id, JdbcTemplate jdbcTemplate) {
if (jdbcTemplateMap == null) {
jdbcTemplateMap = new HashMap<>();
}
jdbcTemplateMap.put(id, jdbcTemplate);
}
public static JdbcTemplate getJdbcTemplate(Integer id) {
return jdbcTemplateMap.get(id);
}
public static Connection getConnection(Integer id) {
try {
return jdbcTemplateMap.get(id).getDataSource().getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
package com.zq.dataoperation.runner;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import com.zq.common.exception.BusinessException;
import com.zq.dataoperation.entity.QueryDb;
import com.zq.dataoperation.utils.ConnectionUtil;
import com.zq.dataoperation.service.CommonQueryService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
//@Slf4j
//@Component
//public class DatabaseConnectRunner implements ApplicationRunner {
//
// @Resource
// private CommonQueryService service;
//
// private Map<Integer, DruidPooledConnection> connectionMap = new HashMap<>();
//
// @Override
// public void run(ApplicationArguments args) throws SQLException {
// List<QueryDb> datasource = service.getDatasource();
// for (QueryDb ds : datasource) {
// DruidDataSource druidDataSource = DruidDataSourceBuilder.create().build();
// String url = "";
// String className = "";
// switch (ds.getDbType()) {
// case 1:
// className = "com.mysql.cj.jdbc.Driver";
// url = "jdbc:mysql://" + ds.getDbIp() + ":" + ds.getDbPort() + "/" + ds.getDbName() + "?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&autoReconnect=true";
// break;
// case 2:
// className = "oracle.jdbc.driver.OracleDriver";
// url = "jdbc:oracle:thin:@//" + ds.getDbIp() + ":" + ds.getDbPort() + "/" + ds.getDbName();
// break;
// case 3:
// className = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
// url = "jdbc:sqlserver://" + ds.getDbIp() + ":" + ds.getDbPort() + "/" + ds.getDbName();
// break;
// default:
// throw new BusinessException("不支持的数据库");
// }
// druidDataSource.setDriverClassName(className);
// druidDataSource.setUrl(url);
// druidDataSource.setUsername(ds.getUsername());
// druidDataSource.setPassword(ds.getPassword());
// druidDataSource.init();
// connectionMap.put(ds.getId(), druidDataSource.getConnection());
// }
// ConnectionUtil.set(connectionMap);
// log.info("成功加载数据库:{}个", connectionMap.size());
// }
//
// @PreDestroy
// public void onClose(){
// for (DruidPooledConnection connection : connectionMap.values()) {
// try {
// connection.close();
// } catch (SQLException e) {
// e.printStackTrace();
// }
// }
// }
//}
package com.zq.dataoperation.runner;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zq.common.exception.BusinessException;
import com.zq.dataoperation.dao.QueryDbDao;
import com.zq.dataoperation.entity.QueryDb;
import com.zq.dataoperation.utils.ConnectionUtil;
import com.zq.dataoperation.service.CommonQueryService;
import com.zq.dataoperation.holder.DatabaseHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;;
import javax.annotation.Resource;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class DatabaseRunner implements ApplicationRunner {
@Resource
private CommonQueryService service;
private Map<Integer, DruidPooledConnection> connectionMap = new HashMap<>();
private QueryDbDao queryDbDao;
@Override
public void run(ApplicationArguments args) throws SQLException {
List<QueryDb> datasource = service.getDatasource();
List<QueryDb> datasource = queryDbDao.selectList(new QueryWrapper<>());
for (QueryDb ds : datasource) {
DruidDataSource druidDataSource = DruidDataSourceBuilder.create().build();
String url = "";
......@@ -56,20 +53,8 @@ public class DatabaseRunner implements ApplicationRunner {
druidDataSource.setUsername(ds.getUsername());
druidDataSource.setPassword(ds.getPassword());
druidDataSource.init();
connectionMap.put(ds.getId(), druidDataSource.getConnection());
}
ConnectionUtil.set(connectionMap);
log.info("成功加载数据库:{}个", connectionMap.size());
}
@PreDestroy
public void onClose(){
for (DruidPooledConnection connection : connectionMap.values()) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
DatabaseHolder.add(ds.getId(), new JdbcTemplate(druidDataSource));
}
log.info("成功加载数据库:{}个", DatabaseHolder.getJdbcTemplateMap().size());
}
}
\ No newline at end of file
}
package com.zq.dataoperation.service;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zq.common.utils.AssertUtils;
import com.zq.common.vo.ResultVo;
import com.zq.datacollect.entity.DataCollectTaskLog;
import com.zq.datacollect.mapper.DataCollectTaskLogMapper;
import com.zq.dataoperation.dao.DataCleanRecordDao;
import com.zq.dataoperation.dao.QueryDbDao;
import com.zq.dataoperation.entity.DataCleanRecord;
import com.zq.dataoperation.entity.QueryDb;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@RequiredArgsConstructor
@Service
public class DataCleanRecordService extends ServiceImpl<DataCleanRecordDao, DataCleanRecord> {
@Resource
private DataCleanRecordDao dataCleanRecordDao;
@Resource
private QueryDbDao queryDbDao;
@Resource
private DataCollectTaskLogMapper dataCollectTaskLogMapper;
public Object runAndSave(DataCleanRecord dataCleanRecord) {
QueryDb queryDb = queryDbDao.selectById(dataCleanRecord.getQueryDbId());
AssertUtils.notNull(queryDb, "无连接表ID");
DataCollectTaskLog dataCollectTaskLog = dataCollectTaskLogMapper.selectById(dataCleanRecord.getDataCollectTaskLogId());
AssertUtils.notNull(dataCollectTaskLog, "无连接表ID");
DataCleanRecord build = DataCleanRecord.builder()
.queryDbId(dataCleanRecord.getQueryDbId())
.dataCollectTaskLogId(dataCollectTaskLog.getDataCollectSettingId())
.tableName(dataCleanRecord.getTableName())
.endTime(DateUtil.date())
.status(dataCollectTaskLog.getStatus())
.dataCollectCount(dataCleanRecord.getDataCollectCount())
.dataCleanCount(dataCleanRecord.getDataCleanCount())
.dataDirtyCount(dataCleanRecord.getDataDirtyCount())
.createTime(DateUtil.date())
.build();
DataCleanRecord record = dataCleanRecordDao.selectById(dataCleanRecord.getId());
int i = 0;
if (record == null) {
i = dataCleanRecordDao.insert(build);
} else {
i = dataCleanRecordDao.updateById(build);
}
return i>0 ? ResultVo.success("操作成功!") : ResultVo.fail("操作失败");
}
}
......@@ -16,8 +16,4 @@ public class ConnectionUtil {
public static Connection get(Integer id){
return connectionMap.get(id).getConnection();
}
public static Connection get(Integer id) throws SQLException {
return dataSourceMap.get(id).getConnection();
}
}
......@@ -140,7 +140,7 @@ public class FileUtil extends cn.hutool.core.io.FileUtil {
/**
* 导出excel
*/
public static void downloadExcel(List<Map<String, Object>> list, HttpServletResponse response, String fileName) throws IOException {
public static void downloadExcel(List<Map> list, HttpServletResponse response, String fileName) throws IOException {
String tempPath = fileName + ".xlsx";
File file = new File(tempPath);
BigExcelWriter writer = ExcelUtil.getBigWriter(file);
......@@ -158,26 +158,6 @@ public class FileUtil extends cn.hutool.core.io.FileUtil {
IoUtil.close(out);
}
/**
* 导出excel
*/
public static void outputExcel(List<Map> list, HttpServletResponse response, String filePath,String fileName) throws IOException {
// String tempPath = fileName + ".xlsx";
File file = new File(fileName+"."+filePath);
BigExcelWriter writer = ExcelUtil.getBigWriter(file);
// 一次性写出内容,使用默认样式,强制输出标题
writer.write(list, true);
//response为HttpServletResponse对象
response.setContentType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;charset=utf-8");
//test.xls是弹出下载对话框的文件名,不能为中文,中文请自行编码
response.setHeader("Content-Disposition", "attachment;filename=file.xlsx");
ServletOutputStream out = response.getOutputStream();
// 终止后删除临时文件
file.deleteOnExit();
writer.flush(out, true);
//此处记得关闭输出Servlet流
IoUtil.close(out);
}
public static String getFileType(String type) {
String documents = "txt doc pdf ppt pps xlsx xls docx";
......
package com.zq.dataoperation.utils;
import com.zq.dataoperation.holder.DatabaseHolder;
import com.zq.dataoperation.vo.Column;
import lombok.SneakyThrows;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
......@@ -18,7 +20,7 @@ public class TableUtil {
public static Object getColumns(Integer queryId, String tableName) {
String driver = "com.mysql.cj.jdbc.Driver";
Class.forName(driver);
Connection connection = ConnectionUtil.get(queryId);
Connection connection = DatabaseHolder.getConnection(queryId);
DatabaseMetaData metaData = connection.getMetaData();
String primaryColumn = "";
ResultSet idxRs = metaData.getIndexInfo(connection.getCatalog(), connection.getCatalog(), tableName, false, false);
......
......@@ -34,8 +34,8 @@ public interface SpiderFlowMapper extends BaseMapper<SpiderFlow>{
})
IPage<SpiderFlow> selectSpiderPage(Page<SpiderFlow> page,@Param("name") String name);
@Insert("insert into sp_flow(id,name,xml,enabled) values(#{id},#{name},#{xml},'0')")
int insertSpiderFlow(@Param("id") String id, @Param("name") String name, @Param("xml") String xml);
@Insert("insert into sp_flow(id,name,xml,flow_type,enabled) values(#{id},#{name},#{xml},#{flowType},'0')")
int insertSpiderFlow(@Param("id") String id, @Param("name") String name, @Param("xml") String xml, @Param("flowType") String flowType);
@Update("update sp_flow set name = #{name},xml = #{xml} where id = #{id}")
int updateSpiderFlow(@Param("id") String id, @Param("name") String name, @Param("xml") String xml);
......
......@@ -39,6 +39,11 @@ public class SpiderFlow {
private String enabled;
/**
* 1模型2汇聚3清洗
*/
private String flowType;
private Date createDate;
private Date lastExecuteTime;
......
......@@ -128,7 +128,7 @@ public class SpiderFlowService extends ServiceImpl<SpiderFlowMapper, SpiderFlow>
}
}else{//insert 任务
String id = UUID.randomUUID().toString().replace("-", "");
sfMapper.insertSpiderFlow(id, spiderFlow.getName(), spiderFlow.getXml());
sfMapper.insertSpiderFlow(id, spiderFlow.getName(), spiderFlow.getXml(),spiderFlow.getFlowType());
spiderFlow.setId(id);
}
File file = new File(workspace,spiderFlow.getId() + File.separator + "xmls" + File.separator + System.currentTimeMillis() + ".xml");
......@@ -259,7 +259,7 @@ public class SpiderFlowService extends ServiceImpl<SpiderFlowMapper, SpiderFlow>
public PageVo<SpiderFlow> getPageList(SpiderFlowVo vo) {
LambdaQueryWrapper<SpiderFlow> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.orderByDesc(SpiderFlow::getCreateDate);
queryWrapper.eq(SpiderFlow::getFlowType,vo.getFlowType()).orderByDesc(SpiderFlow::getCreateDate);
return PagingUtils.paging(vo, sfMapper, queryWrapper, SpiderFlow.class);
}
}
......@@ -52,5 +52,10 @@ public class SpiderFlowVo extends PageReqVo{
@TableField(exist = false)
private Integer running;
/**
* 1模型2汇聚3清洗
*/
private String flowType;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment