diff --git a/common/pom.xml b/common/pom.xml index b72fce11..56de9af4 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -299,6 +299,32 @@ dom4j true + + org.anarres.lzo + lzo-core + true + + + org.anarres.lzo + lzo-hadoop + true + + + org.xerial.snappy + snappy-java + true + + + org.tukaani + xz + true + + + + org.lz4 + lz4-java + true + src/main/java diff --git a/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java b/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java index 14de0c7e..b40b636f 100644 --- a/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java +++ b/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java @@ -156,7 +156,7 @@ public void returnOutputStreamHolder(OutputStreamHolder holder) throws Exception throw new RuntimeException("output strem config not found!"); } } - public JdbcResourceHolder getPoolJdbcHolder(Long sourceId, DataCollectionMeta meta, HikariConfig config) throws Exception{ + public JdbcResourceHolder getPoolJdbcHolder(Long sourceId, DataCollectionMeta meta, HikariConfig config) { if(jdbcResourceHolderMap.get(sourceId)==null){ JdbcResourceHolder holder=new JdbcResourceHolder(meta,config); jdbcResourceHolderMap.put(sourceId,holder); diff --git a/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java b/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java index b57c9c03..6a318936 100644 --- a/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java +++ b/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java @@ -104,4 +104,5 @@ public void init(DataCollectionMeta meta){ public String getIdentifier() { return identifier; } + } diff --git a/common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java b/common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java index cf619c71..0e860532 100644 --- a/common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java +++ b/common/src/main/java/com/robin/core/fileaccess/fs/ApacheVfsFileSystemAccessor.java @@ -5,10 +5,7 @@ import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.VfsParam; import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.vfs2.FileObject; -import org.apache.commons.vfs2.FileSystemManager; -import org.apache.commons.vfs2.FileSystemOptions; -import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.*; import org.apache.commons.vfs2.impl.StandardFileSystemManager; import org.apache.commons.vfs2.provider.ftp.FtpFileSystemConfigBuilder; import org.apache.commons.vfs2.provider.http.HttpFileSystemConfigBuilder; @@ -17,15 +14,19 @@ import org.slf4j.LoggerFactory; import org.springframework.util.ObjectUtils; +import java.io.FileNotFoundException; import java.io.*; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class ApacheVfsFileSystemAccessor extends AbstractFileSystemAccessor { + private Map objectMap = new HashMap<>(); public ApacheVfsFileSystemAccessor() { - this.identifier= Const.FILESYSTEM.VFS.getValue(); + this.identifier = Const.FILESYSTEM.VFS.getValue(); try { manager = new StandardFileSystemManager(); logger.info(" manager {} ", manager); @@ -39,29 +40,30 @@ public ApacheVfsFileSystemAccessor() { private static final Logger logger = LoggerFactory.getLogger(ApacheVfsFileSystemAccessor.class); @Override - public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { + public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { VfsParam param = new VfsParam(); InputStream stream; try { ConvertUtil.convertToTarget(param, meta.getResourceCfgMap()); - - FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); - stream=getInResource(fo, meta); - return Pair.of(new BufferedReader(new InputStreamReader(stream, meta.getEncode())),stream); + FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); + meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject)); + stream = getInResource(fileObject, meta); + return Pair.of(new BufferedReader(new InputStreamReader(stream, meta.getEncode())), stream); } catch (Exception ex) { throw new IOException(ex); } } @Override - public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException { + public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException { BufferedWriter writer; OutputStream outputStream; try { - FileObject fo = checkFileExist(meta, resourcePath); - outputStream=fo.getContent().getOutputStream(); + FileObject fileObject = createNotExists(meta, resourcePath); + meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject)); + outputStream = fileObject.getContent().getOutputStream(); writer = getWriterByPath(resourcePath, outputStream, meta.getEncode()); - return Pair.of(writer,outputStream); + return Pair.of(writer, outputStream); } catch (Exception ex) { throw new IOException(ex); } @@ -71,8 +73,9 @@ public Pair getOutResourceByWriter(DataCollectionMe public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { OutputStream out; try { - FileObject fo = checkFileExist(meta, resourcePath); - out = getOutputStreamByPath(resourcePath, fo.getContent().getOutputStream()); + FileObject fileObject = createNotExists(meta, resourcePath); + meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject)); + out = getOutputStreamByPath(resourcePath, fileObject.getContent().getOutputStream()); return out; } catch (Exception ex) { throw new IOException(ex); @@ -84,9 +87,9 @@ public InputStream getInResourceByStream(DataCollectionMeta meta, String resourc VfsParam param = new VfsParam(); try { ConvertUtil.convertToTarget(param, meta.getResourceCfgMap()); - - FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); - return getInResource(fo, meta); + FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); + meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject)); + return getInResource(fileObject, meta); } catch (Exception ex) { throw new IOException(ex); } @@ -117,7 +120,7 @@ private InputStream getRawInResource(FileObject fo, DataCollectionMeta meta) thr return reader; } - public static InputStream getInResource(FileObject fo, DataCollectionMeta meta) throws Exception { + public static InputStream getInResource(FileObject fo, DataCollectionMeta meta) throws IOException { InputStream reader; if (fo.exists()) { if (FileType.FOLDER.equals(fo.getType())) { @@ -134,8 +137,7 @@ public static InputStream getInResource(FileObject fo, DataCollectionMeta meta) public List listFilePath(VfsParam param, String path) { List list = new ArrayList<>(); - try { - FileObject fo = manager.resolveFile(getUriByParam(param, path).toString(), getOptions(param)); + try (FileObject fo = manager.resolveFile(getUriByParam(param, path).toString(), getOptions(param))) { if (FileType.FOLDER.equals(fo.getType())) { FileObject[] object = fo.getChildren(); if (!ObjectUtils.isEmpty(object)) { @@ -146,39 +148,41 @@ public List listFilePath(VfsParam param, String path) { } } } - } catch (Exception ex) { ex.printStackTrace(); } return list; } - public FileObject checkFileExist(DataCollectionMeta meta, String resourcePath) throws Exception { + public FileObject createNotExists(DataCollectionMeta meta, String resourcePath) throws Exception { VfsParam param = new VfsParam(); ConvertUtil.convertToTarget(param, meta.getResourceCfgMap()); - FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); - if (fo.exists()) { - if (FileType.FOLDER.equals(fo.getType())) { - logger.error("File {} is a directory!", resourcePath); - throw new FileNotFoundException("File " + resourcePath + " is a directory!"); + try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) { + if (fo.exists()) { + if (FileType.FOLDER.equals(fo.getType())) { + logger.error("File {} is a directory!", resourcePath); + throw new FileNotFoundException("File " + resourcePath + " is a directory!"); + } else { + logger.warn("File {} already exists!,Overwrite", resourcePath); + } } else { - logger.warn("File " + resourcePath + " already exists!,Overwrite"); - } - } else { - if (!fo.getParent().exists()) { - fo.getParent().createFolder(); + if (!fo.getParent().exists()) { + fo.getParent().createFolder(); + } + //fo.createFile(); } - //fo.createFile(); + return fo; + } catch (FileSystemException ex) { + logger.info("{}", ex.getMessage()); } - return fo; + return null; } public boolean checkFileExist(VfsParam param, String resourcePath) throws Exception { - FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); - if (fo.exists()) { - return true; - } else { - return false; + try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) { + return fo.exists(); + } catch (FileSystemException ex) { + throw ex; } } @@ -186,8 +190,9 @@ public boolean checkFileExist(VfsParam param, String resourcePath) throws Except public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException { OutputStream out; try { - FileObject fo = checkFileExist(meta, resourcePath); - out = fo.getContent().getOutputStream(); + FileObject fileObject = createNotExists(meta, resourcePath); + meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject)); + out = fileObject.getContent().getOutputStream(); return out; } catch (Exception ex) { throw new IOException(ex); @@ -199,9 +204,9 @@ public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePat VfsParam param = new VfsParam(); try { ConvertUtil.convertToTarget(param, meta.getResourceCfgMap()); - - FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); - return getRawInResource(fo, meta); + FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); + meta.getResourceCfgMap().put(Const.ITERATOR_PROCESSID, setProcessId(fileObject)); + return getRawInResource(fileObject, meta); } catch (Exception ex) { throw new IOException(ex); } @@ -224,11 +229,7 @@ private static FileSystemOptions getOptions(VfsParam param) throws Exception { FileSystemOptions opts = new FileSystemOptions(); if (Const.VFS_PROTOCOL.SFTP.getValue().equalsIgnoreCase(param.getProtocol())) { SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(opts, "no"); - if (param.isLockDir()) { - SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, true); - } else { - SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, false); - } + SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, param.isLockDir()); } else if (Const.VFS_PROTOCOL.FTP.getValue().equalsIgnoreCase(param.getProtocol())) { FtpFileSystemConfigBuilder builder = FtpFileSystemConfigBuilder.getInstance(); if (param.isLockDir()) { @@ -238,23 +239,14 @@ private static FileSystemOptions getOptions(VfsParam param) throws Exception { logger.debug("--- using passive mode ------"); builder.setPassiveMode(opts, true); } - } else if (Const.VFS_PROTOCOL.HTTP.getValue().equalsIgnoreCase(param.getProtocol())) { - HttpFileSystemConfigBuilder builder = HttpFileSystemConfigBuilder.getInstance(); - if(!ObjectUtils.isEmpty(param.getProxyHost())){ - builder.setProxyHost(opts,param.getProxyHost()); - } - if(!ObjectUtils.isEmpty(param.getProxyPort())){ - builder.setProxyPort(opts, param.getProxyPort()); - } - }else if(Const.VFS_PROTOCOL.HTTPS.getValue().equalsIgnoreCase(param.getProtocol())){ + } else if (Const.VFS_PROTOCOL.HTTP.getValue().equalsIgnoreCase(param.getProtocol()) || Const.VFS_PROTOCOL.HTTPS.getValue().equalsIgnoreCase(param.getProtocol())) { HttpFileSystemConfigBuilder builder = HttpFileSystemConfigBuilder.getInstance(); - if(!ObjectUtils.isEmpty(param.getProxyHost())){ - builder.setProxyHost(opts,param.getProxyHost()); + if (!ObjectUtils.isEmpty(param.getProxyHost())) { + builder.setProxyHost(opts, param.getProxyHost()); } - if(!ObjectUtils.isEmpty(param.getProxyPort())){ + if (!ObjectUtils.isEmpty(param.getProxyPort())) { builder.setProxyPort(opts, param.getProxyPort()); } - } return opts; @@ -287,8 +279,9 @@ public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOExc VfsParam param = new VfsParam(); try { ConvertUtil.convertToTarget(param, meta.getResourceCfgMap()); - FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); - return fo.exists(); + try (FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) { + return fo.exists(); + } } catch (Exception ex) { throw new IOException(ex); } @@ -299,12 +292,26 @@ public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) thr VfsParam param = new VfsParam(); try { ConvertUtil.convertToTarget(param, meta.getResourceCfgMap()); - - FileObject fo = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param)); - return fo.getContent().getSize(); + try (FileObject fileObject = manager.resolveFile(getUriByParam(param, resourcePath).toString(), getOptions(param))) { + return fileObject.getContent().getSize(); + } } catch (Exception ex) { throw new IOException(ex); } } + + private synchronized String setProcessId(FileObject fileObject) { + String processId = Thread.currentThread().getId() + "_" + System.currentTimeMillis(); + objectMap.put(processId, fileObject); + return processId; + } + + + public void closeWithProcessId(String processId) throws IOException { + if (objectMap.containsKey(processId)) { + objectMap.get(processId).close(); + objectMap.remove(processId); + } + } } diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java index ebe09cb3..93d0c514 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java @@ -16,8 +16,10 @@ package com.robin.core.fileaccess.iterator; import com.robin.comm.dal.pool.ResourceAccessHolder; +import com.robin.core.base.util.Const; import com.robin.core.base.util.IOUtils; import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; +import com.robin.core.fileaccess.fs.ApacheVfsFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; import com.robin.core.fileaccess.util.ResourceUtil; @@ -68,11 +70,11 @@ public AbstractFileIterator(DataCollectionMeta colmeta, AbstractFileSystemAccess } @Override - public void beforeProcess(String resourcePath) { - checkAccessUtil(resourcePath); + public void beforeProcess() { + checkAccessUtil(colmeta.getPath()); Assert.notNull(accessUtil, "ResourceAccessUtil is required!"); try { - Pair pair = accessUtil.getInResourceByReader(colmeta, ResourceUtil.getProcessPath(resourcePath)); + Pair pair = accessUtil.getInResourceByReader(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); this.reader = pair.getKey(); this.instream = pair.getValue(); } catch (Exception ex) { @@ -89,10 +91,6 @@ public void afterProcess() { } } - @Override - public void init() { - - } protected void checkAccessUtil(String inputPath) { try { @@ -131,6 +129,13 @@ public void close() throws IOException { if (instream != null) { instream.close(); } + if(accessUtil!=null){ + if(ApacheVfsFileSystemAccessor.class.isAssignableFrom(accessUtil.getClass())) { + if(!ObjectUtils.isEmpty(colmeta.getResourceCfgMap().get(Const.ITERATOR_PROCESSID))) { + ((ApacheVfsFileSystemAccessor)accessUtil).closeWithProcessId(colmeta.getResourceCfgMap().get(Const.ITERATOR_PROCESSID).toString()); + } + } + } } @Override public String getIdentifier() { diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java index b7963f11..45d860c0 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java @@ -33,7 +33,7 @@ protected AbstractResIterator(DataCollectionMeta colmeta){ } } @Override - public void beforeProcess(String param) { + public void beforeProcess() { } diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java new file mode 100644 index 00000000..093b44ac --- /dev/null +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java @@ -0,0 +1,73 @@ +package com.robin.core.fileaccess.iterator; + +import com.google.common.collect.Lists; +import com.robin.core.base.util.Const; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; + +import java.io.IOException; +import java.util.List; + + +public class ArffFileIterator extends PlainTextFileIterator{ + public ArffFileIterator(){ + identifier= Const.FILEFORMATSTR.ARFF.getValue(); + } + + public ArffFileIterator(DataCollectionMeta colmeta) { + super(colmeta); + identifier= Const.FILEFORMATSTR.ARFF.getValue(); + } + + @Override + public void beforeProcess() { + super.beforeProcess(); + if(CollectionUtils.isEmpty(colmeta.getColumnList())){ + if(!ObjectUtils.isEmpty(reader)){ + try { + while (!(readLineStr = reader.readLine()).equalsIgnoreCase("@data")) { + if(StringUtils.startsWithIgnoreCase(readLineStr,"@RELATION ")){ + String relationName=readLineStr.substring(10).replace("'",""); + colmeta.getResourceCfgMap().put("relationName",relationName); + }else if(StringUtils.startsWithIgnoreCase(readLineStr,"@attribute ")){ + colmeta.addColumnMeta(parseDefine(readLineStr.substring(11))); + } + } + }catch (IOException ex){ + logger.info("{}",ex.getMessage()); + } + } + } + } + private DataSetColumnMeta parseDefine(String content){ + String[] arr=content.trim().split("\\||\\t"); + String type=arr[1].trim(); + String columnName=arr[0].trim(); + DataSetColumnMeta columnMeta=null; + if("REAL".equalsIgnoreCase(arr[1]) || "numeric".equalsIgnoreCase(arr[1])){ + columnMeta=new DataSetColumnMeta(columnName,Const.META_TYPE_DOUBLE,null); + }else if("string".equalsIgnoreCase(arr[1])){ + columnMeta=new DataSetColumnMeta(columnName,Const.META_TYPE_STRING,null); + }else if("date".equalsIgnoreCase(type)){ + columnMeta=new DataSetColumnMeta(columnName,Const.META_TYPE_TIMESTAMP,null); + }else if(type.startsWith("{")){ + List nominalValues= Lists.newArrayList(type.substring(1,type.length()-1).split(",")); + columnMeta=new DataSetColumnMeta(columnName,Const.META_TYPE_STRING,null); + columnMeta.setNominalValues(nominalValues); + } + return columnMeta; + } + + @Override + public boolean hasNext() { + boolean hasRecord=super.hasNext(); + if(readLineStr.contains(",") || readLineStr.contains("@")){ + return hasRecord; + }else{ + return false; + } + } +} diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java index 6919ae43..c337e5e4 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java @@ -6,8 +6,7 @@ import java.util.Map; public interface IResourceIterator extends Iterator>, Closeable { - void init(); - void beforeProcess(String param); + void beforeProcess(); void afterProcess(); String getIdentifier(); void setInputStream(InputStream inputStream); diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java index 106490ec..56ac7b6d 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java @@ -20,8 +20,10 @@ import com.robin.core.convert.util.ConvertUtil; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import org.springframework.util.CollectionUtils; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; @@ -36,8 +38,8 @@ public JsonFileIterator(DataCollectionMeta metaList) { identifier= Const.FILEFORMATSTR.JSON.getValue(); } @Override - public void init() { - super.beforeProcess(colmeta.getPath()); + public void beforeProcess() { + super.beforeProcess(); jreader=new JsonReader(reader); try{ jreader.beginArray(); @@ -59,7 +61,7 @@ public boolean hasNext() { @Override public Map next() throws NoSuchElementException { - Map retmap=new HashMap(); + Map retmap=new HashMap<>(); DataSetColumnMeta meta=null; try{ if(jreader.hasNext()){ @@ -80,11 +82,11 @@ public Map next() throws NoSuchElementException { jreader.endObject(); } }catch(IOException ex){ - logger.error("{}",ex); - return null; + logger.error("{}",ex.getMessage()); + return Collections.emptyMap(); }catch (Exception e) { logger.error("{}",e.getMessage()); - return null; + return Collections.emptyMap(); } return retmap; } diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/PlainTextFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/PlainTextFileIterator.java index 36c88c5c..c042ce4e 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/PlainTextFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/PlainTextFileIterator.java @@ -20,14 +20,16 @@ import com.robin.core.convert.util.ConvertUtil; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import org.springframework.util.ObjectUtils; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; public class PlainTextFileIterator extends AbstractFileIterator{ - private String readLineStr=null; - private String split=","; + protected String readLineStr=null; + protected String split=","; public PlainTextFileIterator(){ identifier= Const.FILEFORMATSTR.CSV.getValue(); } @@ -36,19 +38,16 @@ public PlainTextFileIterator(DataCollectionMeta metaList) { identifier= Const.FILEFORMATSTR.CSV.getValue(); } - @Override - public void init() { - super.beforeProcess(colmeta.getPath()); - - } @Override public boolean hasNext() { boolean hasNext=false; try{ if(reader!=null){ - readLineStr=reader.readLine(); - if(readLineStr!=null) { + do{ + readLineStr=reader.readLine(); + } while(ObjectUtils.isEmpty(readLineStr)); + if(!ObjectUtils.isEmpty(readLineStr)) { hasNext=true; } } @@ -70,11 +69,11 @@ public Map next(){ } return map; }else{ - return null; + return Collections.emptyMap(); } }catch(Exception ex){ logger.error("{}",ex.getMessage()); - return null; + return Collections.emptyMap(); } } diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java b/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java index d306d3d4..a10419af 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java @@ -36,11 +36,6 @@ public class TextFileIteratorFactory { } public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colmeta) throws IOException{ IResourceIterator iterator=getIter(colmeta); - try { - iterator.init(); - }catch (Exception ex){ - log.error("{}",ex.getMessage()); - } return iterator; } public static AbstractFileIterator getProcessReaderIterator(DataCollectionMeta colmeta, AbstractFileSystemAccessor utils){ @@ -52,8 +47,7 @@ public static AbstractFileIterator getProcessReaderIterator(DataCollectionMeta c if (!ObjectUtils.isEmpty(iterclass)) { iterator = (AbstractFileIterator) iterclass.getConstructor(DataCollectionMeta.class, AbstractFileSystemAccessor.class).newInstance(colmeta,utils); } - iterator.beforeProcess(colmeta.getPath()); - iterator.init(); + iterator.beforeProcess(); }catch (Exception ex){ throw new MissingConfigException(ex); } @@ -63,13 +57,13 @@ public static AbstractFileIterator getProcessReaderIterator(DataCollectionMeta c public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colmeta, BufferedReader reader) throws IOException { IResourceIterator iterator=getIter(colmeta); iterator.setReader(reader); - iterator.init(); + iterator.beforeProcess(); return iterator; } public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colmeta, InputStream in) throws IOException{ IResourceIterator iterator=getIter(colmeta); iterator.setInputStream(in); - iterator.init(); + iterator.beforeProcess(); return iterator; } public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colmeta,InputStream in) throws IOException{ @@ -81,7 +75,7 @@ public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colm } IResourceIterator iterator=getIter(colmeta); iterator.setInputStream(in); - iterator.init(); + iterator.beforeProcess(); return iterator; } private static IResourceIterator getIter(DataCollectionMeta colmeta) throws MissingConfigException { @@ -93,7 +87,7 @@ private static IResourceIterator getIter(DataCollectionMeta colmeta) throws Miss if (!ObjectUtils.isEmpty(iterclass)) { iterator = iterclass.getConstructor(DataCollectionMeta.class).newInstance(colmeta); } - iterator.beforeProcess(colmeta.getPath()); + iterator.beforeProcess(); }catch (Exception ex){ throw new MissingConfigException(ex); } diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/XmlFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/XmlFileIterator.java index 46bc9a70..549cbcf6 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/XmlFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/XmlFileIterator.java @@ -47,7 +47,8 @@ public XmlFileIterator(DataCollectionMeta metaList) { } @Override - public void init() { + public void beforeProcess() { + super.beforeProcess(); try{ factory=XMLInputFactory.newFactory(); if(instream!=null) { diff --git a/common/src/main/java/com/robin/core/resaccess/iterator/AbstractQueueIterator.java b/common/src/main/java/com/robin/core/resaccess/iterator/AbstractQueueIterator.java index c3a8ff15..ec9fe306 100644 --- a/common/src/main/java/com/robin/core/resaccess/iterator/AbstractQueueIterator.java +++ b/common/src/main/java/com/robin/core/resaccess/iterator/AbstractQueueIterator.java @@ -25,7 +25,7 @@ public AbstractQueueIterator(){ } @Override - public void init() { + public void beforeProcess() { Assert.notNull(colmeta,""); schema= AvroUtils.getSchemaFromMeta(colmeta); cfgMap=colmeta.getResourceCfgMap(); diff --git a/common/src/main/java/com/robin/core/resaccess/iterator/JdbcResIterator.java b/common/src/main/java/com/robin/core/resaccess/iterator/JdbcResIterator.java index 4e6df963..123eae81 100644 --- a/common/src/main/java/com/robin/core/resaccess/iterator/JdbcResIterator.java +++ b/common/src/main/java/com/robin/core/resaccess/iterator/JdbcResIterator.java @@ -12,6 +12,7 @@ import org.apache.commons.dbutils.QueryRunner; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Retryable; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; import java.io.BufferedReader; @@ -23,7 +24,7 @@ @Slf4j -public class JdbcResIterator extends AbstractResIterator { +public class JdbcResIterator extends AbstractResIterator { private JdbcResourceHolder holder; private DbConnectionHolder connectionHolder; private String querySql; @@ -31,57 +32,46 @@ public class JdbcResIterator extends AbstractResIterator { private Statement statement; private PreparedStatement preparedStatement; - public JdbcResIterator(){ - + public JdbcResIterator() { + } public JdbcResIterator(DataCollectionMeta colmeta) { super(colmeta); } - @Retryable(maxAttempts = 10,backoff = @Backoff(delay = 60000)) - public void getConnection() throws Exception{ - if(holder==null) { - holder = SpringContextHolder.getBean(ResourceAccessHolder.class).getPoolJdbcHolder(colmeta.getDbSourceId(), colmeta,null); + @Retryable(maxAttempts = 10, backoff = @Backoff(delay = 60000)) + public void getConnection() { + if (holder == null) { + holder = SpringContextHolder.getBean(ResourceAccessHolder.class).getPoolJdbcHolder(colmeta.getDbSourceId(), colmeta, null); } - if(connectionHolder==null) { - connectionHolder = SpringContextHolder.getBean(ResourceAccessHolder.class).getConnectionHolder(colmeta.getDbSourceId(), colmeta.getDbMeta(),null); + if (connectionHolder == null) { + connectionHolder = SpringContextHolder.getBean(ResourceAccessHolder.class).getConnectionHolder(colmeta.getDbSourceId(), colmeta.getDbMeta(), null); } } - @Override - public void init() { - try { - getConnection(); - }catch (Exception ex){ - log.error("{}",ex); - } - } @Override - public void beforeProcess(String param) { + public void beforeProcess() { try { - if (param.toLowerCase().startsWith("select ")) { - querySql = param; - Object[] objs = null; - if (colmeta.getResourceCfgMap().containsKey("queryParams")) { - objs = (Object[]) colmeta.getResourceCfgMap().get("queryParams"); - } - if (!Objects.isNull(objs)) { - QueryRunner qRunner = new QueryRunner(); - preparedStatement = connectionHolder.getConnection().prepareStatement(querySql); - qRunner.fillStatement(preparedStatement, objs); - rs = preparedStatement.executeQuery(); - } else { - statement = connectionHolder.getConnection().createStatement(); - rs = statement.executeQuery(querySql); - } - }else{ + getConnection(); + Assert.notNull(colmeta.getResourceCfgMap().get("selectSql"),""); + querySql = colmeta.getResourceCfgMap().get("selectSql").toString(); + Object[] objs = null; + if (colmeta.getResourceCfgMap().containsKey("queryParams")) { + objs = (Object[]) colmeta.getResourceCfgMap().get("queryParams"); + } + if (!Objects.isNull(objs)) { + QueryRunner qRunner = new QueryRunner(); + preparedStatement = connectionHolder.getConnection().prepareStatement(querySql); + qRunner.fillStatement(preparedStatement, objs); + rs = preparedStatement.executeQuery(); + } else { statement = connectionHolder.getConnection().createStatement(); - rs = statement.executeQuery("select * from "+param); + rs = statement.executeQuery(querySql); } - }catch (SQLException ex){ + } catch (SQLException ex) { } } @@ -100,9 +90,9 @@ public void close() throws IOException { @Override public boolean hasNext() { - try{ + try { return rs.next(); - }catch (SQLException ex){ + } catch (SQLException ex) { return false; } } diff --git a/common/src/main/resources/META-INF/services/com.robin.core.fileaccess.iterator.IResourceIterator b/common/src/main/resources/META-INF/services/com.robin.core.fileaccess.iterator.IResourceIterator index fb8f76bb..c0019ebf 100644 --- a/common/src/main/resources/META-INF/services/com.robin.core.fileaccess.iterator.IResourceIterator +++ b/common/src/main/resources/META-INF/services/com.robin.core.fileaccess.iterator.IResourceIterator @@ -1,4 +1,5 @@ com.robin.core.fileaccess.iterator.JsonFileIterator com.robin.core.fileaccess.iterator.PlainTextFileIterator +com.robin.core.fileaccess.iterator.ArffFileIterator com.robin.core.fileaccess.iterator.XmlFileIterator com.robin.core.resaccess.iterator.JdbcResIterator diff --git a/common/src/test/java/com/robin/comm/test/TestExcelOperation.java b/common/src/test/java/com/robin/comm/test/TestExcelOperation.java index ad41d08d..29fd308b 100644 --- a/common/src/test/java/com/robin/comm/test/TestExcelOperation.java +++ b/common/src/test/java/com/robin/comm/test/TestExcelOperation.java @@ -51,17 +51,10 @@ public void testGenerate() throws Exception { AbstractResIterator iterator = new AbstractResIterator() { Map map = new HashMap<>(); int row = 0; - - @Override - public void init() { - - } - @Override - public void beforeProcess(String param) { + public void beforeProcess() { } - @Override public void afterProcess() { diff --git a/common/src/test/java/com/robin/comm/test/TestJsonRead.java b/common/src/test/java/com/robin/comm/test/TestJsonRead.java index 51031a51..7783b2e2 100644 --- a/common/src/test/java/com/robin/comm/test/TestJsonRead.java +++ b/common/src/test/java/com/robin/comm/test/TestJsonRead.java @@ -18,36 +18,36 @@ import java.util.Map; public class TestJsonRead { - public static void main(String[] args){ - Logger logger=LoggerFactory.getLogger(TestJsonRead.class); - try{ - DataCollectionMeta colmeta=new DataCollectionMeta(); - Map ftpparam=new HashMap(); - ftpparam.put("hostName", "localhost"); - ftpparam.put("protocol", "sftp"); - ftpparam.put("port", 22); - ftpparam.put("userName", "root"); - ftpparam.put("password", "root"); - colmeta.setResourceCfgMap(ftpparam); - colmeta.setPath("/tmp/robin/testdata/test1.avro.gz"); - colmeta.setEncode("UTF-8"); - //ftpparam.put("schemaContent", "{\"namespace\":\"com.robin.avro\",\"name\":\"Content\",\"type\":\"record\",\"fields\":[{\"name\":\"info_id\",\"type\":\"string\"},{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"content\",\"type\":\"string\"}]}"); - List> list=new ArrayList>(); - AbstractFileSystemAccessor util= FileSystemAccessorFactory.getResourceAccessorByType(Const.FILESYSTEM.VFS.getValue()); - //BufferedReader reader=util.getInResourceByReader(colmeta);//new BufferedReader(new FileReader(new File("e:/test1.data"))); - InputStream reader=util.getInResourceByStream(colmeta,colmeta.getPath() ); - IResourceIterator jreader=TextFileIteratorFactory.getProcessIteratorByType(colmeta, reader); - while(jreader.hasNext()){ - Map map=jreader.next(); - logger.info("{}",map); - list.add(map); - } - System.out.println(list); - System.out.println(list.size()); - }catch(Exception ex){ - ex.printStackTrace(); - } - } + public static void main(String[] args) { + Logger logger = LoggerFactory.getLogger(TestJsonRead.class); + + DataCollectionMeta colmeta = new DataCollectionMeta(); + Map ftpparam = new HashMap(); + ftpparam.put("hostName", "localhost"); + ftpparam.put("protocol", "sftp"); + ftpparam.put("port", 22); + ftpparam.put("userName", "root"); + ftpparam.put("password", "root"); + colmeta.setResourceCfgMap(ftpparam); + colmeta.setPath("/tmp/robin/testdata/test1.avro.gz"); + colmeta.setEncode("UTF-8"); + + //ftpparam.put("schemaContent", "{\"namespace\":\"com.robin.avro\",\"name\":\"Content\",\"type\":\"record\",\"fields\":[{\"name\":\"info_id\",\"type\":\"string\"},{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"content\",\"type\":\"string\"}]}"); + List> list = new ArrayList>(); + AbstractFileSystemAccessor util = FileSystemAccessorFactory.getResourceAccessorByType(Const.FILESYSTEM.VFS.getValue()); + try (InputStream reader = util.getInResourceByStream(colmeta, colmeta.getPath()); + IResourceIterator jreader = TextFileIteratorFactory.getProcessIteratorByType(colmeta, reader)) { + while (jreader.hasNext()) { + Map map = jreader.next(); + logger.info("{}", map); + list.add(map); + } + System.out.println(list); + System.out.println(list.size()); + } catch (Exception ex) { + ex.printStackTrace(); + } + } } diff --git a/common/src/test/java/com/robin/comm/test/TestXMLReader.java b/common/src/test/java/com/robin/comm/test/TestXMLReader.java index 23b63c9f..2a5bb80e 100644 --- a/common/src/test/java/com/robin/comm/test/TestXMLReader.java +++ b/common/src/test/java/com/robin/comm/test/TestXMLReader.java @@ -30,10 +30,7 @@ public static void main(String[] args) { colmeta.setEncode("UTF-8"); colmeta.setPath("f:/test.xml"); colmeta.setFileFormat(Const.FILETYPE_XML); - try { - - IResourceIterator iter = TextFileIteratorFactory.getProcessIteratorByType(colmeta); - //iter.beforeProcess(colmeta.getPath()); + try(IResourceIterator iter = TextFileIteratorFactory.getProcessIteratorByType(colmeta)) { while (iter.hasNext()) { System.out.println(iter.next()); } @@ -42,6 +39,20 @@ public static void main(String[] args) { ex.printStackTrace(); } } + @Test + public void testArffRead(){ + DataCollectionMeta colmeta = new DataCollectionMeta(); + colmeta.setPath("file:///f:/iris.arff"); + colmeta.setFileFormat(Const.FILEFORMATSTR.ARFF.getValue()); + try(IResourceIterator iterator=TextFileIteratorFactory.getProcessIteratorByType(colmeta)){ + while(iterator.hasNext()){ + System.out.println(iterator.next()); + } + }catch (Exception ex){ + ex.printStackTrace(); + } + + } @Test public void test1() throws Exception { diff --git a/core/src/main/java/com/robin/core/base/util/Const.java b/core/src/main/java/com/robin/core/base/util/Const.java index ff4f982a..5a913a0a 100644 --- a/core/src/main/java/com/robin/core/base/util/Const.java +++ b/core/src/main/java/com/robin/core/base/util/Const.java @@ -94,6 +94,7 @@ public class Const { public static final Integer COLUMN_VALID = 1; public static final Integer COLUMN_INVALID = 0; + public static final String ITERATOR_PROCESSID="$processId"; public enum FILEFORMAT { @@ -426,7 +427,9 @@ public enum FILEFORMATSTR { XML("xml"), PLAIN("txt"), CSV("csv"), - PROTOBUF("proto"); + PROTOBUF("proto"), + + ARFF("arff"); private String value; FILEFORMATSTR(String value){ this.value=value; diff --git a/core/src/main/java/com/robin/core/compress/util/CompressDecoder.java b/core/src/main/java/com/robin/core/compress/util/CompressDecoder.java index cb3db526..12381451 100644 --- a/core/src/main/java/com/robin/core/compress/util/CompressDecoder.java +++ b/core/src/main/java/com/robin/core/compress/util/CompressDecoder.java @@ -56,7 +56,7 @@ public static InputStream getInputStreamByCompressType(String path, InputStream inputStream=new LZ4FrameInputStream(wrapInputStream(rawstream)); break; case COMPRESS_TYPE_LZMA: - inputStream=new LZMAInputStream(wrapInputStream(rawstream)); + inputStream=new XZInputStream(wrapInputStream(rawstream)); break; case COMPRESS_TYPE_ZSTD: inputStream=new ZstdCompressorInputStream(wrapInputStream(rawstream)); diff --git a/core/src/main/java/com/robin/core/compress/util/CompressEncoder.java b/core/src/main/java/com/robin/core/compress/util/CompressEncoder.java index 4c7959ca..06a9d3a1 100644 --- a/core/src/main/java/com/robin/core/compress/util/CompressEncoder.java +++ b/core/src/main/java/com/robin/core/compress/util/CompressEncoder.java @@ -11,6 +11,7 @@ import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; import org.tukaani.xz.LZMA2Options; import org.tukaani.xz.LZMAOutputStream; +import org.tukaani.xz.XZOutputStream; import org.xerial.snappy.SnappyOutputStream; import java.io.BufferedOutputStream; @@ -64,7 +65,7 @@ public static OutputStream getOutputStreamByCompressType(String path,OutputStrea outputStream=new LZ4FrameOutputStream(wrapOutputStream(rawstream)); break; case COMPRESS_TYPE_LZMA: - outputStream=new LZMAOutputStream(wrapOutputStream(rawstream),new LZMA2Options(),false); + outputStream=new XZOutputStream(wrapOutputStream(rawstream),new LZMA2Options()); break; case COMPRESS_TYPE_ZSTD: outputStream=new ZstdCompressorOutputStream(wrapOutputStream(rawstream)); diff --git a/dataming/pom.xml b/dataming/pom.xml index f071fd37..9a1eb4c5 100644 --- a/dataming/pom.xml +++ b/dataming/pom.xml @@ -170,6 +170,11 @@ true + + com.github.haifengl + smile-core + 2.6.0 + diff --git a/dataming/src/main/java/com/robin/dataming/smile/algorithm/AbstractModeler.java b/dataming/src/main/java/com/robin/dataming/smile/algorithm/AbstractModeler.java new file mode 100644 index 00000000..20d8ea0a --- /dev/null +++ b/dataming/src/main/java/com/robin/dataming/smile/algorithm/AbstractModeler.java @@ -0,0 +1,25 @@ +package com.robin.dataming.smile.algorithm; + +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import smile.data.DataFrame; + +import java.util.Map; + +public abstract class AbstractModeler { + protected T model; + protected DataCollectionMeta collectionMeta; + protected IResourceIterator iterator; + + public AbstractModeler(DataCollectionMeta collectionMeta,IResourceIterator iterator){ + this.collectionMeta=collectionMeta; + this.iterator=iterator; + } + + public abstract T train(DataFrame trainDf, Map optionalMap); + public abstract String validate(DataFrame validateDf); + + public T getModel(){ + return model; + } +} diff --git a/dataming/src/main/java/com/robin/dataming/smile/algorithm/DBSCANModeler.java b/dataming/src/main/java/com/robin/dataming/smile/algorithm/DBSCANModeler.java new file mode 100644 index 00000000..faa069f6 --- /dev/null +++ b/dataming/src/main/java/com/robin/dataming/smile/algorithm/DBSCANModeler.java @@ -0,0 +1,41 @@ +package com.robin.dataming.smile.algorithm; + +import com.google.gson.Gson; +import com.robin.comm.util.json.GsonUtil; +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import smile.clustering.DBSCAN; +import smile.data.DataFrame; + +import java.util.HashMap; +import java.util.Map; + +public class DBSCANModeler extends AbstractModeler> { + private Gson gson= GsonUtil.getGson(); + public DBSCANModeler(DataCollectionMeta collectionMeta, IResourceIterator iterator) { + super(collectionMeta, iterator); + } + + public DBSCAN train(DataFrame trainDf, Map optionalMap){ + try{ + int minPts=(Integer)optionalMap.getOrDefault("minPts",4); + double radius=(Double)optionalMap.getOrDefault("radius",0.1); + model=DBSCAN.fit(trainDf.toArray(),minPts,radius); + return model; + }catch (Exception ex){ + ex.printStackTrace(); + } + return null; + } + + @Override + public String validate(DataFrame validateDf) { + Map rangeMap=new HashMap<>(); + validateDf.stream().forEach(f->{ + int range=model.predict(f.toArray()); + rangeMap.computeIfAbsent(range,key->1); + rangeMap.computeIfPresent(range,(key,v)->v=v+1); + }); + return gson.toJson(rangeMap); + } +} diff --git a/dataming/src/main/java/com/robin/dataming/smile/algorithm/DecisionTreeModeler.java b/dataming/src/main/java/com/robin/dataming/smile/algorithm/DecisionTreeModeler.java new file mode 100644 index 00000000..55405a45 --- /dev/null +++ b/dataming/src/main/java/com/robin/dataming/smile/algorithm/DecisionTreeModeler.java @@ -0,0 +1,41 @@ +package com.robin.dataming.smile.algorithm; + +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import org.springframework.util.CollectionUtils; +import smile.classification.DecisionTree; +import smile.data.DataFrame; +import smile.data.formula.Formula; +import smile.validation.metric.Accuracy; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DecisionTreeModeler extends AbstractModeler { + + public DecisionTreeModeler(DataCollectionMeta collectionMeta, IResourceIterator iterator) { + super(collectionMeta, iterator); + } + + @Override + public DecisionTree train(DataFrame trainDf, Map optionalMap) { + List fields=collectionMeta.getColumnList().stream().filter(f-> CollectionUtils.isEmpty(f.getNominalValues())).collect(Collectors.toList()); + DataSetColumnMeta labelField=collectionMeta.getColumnList().stream().filter(f->!CollectionUtils.isEmpty(f.getNominalValues())).findFirst().orElse(null); + + Formula formula=Formula.of(labelField.getColumnName(),fields.stream().map(DataSetColumnMeta::getColumnName).collect(Collectors.toList()).toArray(new String[]{})); + model=DecisionTree.fit(formula,trainDf); + return model; + } + + @Override + public String validate(DataFrame validateDf) { + DataSetColumnMeta labelField=collectionMeta.getColumnList().stream().filter(f->!CollectionUtils.isEmpty(f.getNominalValues())).findFirst().orElse(null); + + int[] predictIndex=model.predict(validateDf); + int[] trueIndex=validateDf.intVector(labelField.getColumnName()).stream().toArray(); + System.out.println(String.format("Accuracy:%f", Accuracy.of(trueIndex,predictIndex))); + return model.toString(); + } +} diff --git a/dataming/src/main/java/com/robin/dataming/smile/algorithm/RandomForestModeler.java b/dataming/src/main/java/com/robin/dataming/smile/algorithm/RandomForestModeler.java new file mode 100644 index 00000000..e90ebd32 --- /dev/null +++ b/dataming/src/main/java/com/robin/dataming/smile/algorithm/RandomForestModeler.java @@ -0,0 +1,39 @@ +package com.robin.dataming.smile.algorithm; + +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import org.springframework.util.CollectionUtils; +import smile.classification.RandomForest; +import smile.data.DataFrame; +import smile.data.formula.Formula; +import smile.validation.metric.Accuracy; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class RandomForestModeler extends AbstractModeler { + public RandomForestModeler(DataCollectionMeta collectionMeta, IResourceIterator iterator) { + super(collectionMeta, iterator); + } + + @Override + public RandomForest train(DataFrame trainDf, Map optionalMap) { + List fields=collectionMeta.getColumnList().stream().filter(f->CollectionUtils.isEmpty(f.getNominalValues())).collect(Collectors.toList()); + DataSetColumnMeta labelField=collectionMeta.getColumnList().stream().filter(f->!CollectionUtils.isEmpty(f.getNominalValues())).findFirst().orElse(null); + + Formula formula=Formula.of(labelField.getColumnName(),fields.stream().map(DataSetColumnMeta::getColumnName).collect(Collectors.toList()).toArray(new String[]{})); + model=RandomForest.fit(formula,trainDf); + return model; + } + + @Override + public String validate(DataFrame validateDf) { + DataSetColumnMeta labelField=collectionMeta.getColumnList().stream().filter(f->!CollectionUtils.isEmpty(f.getNominalValues())).findFirst().orElse(null); + int[] predictIndex=model.predict(validateDf); + int[] trueIndex=validateDf.intVector(labelField.getColumnName()).stream().toArray(); + System.out.println(String.format("Accuracy:%f", Accuracy.of(trueIndex,predictIndex))); + return model.trees()[0].toString(); + } +} diff --git a/dataming/src/main/java/com/robin/dataming/smile/algorithm/SmileSVMModeler.java b/dataming/src/main/java/com/robin/dataming/smile/algorithm/SmileSVMModeler.java new file mode 100644 index 00000000..b5d900d4 --- /dev/null +++ b/dataming/src/main/java/com/robin/dataming/smile/algorithm/SmileSVMModeler.java @@ -0,0 +1,41 @@ +package com.robin.dataming.smile.algorithm; + +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import org.springframework.util.CollectionUtils; +import smile.classification.Classifier; +import smile.classification.SVM; +import smile.data.DataFrame; +import smile.validation.metric.Accuracy; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class SmileSVMModeler extends AbstractModeler{ + + + public SmileSVMModeler(DataCollectionMeta collectionMeta, IResourceIterator iterator) { + super(collectionMeta, iterator); + } + + @Override + public Classifier train(DataFrame trainDf, Map optionalMap) { + List fields=collectionMeta.getColumnList().stream().filter(f-> CollectionUtils.isEmpty(f.getNominalValues())).collect(Collectors.toList()); + DataSetColumnMeta labelField=collectionMeta.getColumnList().stream().filter(f->!CollectionUtils.isEmpty(f.getNominalValues())).findFirst().orElse(null); + double[][] dsArr= trainDf.select(fields.stream().map(DataSetColumnMeta::getColumnName).collect(Collectors.toList()).toArray(new String[]{})).toArray(); + model=SVM.fit(dsArr,trainDf.intVector(labelField.getColumnName()).array(),1,1); + return model; + } + + @Override + public String validate(DataFrame validateDf) { + List fields=collectionMeta.getColumnList().stream().filter(f-> CollectionUtils.isEmpty(f.getNominalValues())).collect(Collectors.toList()); + DataSetColumnMeta labelField=collectionMeta.getColumnList().stream().filter(f->!CollectionUtils.isEmpty(f.getNominalValues())).findFirst().orElse(null); + int[] trueIndex=validateDf.intVector(labelField.getColumnName()).array(); + int[] predictIndex=model.predict(validateDf.select(fields.stream().map(DataSetColumnMeta::getColumnName).collect(Collectors.toList()).toArray(new String[]{})).toArray()); + System.out.println(String.format("Accuracy:%f", Accuracy.of(trueIndex,predictIndex))); + return model.toString(); + } +} diff --git a/dataming/src/main/java/com/robin/dataming/smile/utils/SmileUtils.java b/dataming/src/main/java/com/robin/dataming/smile/utils/SmileUtils.java new file mode 100644 index 00000000..33b91813 --- /dev/null +++ b/dataming/src/main/java/com/robin/dataming/smile/utils/SmileUtils.java @@ -0,0 +1,115 @@ +package com.robin.dataming.smile.utils; + +import com.robin.core.base.util.Const; +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.util.CollectionUtils; +import smile.data.DataFrame; +import smile.data.Tuple; +import smile.data.type.DataType; +import smile.data.type.DataTypes; +import smile.data.type.StructField; +import smile.data.type.StructType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class SmileUtils { + private static final Random random=new Random(1123123L); + public static DataFrame construct(DataCollectionMeta colMeta, IResourceIterator iterator){ + StructType structType = getStructType(colMeta); + List datas = parseTuples(colMeta, iterator, structType); + StructType boxType=structType.boxed(datas); + return DataFrame.of(datas,boxType); + } + + public static StructType getStructType(DataCollectionMeta colMeta) { + List fields=new ArrayList<>(); + for(DataSetColumnMeta columnMeta: colMeta.getColumnList()){ + fields.add(new StructField(columnMeta.getColumnName(),wrapDataType(columnMeta))); + } + StructType structType=new StructType(fields); + return structType; + } + + public static List parseTuples(DataCollectionMeta colMeta, IResourceIterator iterator, StructType structType) { + List datas=new ArrayList<>(); + + while(iterator.hasNext()){ + Map map = iterator.next(); + List objects=wrapValue(colMeta.getColumnList(),map); + datas.add(Tuple.of(objects.toArray(), structType)); + } + return datas; + } + + private static DataType wrapDataType(DataSetColumnMeta columnMeta){ + DataType type=null; + switch (columnMeta.getColumnType()){ + case Const.META_TYPE_BIGINT: + type= DataTypes.LongType; + break; + case Const.META_TYPE_INTEGER: + type=DataTypes.IntegerType; + break; + case Const.META_TYPE_DOUBLE: + type=DataTypes.DoubleType; + break; + case Const.META_TYPE_DECIMAL: + type=DataTypes.DecimalType; + break; + case Const.META_TYPE_TIMESTAMP: + type=DataTypes.TimeType; + break; + case Const.META_TYPE_SHORT: + type=DataTypes.ShortType; + break; + case Const.META_TYPE_STRING: + if(CollectionUtils.isEmpty(columnMeta.getNominalValues())) { + type = DataTypes.StringType; + }else{ + type=DataTypes.IntegerType; + } + break; + default: + type=DataTypes.StringType; + } + return type; + } + public static List wrapValue(List columnMetas, Map map){ + List objects=new ArrayList<>(columnMetas.size()); + for(DataSetColumnMeta columnMeta:columnMetas){ + Object value=map.getOrDefault(columnMeta.getColumnName(),null); + if(!CollectionUtils.isEmpty(columnMeta.getNominalValues())){ + Integer pos=columnMeta.getNominalValues().indexOf(value.toString()); + if(pos>=-1){ + value=pos+1; + } + } + objects.add(value); + } + return objects; + } + public static Pair splitTrainAndValidate(DataCollectionMeta colMeta, IResourceIterator iterator, int trainRates){ + List trainList=new ArrayList<>(); + List validateList=new ArrayList<>(); + StructType structType=SmileUtils.getStructType(colMeta); + List allDatas= SmileUtils.parseTuples(colMeta,iterator,structType); + allDatas.stream().forEach(f->{ + if(random.nextInt(100) splitTrainAndValidates(Instances allData RemovePercentage dtValidate=new RemovePercentage(); dtValidate.setPercentage(validatePercentage); dtValidate.setInputFormat(allDatas); - Instances trainDatas= Filter.useFilter(allDatas,dtTrain); - Instances validateDatas=Filter.useFilter(allDatas,dtValidate); + Instances trainDatas= Filter.useFilter(allDatas,dtValidate); + Instances validateDatas=Filter.useFilter(allDatas,dtTrain); return Pair.of(trainDatas,validateDatas); } } diff --git a/dataming/src/test/java/com/robin/dataming/LogisticRegressionTest.java b/dataming/src/test/java/com/robin/dataming/LogisticRegressionTest.java deleted file mode 100644 index 38e5861a..00000000 --- a/dataming/src/test/java/com/robin/dataming/LogisticRegressionTest.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.robin.dataming; - - -import com.google.common.collect.Lists; -import com.robin.core.base.util.Const; -import com.robin.core.base.util.ResourceConst; -import com.robin.core.fileaccess.iterator.IResourceIterator; -import com.robin.core.fileaccess.iterator.TextFileIteratorFactory; -import com.robin.core.fileaccess.meta.DataCollectionMeta; -import com.robin.core.fileaccess.meta.DataSetColumnMeta; -import com.robin.dataming.weka.algorithm.LogisticRegressionModeler; -import com.robin.dataming.weka.utils.WekaUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.junit.Test; -import weka.classifiers.functions.Logistic; -import weka.core.Instances; - -import java.util.HashMap; - - -public class LogisticRegressionTest { - @Test - public void testIris() throws Exception{ - DataCollectionMeta meta=new DataCollectionMeta(); - meta.setResType(ResourceConst.ResourceType.TYPE_LOCALFILE.getValue()); - meta.setSourceType(ResourceConst.IngestType.TYPE_LOCAL.getValue()); - meta.setFileFormat(Const.FILESUFFIX_CSV); - meta.setPath("file:///e:/iris.csv"); - //"erwidth","banlength","banwidth","class" - meta.addColumnMeta("erlength", Const.META_TYPE_DOUBLE,null); - meta.addColumnMeta("erwidth", Const.META_TYPE_DOUBLE,null); - meta.addColumnMeta("banlength", Const.META_TYPE_DOUBLE,null); - meta.addColumnMeta("banwidth", Const.META_TYPE_DOUBLE,null); - DataSetColumnMeta columnMeta=meta.createColumnMeta("class",Const.META_TYPE_STRING,null); - columnMeta.setNominalValues(Lists.newArrayList(new String[]{"setosa", "versicolor","virginica"})); - meta.addColumnMeta(columnMeta); - - IResourceIterator iterator= TextFileIteratorFactory.getProcessIteratorByType(meta); - Instances instances= WekaUtils.getInstancesByResource(meta,iterator,4); - Pair datas=WekaUtils.splitTrainAndValidates(instances,80.0); - LogisticRegressionModeler modeler=new LogisticRegressionModeler(); - Logistic classifier= modeler.train(4,new HashMap<>(),datas.getLeft()); - System.out.println(modeler.evaluate(datas.getRight())); - } -} diff --git a/dataming/src/test/java/com/robin/dataming/smile/DBSCANTest.java b/dataming/src/test/java/com/robin/dataming/smile/DBSCANTest.java new file mode 100644 index 00000000..6918f20f --- /dev/null +++ b/dataming/src/test/java/com/robin/dataming/smile/DBSCANTest.java @@ -0,0 +1,45 @@ +package com.robin.dataming.smile; + +import com.robin.core.base.util.Const; +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.iterator.TextFileIteratorFactory; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.dataming.smile.algorithm.DBSCANModeler; +import com.robin.dataming.smile.utils.SmileUtils; +import org.apache.commons.lang3.tuple.Pair; +import smile.clustering.DBSCAN; +import smile.data.DataFrame; + +import java.util.HashMap; +import java.util.Map; + +public class DBSCANTest { + public static void main(String[] args){ + + DataCollectionMeta meta=new DataCollectionMeta(); + meta.setResType(ResourceConst.ResourceType.TYPE_LOCALFILE.getValue()); + meta.setSourceType(ResourceConst.IngestType.TYPE_LOCAL.getValue()); + meta.setFileFormat(Const.FILESUFFIX_CSV); + meta.setPath("file:///e:/iris.csv"); + //"erwidth","banlength","banwidth","class" + meta.addColumnMeta("erlength", Const.META_TYPE_DOUBLE,null); + meta.addColumnMeta("erwidth", Const.META_TYPE_DOUBLE,null); + meta.addColumnMeta("banlength", Const.META_TYPE_DOUBLE,null); + meta.addColumnMeta("banwidth", Const.META_TYPE_DOUBLE,null); + + try(IResourceIterator iterator= TextFileIteratorFactory.getProcessIteratorByType(meta)){ + DBSCANModeler modeler=new DBSCANModeler(meta,iterator); + Pair dfPair=SmileUtils.splitTrainAndValidate(meta,iterator,80); + Map optionalMap=new HashMap<>(); + optionalMap.put("radius",0.6); + DBSCAN dbscan= modeler.train(dfPair.getKey(),optionalMap); + System.out.println(dbscan.k); + System.out.println(dbscan); + System.out.println(modeler.validate(dfPair.getValue())); + }catch (Exception ex){ + ex.printStackTrace(); + } + + } +} diff --git a/dataming/src/test/java/com/robin/dataming/smile/DecisionTreeTest.java b/dataming/src/test/java/com/robin/dataming/smile/DecisionTreeTest.java new file mode 100644 index 00000000..cee04991 --- /dev/null +++ b/dataming/src/test/java/com/robin/dataming/smile/DecisionTreeTest.java @@ -0,0 +1,51 @@ +package com.robin.dataming.smile; + +import com.google.common.collect.Lists; +import com.robin.core.base.util.Const; +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.iterator.TextFileIteratorFactory; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import com.robin.dataming.smile.algorithm.DecisionTreeModeler; +import com.robin.dataming.smile.algorithm.RandomForestModeler; +import com.robin.dataming.smile.algorithm.SmileSVMModeler; +import com.robin.dataming.smile.utils.SmileUtils; +import org.apache.commons.lang3.tuple.Pair; +import smile.classification.Classifier; +import smile.classification.DecisionTree; +import smile.data.DataFrame; + +import java.util.HashMap; +import java.util.Map; + +public class DecisionTreeTest { + public static void main(String[] args) { + + DataCollectionMeta meta = new DataCollectionMeta(); + meta.setResType(ResourceConst.ResourceType.TYPE_LOCALFILE.getValue()); + meta.setSourceType(ResourceConst.IngestType.TYPE_LOCAL.getValue()); + meta.setFileFormat(Const.FILESUFFIX_CSV); + meta.setPath("file:///e:/iris.csv"); + //"erwidth","banlength","banwidth","class" + meta.addColumnMeta("erlength", Const.META_TYPE_DOUBLE, null); + meta.addColumnMeta("erwidth", Const.META_TYPE_DOUBLE, null); + meta.addColumnMeta("banlength", Const.META_TYPE_DOUBLE, null); + meta.addColumnMeta("banwidth", Const.META_TYPE_DOUBLE, null); + DataSetColumnMeta columnMeta=meta.createColumnMeta("label",Const.META_TYPE_STRING,null); + columnMeta.setNominalValues(Lists.newArrayList(new String[]{"setosa", "versicolor","virginica"})); + meta.addColumnMeta(columnMeta); + try(IResourceIterator iterator= TextFileIteratorFactory.getProcessIteratorByType(meta)){ + DecisionTreeModeler modeler=new DecisionTreeModeler(meta,iterator); + Pair dfPair= SmileUtils.splitTrainAndValidate(meta,iterator,80); + Map optionalMap=new HashMap<>(); + DecisionTree tree= modeler.train(dfPair.getKey(),optionalMap); + System.out.println(modeler.validate(dfPair.getValue())); + SmileSVMModeler smileSVMModeler=new SmileSVMModeler(meta,iterator); + Classifier classifier=smileSVMModeler.train(dfPair.getKey(),optionalMap); + System.out.println(smileSVMModeler.validate(dfPair.getValue())); + }catch (Exception ex){ + ex.printStackTrace(); + } + } +} diff --git a/dataming/src/test/java/com/robin/dataming/weka/LogisticRegressionTest.java b/dataming/src/test/java/com/robin/dataming/weka/LogisticRegressionTest.java new file mode 100644 index 00000000..6cf0de88 --- /dev/null +++ b/dataming/src/test/java/com/robin/dataming/weka/LogisticRegressionTest.java @@ -0,0 +1,40 @@ +package com.robin.dataming.weka; + + +import com.robin.core.base.util.Const; +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.iterator.IResourceIterator; +import com.robin.core.fileaccess.iterator.TextFileIteratorFactory; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.dataming.weka.algorithm.LogisticRegressionModeler; +import com.robin.dataming.weka.utils.WekaUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Test; +import weka.classifiers.functions.Logistic; +import weka.core.Instances; + +import java.util.HashMap; + + +public class LogisticRegressionTest { + @Test + public void testIris() throws Exception{ + DataCollectionMeta meta=new DataCollectionMeta(); + meta.setResType(ResourceConst.ResourceType.TYPE_LOCALFILE.getValue()); + meta.setSourceType(ResourceConst.IngestType.TYPE_LOCAL.getValue()); + meta.setFileFormat(Const.FILEFORMATSTR.ARFF.getValue()); + meta.setPath("file:///f:/iris.arff"); + + + try(IResourceIterator iterator= TextFileIteratorFactory.getProcessIteratorByType(meta)) { + Instances instances = WekaUtils.getInstancesByResource(meta, iterator, 4); + Pair datas = WekaUtils.splitTrainAndValidates(instances, 80.0); + LogisticRegressionModeler modeler = new LogisticRegressionModeler(); + Logistic classifier = modeler.train(4, new HashMap<>(), datas.getLeft()); + System.out.println(classifier.toString()); + System.out.println(modeler.evaluate(datas.getRight())); + }catch (Exception ex){ + + } + } +} diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/AvroFileIterator.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/AvroFileIterator.java index 537f6a94..6822f6c1 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/AvroFileIterator.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/AvroFileIterator.java @@ -52,8 +52,7 @@ public boolean hasNext() { } @Override - public void init() { - + public void beforeProcess() { try { schema= AvroUtils.getSchemaFromMeta(colmeta); doInit(colmeta.getPath()); diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/CustomParquetReader.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/CustomParquetReader.java index d1c9e3a4..84659800 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/CustomParquetReader.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/CustomParquetReader.java @@ -21,7 +21,7 @@ public CustomParquetReader(Configuration conf, Path file, ReadSupport readSuppor public static CustomParquetReader.Builder builder(InputFile file,DataCollectionMeta colMeta) { return new CustomParquetReader.Builder(file,colMeta); } - public static class Builder extends org.apache.parquet.hadoop.ParquetReader.Builder { + public static class Builder extends org.apache.parquet.hadoop.ParquetReader.Builder> { private boolean enableCompatibility; private DataCollectionMeta colMeta; /** @deprecated */ @@ -48,8 +48,8 @@ public CustomParquetReader.Builder withCompatibility(boolean enableCompatibility } @Override - protected ReadSupport getReadSupport() { - return new CustomReadSupport(colMeta); + protected ReadSupport> getReadSupport() { + return new CustomReadSupport(); } } } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/CustomReadSupport.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/CustomReadSupport.java index d04c6d7d..fdf52e8f 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/CustomReadSupport.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/CustomReadSupport.java @@ -15,11 +15,10 @@ import java.util.List; import java.util.Map; -public class CustomReadSupport extends ReadSupport { +public class CustomReadSupport extends ReadSupport> { MessageType type; - DataCollectionMeta colMeta; - public CustomReadSupport(DataCollectionMeta colMeta){ - this.colMeta=colMeta; + + public CustomReadSupport(){ } @@ -31,13 +30,13 @@ public ReadContext init(InitContext context) { @Override - public RecordMaterializer prepareForRead(Configuration configuration, Map map, MessageType messageType, ReadContext readContext) { + public RecordMaterializer> prepareForRead(Configuration configuration, Map map, MessageType messageType, ReadContext readContext) { - java.util.Map metadata = readContext.getReadSupportMetadata(); + Map metadata = readContext.getReadSupportMetadata(); MessageType parquetSchema = readContext.getRequestedSchema(); List types=parquetSchema.getFields(); - return new RecordMaterializer() { + return new RecordMaterializer>() { Map record; @Override diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/OrcFileIterator.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/OrcFileIterator.java index 586109a9..2c50abcb 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/OrcFileIterator.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/OrcFileIterator.java @@ -120,7 +120,7 @@ public void wrapValue(TypeDescription schema,String columnName, ColumnVector vec } @Override - public void init() { + public void beforeProcess() { try { if(colmeta.getSourceType().equals(ResourceConst.IngestType.TYPE_HDFS.getValue())){ HDFSUtil util=new HDFSUtil(colmeta); diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java index 933ddff0..44c67ffd 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java @@ -1,21 +1,21 @@ package com.robin.comm.fileaccess.iterator; -import com.robin.comm.fileaccess.util.FileSeekableInputStream; import com.robin.comm.fileaccess.util.ParquetUtil; import com.robin.comm.fileaccess.util.SeekableInputStream; import com.robin.core.base.util.Const; import com.robin.core.base.util.IOUtils; -import com.robin.core.base.util.ResourceConst; import com.robin.core.fileaccess.iterator.AbstractFileIterator; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.util.AvroUtils; import com.robin.core.fileaccess.util.ResourceUtil; import com.robin.hadoop.hdfs.HDFSUtil; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; @@ -23,14 +23,18 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import org.apache.slider.server.appmaster.management.Timestamp; import org.springframework.util.ObjectUtils; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.net.URI; import java.net.URL; +import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,46 +46,39 @@ public class ParquetFileIterator extends AbstractFileIterator { private Schema schema; private MessageType msgtype; private GenericData.Record record; - private ParquetReader ireader; + private ParquetReader> ireader; private boolean useAvroEncode = false; - public ParquetFileIterator(){ - identifier= Const.FILEFORMATSTR.PARQUET.getValue(); + private final long maxSize = Integer.MAX_VALUE; + + public ParquetFileIterator() { + identifier = Const.FILEFORMATSTR.PARQUET.getValue(); } + public ParquetFileIterator(DataCollectionMeta colmeta) { super(colmeta); - identifier= Const.FILEFORMATSTR.PARQUET.getValue(); + identifier = Const.FILEFORMATSTR.PARQUET.getValue(); } private List fields; Map rsMap; - private FileSeekableInputStream seekableInputStream = null; private File tmpFile; @Override - public void init() { + public void beforeProcess() { Configuration conf; InputFile file; - // max allowable parquet file size to load in memory for no hdfs input - long maxSize=1024L*1024L*300L; try { checkAccessUtil(null); - if(colmeta.getResourceCfgMap().containsKey("parquetMaxLoadableSize")){ - maxSize=Long.parseLong(colmeta.getResourceCfgMap().get("parquetMaxLoadableSize").toString()); - } if (colmeta.getResourceCfgMap().containsKey("file.useAvroEncode") && "true".equalsIgnoreCase(colmeta.getResourceCfgMap().get("file.useAvroEncode").toString())) { useAvroEncode = true; } - if (colmeta.getSourceType().equals(ResourceConst.IngestType.TYPE_HDFS.getValue())) { + + if (Const.FILESYSTEM.HDFS.getValue().equals(colmeta.getFsType())) { conf = new HDFSUtil(colmeta).getConfig(); - if (colmeta.getColumnList().isEmpty()) { - ParquetMetadata meta = ParquetFileReader.readFooter(conf, new Path(colmeta.getPath()), ParquetMetadataConverter.NO_FILTER); - msgtype = meta.getFileMetaData().getSchema(); - parseSchemaByType(); - } else { - schema = AvroUtils.getSchemaFromMeta(colmeta); - } + file=HadoopInputFile.fromPath(new Path(colmeta.getPath()), conf); + getSchema(file,false); if (!useAvroEncode) { - ParquetReader.Builder builder=ParquetReader.builder(new CustomReadSupport(colmeta),new Path(ResourceUtil.getProcessPath(colmeta.getPath()))).withConf(conf); + ParquetReader.Builder> builder = ParquetReader.builder(new CustomReadSupport(), new Path(ResourceUtil.getProcessPath(colmeta.getPath()))).withConf(conf); ireader = builder.build(); } else { preader = AvroParquetReader @@ -89,52 +86,54 @@ public void init() { } } else { // no hdfs input source - if (ResourceConst.IngestType.TYPE_LOCAL.getValue().equals(colmeta.getSourceType())) { - long size = accessUtil.getInputStreamSize(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); - seekableInputStream = new FileSeekableInputStream(colmeta.getPath()); - file = ParquetUtil.makeInputFile(seekableInputStream, size); + if (Const.FILESYSTEM.LOCAL.getValue().equals(colmeta.getFsType())) { + file = new LocalInputFile(Paths.get(colmeta.getPath())); } else { instream = accessUtil.getRawInputStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); - long size = accessUtil.getInputStreamSize(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); - //file size too large ,may cause OOM,download as tmpfile - if(size>maxSize) { - String tmpPath = (!ObjectUtils.isEmpty(colmeta.getResourceCfgMap().get("output.tmppath"))) ? colmeta.getResourceCfgMap().get("output.tmppath").toString() : FileUtils.getUserDirectoryPath(); + long size = instream.available();//accessUtil.getInputStreamSize(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); + //file size too large ,can not store in ByteArrayOutputStream + if (size >= maxSize) { + String tmpPath = (!ObjectUtils.isEmpty(colmeta.getResourceCfgMap().get("output.tmppath"))) ? colmeta.getResourceCfgMap().get("output.tmppath").toString() : FileUtils.getTempDirectoryPath(); String tmpFilePath = "file:///" + tmpPath + ResourceUtil.getProcessFileName(colmeta.getPath()); tmpFile = new File(new URL(tmpFilePath).toURI()); copyToLocal(tmpFile, instream); - seekableInputStream = new FileSeekableInputStream(tmpPath); - file = ParquetUtil.makeInputFile(seekableInputStream, size); - }else{ - ByteArrayOutputStream byteout=new ByteArrayOutputStream(instream.available()); - IOUtils.copyBytes(instream,byteout,8000); - SeekableInputStream seekableInputStream=new SeekableInputStream(byteout.toByteArray()); + file = new LocalInputFile(Paths.get(new URI(tmpFilePath))); + } else { + ByteArrayOutputStream byteout = new ByteArrayOutputStream((int) size); + IOUtils.copyBytes(instream, byteout, 8000); + SeekableInputStream seekableInputStream = new SeekableInputStream(byteout.toByteArray()); file = ParquetUtil.makeInputFile(seekableInputStream); } } - if (colmeta.getColumnList().isEmpty()) { - ParquetMetadata meta = ParquetFileReader.readFooter(file, ParquetMetadataConverter.NO_FILTER); - msgtype = meta.getFileMetaData().getSchema(); - //read footer and schema,must return header - if(!ObjectUtils.isEmpty(seekableInputStream)){ - seekableInputStream.seek(0L); - } - } else { - schema = AvroUtils.getSchemaFromMeta(colmeta); - } + getSchema(file,true); if (!useAvroEncode) { - parseSchemaByType(); - fields = schema.getFields(); ireader = CustomParquetReader.builder(file, colmeta).build(); } else { preader = AvroParquetReader.builder(file).build(); } } + fields = schema.getFields(); } catch (Exception ex) { - logger.error("{0}", ex.getMessage()); + logger.error("{}", ex.getMessage()); } } + private void getSchema(InputFile file,boolean seekFrist) throws IOException { + if (colmeta.getColumnList().isEmpty()) { + ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.NO_FILTER).build(); + ParquetFileReader ireader = ParquetFileReader.open(file, options); + ParquetMetadata meta = ireader.getFooter(); + msgtype = meta.getFileMetaData().getSchema(); + parseSchemaByType(); + if(seekFrist) { + file.newStream().seek(0L); + } + } else { + schema = AvroUtils.getSchemaFromMeta(colmeta); + } + } + @Override public boolean hasNext() { try { @@ -160,7 +159,11 @@ public Map next() { throw new NoSuchElementException(""); } for (Schema.Field field : fields) { - retMap.put(field.name(), record.get(field.name())); + Object value=record.get(field.name()); + if(LogicalTypes.timestampMillis().equals(field.schema().getLogicalType())){ + value=new Timestamp((Long)value); + } + retMap.put(field.name(), value); } return retMap; } else { @@ -169,12 +172,14 @@ public Map next() { } private void parseSchemaByType() { + List colList = msgtype.getFields(); for (Type type : colList) { colmeta.addColumnMeta(type.getName(), ParquetReaderUtil.parseColumnType(type.asPrimitiveType()), null); } schema = AvroUtils.getSchemaFromMeta(colmeta); + } @@ -186,9 +191,6 @@ public Schema getSchema() { public void remove() { try { reader.read(); - if(tmpFile!=null){ - FileUtils.deleteQuietly(tmpFile); - } } catch (Exception ex) { ex.printStackTrace(); } @@ -201,16 +203,14 @@ public MessageType getMessageType() { @Override public void close() throws IOException { super.close(); - if(!ObjectUtils.isEmpty(ireader)){ + if (!ObjectUtils.isEmpty(ireader)) { ireader.close(); } - if(!ObjectUtils.isEmpty(preader)){ + if (!ObjectUtils.isEmpty(preader)) { preader.close(); } - if(!ObjectUtils.isEmpty(seekableInputStream)) { - seekableInputStream.closeQuitly(); - } - if(!ObjectUtils.isEmpty(tmpFile)){ + + if (!ObjectUtils.isEmpty(tmpFile)) { FileUtils.deleteQuietly(tmpFile); } } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetReaderUtil.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetReaderUtil.java index 68adfe3c..d592ecbf 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetReaderUtil.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetReaderUtil.java @@ -1,7 +1,7 @@ package com.robin.comm.fileaccess.iterator; import com.robin.core.base.util.Const; -import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -14,7 +14,7 @@ public static String parseColumnType(PrimitiveType type){ if(type.getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT32)){ rettype=Const.META_TYPE_INTEGER; }else if(type.getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT64)){ - if(type.getOriginalType().equals(OriginalType.DATE) || type.getOriginalType().equals(OriginalType.TIME_MILLIS)){ + if(LogicalTypeAnnotation.dateType().equals(type.getLogicalTypeAnnotation()) || LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS).equals(type.getLogicalTypeAnnotation())){ rettype=Const.META_TYPE_TIMESTAMP; }else { rettype = Const.META_TYPE_BIGINT; diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetStreamIterator.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetStreamIterator.java index e2ed5910..e55c29ec 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetStreamIterator.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetStreamIterator.java @@ -44,7 +44,8 @@ public ParquetStreamIterator(DataCollectionMeta colmeta) { private static final int COPY_BUFFER_SIZE = 8192; @Override - public void init() { + public void beforeProcess() { + super.beforeProcess(); conf=new HDFSUtil(colmeta).getConfig(); try { if (colmeta.getColumnList().isEmpty()) { diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ProtoBufFileIterator.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ProtoBufFileIterator.java index ef55da6e..dd240759 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ProtoBufFileIterator.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ProtoBufFileIterator.java @@ -34,7 +34,7 @@ public ProtoBufFileIterator(DataCollectionMeta colmeta) { @Override - public void init() { + public void beforeProcess() { try { if (!CollectionUtils.isEmpty(colmeta.getColumnList())) { schemaBuilder = DynamicSchema.newBuilder(); diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/util/ParquetUtil.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/util/ParquetUtil.java index dc052546..7a92f6ca 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/util/ParquetUtil.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/util/ParquetUtil.java @@ -207,6 +207,7 @@ private static PositionOutputStream makePositionOutputStream(@NonNull AbstractFi final OutputStream output = accessUtil.getRawOutputStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); return new PositionOutputStream() { private long position = 0; + private boolean isClosed=false; @Override public void write(int b) throws IOException { @@ -228,12 +229,17 @@ public void write(@Nonnull byte[] b, int off, int len) throws IOException { @Override public void flush() throws IOException { - output.flush(); + if(!isClosed) { + output.flush(); + } } @Override public void close() throws IOException { - output.close(); + if(!isClosed) { + output.close(); + isClosed = true; + } } @Override diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/CustomWriteSupport.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/CustomWriteSupport.java index 467d643c..6307e682 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/CustomWriteSupport.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/CustomWriteSupport.java @@ -7,6 +7,7 @@ import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.Type; @@ -52,7 +53,7 @@ public void write(T obj) { switch (type.asPrimitiveType().getPrimitiveTypeName()) { case BOOLEAN: - recordConsumer.addBoolean(ObjectUtils.isEmpty(tobj) ? false : Boolean.parseBoolean(tobj.toString())); + recordConsumer.addBoolean(!ObjectUtils.isEmpty(tobj) && Boolean.parseBoolean(tobj.toString())); break; case FLOAT: recordConsumer.addFloat(ObjectUtils.isEmpty(tobj) ? Float.valueOf("0.0") : Float.valueOf(tobj.toString())); @@ -64,10 +65,10 @@ public void write(T obj) { recordConsumer.addInteger(ObjectUtils.isEmpty(tobj) ? 0 : Integer.valueOf(tobj.toString())); break; case INT64: - Long realVal = 0L; + long realVal = 0L; if (!ObjectUtils.isEmpty(tobj)) { - if (type.getOriginalType() == OriginalType.DATE || type.getOriginalType() == OriginalType.TIME_MILLIS - || type.getOriginalType() == OriginalType.TIMESTAMP_MILLIS) { + if (LogicalTypeAnnotation.dateType().equals(type.getLogicalTypeAnnotation()) + || LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS).equals(type.getLogicalTypeAnnotation())){ realVal = ((Timestamp) tobj).getTime(); } else { realVal = Long.valueOf(tobj.toString()); @@ -76,10 +77,10 @@ public void write(T obj) { recordConsumer.addLong(realVal); break; case INT96: - Long realVal1 = 0L; + long realVal1 = 0L; if (!ObjectUtils.isEmpty(tobj)) { - if (type.getOriginalType() == OriginalType.DATE || type.getOriginalType() == OriginalType.TIME_MILLIS - || type.getOriginalType() == OriginalType.TIMESTAMP_MILLIS) { + if (LogicalTypeAnnotation.dateType().equals(type.getLogicalTypeAnnotation()) + || LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS).equals(type.getLogicalTypeAnnotation())) { realVal1 = ((Timestamp) tobj).getTime(); } else { realVal1 = Long.valueOf(tobj.toString()); diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ParquetFileWriter.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ParquetFileWriter.java index a4e7d6c9..f5b3a652 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ParquetFileWriter.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ParquetFileWriter.java @@ -2,7 +2,6 @@ import com.robin.comm.fileaccess.util.ParquetUtil; import com.robin.core.base.util.Const; -import com.robin.core.base.util.ResourceConst; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.util.AvroUtils; import com.robin.core.fileaccess.util.ResourceUtil; @@ -11,12 +10,14 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.io.OutputFile; import org.apache.parquet.schema.MessageType; -import org.springframework.util.ObjectUtils; import javax.naming.OperationNotSupportedException; import java.io.IOException; @@ -27,7 +28,9 @@ public class ParquetFileWriter extends AbstractFileWriter { private Schema avroSchema; - private ParquetWriter pwriter; + //private ParquetWriter pwriter; + private ParquetWriter avroWriter; + private ParquetWriter> mapWriter; private MessageType schema; private boolean useAvroEncode=false; public ParquetFileWriter(){ @@ -82,17 +85,19 @@ public void beginWrite() throws IOException { if(colmeta.getResourceCfgMap().containsKey("file.useAvroEncode") && "true".equalsIgnoreCase(colmeta.getResourceCfgMap().get("file.useAvroEncode").toString())){ useAvroEncode=true; } - if(!ObjectUtils.isEmpty(colmeta.getSourceType()) && colmeta.getSourceType().equals(ResourceConst.IngestType.TYPE_HDFS.getValue())){ + if(Const.FILESYSTEM.HDFS.getValue().equals(colmeta.getFsType())){ if(useAvroEncode) { - pwriter = AvroParquetWriter.builder(new Path(colmeta.getPath())).withSchema(avroSchema).withCompressionCodec(codecName).withConf(new HDFSUtil(colmeta).getConfig()).build(); + Configuration conf=new HDFSUtil(colmeta).getConfig(); + OutputFile outputFile= HadoopOutputFile.fromPath(new Path(colmeta.getPath()),conf); + avroWriter = AvroParquetWriter.builder(outputFile).withSchema(avroSchema).withCompressionCodec(codecName).withConf(conf).build(); }else { - pwriter = new CustomParquetWriter(new Path(colmeta.getPath()), schema, true, codecName); + mapWriter = new CustomParquetWriter(new Path(colmeta.getPath()), schema, true, codecName); } }else{ if(useAvroEncode) { - pwriter = AvroParquetWriter.builder(ParquetUtil.makeOutputFile(accessUtil, colmeta, ResourceUtil.getProcessPath(colmeta.getPath()))).withCompressionCodec(codecName).withSchema(avroSchema).build(); + avroWriter = AvroParquetWriter.builder(ParquetUtil.makeOutputFile(accessUtil, colmeta, ResourceUtil.getProcessPath(colmeta.getPath()))).withCompressionCodec(codecName).withSchema(avroSchema).build(); }else { - pwriter = new CustomParquetWriter.Builder>(ParquetUtil.makeOutputFile(accessUtil, colmeta, ResourceUtil.getProcessPath(colmeta.getPath())), schema).withCompressionCodec(codecName).build(); + mapWriter = new CustomParquetWriter.Builder>(ParquetUtil.makeOutputFile(accessUtil, colmeta, ResourceUtil.getProcessPath(colmeta.getPath())), schema).withCompressionCodec(codecName).build(); } } } @@ -102,7 +107,6 @@ public void writeRecord(Map map) throws IOException, OperationNo if(useAvroEncode) { GenericRecord record = new GenericData.Record(avroSchema); - for (int i = 0; i < colmeta.getColumnList().size(); i++) { String name = colmeta.getColumnList().get(i).getColumnName(); Object value = getMapValueByMeta(map, name); @@ -114,14 +118,13 @@ public void writeRecord(Map map) throws IOException, OperationNo } } } - try { - pwriter.write(record); + avroWriter.write(record); } catch (IOException ex) { logger.error("", ex); } }else { - pwriter.write(map); + mapWriter.write(map); } } @@ -132,16 +135,17 @@ public void writeRecord(List map) throws IOException,OperationNotSupport @Override public void finishWrite() throws IOException { - if(pwriter!=null){ - pwriter.close(); + if(avroWriter!=null){ + avroWriter.close(); + } + if(mapWriter!=null){ + mapWriter.close(); } } @Override public void flush() throws IOException { - if(pwriter!=null){ - } } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java index aca821ec..c6bd6e3d 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java @@ -13,6 +13,7 @@ import com.robin.core.fileaccess.writer.AbstractFileWriter; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; +import org.tukaani.xz.LZMAOutputStream; import javax.naming.OperationNotSupportedException; import java.io.IOException; diff --git a/hadooptool/src/main/java/com/robin/comm/resaccess/iterator/KafkaIterator.java b/hadooptool/src/main/java/com/robin/comm/resaccess/iterator/KafkaIterator.java index a5aeea4a..3dbca397 100644 --- a/hadooptool/src/main/java/com/robin/comm/resaccess/iterator/KafkaIterator.java +++ b/hadooptool/src/main/java/com/robin/comm/resaccess/iterator/KafkaIterator.java @@ -34,7 +34,7 @@ public KafkaIterator(DataCollectionMeta collectionMeta){ this.identifier= ResourceConst.ResourceType.TYPE_KAFKA.toString(); } @Override - public void init() { + public void beforeProcess() { Properties props = new Properties(); brokerUrl=colmeta.getResourceCfgMap().get("brokerUrl").toString(); groupId=colmeta.getResourceCfgMap().get("groupId").toString(); diff --git a/hadooptool/src/main/java/com/robin/comm/resaccess/iterator/RabbitMQIterator.java b/hadooptool/src/main/java/com/robin/comm/resaccess/iterator/RabbitMQIterator.java index 03a1d426..f140da4f 100644 --- a/hadooptool/src/main/java/com/robin/comm/resaccess/iterator/RabbitMQIterator.java +++ b/hadooptool/src/main/java/com/robin/comm/resaccess/iterator/RabbitMQIterator.java @@ -32,13 +32,9 @@ public RabbitMQIterator(DataCollectionMeta collectionMeta){ this.identifier= ResourceConst.ResourceType.TYPE_RABBIT.toString(); } - @Override - public void beforeProcess(String param) { - - } @Override - public void init() { + public void beforeProcess() { connectionFactory=new CachingConnectionFactory(); int port=5672; Assert.notNull(cfgMap,""); diff --git a/hadooptool/src/main/java/com/robin/hadoop/hdfs/HDFSSecurityUtil.java b/hadooptool/src/main/java/com/robin/hadoop/hdfs/HDFSSecurityUtil.java index d5ccb71d..5e7c952d 100644 --- a/hadooptool/src/main/java/com/robin/hadoop/hdfs/HDFSSecurityUtil.java +++ b/hadooptool/src/main/java/com/robin/hadoop/hdfs/HDFSSecurityUtil.java @@ -1,21 +1,21 @@ package com.robin.hadoop.hdfs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.lang.reflect.Method; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; -import java.util.function.Consumer; import java.util.function.Function; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.log4j.Logger; - public class HDFSSecurityUtil { - private static Logger logger=Logger.getLogger(HDFSSecurityUtil.class); + private static Logger logger= LoggerFactory.getLogger(HDFSSecurityUtil.class); - public static Object executeHdfsMethodWithSecurity(final Configuration config,final String methodName, final Object[] param) { + static Object executeHdfsMethodWithSecurity(final Configuration config,final String methodName, final Object[] param) { List list = new ArrayList(); for (int i = 0; i < param.length; i++) { list.add(param[i].getClass()); @@ -37,12 +37,11 @@ public static Object executeHdfsMethodWithSecurity(final Configuration config,fi return null; }); } catch (Exception ex) { - ex.printStackTrace(); - logger.error(ex); + logger.error("{}",ex.getMessage()); } return ret; } - public static Object executeHadoopMethodWithSecurity(final Configuration config,final Object obj,final String methodName, final Object[] param) { + static Object executeHadoopMethodWithSecurity(final Configuration config,final Object obj,final String methodName, final Object[] param) { List list = new ArrayList(); for (int i = 0; i < param.length; i++) { list.add(param[i].getClass()); @@ -64,19 +63,18 @@ public static Object executeHadoopMethodWithSecurity(final Configuration config, return null; }); }catch(Exception ex){ - ex.printStackTrace(); - logger.error(ex); + logger.error("{}",ex.getMessage()); } return ret; } - public static Object executeSecurityWithProxy(final Configuration config, final Function consumer){ - Object ret = null; + static T executeSecurityWithProxy(final Configuration config, final Function consumer){ + T ret = null; try { UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); UserGroupInformation.getCurrentUser().doAs((PrivilegedAction) () -> consumer.apply(config)); }catch(Exception ex){ - ex.printStackTrace(); + logger.error("{}",ex.getMessage()); } return ret; } diff --git a/hadooptool/src/main/java/com/robin/hadoop/hdfs/HDFSUtil.java b/hadooptool/src/main/java/com/robin/hadoop/hdfs/HDFSUtil.java index 2691d0d4..6056d4c4 100644 --- a/hadooptool/src/main/java/com/robin/hadoop/hdfs/HDFSUtil.java +++ b/hadooptool/src/main/java/com/robin/hadoop/hdfs/HDFSUtil.java @@ -104,7 +104,7 @@ public String upload(final String filePath,String toUrl) throws HdfsException{ if(!useSecurity) { return HDFSCallUtil.upload(config, filePath, toUrl); } else { - return (String) HDFSSecurityUtil.executeSecurityWithProxy(config,config->HDFSCallUtil.upload(config,filePath,toUrl)); + return HDFSSecurityUtil.executeSecurityWithProxy(config,config->HDFSCallUtil.upload(config,filePath,toUrl)); } } @SuppressWarnings("unchecked") @@ -112,7 +112,7 @@ public boolean uploadByInputStream(final InputStream in,String toUrl, int buffer if(!useSecurity) { return HDFSCallUtil.uploadByInputStream(config, in, toUrl, bufferSize); } else { - return (Boolean) HDFSSecurityUtil.executeSecurityWithProxy(config, config-> HDFSCallUtil.uploadByInputStream(config,in,toUrl, bufferSize)); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config-> HDFSCallUtil.uploadByInputStream(config,in,toUrl, bufferSize)); } } @SuppressWarnings("unchecked") @@ -120,7 +120,7 @@ public String uploadByInputStream(final InputStream in,String toUrl, int bufferS if(!useSecurity) { return HDFSCallUtil.uploadByInputStream(config, in, toUrl, bufferSize, fromCharset, toCharset); } else { - return (String) HDFSSecurityUtil.executeSecurityWithProxy(config, config->HDFSCallUtil.uploadByInputStream(config,in,toUrl, bufferSize, fromCharset, toCharset)); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config->HDFSCallUtil.uploadByInputStream(config,in,toUrl, bufferSize, fromCharset, toCharset)); } } @SuppressWarnings("unchecked") @@ -156,7 +156,7 @@ public List listFile(String hdfsUrl) throws HdfsException{ if(!useSecurity) { return HDFSCallUtil.listFile(config, hdfsUrl); } else { - return (List) HDFSSecurityUtil.executeSecurityWithProxy(config, config->HDFSCallUtil.listFile(config,hdfsUrl)); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config->HDFSCallUtil.listFile(config,hdfsUrl)); } } @SuppressWarnings("unchecked") @@ -164,7 +164,7 @@ public List> listFileAndDirectory(String hdfsUrl) throws Hdf if(!useSecurity) { return HDFSCallUtil.listFileAndDirectory(config, hdfsUrl); } else { - return (List>) HDFSSecurityUtil.executeSecurityWithProxy(config,config->HDFSCallUtil.listFileAndDirectory(config,hdfsUrl)); + return HDFSSecurityUtil.executeSecurityWithProxy(config,config->HDFSCallUtil.listFileAndDirectory(config,hdfsUrl)); } } @SuppressWarnings("unchecked") @@ -172,7 +172,7 @@ public List listFileName(String hdfsUrl) throws HdfsException{ if(!useSecurity) { return HDFSCallUtil.listFileName(config, hdfsUrl); } else { - return (List) HDFSSecurityUtil.executeSecurityWithProxy(config, config->HDFSCallUtil.listFileName(config,hdfsUrl)); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config->HDFSCallUtil.listFileName(config,hdfsUrl)); } } @SuppressWarnings("unchecked") @@ -196,7 +196,7 @@ public boolean isDirectory(String hdfsUrl) throws HdfsException{ if(!useSecurity) { return HDFSCallUtil.isDirectory(config, hdfsUrl); } else { - return (Boolean) HDFSSecurityUtil.executeSecurityWithProxy(config, config->HDFSCallUtil.isDirectory(config,hdfsUrl)); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config->HDFSCallUtil.isDirectory(config,hdfsUrl)); } } @SuppressWarnings("unchecked") @@ -220,7 +220,7 @@ public boolean exists(String hdfsUrl) throws HdfsException{ if(!useSecurity) { return HDFSCallUtil.exists(config, hdfsUrl); } else { - return (Boolean) HDFSSecurityUtil.executeSecurityWithProxy(config, config1 -> HDFSCallUtil.exists(config,hdfsUrl)); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config1 -> HDFSCallUtil.exists(config,hdfsUrl)); } } @SuppressWarnings("unchecked") @@ -228,7 +228,7 @@ public String read(String hdfsUrl,String encode) throws HdfsException{ if(!useSecurity) { return HDFSCallUtil.read(config, hdfsUrl,encode); } else { - return (String) HDFSSecurityUtil.executeSecurityWithProxy(config, config1 -> HDFSCallUtil.read(config,hdfsUrl,encode)); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config1 -> HDFSCallUtil.read(config,hdfsUrl,encode)); } } @SuppressWarnings("unchecked") @@ -261,7 +261,7 @@ public void copy(String fromPath,String toPath) throws HdfsException{ if(!useSecurity) { HDFSCallUtil.copy(config, fromPath, toPath); } else { - HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "copy", new Object[]{config,fromPath,toPath}); + HDFSSecurityUtil.executeSecurityWithProxy(config, config1->{HDFSCallUtil.copy(config,fromPath,toPath);return null;}); } } @SuppressWarnings({"unchecked","unused"}) @@ -269,7 +269,7 @@ public byte[] readByte(String hdfsUrl) throws HdfsException{ if(!useSecurity) { return HDFSCallUtil.readByte(config, hdfsUrl); } else { - return (byte[]) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "readByte", new Object[]{config,hdfsUrl}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config1->HDFSCallUtil.readByte(config,hdfsUrl)); } } @SuppressWarnings({"unchecked","unused"}) @@ -277,7 +277,7 @@ public FSDataOutputStream createFile(String hdfsUrl,Boolean overwriteOrigion) th if(!useSecurity) { return HDFSCallUtil.createFile(config, hdfsUrl,overwriteOrigion); } else { - return (FSDataOutputStream) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "createFile", new Object[]{config,hdfsUrl,overwriteOrigion}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, config1->HDFSCallUtil.createFile(config,hdfsUrl,overwriteOrigion)); } } @SuppressWarnings({"unchecked","unused"}) @@ -293,7 +293,7 @@ public BufferedReader readStream(String hdfsUrl,String encode) throws HdfsExcept if(!useSecurity) { return HDFSCallUtil.readStream(config, hdfsUrl, encode); } else { - return (BufferedReader) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "readStream", new Object[]{config,hdfsUrl, encode}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, f->HDFSCallUtil.readStream(config, hdfsUrl, encode)); } } @@ -302,7 +302,7 @@ public FileSystem getFileSystem(){ if (!useSecurity) { return HDFSCallUtil.getFileSystem(config); } else { - return (FileSystem) HDFSSecurityUtil.executeSecurityWithProxy(config,config1 -> HDFSCallUtil.getFileSystem(config)); + return HDFSSecurityUtil.executeSecurityWithProxy(config,config1 -> HDFSCallUtil.getFileSystem(config)); } }catch (Exception ex){ logger.error("{}",ex.getMessage()); @@ -320,7 +320,7 @@ public Long getHDFSFileSize(String hdfsUrl) throws HdfsException{ if(!useSecurity) { return HDFSCallUtil.getHDFSFileSize(config, hdfsUrl); } else { - return (Long) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "getHDFSFileSize", new Object[]{config,hdfsUrl}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, f->HDFSCallUtil.getHDFSFileSize(config,hdfsUrl)); } } @@ -328,7 +328,7 @@ public String read(Configuration config,String hdfsUrl,String encode) throws Hdf if(!useSecurity) { return HDFSCallUtil.read(config, hdfsUrl,encode); } else { - return (String) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "read", new Object[]{config,hdfsUrl,encode}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, f->HDFSCallUtil.read(config,hdfsUrl,encode)); } } @@ -336,7 +336,14 @@ public BufferedReader getHDFSDataByReader(String path,String encode) throws Exce if(!useSecurity) { return HDFSCallUtil.getHDFSDataByReader(config, path, encode); } else { - return (BufferedReader) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "getHDFSDataByReader", new Object[]{config,path, encode}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, f-> { + try { + return HDFSCallUtil.getHDFSDataByReader(config, path, encode); + } catch (Exception ex) { + logger.error("{}",ex.getMessage()); + } + return null; + }); } } @@ -344,7 +351,14 @@ public BufferedInputStream getHDFSDataByInputStream(String path) throws Exceptio if(!useSecurity) { return HDFSCallUtil.getHDFSDataByInputStream(config, path); } else { - return (BufferedInputStream) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "getHDFSDataByInputStream", new Object[]{config,path}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, f->{ + try { + return HDFSCallUtil.getHDFSDataByInputStream(config, path); + }catch (Exception ex){ + logger.error("{}",ex.getMessage()); + } + return null; + }); } } @@ -352,7 +366,14 @@ public BufferedInputStream getHDFSDataByRawInputStream(String path) throws Excep if(!useSecurity) { return HDFSCallUtil.getHDFSRawInputStream(config, path); } else { - return (BufferedInputStream) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "getHDFSRawInputStream", new Object[]{config,path}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, f->{ + try { + return HDFSCallUtil.getHDFSRawInputStream(config, path); + }catch (Exception ex){ + logger.error("{}",ex.getMessage()); + } + return null; + }); } } @@ -360,7 +381,14 @@ public BufferedWriter getHDFSDataByWriter(String path, String encode) throws Exc if(!useSecurity) { return HDFSCallUtil.getHDFSDataByWriter(config, path, encode); } else { - return (BufferedWriter) HDFSSecurityUtil.executeHdfsMethodWithSecurity(config, "getHDFSDataByWriter", new Object[]{config,path, encode}); + return HDFSSecurityUtil.executeSecurityWithProxy(config, f->{ + try { + return HDFSCallUtil.getHDFSDataByWriter(config, path, encode); + }catch (Exception ex){ + logger.error("{}",ex.getMessage()); + } + return null; + }); } } diff --git a/hadooptool/src/test/java/com/robin/test/TestResourceReadWrite.java b/hadooptool/src/test/java/com/robin/test/TestResourceReadWrite.java index 0a3ca14a..e9fcf537 100644 --- a/hadooptool/src/test/java/com/robin/test/TestResourceReadWrite.java +++ b/hadooptool/src/test/java/com/robin/test/TestResourceReadWrite.java @@ -36,8 +36,8 @@ public void testWrite(){ builder.addColumn("tdate",Const.META_TYPE_BIGINT,null); builder.resourceCfg("hostName", "127.0.0.1").resourceCfg("protocol", "ftp") - .resourceCfg("port", 21).resourceCfg("userName", "test").resourceCfg("password", "test").fileFormat(Const.FILEFORMATSTR.PROTOBUF.getValue()) - .resPath("/tmp/test1.proto.snappy").protocol(Const.VFS_PROTOCOL.FTP.getValue()).fsType(Const.FILESYSTEM.VFS.getValue()); + .resourceCfg("port", 21).resourceCfg("userName", "test").resourceCfg("password", "test").fileFormat(Const.FILEFORMATSTR.PARQUET.getValue()) + .resPath("/tmp/test1.parquet.snappy").protocol(Const.VFS_PROTOCOL.FTP.getValue()).fsType(Const.FILESYSTEM.VFS.getValue()); DataCollectionMeta colmeta=builder.build(); final AbstractFileWriter jwriter=(AbstractFileWriter) TextFileWriterFactory.getWriterByType(colmeta); @@ -51,6 +51,7 @@ public boolean executeAdditionalOperation(Map map, ResultSetMeta throws SQLException { try{ map.put("tdate",((Timestamp)map.get("start_time")).getTime()); + map.remove("start_time"); jwriter.writeRecord(map); }catch(Exception ex){ ex.printStackTrace(); @@ -71,24 +72,19 @@ public boolean executeAdditionalOperation(Map map, ResultSetMeta } @Test public void testRead(){ - try { - DataCollectionMeta.Builder builder = new DataCollectionMeta.Builder(); - builder.addColumn("id", Const.META_TYPE_BIGINT, null); - builder.addColumn("line_code", Const.META_TYPE_STRING, null); - builder.addColumn("line_name", Const.META_TYPE_STRING, null); - builder.addColumn("tdate", Const.META_TYPE_BIGINT, null); + DataCollectionMeta.Builder builder = new DataCollectionMeta.Builder(); - builder.resourceCfg("hostName", "127.0.0.1").resourceCfg("protocol", "ftp") - .resourceCfg("port", 21).resourceCfg("userName", "test").resourceCfg("password", "test").fileFormat(Const.FILEFORMATSTR.PROTOBUF.getValue()) - .resPath("/tmp/test1.proto.snappy").protocol(Const.VFS_PROTOCOL.FTP.getValue()).fsType(Const.FILESYSTEM.VFS.getValue()); + builder.resourceCfg("hostName", "127.0.0.1").resourceCfg("protocol", "ftp") + .resourceCfg("port", 21).resourceCfg("userName", "test").resourceCfg("password", "test").fileFormat(Const.FILEFORMATSTR.PARQUET.getValue()) + .resPath("/tmp/test1.parquet.snappy").protocol(Const.VFS_PROTOCOL.FTP.getValue()).fsType(Const.FILESYSTEM.VFS.getValue()); - DataCollectionMeta colmeta = builder.build(); - IResourceIterator iterator= TextFileIteratorFactory.getProcessIteratorByType(colmeta); + DataCollectionMeta colmeta = builder.build(); + try(IResourceIterator iterator= TextFileIteratorFactory.getProcessIteratorByType(colmeta)){ while (iterator.hasNext()){ log.info("get record {}",iterator.next()); } }catch (IOException ex){ - + ex.printStackTrace(); } } diff --git a/pom.xml b/pom.xml index 2bfcf5ba..6663dc00 100644 --- a/pom.xml +++ b/pom.xml @@ -117,7 +117,7 @@ 4.0 1.6.2 1.8.2 - 1.10.0 + 1.14.3 9.1-901.jdbc3 1.0.3 1.3.10 @@ -127,7 +127,7 @@ 1.8.9 1.0.4 1.1.2.6 - 1.6 + 1.9 3.6.0 1.0.0-rc7 1.21.0