Skip to content

Commit 55f121d

Browse files
Merge pull request #218 from robinhood-jim/develop
Develop
2 parents b1f448e + 9fffff0 commit 55f121d

File tree

235 files changed

+18714
-587
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

235 files changed

+18714
-587
lines changed

common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public Pair<BufferedReader, InputStream> getInResourceByReader(String resourcePa
4949
VfsParam param = new VfsParam();
5050
InputStream stream;
5151
try {
52-
ConvertUtil.convertToTarget(param, colmeta.getResourceCfgMap());
52+
ConvertUtil.convertToTarget(colmeta.getResourceCfgMap(), param);
5353
FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
5454
checkAndSetFileObject(fileObject);
5555
stream = getInResource(fileObject, colmeta);
@@ -92,7 +92,7 @@ public OutputStream getOutResourceByStream(String resourcePath) throws IOExcepti
9292
public InputStream getInResourceByStream(String resourcePath) throws IOException {
9393
VfsParam param = new VfsParam();
9494
try {
95-
ConvertUtil.convertToTarget(param, colmeta.getResourceCfgMap());
95+
ConvertUtil.convertToTarget(colmeta.getResourceCfgMap(), param);
9696
FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
9797
checkAndSetFileObject(fileObject);
9898
return getInResource(fileObject, colmeta);
@@ -104,7 +104,7 @@ public InputStream getInResourceByStream(String resourcePath) throws IOException
104104
public static FileObject getFileObject(FileSystemManager manager, DataCollectionMeta meta, String resPath) throws IOException {
105105
VfsParam param = new VfsParam();
106106
try {
107-
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
107+
ConvertUtil.convertToTarget(meta.getResourceCfgMap(), param);
108108
return manager.resolveFile(getUriByParam(param, resPath).toString(), getOptions(param));
109109
} catch (Exception ex) {
110110
throw new IOException(ex);
@@ -162,7 +162,7 @@ public List<String> listFilePath(VfsParam param, String path) {
162162

163163
public FileObject createNotExists(DataCollectionMeta meta, String resourcePath) throws Exception {
164164
VfsParam param = new VfsParam();
165-
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
165+
ConvertUtil.convertToTarget(meta.getResourceCfgMap(), param);
166166
try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
167167
if (fo.exists()) {
168168
if (FileType.FOLDER.equals(fo.getType())) {
@@ -209,7 +209,7 @@ public OutputStream getRawOutputStream(String resourcePath) throws IOException {
209209
public InputStream getRawInputStream(String resourcePath) throws IOException {
210210
VfsParam param = new VfsParam();
211211
try {
212-
ConvertUtil.convertToTarget(param, colmeta.getResourceCfgMap());
212+
ConvertUtil.convertToTarget(colmeta.getResourceCfgMap(), param);
213213
FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
214214
checkAndSetFileObject(fileObject);
215215
return getRawInResource(fileObject, colmeta);
@@ -284,7 +284,7 @@ public VfsParam returnFtpParam(String hostName, int port, String userName, Strin
284284
public boolean exists(String resourcePath) throws IOException {
285285
VfsParam param = new VfsParam();
286286
try {
287-
ConvertUtil.convertToTarget(param, colmeta.getResourceCfgMap());
287+
ConvertUtil.convertToTarget(colmeta.getResourceCfgMap(), param);
288288
} catch (Exception ex) {
289289
throw new IOException(ex);
290290
}
@@ -299,7 +299,7 @@ public boolean exists(String resourcePath) throws IOException {
299299
public long getInputStreamSize(String resourcePath) throws IOException {
300300
VfsParam param = new VfsParam();
301301
try {
302-
ConvertUtil.convertToTarget(param, colmeta.getResourceCfgMap());
302+
ConvertUtil.convertToTarget(colmeta.getResourceCfgMap(), param);
303303
} catch (Exception ex) {
304304
throw new IOException(ex);
305305
}

common/src/main/java/com/robin/core/fileaccess/fs/FileSystemAccessorFactory.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.robin.core.fileaccess.fs;
22

33
import com.robin.core.base.exception.MissingConfigException;
4+
import com.robin.core.base.util.Const;
45
import com.robin.core.fileaccess.meta.DataCollectionMeta;
56
import lombok.NonNull;
67
import org.springframework.util.ObjectUtils;
@@ -21,7 +22,27 @@ public static AbstractFileSystemAccessor getResourceAccessorByType(@NonNull Stri
2122
try {
2223
Class<? extends IFileSystemAccessor> clazz = accessorMap.get(resType);
2324
if (!ObjectUtils.isEmpty(clazz)) {
24-
accessor = (AbstractFileSystemAccessor) clazz.getConstructor().newInstance();
25+
if(LocalFileSystemAccessor.class.isAssignableFrom(clazz)){
26+
accessor=LocalFileSystemAccessor.getInstance();
27+
}else {
28+
accessor = (AbstractFileSystemAccessor) clazz.getConstructor().newInstance();
29+
}
30+
}
31+
} catch (Exception ex) {
32+
throw new MissingConfigException(ex);
33+
}
34+
return accessor;
35+
}
36+
public static AbstractFileSystemAccessor getResourceAccessorByType(@NonNull Const.FILESYSTEM fsType) throws MissingConfigException {
37+
AbstractFileSystemAccessor accessor = null;
38+
try {
39+
Class<? extends IFileSystemAccessor> clazz = accessorMap.get(fsType.getValue());
40+
if (!ObjectUtils.isEmpty(clazz)) {
41+
if(LocalFileSystemAccessor.class.isAssignableFrom(clazz)){
42+
accessor=LocalFileSystemAccessor.getInstance();
43+
}else {
44+
accessor = (AbstractFileSystemAccessor) clazz.getConstructor().newInstance();
45+
}
2546
}
2647
} catch (Exception ex) {
2748
throw new MissingConfigException(ex);
@@ -47,10 +68,30 @@ public static AbstractFileSystemAccessor getResourceAccessorByType(@NonNull Stri
4768
}
4869
return accessor;
4970
}
71+
public static AbstractFileSystemAccessor getResourceAccessorByType(@NonNull Const.FILESYSTEM resType, @NonNull DataCollectionMeta colmeta) throws MissingConfigException {
72+
AbstractFileSystemAccessor accessor = null;
73+
try {
74+
Class<? extends IFileSystemAccessor> clazz = accessorMap.get(resType.getValue());
75+
if (!ObjectUtils.isEmpty(clazz)) {
76+
if(LocalFileSystemAccessor.class.isAssignableFrom(clazz)){
77+
return LocalFileSystemAccessor.getInstance();
78+
}else {
79+
accessor = (AbstractFileSystemAccessor) clazz.getConstructor().newInstance();
80+
if (!ObjectUtils.isEmpty(colmeta)) {
81+
accessor.init(colmeta);
82+
}
83+
}
84+
}
85+
} catch (Exception ex) {
86+
throw new MissingConfigException(ex);
87+
}
88+
return accessor;
89+
}
5090

5191
private static void discoverAccessor(Map<String, Class<? extends IFileSystemAccessor>> accessorMap) {
5292
ServiceLoader.load(IFileSystemAccessor.class).iterator().forEachRemaining(i -> {
5393
accessorMap.put(i.getIdentifier(), i.getClass());
5494
});
95+
accessorMap.put(Const.FILESYSTEM.LOCAL.getValue(), LocalFileSystemAccessor.class);
5596
}
5697
}

common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.robin.core.fileaccess.util.PolandNotationUtil;
3131
import com.robin.core.fileaccess.util.ResourceUtil;
3232
import com.robin.core.fileaccess.util.SqlContentResolver;
33+
import org.apache.avro.Schema;
3334
import org.apache.calcite.config.Lex;
3435
import org.apache.calcite.sql.*;
3536
import org.apache.commons.lang3.StringUtils;
@@ -73,6 +74,7 @@ public abstract class AbstractFileIterator implements IResourceIterator {
7374

7475
protected SqlSegment segment;
7576
protected Iterator<Map.Entry<String,Map<String,Object>>> groupIter;
77+
protected Schema avroSchema;
7678

7779
public AbstractFileIterator() {
7880

@@ -328,4 +330,21 @@ private void appendByType(StringBuilder builder,Object value){
328330
builder.append(value).append("|");
329331
}
330332
}
333+
334+
@Override
335+
public void remove() {
336+
hasNext();
337+
}
338+
339+
public Schema getSchema() {
340+
return avroSchema;
341+
}
342+
343+
public Map<String, DataSetColumnMeta> getColumnMap() {
344+
return columnMap;
345+
}
346+
347+
public SqlSegment getSegment() {
348+
return segment;
349+
}
331350
}

common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.robin.core.fileaccess.iterator;
22
import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor;
3+
import org.apache.avro.Schema;
34

45
import java.io.BufferedReader;
56
import java.io.Closeable;

common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colm
116116
public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colmeta,InputStream in) throws IOException{
117117
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
118118
colmeta.setContent(content);
119-
String fileFormat=content.getFileFormat();
119+
String fileFormat=content.getFileFormat().getValue();
120120
if(StringUtils.isEmpty(colmeta.getFileFormat())){
121121
colmeta.setFileFormat(fileFormat);
122122
}
@@ -170,7 +170,7 @@ private static String getFileType(DataCollectionMeta colmeta) {
170170
if(ObjectUtils.isEmpty(fileType)){
171171
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
172172
colmeta.setContent(content);
173-
fileType=content.getFileFormat();
173+
fileType=content.getFileFormat().getValue();
174174
}
175175
return fileType;
176176
}

common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public abstract class AbstractFileWriter implements IResourceWriter {
4949
protected AbstractFileSystemAccessor accessUtil;
5050
protected String identifier;
5151
protected boolean useBufferedWriter=false;
52+
protected boolean useRawOutputStream=false;
5253

5354
public AbstractFileWriter(){
5455

@@ -105,7 +106,11 @@ public void initalize() throws IOException{
105106
public void beginWrite() throws IOException{
106107
if(out==null){
107108
checkAccessUtil(colmeta.getPath());
108-
out = accessUtil.getOutResourceByStream(ResourceUtil.getProcessPath(colmeta.getPath()));
109+
if(!useRawOutputStream) {
110+
out = accessUtil.getOutResourceByStream(ResourceUtil.getProcessPath(colmeta.getPath()));
111+
}else{
112+
out = accessUtil.getRawOutputStream(ResourceUtil.getProcessPath(colmeta.getPath()));
113+
}
109114
if(useBufferedWriter) {
110115
writer = new BufferedWriter(new OutputStreamWriter(out));
111116
}

common/src/main/java/com/robin/core/fileaccess/writer/AbstractResourceWriter.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,7 @@
1616
import java.io.IOException;
1717
import java.util.*;
1818

19-
/**
20-
* <p>Project: frame</p>
21-
* <p>
22-
* <p>Description:com.robin.core.fileaccess.writer</p>
23-
* <p>
24-
* <p>Copyright: Copyright (c) 2018 create at 2018年10月31日</p>
25-
* <p>
26-
* <p>Company: zhcx_DEV</p>
27-
*
28-
* @author robinjim
29-
* @version 1.0
30-
*/
19+
3120
public abstract class AbstractResourceWriter implements IResourceWriter{
3221
protected DataCollectionMeta colmeta;
3322

common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private static String getFileSuffix(DataCollectionMeta colmeta) {
113113
if(ObjectUtils.isEmpty(fileSuffix)){
114114
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
115115
colmeta.setContent(content);
116-
fileSuffix=content.getFileFormat();
116+
fileSuffix=content.getFileFormat().getValue();
117117
}
118118
return fileSuffix;
119119
}
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
com.robin.core.fileaccess.fs.ApacheVfsFileSystemAccessor
2-
com.robin.core.fileaccess.fs.LocalFileSystemAccessor
1+
com.robin.core.fileaccess.fs.ApacheVfsFileSystemAccessor
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.robin.core.base.annotation;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
@Target(ElementType.METHOD)
9+
@Retention(RetentionPolicy.RUNTIME)
10+
public @interface ServerlessFunction {
11+
String value() default "";
12+
String initFunc() default "";
13+
String initParam() default "";
14+
String method() default "";
15+
}

0 commit comments

Comments
 (0)