Skip to content

Commit 727a123

Browse files
Submit job related modules
1 parent 1910060 commit 727a123

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+7925
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
*
3+
* Copyright 2020 WeBank
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.webank.wedatasphere.exchangis.job.config;
19+
20+
import com.webank.wedatasphere.exchangis.job.config.builder.JobConfBuilder;
21+
22+
import java.io.BufferedReader;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.io.InputStreamReader;
26+
import java.nio.charset.StandardCharsets;
27+
28+
/**
29+
* @author enjoyyin
30+
* 2018/11/22
31+
*/
32+
public abstract class AbstractJobTemplate {
33+
34+
public static final String IGNORE_EMPTY_KEY_SIGN = "@";
35+
protected static final String COMMON_TEMPLATE_FILE = "exchangis.tpl";
36+
protected static final String READER_DIRECTORY = "/readers";
37+
protected static final String WRITER_DIRECTORY = "/writers";
38+
protected static final String DEFAULT_SUFFIX = ".tpl";
39+
40+
/**
41+
* //TODO cache the template
42+
* @param location
43+
* @return
44+
*/
45+
protected String getTemplateContent(String location){
46+
return getTemplateContent(location, false);
47+
}
48+
49+
protected String getTemplateContent(String location, boolean includeLineBreak){
50+
String lineBreak = includeLineBreak?"\n":"";
51+
InputStream inputStream = JobConfBuilder.class.getResourceAsStream(location);
52+
if(null != inputStream) {
53+
StringBuilder buffer = new StringBuilder();
54+
try {
55+
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
56+
String tmp = null;
57+
while ((tmp = reader.readLine()) != null) {
58+
if(!includeLineBreak){
59+
tmp = tmp.trim();
60+
}
61+
buffer.append(tmp).append(lineBreak);
62+
}
63+
return buffer.toString();
64+
} catch (IOException e) {
65+
throw new RuntimeException(e);
66+
}
67+
}
68+
return "";
69+
}
70+
71+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
*
3+
* Copyright 2020 WeBank
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.webank.wedatasphere.exchangis.job.config;
19+
20+
/**
21+
* @author enjoyyin
22+
* 2018/10/29
23+
*/
24+
public enum DataConfType{
25+
/**
26+
* reader
27+
*/
28+
READER,
29+
/**
30+
* writer
31+
*/
32+
WRITER
33+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
*
3+
* Copyright 2020 WeBank
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.webank.wedatasphere.exchangis.job.config;
19+
20+
import com.webank.wedatasphere.exchangis.common.util.DateTool;
21+
import com.webank.wedatasphere.exchangis.job.domain.JobHistoryReq;
22+
import com.webank.wedatasphere.exchangis.job.domain.JobInfo;
23+
import jodd.util.StringUtil;
24+
import org.apache.commons.lang3.StringUtils;
25+
import org.springframework.stereotype.Component;
26+
27+
import java.util.regex.Matcher;
28+
29+
import static com.webank.wedatasphere.exchangis.common.util.DateTool.TIME_REGULAR_PATTERN;
30+
31+
@Component
32+
public class HistoryInterval {
33+
public static final String DAY = "DAY";
34+
public static final String HOUR = "HOUR";
35+
public static final String MINUTE = "MINUTE";
36+
private static final String[] TIME_INTERVAL = new String[]{DAY,HOUR,MINUTE};
37+
38+
private static final String TIME_PLACEHOLDER_DATE_TIME = "${yyyy-MM-dd HH:mm:ss}";
39+
private static final String TIME_PLACEHOLDER_TIMESTAMP = "${timestamp}";
40+
private static final String[] TIME_PLACEHOLDER = new String[]
41+
{TIME_PLACEHOLDER_DATE_TIME, TIME_PLACEHOLDER_TIMESTAMP};
42+
43+
44+
public boolean checkTime(JobInfo jobInfo, JobHistoryReq req){
45+
boolean flag = false;
46+
String unit = req.getUnit();
47+
if(!StringUtils.isEmpty(jobInfo.getJobConfig()) && !StringUtil.isEmpty(unit) && req.getStep() != 0){
48+
for(String time : TIME_INTERVAL){
49+
if(req.getUnit().equals(time)){
50+
flag = true;
51+
break;
52+
}
53+
}
54+
if(flag){
55+
flag = false;
56+
String[] placeholders = DateTool.TIME_PLACEHOLDER;
57+
if(unit.equals(HOUR) || unit.equals(MINUTE)){
58+
placeholders = TIME_PLACEHOLDER;
59+
}
60+
for(String p : placeholders){
61+
if(jobInfo.getJobConfig().contains(p)){
62+
flag = true;
63+
break;
64+
}
65+
}
66+
if(!flag){
67+
Matcher matcher = TIME_REGULAR_PATTERN.matcher(jobInfo.getJobConfig());
68+
return matcher.find();
69+
}
70+
}
71+
}
72+
return flag;
73+
}
74+
75+
public boolean checkStep(JobHistoryReq req){
76+
boolean flag = true;
77+
if(req.getUnit().equals(DAY)){
78+
if(req.getStep()> 30 || req.getStep() <= 0){
79+
flag = false;
80+
}
81+
}
82+
if(req.getUnit().equals(HOUR)){
83+
if(req.getStep()> 24 || req.getStep() <= 0){
84+
flag = false;
85+
}
86+
}
87+
if(req.getUnit().equals(MINUTE)){
88+
if(req.getStep()> 60 || req.getStep() <= 0){
89+
flag = false;
90+
}
91+
}
92+
return flag;
93+
}
94+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
*
3+
* Copyright 2020 WeBank
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.webank.wedatasphere.exchangis.job.config;
19+
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
/**
25+
* @author enjoyyin
26+
* 2019/4/1
27+
*/
28+
public enum TransportType {
29+
//
30+
STREAM("stream"),
31+
32+
RECORD("record");
33+
34+
private static Map<String, TransportType> typeMap = new HashMap<>();
35+
36+
private String type;
37+
38+
TransportType(String type){
39+
this.type = type;
40+
}
41+
42+
public String v(){
43+
return this.type;
44+
}
45+
public static TransportType type(String type){
46+
TransportType result = typeMap.get(type != null? type.toLowerCase(): type);
47+
return result == null ? TransportType.RECORD : result;
48+
}
49+
50+
static{
51+
typeMap.put(STREAM.type, STREAM);
52+
typeMap.put(RECORD.type, RECORD);
53+
}
54+
}

0 commit comments

Comments
 (0)