代码备份 inited.get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143

```java
/*
* Datart
* <p>
* Copyright 2021
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package datart.api.service.jdbc;

import cn.hutool.json.JSONUtil;
import datart.api.feign.dataobject.JdbcProperties;
import datart.api.feign.dataobject.Source;
import datart.api.service.SourceService;
import datart.api.service.custom.result.QueryResult;
import datart.api.service.custom.result.Row;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 本地维护数据源
* <pre>
* 1. 数据源创建(初始化)
* 2. TODO 数据源动态更新
* 3. 数据源查询
* </pre>
*
* @since 1.0.8
*/
@Slf4j
@Component
public class JdbcDataProvider {

@Autowired
private SourceService sourceService;

private AtomicBoolean inited = new AtomicBoolean();

private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();

@SneakyThrows
public void init() {
// 已初始化 或 并发设置失败
if (inited.get() || !inited.compareAndSet(false, true)) {
return;
}

List<Source> sourceList = sourceService.listOrgSources();

for (Source source : sourceList) {
JdbcProperties jdbcProperties = JSONUtil.toBean(source.getConfig(), JdbcProperties.class);

DataSource dataSource = DataSourceFactory.createDataSource(jdbcProperties);

dataSourceMap.put(source.getId(), dataSource);
}
}

/**
* 直接执行,返回所有数据,用于支持已经支持分页的数据库,或者不需要分页的查询。
*
* @param sql 直接提交至数据源执行的SQL,通常已经包含了分页
* @return 全量数据
* @throws SQLException SQL执行异常
*/
public QueryResult execute(String sourceId, String sql) throws SQLException {
init();

try (Connection conn = getConn(sourceId)) {
try (Statement statement = conn.createStatement()) {
try (ResultSet rs = statement.executeQuery(sql)) {
return parseResultSet(rs);
}
}
}
}

private Connection getConn(String sourceId) throws SQLException {
return dataSourceMap.get(sourceId).getConnection();
}

private QueryResult parseResultSet(ResultSet rs) throws SQLException {
return parseResultSet(rs, Long.MAX_VALUE);
}

private QueryResult parseResultSet(ResultSet rs, long count) throws SQLException {
QueryResult queryResult = new QueryResult();

int c = 0;
while (rs.next()) {
Row row = new Row();
queryResult.add(row);

for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
String columnName = rs.getMetaData().getColumnLabel(i);
Object columnValue = getObjFromResultSet(rs, i);
row.put(columnName, columnValue);
}
c++;
if (c >= count) {
break;
}
}

return queryResult;
}

private Object getObjFromResultSet(ResultSet rs, int columnIndex) throws SQLException {
Object obj = rs.getObject(columnIndex);
if (obj instanceof Boolean) {
obj = rs.getObject(columnIndex).toString();
} else if (obj instanceof LocalDateTime) {
obj = rs.getTimestamp(columnIndex);
}
return obj;
}

}

```