Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,32 @@
<artifactId>dom4j</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.anarres.lzo</groupId>
<artifactId>lzo-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.anarres.lzo</groupId>
<artifactId>lzo-hadoop</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void returnOutputStreamHolder(OutputStreamHolder holder) throws Exception
throw new RuntimeException("output strem config not found!");
}
}
public JdbcResourceHolder getPoolJdbcHolder(Long sourceId, DataCollectionMeta meta, HikariConfig config) throws Exception{
public JdbcResourceHolder getPoolJdbcHolder(Long sourceId, DataCollectionMeta meta, HikariConfig config) {
if(jdbcResourceHolderMap.get(sourceId)==null){
JdbcResourceHolder holder=new JdbcResourceHolder(meta,config);
jdbcResourceHolderMap.put(sourceId,holder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,5 @@ public void init(DataCollectionMeta meta){
public String getIdentifier() {
return identifier;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import com.robin.core.fileaccess.meta.DataCollectionMeta;
import com.robin.core.fileaccess.meta.VfsParam;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemManager;
import org.apache.commons.vfs2.FileSystemOptions;
import org.apache.commons.vfs2.FileType;
import org.apache.commons.vfs2.*;
import org.apache.commons.vfs2.impl.StandardFileSystemManager;
import org.apache.commons.vfs2.provider.ftp.FtpFileSystemConfigBuilder;
import org.apache.commons.vfs2.provider.http.HttpFileSystemConfigBuilder;
Expand All @@ -17,15 +14,19 @@
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;

import java.io.FileNotFoundException;
import java.io.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ApacheVfsFileSystemAccessor extends AbstractFileSystemAccessor {
private Map<String, FileObject> objectMap = new HashMap<>();

public ApacheVfsFileSystemAccessor() {
this.identifier= Const.FILESYSTEM.VFS.getValue();
this.identifier = Const.FILESYSTEM.VFS.getValue();
try {
manager = new StandardFileSystemManager();
logger.info(" manager {} ", manager);
Expand All @@ -39,29 +40,30 @@ public ApacheVfsFileSystemAccessor() {
private static final Logger logger = LoggerFactory.getLogger(ApacheVfsFileSystemAccessor.class);

@Override
public Pair<BufferedReader,InputStream> getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException {
public Pair<BufferedReader, InputStream> getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException {
VfsParam param = new VfsParam();
InputStream stream;
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());

FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
stream=getInResource(fo, meta);
return Pair.of(new BufferedReader(new InputStreamReader(stream, meta.getEncode())),stream);
FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
stream = getInResource(fileObject, meta);
return Pair.of(new BufferedReader(new InputStreamReader(stream, meta.getEncode())), stream);
} catch (Exception ex) {
throw new IOException(ex);
}
}

@Override
public Pair<BufferedWriter,OutputStream> getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException {
public Pair<BufferedWriter, OutputStream> getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException {
BufferedWriter writer;
OutputStream outputStream;
try {
FileObject fo = checkFileExist(meta, resourcePath);
outputStream=fo.getContent().getOutputStream();
FileObject fileObject = createNotExists(meta, resourcePath);
meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
outputStream = fileObject.getContent().getOutputStream();
writer = getWriterByPath(resourcePath, outputStream, meta.getEncode());
return Pair.of(writer,outputStream);
return Pair.of(writer, outputStream);
} catch (Exception ex) {
throw new IOException(ex);
}
Expand All @@ -71,8 +73,9 @@ public Pair<BufferedWriter,OutputStream> getOutResourceByWriter(DataCollectionMe
public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException {
OutputStream out;
try {
FileObject fo = checkFileExist(meta, resourcePath);
out = getOutputStreamByPath(resourcePath, fo.getContent().getOutputStream());
FileObject fileObject = createNotExists(meta, resourcePath);
meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
out = getOutputStreamByPath(resourcePath, fileObject.getContent().getOutputStream());
return out;
} catch (Exception ex) {
throw new IOException(ex);
Expand All @@ -84,9 +87,9 @@ public InputStream getInResourceByStream(DataCollectionMeta meta, String resourc
VfsParam param = new VfsParam();
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());

FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
return getInResource(fo, meta);
FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
return getInResource(fileObject, meta);
} catch (Exception ex) {
throw new IOException(ex);
}
Expand Down Expand Up @@ -117,7 +120,7 @@ private InputStream getRawInResource(FileObject fo, DataCollectionMeta meta) thr
return reader;
}

public static InputStream getInResource(FileObject fo, DataCollectionMeta meta) throws Exception {
public static InputStream getInResource(FileObject fo, DataCollectionMeta meta) throws IOException {
InputStream reader;
if (fo.exists()) {
if (FileType.FOLDER.equals(fo.getType())) {
Expand All @@ -134,8 +137,7 @@ public static InputStream getInResource(FileObject fo, DataCollectionMeta meta)

public List<String> listFilePath(VfsParam param, String path) {
List<String> list = new ArrayList<>();
try {
FileObject fo = manager.resolveFile(getUriByParam(param, path).toString(), getOptions(param));
try (FileObject fo = manager.resolveFile(getUriByParam(param, path).toString(), getOptions(param))) {
if (FileType.FOLDER.equals(fo.getType())) {
FileObject[] object = fo.getChildren();
if (!ObjectUtils.isEmpty(object)) {
Expand All @@ -146,48 +148,51 @@ public List<String> listFilePath(VfsParam param, String path) {
}
}
}

} catch (Exception ex) {
ex.printStackTrace();
}
return list;
}

public FileObject checkFileExist(DataCollectionMeta meta, String resourcePath) throws Exception {
public FileObject createNotExists(DataCollectionMeta meta, String resourcePath) throws Exception {
VfsParam param = new VfsParam();
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
if (fo.exists()) {
if (FileType.FOLDER.equals(fo.getType())) {
logger.error("File {} is a directory!", resourcePath);
throw new FileNotFoundException("File " + resourcePath + " is a directory!");
try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
if (fo.exists()) {
if (FileType.FOLDER.equals(fo.getType())) {
logger.error("File {} is a directory!", resourcePath);
throw new FileNotFoundException("File " + resourcePath + " is a directory!");
} else {
logger.warn("File {} already exists!,Overwrite", resourcePath);
}
} else {
logger.warn("File " + resourcePath + " already exists!,Overwrite");
}
} else {
if (!fo.getParent().exists()) {
fo.getParent().createFolder();
if (!fo.getParent().exists()) {
fo.getParent().createFolder();
}
//fo.createFile();
}
//fo.createFile();
return fo;
} catch (FileSystemException ex) {
logger.info("{}", ex.getMessage());
}
return fo;
return null;
}

public boolean checkFileExist(VfsParam param, String resourcePath) throws Exception {
FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
if (fo.exists()) {
return true;
} else {
return false;
try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
return fo.exists();
} catch (FileSystemException ex) {
throw ex;
}
}

@Override
public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException {
OutputStream out;
try {
FileObject fo = checkFileExist(meta, resourcePath);
out = fo.getContent().getOutputStream();
FileObject fileObject = createNotExists(meta, resourcePath);
meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
out = fileObject.getContent().getOutputStream();
return out;
} catch (Exception ex) {
throw new IOException(ex);
Expand All @@ -199,9 +204,9 @@ public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePat
VfsParam param = new VfsParam();
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());

FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
return getRawInResource(fo, meta);
FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject));
return getRawInResource(fileObject, meta);
} catch (Exception ex) {
throw new IOException(ex);
}
Expand All @@ -224,11 +229,7 @@ private static FileSystemOptions getOptions(VfsParam param) throws Exception {
FileSystemOptions opts = new FileSystemOptions();
if (Const.VFS_PROTOCOL.SFTP.getValue().equalsIgnoreCase(param.getProtocol())) {
SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(opts, "no");
if (param.isLockDir()) {
SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, true);
} else {
SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, false);
}
SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, param.isLockDir());
} else if (Const.VFS_PROTOCOL.FTP.getValue().equalsIgnoreCase(param.getProtocol())) {
FtpFileSystemConfigBuilder builder = FtpFileSystemConfigBuilder.getInstance();
if (param.isLockDir()) {
Expand All @@ -238,23 +239,14 @@ private static FileSystemOptions getOptions(VfsParam param) throws Exception {
logger.debug("--- using passive mode ------");
builder.setPassiveMode(opts, true);
}
} else if (Const.VFS_PROTOCOL.HTTP.getValue().equalsIgnoreCase(param.getProtocol())) {
HttpFileSystemConfigBuilder builder = HttpFileSystemConfigBuilder.getInstance();
if(!ObjectUtils.isEmpty(param.getProxyHost())){
builder.setProxyHost(opts,param.getProxyHost());
}
if(!ObjectUtils.isEmpty(param.getProxyPort())){
builder.setProxyPort(opts, param.getProxyPort());
}
}else if(Const.VFS_PROTOCOL.HTTPS.getValue().equalsIgnoreCase(param.getProtocol())){
} else if (Const.VFS_PROTOCOL.HTTP.getValue().equalsIgnoreCase(param.getProtocol()) || Const.VFS_PROTOCOL.HTTPS.getValue().equalsIgnoreCase(param.getProtocol())) {
HttpFileSystemConfigBuilder builder = HttpFileSystemConfigBuilder.getInstance();
if(!ObjectUtils.isEmpty(param.getProxyHost())){
builder.setProxyHost(opts,param.getProxyHost());
if (!ObjectUtils.isEmpty(param.getProxyHost())) {
builder.setProxyHost(opts, param.getProxyHost());
}
if(!ObjectUtils.isEmpty(param.getProxyPort())){
if (!ObjectUtils.isEmpty(param.getProxyPort())) {
builder.setProxyPort(opts, param.getProxyPort());
}

}

return opts;
Expand Down Expand Up @@ -287,8 +279,9 @@ public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOExc
VfsParam param = new VfsParam();
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());
FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
return fo.exists();
try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
return fo.exists();
}
} catch (Exception ex) {
throw new IOException(ex);
}
Expand All @@ -299,12 +292,26 @@ public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) thr
VfsParam param = new VfsParam();
try {
ConvertUtil.convertToTarget(param, meta.getResourceCfgMap());

FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param));
return fo.getContent().getSize();
try (FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) {
return fileObject.getContent().getSize();
}
} catch (Exception ex) {
throw new IOException(ex);
}
}

private synchronized String setProcessId(FileObject fileObject) {
String processId = Thread.currentThread().getId() + "_" + System.currentTimeMillis();
objectMap.put(processId, fileObject);
return processId;
}


public void closeWithProcessId(String processId) throws IOException {
if (objectMap.containsKey(processId)) {
objectMap.get(processId).close();
objectMap.remove(processId);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package com.robin.core.fileaccess.iterator;

import com.robin.comm.dal.pool.ResourceAccessHolder;
import com.robin.core.base.util.Const;
import com.robin.core.base.util.IOUtils;
import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor;
import com.robin.core.fileaccess.fs.ApacheVfsFileSystemAccessor;
import com.robin.core.fileaccess.meta.DataCollectionMeta;
import com.robin.core.fileaccess.meta.DataSetColumnMeta;
import com.robin.core.fileaccess.util.ResourceUtil;
Expand Down Expand Up @@ -68,11 +70,11 @@ public AbstractFileIterator(DataCollectionMeta colmeta, AbstractFileSystemAccess
}

@Override
public void beforeProcess(String resourcePath) {
checkAccessUtil(resourcePath);
public void beforeProcess() {
checkAccessUtil(colmeta.getPath());
Assert.notNull(accessUtil, "ResourceAccessUtil is required!");
try {
Pair<BufferedReader, InputStream> pair = accessUtil.getInResourceByReader(colmeta, ResourceUtil.getProcessPath(resourcePath));
Pair<BufferedReader, InputStream> pair = accessUtil.getInResourceByReader(colmeta, ResourceUtil.getProcessPath(colmeta.getPath()));
this.reader = pair.getKey();
this.instream = pair.getValue();
} catch (Exception ex) {
Expand All @@ -89,10 +91,6 @@ public void afterProcess() {
}
}

@Override
public void init() {

}

protected void checkAccessUtil(String inputPath) {
try {
Expand Down Expand Up @@ -131,6 +129,13 @@ public void close() throws IOException {
if (instream != null) {
instream.close();
}
if(accessUtil!=null){
if(ApacheVfsFileSystemAccessor.class.isAssignableFrom(accessUtil.getClass())) {
if(!ObjectUtils.isEmpty(colmeta.getResourceCfgMap().get(Const.ITERATOR_PROCESSID))) {
((ApacheVfsFileSystemAccessor)accessUtil).closeWithProcessId(colmeta.getResourceCfgMap().get(Const.ITERATOR_PROCESSID).toString());
}
}
}
}
@Override
public String getIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected AbstractResIterator(DataCollectionMeta colmeta){
}
}
@Override
public void beforeProcess(String param) {
public void beforeProcess() {

}

Expand Down
Loading
Loading