diff --git a/common/pom.xml b/common/pom.xml
index b72fce11..56de9af4 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -299,6 +299,32 @@
dom4j
true
+
+ org.anarres.lzo
+ lzo-core
+ true
+
+
+ org.anarres.lzo
+ lzo-hadoop
+ true
+
+
+ org.xerial.snappy
+ snappy-java
+ true
+
+
+ org.tukaani
+ xz
+ true
+
+
+
+ org.lz4
+ lz4-java
+ true
+
src/main/java
diff --git a/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java b/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java
index 14de0c7e..b40b636f 100644
--- a/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java
+++ b/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java
@@ -156,7 +156,7 @@ public void returnOutputStreamHolder(OutputStreamHolder holder) throws Exception
throw new RuntimeException("output strem config not found!");
}
}
- public JdbcResourceHolder getPoolJdbcHolder(Long sourceId, DataCollectionMeta meta, HikariConfig config) throws Exception{
+ public JdbcResourceHolder getPoolJdbcHolder(Long sourceId, DataCollectionMeta meta, HikariConfig config) {
if(jdbcResourceHolderMap.get(sourceId)==null){
JdbcResourceHolder holder=new JdbcResourceHolder(meta,config);
jdbcResourceHolderMap.put(sourceId,holder);
diff --git a/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java b/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java
index b57c9c03..6a318936 100644
--- a/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java
+++ b/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java
@@ -104,4 +104,5 @@ public void init(DataCollectionMeta meta){
public String getIdentifier() {
return identifier;
}
+
}
diff --git a/common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java b/common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java
index cf619c71..0e860532 100644
--- a/common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java
+++ b/common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java
@@ -5,10 +5,7 @@
import com.robin.core.fileaccess.meta.DataCollectionMeta;
import com.robin.core.fileaccess.meta.VfsParam;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.commons.vfs2.FileObject;
-import org.apache.commons.vfs2.FileSystemManager;
-import org.apache.commons.vfs2.FileSystemOptions;
-import org.apache.commons.vfs2.FileType;
+import org.apache.commons.vfs2.*;
import org.apache.commons.vfs2.impl.StandardFileSystemManager;
import org.apache.commons.vfs2.provider.ftp.FtpFileSystemConfigBuilder;
import org.apache.commons.vfs2.provider.http.HttpFileSystemConfigBuilder;
@@ -17,15 +14,19 @@
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
+import java.io.FileNotFoundException;
import java.io.*;
import java.net.URI;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class ApacheVfsFileSystemAccessor extends AbstractFileSystemAccessor {
+ private Map objectMap = new HashMap<>();
public ApacheVfsFileSystemAccessor() {
- this.identifier= Const.FILESYSTEM.VFS.getValue();
+ this.identifier = Const.FILESYSTEM.VFS.getValue();
try {
manager = new StandardFileSystemManager();
logger.info(" manager {} ", manager);
@@ -39,29 +40,30 @@ public ApacheVfsFileSystemAccessor() {
private static final Logger logger = LoggerFactory.getLogger(ApacheVfsFileSystemAccessor.class);
@Override
- public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException {
+ public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException {
VfsParam param = new VfsParam();
InputStream stream;
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
-
- FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
- stream=getInResource(fo, meta);
- return Pair.of(new BufferedReader(new InputStreamReader(stream, meta.getEncode())),stream);
+ FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
+ meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
+ stream = getInResource(fileObject, meta);
+ return Pair.of(new BufferedReader(new InputStreamReader(stream, meta.getEncode())), stream);
} catch (Exception ex) {
throw new IOException(ex);
}
}
@Override
- public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException {
+ public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException {
BufferedWriter writer;
OutputStream outputStream;
try {
- FileObject fo = checkFileExist(meta, resourcePath);
- outputStream=fo.getContent().getOutputStream();
+ FileObject fileObject = createNotExists(meta, resourcePath);
+ meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
+ outputStream = fileObject.getContent().getOutputStream();
writer = getWriterByPath(resourcePath, outputStream, meta.getEncode());
- return Pair.of(writer,outputStream);
+ return Pair.of(writer, outputStream);
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -71,8 +73,9 @@ public Pair getOutResourceByWriter(DataCollectionMe
public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException {
OutputStream out;
try {
- FileObject fo = checkFileExist(meta, resourcePath);
- out = getOutputStreamByPath(resourcePath, fo.getContent().getOutputStream());
+ FileObject fileObject = createNotExists(meta, resourcePath);
+ meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
+ out = getOutputStreamByPath(resourcePath, fileObject.getContent().getOutputStream());
return out;
} catch (Exception ex) {
throw new IOException(ex);
@@ -84,9 +87,9 @@ public InputStream getInResourceByStream(DataCollectionMeta meta, String resourc
VfsParam param = new VfsParam();
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
-
- FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
- return getInResource(fo, meta);
+ FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
+ meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
+ return getInResource(fileObject, meta);
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -117,7 +120,7 @@ private InputStream getRawInResource(FileObject fo, DataCollectionMeta meta) thr
return reader;
}
- public static InputStream getInResource(FileObject fo, DataCollectionMeta meta) throws Exception {
+ public static InputStream getInResource(FileObject fo, DataCollectionMeta meta) throws IOException {
InputStream reader;
if (fo.exists()) {
if (FileType.FOLDER.equals(fo.getType())) {
@@ -134,8 +137,7 @@ public static InputStream getInResource(FileObject fo, DataCollectionMeta meta)
public List listFilePath(VfsParam param, String path) {
List list = new ArrayList<>();
- try {
- FileObject fo = manager.resolveFile(getUriByParam(param, path).toString(), getOptions(param));
+ try (FileObject fo = manager.resolveFile(getUriByParam(param, path).toString(), getOptions(param))) {
if (FileType.FOLDER.equals(fo.getType())) {
FileObject[] object = fo.getChildren();
if (!ObjectUtils.isEmpty(object)) {
@@ -146,39 +148,41 @@ public List listFilePath(VfsParam param, String path) {
}
}
}
-
} catch (Exception ex) {
ex.printStackTrace();
}
return list;
}
- public FileObject checkFileExist(DataCollectionMeta meta, String resourcePath) throws Exception {
+ public FileObject createNotExists(DataCollectionMeta meta, String resourcePath) throws Exception {
VfsParam param = new VfsParam();
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
- FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
- if (fo.exists()) {
- if (FileType.FOLDER.equals(fo.getType())) {
- logger.error("File {} is a directory!", resourcePath);
- throw new FileNotFoundException("File " + resourcePath + " is a directory!");
+ try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
+ if (fo.exists()) {
+ if (FileType.FOLDER.equals(fo.getType())) {
+ logger.error("File {} is a directory!", resourcePath);
+ throw new FileNotFoundException("File " + resourcePath + " is a directory!");
+ } else {
+ logger.warn("File {} already exists!,Overwrite", resourcePath);
+ }
} else {
- logger.warn("File " + resourcePath + " already exists!,Overwrite");
- }
- } else {
- if (!fo.getParent().exists()) {
- fo.getParent().createFolder();
+ if (!fo.getParent().exists()) {
+ fo.getParent().createFolder();
+ }
+ //fo.createFile();
}
- //fo.createFile();
+ return fo;
+ } catch (FileSystemException ex) {
+ logger.info("{}", ex.getMessage());
}
- return fo;
+ return null;
}
public boolean checkFileExist(VfsParam param, String resourcePath) throws Exception {
- FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
- if (fo.exists()) {
- return true;
- } else {
- return false;
+ try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
+ return fo.exists();
+ } catch (FileSystemException ex) {
+ throw ex;
}
}
@@ -186,8 +190,9 @@ public boolean checkFileExist(VfsParam param, String resourcePath) throws Except
public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException {
OutputStream out;
try {
- FileObject fo = checkFileExist(meta, resourcePath);
- out = fo.getContent().getOutputStream();
+ FileObject fileObject = createNotExists(meta, resourcePath);
+ meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
+ out = fileObject.getContent().getOutputStream();
return out;
} catch (Exception ex) {
throw new IOException(ex);
@@ -199,9 +204,9 @@ public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePat
VfsParam param = new VfsParam();
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
-
- FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
- return getRawInResource(fo, meta);
+ FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
+ meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
+ return getRawInResource(fileObject, meta);
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -224,11 +229,7 @@ private static FileSystemOptions getOptions(VfsParam param) throws Exception {
FileSystemOptions opts = new FileSystemOptions();
if (Const.VFS_PROTOCOL.SFTP.getValue().equalsIgnoreCase(param.getProtocol())) {
SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(opts, "no");
- if (param.isLockDir()) {
- SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, true);
- } else {
- SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, false);
- }
+ SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, param.isLockDir());
} else if (Const.VFS_PROTOCOL.FTP.getValue().equalsIgnoreCase(param.getProtocol())) {
FtpFileSystemConfigBuilder builder = FtpFileSystemConfigBuilder.getInstance();
if (param.isLockDir()) {
@@ -238,23 +239,14 @@ private static FileSystemOptions getOptions(VfsParam param) throws Exception {
logger.debug("--- using passive mode ------");
builder.setPassiveMode(opts, true);
}
- } else if (Const.VFS_PROTOCOL.HTTP.getValue().equalsIgnoreCase(param.getProtocol())) {
- HttpFileSystemConfigBuilder builder = HttpFileSystemConfigBuilder.getInstance();
- if(!ObjectUtils.isEmpty(param.getProxyHost())){
- builder.setProxyHost(opts,param.getProxyHost());
- }
- if(!ObjectUtils.isEmpty(param.getProxyPort())){
- builder.setProxyPort(opts, param.getProxyPort());
- }
- }else if(Const.VFS_PROTOCOL.HTTPS.getValue().equalsIgnoreCase(param.getProtocol())){
+ } else if (Const.VFS_PROTOCOL.HTTP.getValue().equalsIgnoreCase(param.getProtocol()) || Const.VFS_PROTOCOL.HTTPS.getValue().equalsIgnoreCase(param.getProtocol())) {
HttpFileSystemConfigBuilder builder = HttpFileSystemConfigBuilder.getInstance();
- if(!ObjectUtils.isEmpty(param.getProxyHost())){
- builder.setProxyHost(opts,param.getProxyHost());
+ if (!ObjectUtils.isEmpty(param.getProxyHost())) {
+ builder.setProxyHost(opts, param.getProxyHost());
}
- if(!ObjectUtils.isEmpty(param.getProxyPort())){
+ if (!ObjectUtils.isEmpty(param.getProxyPort())) {
builder.setProxyPort(opts, param.getProxyPort());
}
-
}
return opts;
@@ -287,8 +279,9 @@ public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOExc
VfsParam param = new VfsParam();
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
- FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
- return fo.exists();
+ try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
+ return fo.exists();
+ }
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -299,12 +292,26 @@ public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) thr
VfsParam param = new VfsParam();
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
-
- FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
- return fo.getContent().getSize();
+ try (FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
+ return fileObject.getContent().getSize();
+ }
} catch (Exception ex) {
throw new IOException(ex);
}
}
+
+ private synchronized String setProcessId(FileObject fileObject) {
+ String processId = Thread.currentThread().getId() + "_" + System.currentTimeMillis();
+ objectMap.put(processId, fileObject);
+ return processId;
+ }
+
+
+ public void closeWithProcessId(String processId) throws IOException {
+ if (objectMap.containsKey(processId)) {
+ objectMap.get(processId).close();
+ objectMap.remove(processId);
+ }
+ }
}
diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java
index ebe09cb3..93d0c514 100644
--- a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java
+++ b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java
@@ -16,8 +16,10 @@
package com.robin.core.fileaccess.iterator;
import com.robin.comm.dal.pool.ResourceAccessHolder;
+import com.robin.core.base.util.Const;
import com.robin.core.base.util.IOUtils;
import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor;
+import com.robin.core.fileaccess.fs.ApacheVfsFileSystemAccessor;
import com.robin.core.fileaccess.meta.DataCollectionMeta;
import com.robin.core.fileaccess.meta.DataSetColumnMeta;
import com.robin.core.fileaccess.util.ResourceUtil;
@@ -68,11 +70,11 @@ public AbstractFileIterator(DataCollectionMeta colmeta, AbstractFileSystemAccess
}
@Override
- public void beforeProcess(String resourcePath) {
- checkAccessUtil(resourcePath);
+ public void beforeProcess() {
+ checkAccessUtil(colmeta.getPath());
Assert.notNull(accessUtil, "ResourceAccessUtil is required!");
try {
- Pair pair = accessUtil.getInResourceByReader(colmeta, ResourceUtil.getProcessPath(resourcePath));
+ Pair pair = accessUtil.getInResourceByReader(colmeta, ResourceUtil.getProcessPath(colmeta.getPath()));
this.reader = pair.getKey();
this.instream = pair.getValue();
} catch (Exception ex) {
@@ -89,10 +91,6 @@ public void afterProcess() {
}
}
- @Override
- public void init() {
-
- }
protected void checkAccessUtil(String inputPath) {
try {
@@ -131,6 +129,13 @@ public void close() throws IOException {
if (instream != null) {
instream.close();
}
+ if(accessUtil!=null){
+ if(ApacheVfsFileSystemAccessor.class.isAssignableFrom(accessUtil.getClass())) {
+ if(!ObjectUtils.isEmpty(colmeta.getResourceCfgMap().get(Const.ITERATOR_PROCESSID))) {
+ ((ApacheVfsFileSystemAccessor)accessUtil).closeWithProcessId(colmeta.getResourceCfgMap().get(Const.ITERATOR_PROCESSID).toString());
+ }
+ }
+ }
}
@Override
public String getIdentifier() {
diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java
index b7963f11..45d860c0 100644
--- a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java
+++ b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java
@@ -33,7 +33,7 @@ protected AbstractResIterator(DataCollectionMeta colmeta){
}
}
@Override
- public void beforeProcess(String param) {
+ public void beforeProcess() {
}
diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java
new file mode 100644
index 00000000..093b44ac
--- /dev/null
+++ b/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java
@@ -0,0 +1,73 @@
+package com.robin.core.fileaccess.iterator;
+
+import com.google.common.collect.Lists;
+import com.robin.core.base.util.Const;
+import com.robin.core.fileaccess.meta.DataCollectionMeta;
+import com.robin.core.fileaccess.meta.DataSetColumnMeta;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.StringUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+
+public class ArffFileIterator extends PlainTextFileIterator{
+ public ArffFileIterator(){
+ identifier= Const.FILEFORMATSTR.ARFF.getValue();
+ }
+
+ public ArffFileIterator(DataCollectionMeta colmeta) {
+ super(colmeta);
+ identifier= Const.FILEFORMATSTR.ARFF.getValue();
+ }
+
+ @Override
+ public void beforeProcess() {
+ super.beforeProcess();
+ if(CollectionUtils.isEmpty(colmeta.getColumnList())){
+ if(!ObjectUtils.isEmpty(reader)){
+ try {
+ while (!(readLineStr = reader.readLine()).equalsIgnoreCase("@data")) {
+ if(StringUtils.startsWithIgnoreCase(readLineStr,"@RELATION ")){
+ String relationName=readLineStr.substring(10).replace("'","");
+ colmeta.getResourceCfgMap().put("relationName",relationName);
+ }else if(StringUtils.startsWithIgnoreCase(readLineStr,"@attribute ")){
+ colmeta.addColumnMeta(parseDefine(readLineStr.substring(11)));
+ }
+ }
+ }catch (IOException ex){
+ logger.info("{}",ex.getMessage());
+ }
+ }
+ }
+ }
+ private DataSetColumnMeta parseDefine(String content){
+ String[] arr=content.trim().split("\\||\\t");
+ String type=arr[1].trim();
+ String columnName=arr[0].trim();
+ DataSetColumnMeta columnMeta=null;
+ if("REAL".equalsIgnoreCase(arr[1]) || "numeric".equalsIgnoreCase(arr[1])){
+ columnMeta=new DataSetColumnMeta(columnName,Const.META_TYPE_DOUBLE,null);
+ }else if("string".equalsIgnoreCase(arr[1])){
+ columnMeta=new DataSetColumnMeta(columnName,Const.META_TYPE_STRING,null);
+ }else if("date".equalsIgnoreCase(type)){
+ columnMeta=new DataSetColumnMeta(columnName,Const.META_TYPE_TIMESTAMP,null);
+ }else if(type.startsWith("{")){
+ List nominalValues= Lists.newArrayList(type.substring(1,type.length()-1).split(","));
+ columnMeta=new DataSetColumnMeta(columnName,Const.META_TYPE_STRING,null);
+ columnMeta.setNominalValues(nominalValues);
+ }
+ return columnMeta;
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean hasRecord=super.hasNext();
+ if(readLineStr.contains(",") || readLineStr.contains("@")){
+ return hasRecord;
+ }else{
+ return false;
+ }
+ }
+}
diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java
index 6919ae43..c337e5e4 100644
--- a/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java
+++ b/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java
@@ -6,8 +6,7 @@
import java.util.Map;
public interface IResourceIterator extends Iterator