diff --git a/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java b/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java index b40b636f..6bb112e7 100644 --- a/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java +++ b/common/src/main/java/com/robin/comm/dal/pool/ResourceAccessHolder.java @@ -120,7 +120,7 @@ public InputStreamHolder getPoolInputStreamObject() throws Exception{ if(inputStreamPool!=null){ return inputStreamPool.borrowObject(); }else{ - throw new RuntimeException("input strem config not found!"); + throw new RuntimeException("inputstream config not found!"); } } public void returnInputStreamHolder(InputStreamHolder holder) throws Exception{ diff --git a/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java b/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java index 6a318936..9e07a42e 100644 --- a/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java +++ b/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java @@ -30,6 +30,7 @@ public AbstractFileSystemAccessor(){ } + public static String[] retrieveResource(String path){ String[] ret=new String[2]; ret[1]=getFileSuffix(path); @@ -98,6 +99,9 @@ protected static OutputStream getOutputStreamByPath(String path, OutputStream ou @Override public void init(DataCollectionMeta meta){ + } + public void finishWrite(DataCollectionMeta meta,OutputStream outputStream) { + } @Override diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java index 93d0c514..77fc62df 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java @@ -141,4 +141,7 @@ public void close() throws IOException { public String getIdentifier() { return identifier; } + public AbstractFileSystemAccessor getFileSystemAccessor(){ + return accessUtil; + } } diff --git a/common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java b/common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java index 34841a47..0cdacef9 100644 --- a/common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java +++ b/common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java @@ -108,9 +108,11 @@ public void close() throws IOException{ if(writer!=null){ writer.close(); } + accessUtil.finishWrite(colmeta,out); if(out!=null){ out.close(); } + } protected String getOutputStringByType(Map valueMap,String columnName){ String columnType=columnMap.get(columnName); diff --git a/core/src/main/java/com/robin/core/base/dao/SimpleJdbcDao.java b/core/src/main/java/com/robin/core/base/dao/SimpleJdbcDao.java index 4ef70f28..e6b0a4e8 100644 --- a/core/src/main/java/com/robin/core/base/dao/SimpleJdbcDao.java +++ b/core/src/main/java/com/robin/core/base/dao/SimpleJdbcDao.java @@ -420,6 +420,16 @@ public static int executeOperationWithQuery(final Connection conn, String sql, b } return qRunner.query(conn, sql, extractor::extractData); } + public static int executeOperationWithHandler(final Connection conn, String sql, boolean pmdKnownBroken, final ResultSetHandler handler) throws SQLException { + QueryRunner qRunner ; + if (pmdKnownBroken) { + qRunner = new QueryRunner(pmdKnownBroken); + } else { + qRunner = new QueryRunner(); + } + return qRunner.query(conn, sql, handler); + } + public static int executeOperationWithQuery(final Connection conn, String sql, Object[] param, boolean pmdKnownBroken, final ResultSetOperationExtractor extractor) throws SQLException { QueryRunner qRunner = new QueryRunner(pmdKnownBroken); diff --git a/core/src/main/java/com/robin/core/base/util/FileUtils.java b/core/src/main/java/com/robin/core/base/util/FileUtils.java index aadf5c42..73ff4811 100644 --- a/core/src/main/java/com/robin/core/base/util/FileUtils.java +++ b/core/src/main/java/com/robin/core/base/util/FileUtils.java @@ -2,7 +2,9 @@ import cn.hutool.core.io.FileUtil; import com.google.common.collect.Lists; +import com.robin.core.fileaccess.meta.DataCollectionMeta; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.ObjectUtils; import java.io.File; import java.io.IOException; @@ -104,4 +106,9 @@ public static boolean setWithGroupAndUser(File file,String group,String user){ } return true; } + public static String getWorkingPath(DataCollectionMeta meta){ + return !ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.WORKINGPATHPARAM)) + ? meta.getResourceCfgMap().get(ResourceConst.WORKINGPATHPARAM).toString() + : org.apache.commons.io.FileUtils.getTempDirectoryPath(); + } } diff --git a/core/src/main/java/com/robin/core/base/util/ResourceConst.java b/core/src/main/java/com/robin/core/base/util/ResourceConst.java index 742d4304..2b98504b 100644 --- a/core/src/main/java/com/robin/core/base/util/ResourceConst.java +++ b/core/src/main/java/com/robin/core/base/util/ResourceConst.java @@ -2,6 +2,8 @@ public class ResourceConst { + public static final String WORKINGPATHPARAM="output.workingPath"; + public static final String USETMPFILETAG="output.usingTmpFiles"; public enum IngestType { TYPE_HDFS(1L,"HDFS"), @@ -116,6 +118,38 @@ public enum S3PARAM{ this.value=value; } + public String getValue() { + return value; + } + } + public enum OSSPARAM{ + ENDPOIN("endpoint"), + REGION("region"), + ACESSSKEYID("accessKeyId"), + SECURITYACCESSKEY("securityAccessKey"), + BUCKETNAME("bucketName"), + OBJECTNAME("objectName"); + private String value; + OSSPARAM(String value){ + this.value=value; + } + + public String getValue() { + return value; + } + } + public enum COSPARAM{ + HTTPPROTOCOL("protocol"), + REGION("region"), + ACESSSKEY("accessKey"), + SECURITYKEY("securityKey"), + BUCKETNAME("bucketName"), + OBJECTNAME("objectName"); + private String value; + COSPARAM(String value){ + this.value=value; + } + public String getValue() { return value; } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java new file mode 100644 index 00000000..64fa0d91 --- /dev/null +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java @@ -0,0 +1,195 @@ +package com.robin.comm.fileaccess.fs; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.http.HttpProtocol; +import com.qcloud.cos.model.*; +import com.qcloud.cos.region.Region; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos.transfer.TransferManagerConfiguration; +import com.qcloud.cos.transfer.Upload; +import com.robin.core.base.exception.ResourceNotAvailableException; +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.util.ResourceUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.lang.NonNull; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tencent COS FileSystemAccessor + */ +@Slf4j +public class COSFileSystemAccessor extends AbstractFileSystemAccessor { + private COSClient cosClient; + + @Override + public void init(DataCollectionMeta meta) { + Assert.isTrue(!CollectionUtils.isEmpty(meta.getResourceCfgMap()),"config map is empty!"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.COSPARAM.HTTPPROTOCOL.getValue()),"must provide protocol"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.COSPARAM.REGION.getValue()),"must provide region"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.COSPARAM.ACESSSKEY.getValue()),"must provide accessKey"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.COSPARAM.SECURITYKEY.getValue()),"must provide securityKey"); + Region region=new Region(meta.getResourceCfgMap().get(ResourceConst.COSPARAM.REGION.getValue()).toString()); + ClientConfig config=new ClientConfig(region); + HttpProtocol protocol="https".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.COSPARAM.HTTPPROTOCOL.getValue()).toString())? + HttpProtocol.https:HttpProtocol.http; + config.setHttpProtocol(protocol); + COSCredentials cosCredentials = new BasicCOSCredentials(meta.getResourceCfgMap().get(ResourceConst.COSPARAM.ACESSSKEY.getValue()).toString(), + meta.getResourceCfgMap().get(ResourceConst.COSPARAM.SECURITYKEY.getValue()).toString()); + cosClient = new COSClient(cosCredentials, config); + } + + @Override + public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getInputStreamByConfig(meta); + return Pair.of(getReaderByPath(resourcePath, inputStream, meta.getEncode()),inputStream); + } + + @Override + public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException { + OutputStream outputStream = getOutputStream(meta); + return Pair.of(getWriterByPath(meta.getPath(), outputStream, meta.getEncode()),outputStream); + } + + @Override + public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStream(meta); + } + + @Override + public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStreamByPath(resourcePath, getOutputStream(meta)); + } + + @Override + public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getInputStreamByConfig(meta); + return getInputStreamByPath(resourcePath, inputStream); + } + + @Override + public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getInputStreamByConfig(meta); + } + + @Override + public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName= meta.getResourceCfgMap().get("bucketName").toString(); + return exists(bucketName,resourcePath); + } + + @Override + public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName= meta.getResourceCfgMap().get("bucketName").toString(); + if(exists(bucketName,resourcePath)){ + ObjectMetadata metadata=cosClient.getObjectMetadata(bucketName,resourcePath); + if(!ObjectUtils.isEmpty(metadata)){ + return metadata.getContentLength(); + } + } + return 0; + } + private InputStream getInputStreamByConfig(DataCollectionMeta meta) { + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()),"must provide bucketName"); + String bucketName= meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()).toString(); + String objectName= meta.getPath(); + return getObject(bucketName,objectName); + } + private static OutputStream getOutputStream(DataCollectionMeta meta) throws IOException { + OutputStream outputStream; + if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){ + String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); + String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); + outputStream= Files.newOutputStream(Paths.get(tmpFilePath)); + }else { + outputStream = new ByteArrayOutputStream(); + } + return outputStream; + } + private COSObjectInputStream getObject(@NonNull String bucketName,@NonNull String key) { + GetObjectRequest request = new GetObjectRequest(bucketName, key); + COSObject object = cosClient.getObject(request); + if (!ObjectUtils.isEmpty(object)) { + return object.getObjectContent(); + } else { + throw new ResourceNotAvailableException("key " + key + " not found in cos "); + } + } + private boolean exists(String bucketName,String key){ + return cosClient.doesObjectExist(bucketName,key); + } + private boolean bucketExists(String bucketName){ + return cosClient.doesBucketExist(bucketName); + } + private TransferManager getManager() { + ExecutorService threadPool = Executors.newFixedThreadPool(32); + + // 传入一个 threadpool, 若不传入线程池,默认 TransferManager 中会生成一个单线程的线程池。 + TransferManager transferManager = new TransferManager(cosClient, threadPool); + // 设置高级接口的配置项 + // 分块复制阈值和分块大小分别为 5MB 和 1MB + TransferManagerConfiguration transferManagerConfiguration = new TransferManagerConfiguration(); + transferManagerConfiguration.setMultipartCopyThreshold(5 * 1024 * 1024L); + transferManagerConfiguration.setMultipartCopyPartSize(1024 * 1024L); + transferManager.setConfiguration(transferManagerConfiguration); + return transferManager; + } + + @Override + public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { + Assert.notNull(meta.getResourceCfgMap().get("bucketName"),"must provide bucketName"); + try{ + upload(meta,outputStream); + }catch (InterruptedException | IOException ex){ + log.error("{}",ex.getMessage()); + } + } + + private boolean upload(DataCollectionMeta meta, OutputStream outputStream) throws IOException,InterruptedException { + String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + TransferManager transferManager=getManager(); + PutObjectRequest request=null; + String tmpFilePath=null; + if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())){ + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentLength(((ByteArrayOutputStream)outputStream).size()); + request = new PutObjectRequest(bucketName, meta.getPath(), new ByteArrayInputStream(((ByteArrayOutputStream)outputStream).toByteArray()),objectMetadata); + }else{ + outputStream.close(); + String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); + tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); + request=new PutObjectRequest(bucketName,meta.getPath(),new File(tmpFilePath)); + } + + try { + Upload upload = transferManager.upload(request, null); + UploadResult result = upload.waitForUploadResult(); + if (!ObjectUtils.isEmpty(result)) { + return true; + } + } finally { + if (!ObjectUtils.isEmpty(transferManager)) { + transferManager.shutdownNow(true); + } + if(!ObjectUtils.isEmpty(tmpFilePath)){ + FileUtils.deleteQuietly(new File(tmpFilePath)); + } + } + return false; + } + +} diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java new file mode 100644 index 00000000..378110d3 --- /dev/null +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java @@ -0,0 +1,163 @@ +package com.robin.comm.fileaccess.fs; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.common.comm.ResponseMessage; +import com.aliyun.oss.model.Bucket; +import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.PutObjectResult; +import com.robin.core.base.exception.MissingConfigException; +import com.robin.core.base.util.Const; +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.util.ResourceUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * Aliyun OSS FileSystemAccessor + */ +@Slf4j +public class OSSFileSystemAccessor extends AbstractFileSystemAccessor { + public OSSFileSystemAccessor(){ + this.identifier= Const.FILESYSTEM.ALIYUN.getValue(); + } + private OSS ossClient; + + @Override + public void init(DataCollectionMeta meta){ + Assert.isTrue(!CollectionUtils.isEmpty(meta.getResourceCfgMap()),"config map is empty!"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ENDPOIN.getValue()),"must provide endpoint"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.REGION.getValue()),"must provide region"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ACESSSKEYID.getValue()),"must provide accessKey"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.SECURITYACCESSKEY.getValue()),"must provide securityAccessKey"); + + String endpoint=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ENDPOIN.getValue()).toString(); + String region=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.REGION.getValue()).toString(); + String accessKeyId=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ACESSSKEYID.getValue()).toString(); + String securityAccessKey=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.SECURITYACCESSKEY.getValue()).toString(); + + CredentialsProvider credentialsProvider= CredentialsProviderFactory.newDefaultCredentialProvider(accessKeyId,securityAccessKey); + ossClient= OSSClientBuilder.create().endpoint(endpoint).credentialsProvider(credentialsProvider) + .region(region).build(); + } + + @Override + public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getInputStreamByConfig(meta); + return Pair.of(getReaderByPath(resourcePath, inputStream, meta.getEncode()),inputStream); + } + + @Override + public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException { + OutputStream outputStream = getOutputStream(meta); + return Pair.of(getWriterByPath(meta.getPath(), outputStream, meta.getEncode()),outputStream); + } + + @Override + public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStream(meta); + } + + @Override + public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStreamByPath(resourcePath, getOutputStream(meta)); + } + + @Override + public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getInputStreamByConfig(meta); + return getInputStreamByPath(resourcePath, inputStream); + } + + @Override + public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getInputStreamByConfig(meta); + } + + @Override + public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName= meta.getResourceCfgMap().get("bucketName").toString(); + return ossClient.doesObjectExist(bucketName,resourcePath); + } + + @Override + public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName= meta.getResourceCfgMap().get("bucketName").toString(); + if(exists(meta,resourcePath)){ + OSSObject object=ossClient.getObject(bucketName,resourcePath); + return object.getObjectMetadata().getContentLength(); + } + return 0; + } + @Override + public void finishWrite(DataCollectionMeta meta,OutputStream outputStream) { + Assert.notNull(meta.getResourceCfgMap().get("bucketName"),"must provide bucketName"); + String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + try{ + putObject(bucketName,meta,outputStream); + }catch (IOException ex){ + log.error("{}",ex.getMessage()); + } + } + private static OutputStream getOutputStream(DataCollectionMeta meta) throws IOException { + OutputStream outputStream; + if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){ + String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); + String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); + outputStream= Files.newOutputStream(Paths.get(tmpFilePath)); + }else { + outputStream = new ByteArrayOutputStream(); + } + return outputStream; + } + private InputStream getInputStreamByConfig(DataCollectionMeta meta) { + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()),"must provide bucketName"); + String bucketName= meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()).toString(); + String objectName= meta.getPath(); + return getObject(bucketName,objectName); + } + private Bucket createBucket(String bucketName){ + return ossClient.createBucket(bucketName); + } + private boolean putObject(String bucketName,DataCollectionMeta meta,OutputStream outputStream) throws IOException{ + PutObjectResult result; + String tmpFilePath=null; + if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) { + result = ossClient.putObject(bucketName, meta.getPath(), new ByteArrayInputStream(((ByteArrayOutputStream)outputStream).toByteArray())); + }else{ + outputStream.close(); + String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); + tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); + result=ossClient.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath))); + } + ResponseMessage message=result.getResponse(); + if(message.isSuccessful() && !ObjectUtils.isEmpty(tmpFilePath)){ + FileUtils.deleteQuietly(new File(tmpFilePath)); + } + return message.isSuccessful(); + } + private InputStream getObject(String bucketName,String objectName){ + if(ossClient.doesObjectExist(bucketName,objectName)) { + OSSObject object = ossClient.getObject(bucketName, objectName); + if (object.getResponse().isSuccessful()) { + return object.getObjectContent(); + } else { + throw new RuntimeException("objectName " + objectName + " can not get!"); + } + }else{ + throw new MissingConfigException(" key "+objectName+" not in OSS bucket "+bucketName); + } + } +} diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java new file mode 100644 index 00000000..fb7d317a --- /dev/null +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java @@ -0,0 +1,114 @@ +package com.robin.comm.fileaccess.fs; + + +import com.robin.core.base.util.Const; +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.util.ResourceUtil; +import com.robin.dfs.aws.AwsUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * Amazon AWS FileSystemAccessor + */ +public class S3FileSystemAccessor extends AbstractFileSystemAccessor { + private S3Client client; + private S3AsyncClient asyncClient; + private Region region; + public S3FileSystemAccessor(){ + this.identifier= Const.FILESYSTEM.S3.getValue(); + } + + @Override + public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream stream=getRawInputStream(meta, resourcePath); + return Pair.of(getReaderByPath(resourcePath, stream, meta.getEncode()),stream); + } + + @Override + public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException { + OutputStream outputStream=getOutputStream(meta); + return Pair.of(getWriterByPath(meta.getPath(), outputStream, meta.getEncode()),outputStream); + } + + @Override + public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStreamByPath(resourcePath, getOutputStream(meta)); + } + + @Override + public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStream(meta); + //Pair> pair = AwsUtils.putAsync(asyncClient, bucketName, resourcePath); + //futureMap.put(resourcePath, pair.getValue()); + } + + + + @Override + public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return null; + } + + @Override + public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.BUCKETNAME.getValue()).toString(); + return AwsUtils.getObject(client, bucketName, resourcePath); + } + + @Override + public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.BUCKETNAME.getValue()).toString(); + return AwsUtils.exists(client,bucketName,meta.getPath()); + } + + @Override + public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { + return 0; + } + + @Override + public void init(DataCollectionMeta meta) { + Assert.notNull(meta, ""); + if (!CollectionUtils.isEmpty(meta.getResourceCfgMap())) { + if (meta.getResourceCfgMap().containsKey(ResourceConst.S3PARAM.ACCESSKEY.getValue()) && + meta.getResourceCfgMap().containsKey(ResourceConst.S3PARAM.SECRET.getValue())) { + Object regionName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.REGION.getValue()); + region = ObjectUtils.isEmpty(regionName) ? Region.US_EAST_1 : Region.of(regionName.toString()); + client = AwsUtils.getClientByCredential(region, meta.getResourceCfgMap().get(ResourceConst.S3PARAM.ACCESSKEY.getValue()).toString(), meta.getResourceCfgMap().get(ResourceConst.S3PARAM.SECRET.getValue()).toString()); + asyncClient = AwsUtils.getAsyncClientByCredential(region, meta.getResourceCfgMap().get(ResourceConst.S3PARAM.ACCESSKEY.getValue()).toString(), meta.getResourceCfgMap().get(ResourceConst.S3PARAM.SECRET.getValue()).toString()); + } + } + } + + + @Override + public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { + String bucketName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.BUCKETNAME.getValue()).toString(); + ByteArrayOutputStream outputStream1=(ByteArrayOutputStream) outputStream; + int size=outputStream1.size(); + AwsUtils.put(client,bucketName,meta.getPath(),new ByteArrayInputStream(outputStream1.toByteArray()),new Long(size)); + } + private static OutputStream getOutputStream(DataCollectionMeta meta) throws IOException { + OutputStream outputStream; + if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){ + String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); + String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); + outputStream= Files.newOutputStream(Paths.get(tmpFilePath)); + }else { + outputStream = new ByteArrayOutputStream(); + } + return outputStream; + } +} diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java index 44c67ffd..539d4d4e 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java @@ -93,7 +93,7 @@ public void beforeProcess() { long size = instream.available();//accessUtil.getInputStreamSize(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); //file size too large ,can not store in ByteArrayOutputStream if (size >= maxSize) { - String tmpPath = (!ObjectUtils.isEmpty(colmeta.getResourceCfgMap().get("output.tmppath"))) ? colmeta.getResourceCfgMap().get("output.tmppath").toString() : FileUtils.getTempDirectoryPath(); + String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(colmeta); String tmpFilePath = "file:///" + tmpPath + ResourceUtil.getProcessFileName(colmeta.getPath()); tmpFile = new File(new URL(tmpFilePath).toURI()); copyToLocal(tmpFile, instream); diff --git a/hadooptool/src/main/java/com/robin/dfs/aws/AwsUtils.java b/hadooptool/src/main/java/com/robin/dfs/aws/AwsUtils.java new file mode 100644 index 00000000..a360734e --- /dev/null +++ b/hadooptool/src/main/java/com/robin/dfs/aws/AwsUtils.java @@ -0,0 +1,137 @@ +package com.robin.dfs.aws; + + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.util.CollectionUtils; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.utils.async.OutputStreamPublisher; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +public class AwsUtils { + public static S3Client getHttpClient(){ + return S3Client.builder().httpClientBuilder(ApacheHttpClient.builder()).build(); + } + public static S3Client getClientByCredential(Region region,String accessKey,String secret){ + return S3Client.builder().region(region).credentialsProvider(()->new AwsCredentials() { + @Override + public String accessKeyId() { + return accessKey; + } + @Override + public String secretAccessKey() { + return secret; + } + }).build(); + } + public static S3AsyncClient getAsyncClientByCredential(Region region,String accessKey,String secret){ + return S3AsyncClient.builder().region(region).credentialsProvider(()->new AwsCredentials() { + @Override + public String accessKeyId() { + return accessKey; + } + @Override + public String secretAccessKey() { + return secret; + } + }).build(); + } + public static S3Client getClientByRegion(Region region){ + ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create(); + return S3Client.builder().credentialsProvider(credentialsProvider).region(region).build(); + } + public static S3AsyncClient getAsyncClientByRegion(Region region){ + ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create(); + return S3AsyncClient.builder().credentialsProvider(credentialsProvider).region(region).build(); + } + public static void createBucket(S3Client client,String bucketName){ + try { + client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()); + client.waiter().waitUntilBucketExists(HeadBucketRequest.builder().bucket(bucketName).build()); + }catch (S3Exception ex){ + + } + } + public static boolean put(S3Client client,String bucketName, String key, InputStream stream,Long length){ + createBucket(client,bucketName); + PutObjectRequest request=PutObjectRequest.builder().bucket(bucketName).key(key).build(); + PutObjectResponse response= client.putObject(request, RequestBody.fromInputStream(stream,length)); + return response.bucketKeyEnabled(); + } + public static Pair> putAsync(S3AsyncClient s3AsyncClient,String bucketName, String key){ + + OutputStreamPublisher publisher=new OutputStreamPublisher(); + + AsyncRequestBody body=AsyncRequestBody.fromPublisher(publisher); + + CompletableFuture responseFuture = s3AsyncClient.putObject(r->r.bucket(bucketName).key(key),body); + return Pair.of(publisher,responseFuture); + } + public static ResponseInputStream getObject(S3Client client,String bucketName,String key){ + try { + GetObjectRequest request = GetObjectRequest.builder().bucket(bucketName).key(key).build(); + + return client.getObject(request); + }catch (S3Exception ex){ + + } + return null; + } + public static boolean exists(S3Client client,String bucketName,String objectName){ + HeadObjectRequest objectRequest= HeadObjectRequest.builder().bucket(bucketName).key(objectName).build(); + try{ + client.headObject(objectRequest); + return true; + }catch (NoSuchKeyException ex){ + return false; + } + } + public static boolean bucketExists(S3Client client,String bucketName){ + HeadBucketRequest bucketRequest= HeadBucketRequest.builder().bucket(bucketName).build(); + try{ + client.headBucket(bucketRequest); + return true; + }catch (NoSuchBucketException ex){ + return false; + } + } + public static List> list(S3Client client, String bucketName){ + List> retList=new ArrayList<>(); + try { + ListObjectsRequest request = ListObjectsRequest.builder().bucket(bucketName).build(); + ListObjectsResponse response = client.listObjects(request); + if(!CollectionUtils.isEmpty(response.contents())){ + for(S3Object obj:response.contents()){ + Map tmap=new HashMap<>(); + tmap.put("key",obj.key()); + tmap.put("owner",obj.owner().displayName()); + tmap.put("size",obj.size()); + retList.add(tmap); + } + } + }catch (S3Exception ex){ + + } + return retList; + } + + + +} diff --git a/hadooptool/src/main/resources/META-INF/services/com.robin.core.fileaccess.fs.IFileSystemAccessor b/hadooptool/src/main/resources/META-INF/services/com.robin.core.fileaccess.fs.IFileSystemAccessor index d0366d02..8548b10a 100644 --- a/hadooptool/src/main/resources/META-INF/services/com.robin.core.fileaccess.fs.IFileSystemAccessor +++ b/hadooptool/src/main/resources/META-INF/services/com.robin.core.fileaccess.fs.IFileSystemAccessor @@ -1 +1,4 @@ -com.robin.comm.fileaccess.fs.HdfsFileSystemAccessor \ No newline at end of file +com.robin.comm.fileaccess.fs.HdfsFileSystemAccessor +com.robin.comm.fileaccess.fs.OSSFileSystemAccessor +com.robin.comm.fileaccess.fs.COSFileSystemAccessor +com.robin.comm.fileaccess.fs.S3FileSystemAccessor \ No newline at end of file