diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java index 11a5b8c2762f1..903ad2809d416 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java @@ -32,6 +32,8 @@ import java.util.List; import java.util.Map; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION; + /** * Base class for AWS processors that uses AWSCredentialsProvider interface for creating AWS clients. * diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java index f236f67ef840e..ca30e30c8fb3f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -33,10 +33,8 @@ import com.amazonaws.services.s3.model.Grantee; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.PropertyDescriptor; @@ -45,7 +43,6 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import org.apache.nifi.processors.aws.AbstractAWSProcessor; @@ -60,11 +57,13 @@ import java.util.List; import java.util.Map; -import static java.lang.String.format; import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER; import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V4_SIGNER; import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER; import static org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.ATTRIBUTE_DEFINED_REGION; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion; public abstract class AbstractS3Processor extends AbstractAWSCredentialsProviderProcessor { @@ -164,16 +163,6 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider .dependsOn(SIGNER_OVERRIDE, CUSTOM_SIGNER) .build(); - public static final String S3_REGION_ATTRIBUTE = "s3.region" ; - static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region", - "Use '" + S3_REGION_ATTRIBUTE + "' Attribute", - "Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region."); - - public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(AbstractAWSProcessor.REGION) - .allowableValues(getAvailableS3Regions()) - .build(); - public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder() .name("encryption-service") .displayName("Encryption Service") @@ -454,38 +443,11 @@ protected final CannedAccessControlList createCannedACL(final ProcessContext con return cannedAcl; } - private Region parseRegionValue(String regionValue) { - if (regionValue == null) { - throw new ProcessException(format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE)); - } - - try { - return Region.getRegion(Regions.fromName(regionValue)); - } catch (Exception e) { - throw new ProcessException(format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e); - } - } - - private Region resolveRegion(final ProcessContext context, final Map attributes) { - String regionValue = context.getProperty(S3_REGION).getValue(); - - if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) { - regionValue = attributes.get(S3_REGION_ATTRIBUTE); - } - - return parseRegionValue(regionValue); - } - private boolean isAttributeDefinedRegion(final ProcessContext context) { String regionValue = context.getProperty(S3_REGION).getValue(); return ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue); } - private static AllowableValue[] getAvailableS3Regions() { - final AllowableValue[] availableRegions = getAvailableRegions(); - return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION); - } - private AwsClientDetails getAwsClientDetails(final ProcessContext context, final Map attributes) { final Region region = resolveRegion(context, attributes); return new AwsClientDetails(region); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java new file mode 100644 index 0000000000000..887c92d0ac6bd --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.util; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Utility class for AWS region methods. This class uses AWS SDK v1. + * + */ +public final class RegionUtilV1 { + + private RegionUtilV1() { + } + + public static final String S3_REGION_ATTRIBUTE = "s3.region" ; + public static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region", + "Use '" + S3_REGION_ATTRIBUTE + "' Attribute", + "Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region."); + + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .name("Region") + .description("The AWS Region to connect to.") + .required(true) + .allowableValues(getAvailableRegions()) + .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .build(); + + public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(REGION) + .allowableValues(getAvailableS3Regions()) + .build(); + + public static Region parseRegionValue(String regionValue) { + if (regionValue == null) { + throw new ProcessException(String.format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE)); + } + + try { + return Region.getRegion(Regions.fromName(regionValue)); + } catch (Exception e) { + throw new ProcessException(String.format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e); + } + } + + public static Region resolveRegion(final PropertyContext context, final Map attributes) { + String regionValue = context.getProperty(S3_REGION).getValue(); + + if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) { + regionValue = attributes.get(S3_REGION_ATTRIBUTE); + } + + return parseRegionValue(regionValue); + } + + public static AllowableValue[] getAvailableS3Regions() { + final AllowableValue[] availableRegions = getAvailableRegions(); + return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION); + } + + public static AllowableValue createAllowableValue(final Regions region) { + return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName()); + } + + public static AllowableValue[] getAvailableRegions() { + final List values = new ArrayList<>(); + for (final Regions region : Regions.values()) { + values.add(createAllowableValue(region)); + } + return values.toArray(new AllowableValue[0]); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java index 476acd97f97b5..8658d2024bcf5 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java @@ -138,8 +138,8 @@ public abstract class AbstractAwsProcessornifi-listed-entity 1.23.2.3.2.2.0-SNAPSHOT + + org.apache.nifi + nifi-resource-transfer + 1.23.2.3.2.2.0-SNAPSHOT + + + org.apache.nifi + nifi-file-resource-service-api + org.apache.nifi nifi-aws-abstract-processors @@ -137,6 +146,12 @@ com.amazonaws aws-java-sdk-textract + + org.apache.nifi + nifi-file-resource-service + 1.23.2.3.2.2.0-SNAPSHOT + test + diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index b93d7ebc4a6ec..bfffd8d2bcb22 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -40,6 +40,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.aws.s3.S3FileResourceService.S3_REGION; + @SupportsBatching @WritesAttributes({ diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index cac2a6a3f7419..c988df5227a31 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -60,6 +60,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + @SupportsBatching @SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class}) @InputRequirement(Requirement.INPUT_REQUIRED) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index 954bfa08563cf..b79b0bd297585 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -100,6 +100,9 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + @PrimaryNodeOnly @TriggerSerially @TriggerWhenEmpty diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 10632082cb685..069b8d805e06a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.TreeSet; @@ -59,14 +60,15 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.fileresource.service.api.FileResource; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.transfer.ResourceTransferSource; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3Client; @@ -89,6 +91,11 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; +import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource; + @SupportsBatching @SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @@ -286,6 +293,8 @@ public class PutS3Object extends AbstractS3Processor { SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, + RESOURCE_TRANSFER_SOURCE, + FILE_RESOURCE_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, STORAGE_CLASS, @@ -529,6 +538,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final FlowFile ff = flowFile; final Map attributes = new HashMap<>(); final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key()); + final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue()); + attributes.put(S3_BUCKET_KEY, bucket); attributes.put(S3_OBJECT_KEY, key); @@ -547,12 +558,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session */ try { final FlowFile flowFileCopy = flowFile; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - final ObjectMetadata objectMetadata = new ObjectMetadata(); - objectMetadata.setContentLength(ff.getSize()); + Optional optFileResource = getFileResource(resourceTransferSource, context, flowFile.getAttributes()); + try (InputStream in = optFileResource + .map(FileResource::getInputStream) + .orElseGet(() -> session.read(flowFileCopy))) { + final ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentLength(optFileResource.map(FileResource::getSize).orElseGet(ff::getSize)); final String contentType = context.getProperty(CONTENT_TYPE) .evaluateAttributeExpressions(ff).getValue(); @@ -869,9 +880,10 @@ public void process(final InputStream rawIn) throws IOException { throw (e); } } - } - } - }); + } catch (IOException e) { + getLogger().error("Error during upload of flow files: " + e.getMessage()); + throw e; + } if (!attributes.isEmpty()) { flowFile = session.putAllAttributes(flowFile, attributes); @@ -882,25 +894,25 @@ public void process(final InputStream rawIn) throws IOException { final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, url, millis); - getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis}); + getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis}); try { removeLocalState(cacheKey); } catch (IOException e) { getLogger().info("Error trying to delete key {} from cache: {}", new Object[]{cacheKey, e.getMessage()}); } - } catch (final ProcessException | AmazonClientException pe) { - extractExceptionDetails(pe, session, flowFile); - if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) { - getLogger().info(pe.getMessage()); + + } catch (final ProcessException | AmazonClientException | IOException e) { + extractExceptionDetails(e, session, flowFile); + if (e.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) { + getLogger().info(e.getMessage()); session.rollback(); } else { - getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe}); + getLogger().error("Failed to put {} to Amazon S3 due to {}", flowFile, e); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } } - } private final Lock s3BucketLock = new ReentrantLock(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/S3FileResourceService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/S3FileResourceService.java new file mode 100644 index 0000000000000..5d1f2d31fe749 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/S3FileResourceService.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.S3Object; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.processors.aws.util.RegionUtilV1; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion; +import static org.apache.nifi.util.StringUtils.isBlank; + +@Tags({"Amazon", "S3", "AWS", "file", "resource"}) +@SeeAlso({FetchS3Object.class}) +@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource for other components.") +public class S3FileResourceService extends AbstractControllerService implements FileResourceService { + + public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.BUCKET) + .build(); + + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractS3Processor.KEY) + .build(); + + public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(RegionUtilV1.S3_REGION) + .build(); + + private static final List PROPERTIES = Arrays.asList( + BUCKET, + KEY, + S3_REGION, + AWS_CREDENTIALS_PROVIDER_SERVICE); + + private final Cache clientCache = Caffeine.newBuilder().build(); + + private volatile PropertyContext context; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.context = context; + } + + @OnDisabled + public void onDisabled() { + this.context = null; + clientCache.asMap().values().forEach(AmazonS3::shutdown); + clientCache.invalidateAll(); + clientCache.cleanUp(); + } + + @Override + public FileResource getFileResource(Map attributes) { + final AWSCredentialsProviderService awsCredentialsProviderService = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE) + .asControllerService(AWSCredentialsProviderService.class); + final AmazonS3 client = getS3Client(attributes, awsCredentialsProviderService.getCredentialsProvider()); + + try { + return fetchObject(client, attributes); + } catch (final ProcessException | SdkClientException e) { + throw new ProcessException("Failed to fetch s3 object", e); + } + } + + /** + * Fetches s3 object from the provided bucket and returns it as FileResource + * + * @param client amazon s3 client + * @param attributes configuration attributes + * @return fetched s3 object as FileResource + * @throws ProcessException if the object 'bucketName/key' does not exist + */ + private FileResource fetchObject(final AmazonS3 client, final Map attributes) throws ProcessException, + SdkClientException { + final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue(); + + if (isBlank(bucketName) || isBlank(key)) { + throw new ProcessException("Bucket name or key value is missing"); + } + + if (!client.doesObjectExist(bucketName, key)) { + throw new ProcessException(String.format("Object '%s/%s' does not exist in s3", bucketName, key)); + } + + final S3Object object = client.getObject(bucketName, key); + return new FileResource(object.getObjectContent(), object.getObjectMetadata().getContentLength()); + } + + protected AmazonS3 getS3Client(Map attributes, AWSCredentialsProvider credentialsProvider) { + final Region region = resolveRegion(context, attributes); + return clientCache.get(region, ignored -> AmazonS3Client.builder() + .withRegion(region.getName()) + .withCredentials(credentialsProvider) + .build()); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java index 9a79839b6834d..822f98acfccbf 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java @@ -49,6 +49,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + @SupportsBatching @WritesAttributes({ diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java index 90c50e82ab548..6ae050eb4d352 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java @@ -38,7 +38,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService; -import org.apache.nifi.processors.aws.s3.AbstractS3Processor; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.StringUtils; @@ -109,8 +109,8 @@ public class StandardS3EncryptionService extends AbstractControllerService imple .displayName("KMS Region") .description("The Region of the AWS Key Management Service. Only used in case of Client-side KMS.") .required(false) - .allowableValues(AbstractS3Processor.getAvailableRegions()) - .defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .allowableValues(RegionUtilV1.getAvailableRegions()) + .defaultValue(RegionUtilV1.createAllowableValue(Regions.DEFAULT_REGION).getValue()) .build(); private String keyValue = ""; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java index d1c2af4acb73e..0f104f2349823 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java @@ -54,6 +54,8 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION; + @SupportsBatching @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"Amazon", "AWS", "Client", "Gateway-API", "Rest", "http", "https"}) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 390d4c908ba7c..30b9bcde8690e 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -15,3 +15,4 @@ org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService +org.apache.nifi.processors.aws.s3.S3FileResourceService diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index 6e4081683738c..1c2e499c8c9e1 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -37,9 +37,14 @@ import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.Tag; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.util.file.FileUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; @@ -49,8 +54,10 @@ import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.fail; @@ -63,6 +70,8 @@ * @see ITListS3 */ public abstract class AbstractS3IT { + private static final Logger logger = LoggerFactory.getLogger(AbstractS3IT.class); + protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt"; protected final static String REGION = System.getProperty("it.aws.region", "us-west-1"); @@ -76,6 +85,7 @@ public abstract class AbstractS3IT { // Static so multiple Tests can use same client protected static AmazonS3Client client; protected static AWSKMS kmsClient; + private final List addedKeys = new ArrayList<>(); @BeforeAll public static void oneTimeSetup() { @@ -116,36 +126,42 @@ public static void oneTimeSetup() { } } + @BeforeEach + public void clearKeys() { + addedKeys.clear(); + } + + @AfterEach + public void emptyBucket() { + if (!client.doesBucketExistV2(BUCKET_NAME)) { + return; + } + + ObjectListing objectListing = client.listObjects(BUCKET_NAME); + while (true) { + for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { + client.deleteObject(BUCKET_NAME, objectSummary.getKey()); + } + + if (objectListing.isTruncated()) { + objectListing = client.listNextBatchOfObjects(objectListing); + } else { + break; + } + } + } + @AfterAll public static void oneTimeTearDown() { - // Empty the bucket before deleting it. try { - if (client == null) { + if (client == null || !client.doesBucketExistV2(BUCKET_NAME)) { return; } - ObjectListing objectListing = client.listObjects(BUCKET_NAME); - - while (true) { - for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { - client.deleteObject(BUCKET_NAME, objectSummary.getKey()); - } - - if (objectListing.isTruncated()) { - objectListing = client.listNextBatchOfObjects(objectListing); - } else { - break; - } - } - DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME); client.deleteBucket(dbr); } catch (final AmazonS3Exception e) { - System.err.println("Unable to delete bucket " + BUCKET_NAME + e.toString()); - } - - if (client.doesBucketExist(BUCKET_NAME)) { - fail("Incomplete teardown, subsequent tests might fail"); + logger.error("Unable to delete bucket {}", BUCKET_NAME, e); } } @@ -171,6 +187,23 @@ protected void putFileWithUserMetadata(String key, File file, Map objectTags) { PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, file); putRequest.setTagging(new ObjectTagging(objectTags)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java index 1f4154703d5d5..b06c80c849843 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.Map; +import org.apache.nifi.processors.aws.util.RegionUtilV1; + /** * Provides integration level testing with actual AWS S3 resources for {@link DeleteS3Object} and requires additional configuration and resources to work. @@ -40,7 +42,7 @@ public void testSimpleDelete() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object()); runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(DeleteS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); @@ -60,7 +62,7 @@ public void testDeleteFolder() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object()); runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(DeleteS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); @@ -88,7 +90,7 @@ public void testDeleteFolderUsingCredentialsProviderService() throws Throwable { runner.assertValid(serviceImpl); runner.setProperty(DeleteS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); - runner.setProperty(DeleteS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); @@ -108,7 +110,7 @@ public void testDeleteFolderNoExpressionLanguage() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object()); runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(DeleteS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME); runner.setProperty(DeleteS3Object.KEY, "folder/delete-me"); @@ -126,7 +128,7 @@ public void testTryToDeleteNotExistingFile() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object()); runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(DeleteS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java index 1b5d073d915e4..8ee7e7de038f7 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.aws.s3; import com.amazonaws.services.s3.model.ObjectMetadata; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.MockFlowFile; @@ -42,7 +43,7 @@ public void testSimpleGet() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(FetchS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); @@ -65,7 +66,7 @@ public void testSimpleGetEncrypted() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(FetchS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); @@ -96,7 +97,7 @@ public void testFetchS3ObjectUsingCredentialsProviderService() throws Throwable runner.assertValid(serviceImpl); runner.setProperty(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); - runner.setProperty(FetchS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); @@ -114,7 +115,7 @@ public void testTryToFetchNotExistingFile() { final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(FetchS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); @@ -134,7 +135,7 @@ public void testContentsOfFileRetrieved() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(FetchS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); final Map attrs = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java index d89b3a887055b..c2401ffea7a08 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java @@ -212,12 +212,4 @@ public void testUserMetadataWritten() throws FileNotFoundException { flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.2", "dummyvalue2"); } - private void waitForFilesAvailable() { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 74f3b4febb968..404b185336722 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -26,10 +26,13 @@ import com.amazonaws.services.s3.model.MultipartUploadListing; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.Region; import com.amazonaws.services.s3.model.StorageClass; import com.amazonaws.services.s3.model.Tag; import org.apache.commons.codec.binary.Base64; +import org.apache.nifi.fileresource.service.StandardFileResourceService; +import org.apache.nifi.fileresource.service.api.FileResourceService; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; @@ -41,6 +44,8 @@ import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService; +import org.apache.nifi.processors.aws.util.RegionUtilV1; +import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; @@ -67,6 +72,12 @@ import java.util.Map; import java.util.regex.Pattern; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -189,7 +200,7 @@ private void testPutThenFetch(String sseAlgorithm) throws IOException { runner = TestRunners.newTestRunner(new FetchS3Object()); runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(FetchS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); runner.enqueue(new byte[0], attrs); @@ -219,6 +230,61 @@ public void testPutThenFetchWithSSE() throws IOException { testPutThenFetch(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); } + @Test + public void testPutFromLocalFile() throws Exception { + TestRunner runner = initTestRunner(); + String attributeName = "file.path"; + Path resourcePath = getResourcePath(SAMPLE_FILE_RESOURCE_NAME); + + String serviceId = FileResourceService.class.getSimpleName(); + FileResourceService service = new StandardFileResourceService(); + runner.addControllerService(serviceId, service); + runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName)); + runner.enableControllerService(service); + + runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue()); + runner.setProperty(FILE_RESOURCE_SERVICE, serviceId); + + Map attributes = new HashMap<>(); + attributes.put(attributeName, resourcePath.toString()); + runner.enqueue(resourcePath, attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0); + flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + List objectSummaries = client.listObjects(BUCKET_NAME).getObjectSummaries(); + assertThat(objectSummaries, hasSize(1)); + assertEquals(objectSummaries.get(0).getKey(), resourcePath.getFileName().toString()); + assertThat(objectSummaries.get(0).getSize(), greaterThan(0L)); + } + + @Test + public void testPutFromNonExistentLocalFile() throws Exception { + TestRunner runner = initTestRunner(); + String attributeName = "file.path"; + + String serviceId = FileResourceService.class.getSimpleName(); + FileResourceService service = new StandardFileResourceService(); + runner.addControllerService(serviceId, service); + runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName)); + runner.enableControllerService(service); + + runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue()); + runner.setProperty(FILE_RESOURCE_SERVICE, serviceId); + + String filePath = "nonexistent.txt"; + + Map attributes = new HashMap<>(); + attributes.put(attributeName, filePath); + + runner.enqueue("", attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_FAILURE, 1); + assertThat(client.listObjects(BUCKET_NAME).getObjectSummaries(), empty()); + } @Test public void testPutS3ObjectUsingCredentialsProviderService() throws Throwable { @@ -234,7 +300,7 @@ public void testPutS3ObjectUsingCredentialsProviderService() throws Throwable { runner.assertValid(serviceImpl); runner.setProperty(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); @@ -256,7 +322,7 @@ public void testMetaData() throws IOException { final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); PropertyDescriptor prop1 = processor.getSupportedDynamicPropertyDescriptor("TEST-PROP-1"); runner.setProperty(prop1, "TESTING-1-2-3"); @@ -323,7 +389,7 @@ public void testContentDispositionNull() throws IOException { runner = TestRunners.newTestRunner(new FetchS3Object()); runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(FetchS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); runner.enqueue(new byte[0], attrs); @@ -467,7 +533,7 @@ public void testDynamicProperty() { final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); PropertyDescriptor testAttrib = processor.getSupportedDynamicPropertyDescriptor(DYNAMIC_ATTRIB_KEY); @@ -505,7 +571,7 @@ public void testProvenance() { final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); runner.setProperty(PutS3Object.KEY, "${filename}"); @@ -589,7 +655,7 @@ public void testMultipartProperties() { runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST, "28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5"); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); runner.setProperty(PutS3Object.KEY, AbstractS3IT.SAMPLE_FILE_RESOURCE_NAME); @@ -808,7 +874,7 @@ public void testMultipartSmallerThanMinimum() throws IOException { final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); @@ -853,7 +919,7 @@ public void testMultipartBetweenMinimumAndMaximum() throws IOException { final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, TEST_PARTSIZE_STRING); runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); @@ -894,7 +960,7 @@ public void testMultipartLargerThanObjectMaximum() throws IOException { final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); @@ -924,7 +990,7 @@ public void testS3MultipartAgeoff() throws InterruptedException, IOException { final ProcessContext context = runner.getProcessContext(); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); // set check interval and age off to minimum values @@ -1201,7 +1267,7 @@ private static TestRunner createEncryptionTestRunner(Processor processor, String final ConfigurationContext context = mock(ConfigurationContext.class); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); if (strategyName != null) { @@ -1265,7 +1331,7 @@ private TestRunner initTestRunner() { TestRunner runner = TestRunners.newTestRunner(PutS3Object.class); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); return runner; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java index 64ea47d7ef1b4..ef4109e903336 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java @@ -19,6 +19,7 @@ import com.amazonaws.services.s3.model.GetObjectTaggingRequest; import com.amazonaws.services.s3.model.GetObjectTaggingResult; import com.amazonaws.services.s3.model.Tag; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -50,7 +51,7 @@ public void testSimpleTag() { // Set up processor final TestRunner runner = TestRunners.newTestRunner(new TagS3Object()); runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(TagS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagValue); @@ -84,7 +85,7 @@ public void testAppendTag() { // Set up processor final TestRunner runner = TestRunners.newTestRunner(new TagS3Object()); runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(TagS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagValue); @@ -119,7 +120,7 @@ public void testReplaceTags() { // Set up processor final TestRunner runner = TestRunners.newTestRunner(new TagS3Object()); runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(TagS3Object.S3_REGION, REGION); + runner.setProperty(RegionUtilV1.S3_REGION, REGION); runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagValue); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/S3FileResourceServiceTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/S3FileResourceServiceTest.java new file mode 100644 index 0000000000000..d14d3ff4174cc --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/S3FileResourceServiceTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class S3FileResourceServiceTest { + private static final String CONTROLLER_SERVICE = "AWSCredentialsService"; + private static final String BUCKET_NAME = "test-bucket"; + private static final String KEY = "key"; + private static final long CONTENT_LENGTH = 10L; + + @Mock + private AmazonS3 client; + + @Mock + private S3Object s3Object; + + @Mock + private ObjectMetadata metadata; + + @Mock + private S3ObjectInputStream inputStream; + + @InjectMocks + private TestS3FileResourceService service; + private TestRunner runner; + + @BeforeEach + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.addControllerService("S3FileResourceService", service); + } + + @Test + void testGetFileResourceHappyPath() throws InitializationException { + setupS3Client(); + setupService(); + + FileResource fileResource = service.getFileResource(Collections.emptyMap()); + assertFileResource(fileResource); + } + + @Test + void testNonExistingObject() throws InitializationException { + when(client.doesObjectExist(BUCKET_NAME, KEY)).thenReturn(false); + setupService(); + + assertThrows(ProcessException.class, () -> service.getFileResource(Collections.emptyMap()), "Failed to fetch s3 object"); + verify(client).doesObjectExist(BUCKET_NAME, KEY); + verifyNoMoreInteractions(client); + } + + @Test + void testValidBlobUsingELButMissingAttribute() throws InitializationException { + setupService("${s3.bucket}", "${key}"); + + assertThrows(ProcessException.class, + () -> service.getFileResource(Collections.emptyMap()), "Bucket name or key value is missing"); + verifyNoInteractions(client); + } + + @Test + void testValidBlobUsingEL() throws InitializationException { + String bucketProperty = "s3.bucket"; + String keyProperty = "key"; + setupService("${" + bucketProperty + "}", "${" + keyProperty + "}"); + setupS3Client(); + FileResource fileResource = service.getFileResource(new HashMap() + {{ + put(bucketProperty, BUCKET_NAME); + put(keyProperty, KEY); + }}); + assertFileResource(fileResource); + } + + private void assertFileResource(FileResource fileResource) { + assertNotNull(fileResource); + assertEquals(fileResource.getInputStream(), inputStream); + assertEquals(fileResource.getSize(), CONTENT_LENGTH); + verify(client).doesObjectExist(BUCKET_NAME, KEY); + verify(client).getObject(BUCKET_NAME, KEY); + verify(s3Object).getObjectMetadata(); + verify(metadata).getContentLength(); + verify(s3Object).getObjectContent(); + } + + private void setupService() throws InitializationException { + setupService(BUCKET_NAME, KEY); + } + + private void setupService(String bucket, String key) throws InitializationException { + final AWSCredentialsProviderService credentialsService = new AWSCredentialsProviderControllerService(); + + runner.addControllerService(CONTROLLER_SERVICE, credentialsService); + runner.enableControllerService(credentialsService); + + runner.setProperty(service, AWS_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(service, S3FileResourceService.KEY, key); + runner.setProperty(service, S3FileResourceService.BUCKET, bucket); + + runner.enableControllerService(service); + } + + private void setupS3Client() { + when(client.doesObjectExist(BUCKET_NAME, KEY)).thenReturn(true); + when(client.getObject(BUCKET_NAME, KEY)).thenReturn(s3Object); + when(s3Object.getObjectContent()).thenReturn(inputStream); + when(s3Object.getObjectMetadata()).thenReturn(metadata); + when(metadata.getContentLength()).thenReturn(CONTENT_LENGTH); + } + + private static class TestS3FileResourceService extends S3FileResourceService { + + private final AmazonS3 client; + + private TestS3FileResourceService(AmazonS3 client) { + this.client = client; + } + + @Override + protected AmazonS3 getS3Client(Map attributes, AWSCredentialsProvider credentialsProvider) { + return client; + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java index 9df4a65f3a302..b952133d19a97 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java @@ -22,6 +22,7 @@ import com.amazonaws.services.s3.model.DeleteVersionRequest; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -58,7 +59,7 @@ protected AmazonS3Client getS3Client(final ProcessContext context, final Map attrs = new HashMap<>(); attrs.put("filename", "delete-key"); @@ -77,7 +78,7 @@ public void testDeleteObjectSimple() { @Test public void testDeleteObjectSimpleRegionFromFlowFileAttribute() { - runner.setProperty(DeleteS3Object.S3_REGION, "attribute-defined-region"); + runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region"); runner.setProperty(DeleteS3Object.BUCKET, "test-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "delete-key"); @@ -91,7 +92,7 @@ public void testDeleteObjectSimpleRegionFromFlowFileAttribute() { @Test public void testDeleteObjectS3Exception() { - runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(DeleteS3Object.BUCKET, "test-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "delete-key"); @@ -107,7 +108,7 @@ public void testDeleteObjectS3Exception() { @Test public void testDeleteVersionSimple() { - runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(DeleteS3Object.BUCKET, "test-bucket"); runner.setProperty(DeleteS3Object.VERSION_ID, "test-version"); final Map attrs = new HashMap<>(); @@ -128,7 +129,7 @@ public void testDeleteVersionSimple() { @Test public void testDeleteVersionFromExpressions() { - runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(DeleteS3Object.BUCKET, "${s3.bucket}"); runner.setProperty(DeleteS3Object.VERSION_ID, "${s3.version}"); final Map attrs = new HashMap<>(); @@ -164,7 +165,7 @@ public void testGetPropertyDescriptors() { assertTrue(pd.contains(processor.OWNER)); assertTrue(pd.contains(processor.READ_ACL_LIST)); assertTrue(pd.contains(processor.READ_USER_LIST)); - assertTrue(pd.contains(processor.S3_REGION)); + assertTrue(pd.contains(RegionUtilV1.S3_REGION)); assertTrue(pd.contains(processor.SECRET_KEY)); assertTrue(pd.contains(processor.SIGNER_OVERRIDE)); assertTrue(pd.contains(processor.S3_CUSTOM_SIGNER_CLASS_NAME)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index aec54224ec806..0e7de1f647bcb 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -30,6 +30,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -77,7 +78,7 @@ protected AmazonS3Client createClient(final ProcessContext context, final AWSCre @Test public void testGetObject() throws IOException { - runner.setProperty(FetchS3Object.S3_REGION, "attribute-defined-region"); + runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -144,7 +145,7 @@ public void testGetObject() throws IOException { @Test public void testGetObjectWithRequesterPays() throws IOException { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true"); final Map attrs = new HashMap<>(); @@ -202,7 +203,7 @@ public void testGetObjectWithRequesterPays() throws IOException { @Test public void testGetObjectVersion() throws IOException { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); runner.setProperty(FetchS3Object.VERSION_ID, "${s3.version}"); final Map attrs = new HashMap<>(); @@ -242,7 +243,7 @@ public void testGetObjectVersion() throws IOException { @Test public void testGetObjectExceptionGoesToFailure() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -256,7 +257,7 @@ public void testGetObjectExceptionGoesToFailure() { @Test public void testFetchObject_FailAdditionalAttributesBucketName() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -285,7 +286,7 @@ public void testFetchObject_FailAdditionalAttributesBucketName() { @Test public void testFetchObject_FailAdditionalAttributesAuthentication() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -309,7 +310,7 @@ public void testFetchObject_FailAdditionalAttributesAuthentication() { @Test public void testFetchObject_FailAdditionalAttributesNetworkFailure() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -327,7 +328,7 @@ public void testFetchObject_FailAdditionalAttributesNetworkFailure() { @Test public void testGetObjectReturnsNull() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -341,7 +342,7 @@ public void testGetObjectReturnsNull() { @Test public void testFlowFileAccessExceptionGoesToFailure() { - runner.setProperty(FetchS3Object.S3_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1"); runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); final Map attrs = new HashMap<>(); attrs.put("filename", "request-key"); @@ -366,7 +367,7 @@ public void testGetPropertyDescriptors() { assertTrue(pd.contains(FetchS3Object.CREDENTIALS_FILE)); assertTrue(pd.contains(FetchS3Object.ENDPOINT_OVERRIDE)); assertTrue(pd.contains(FetchS3Object.KEY)); - assertTrue(pd.contains(FetchS3Object.S3_REGION)); + assertTrue(pd.contains(RegionUtilV1.S3_REGION)); assertTrue(pd.contains(FetchS3Object.SECRET_KEY)); assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE)); assertTrue(pd.contains(FetchS3Object.S3_CUSTOM_SIGNER_CLASS_NAME)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index 031d833530e4b..c89e8051df3f4 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -34,6 +34,7 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.state.MockStateManager; @@ -80,7 +81,7 @@ protected AmazonS3Client createClient(ProcessContext context, AWSCredentials cre @Test public void testList() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); Date lastModified = new Date(); @@ -131,7 +132,7 @@ public void testList() { @Test public void testListWithRecords() throws InitializationException { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); final MockRecordWriter recordWriter = new MockRecordWriter(null, false); @@ -181,7 +182,7 @@ public void testListWithRecords() throws InitializationException { @Test public void testListWithRequesterPays() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.REQUESTER_PAYS, "true"); @@ -227,7 +228,7 @@ public void testListWithRequesterPays() { @Test public void testListWithRequesterPays_invalid() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.USE_VERSIONS, "true"); // requester pays cannot be used with versions runner.setProperty(ListS3.REQUESTER_PAYS, "true"); @@ -237,7 +238,7 @@ public void testListWithRequesterPays_invalid() { @Test public void testListVersion2() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.LIST_TYPE, "2"); @@ -283,7 +284,7 @@ public void testListVersion2() { @Test public void testListVersion2WithRequesterPays() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.REQUESTER_PAYS, "true"); runner.setProperty(ListS3.LIST_TYPE, "2"); @@ -330,7 +331,7 @@ public void testListVersion2WithRequesterPays() { @Test public void testListVersions() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.USE_VERSIONS, "true"); @@ -374,7 +375,7 @@ public void testListVersions() { @Test public void testListObjectsNothingNew() throws IOException { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); Calendar calendar = Calendar.getInstance(); @@ -410,7 +411,7 @@ public void testListObjectsNothingNew() throws IOException { @Test public void testListIgnoreByMinAge() throws IOException { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.MIN_AGE, "30 sec"); @@ -461,7 +462,7 @@ public void testListIgnoreByMinAge() throws IOException { @Test public void testListIgnoreByMaxAge() throws IOException { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.MAX_AGE, "30 sec"); Date lastModifiedNow = new Date(); @@ -509,7 +510,7 @@ public void testListIgnoreByMaxAge() throws IOException { @Test public void testWriteObjectTags() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true"); @@ -536,7 +537,7 @@ public void testWriteObjectTags() { @Test public void testWriteUserMetadata() { - runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "eu-west-1"); runner.setProperty(ListS3.BUCKET, "test-bucket"); runner.setProperty(ListS3.WRITE_USER_METADATA, "true"); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index f262e3ebc03d4..407431bbe3d80 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -34,10 +34,14 @@ import com.amazonaws.services.s3.model.Tag; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.aws.signer.AwsSignerType; +import org.apache.nifi.processors.aws.util.RegionUtilV1; +import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -47,6 +51,7 @@ import org.mockito.Mockito; import java.io.File; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.Date; @@ -54,12 +59,17 @@ import java.util.List; import java.util.Map; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestPutS3Object { @@ -83,6 +93,33 @@ protected AmazonS3Client createClient(final ProcessContext context, final AWSCre runner.setVariable("java.io.tmpdir", System.getProperty("java.io.tmpdir")); } + @Test + public void testPutSinglePartFromLocalFileSource() throws Exception { + prepareTest(); + + String serviceId = "fileresource"; + FileResourceService service = mock(FileResourceService.class); + InputStream localFileInputStream = mock(InputStream.class); + when(service.getIdentifier()).thenReturn(serviceId); + long contentLength = 10L; + when(service.getFileResource(anyMap())).thenReturn(new FileResource(localFileInputStream, contentLength)); + + runner.addControllerService(serviceId, service); + runner.enableControllerService(service); + runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue()); + runner.setProperty(FILE_RESOURCE_SERVICE, serviceId); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(mockS3Client).putObject(captureRequest.capture()); + PutObjectRequest putObjectRequest = captureRequest.getValue(); + assertEquals(localFileInputStream, putObjectRequest.getInputStream()); + assertEquals(putObjectRequest.getMetadata().getContentLength(), contentLength); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + } + @Test public void testPutSinglePart() { runner.setProperty("x-custom-prop", "hello"); @@ -91,7 +128,7 @@ public void testPutSinglePart() { runner.run(1); ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); - Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); + verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); PutObjectRequest request = captureRequest.getValue(); assertEquals("test-bucket", request.getBucketName()); @@ -101,6 +138,7 @@ public void testPutSinglePart() { MockFlowFile ff0 = flowFiles.get(0); ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt"); + ff0.assertContentEquals("Test Content"); ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag"); ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version"); } @@ -109,7 +147,7 @@ public void testPutSinglePart() { public void testPutSinglePartException() { prepareTest(); - Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new AmazonS3Exception("TestFail")); + when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new AmazonS3Exception("TestFail")); runner.run(1); @@ -145,7 +183,7 @@ public void testObjectTags() { runner.run(1); ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); - Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); + verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); PutObjectRequest request = captureRequest.getValue(); List tagSet = request.getTagging().getTagSet(); @@ -164,7 +202,7 @@ public void testStorageClasses() { runner.run(1); ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); - Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); + verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); PutObjectRequest request = captureRequest.getValue(); assertEquals(storageClass.toString(), request.getStorageClass()); @@ -180,7 +218,7 @@ public void testFilenameWithNationalCharacters() throws UnsupportedEncodingExcep runner.run(1); ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); - Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); + verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); PutObjectRequest request = captureRequest.getValue(); ObjectMetadata objectMetadata = request.getMetadata(); @@ -203,7 +241,7 @@ private void prepareTest() { } private void prepareTest(String filename) { - runner.setProperty(PutS3Object.S3_REGION, "ap-northeast-1"); + runner.setProperty(RegionUtilV1.S3_REGION, "ap-northeast-1"); runner.setProperty(PutS3Object.BUCKET, "test-bucket"); runner.assertValid(); @@ -216,7 +254,7 @@ private void prepareTest(String filename) { } private void prepareTestWithRegionInAttributes(String filename, String region) { - runner.setProperty(PutS3Object.S3_REGION, "attribute-defined-region"); + runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region"); runner.setProperty(PutS3Object.BUCKET, "test-bucket"); runner.assertValid(); @@ -236,10 +274,10 @@ private void initMocks() { putObjectResult.setVersionId("test-version"); putObjectResult.setETag("test-etag"); - Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult); + when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult); MultipartUploadListing uploadListing = new MultipartUploadListing(); - Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); + when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); } @Test @@ -290,7 +328,7 @@ public void testGetPropertyDescriptors() { assertTrue(pd.contains(PutS3Object.OWNER)); assertTrue(pd.contains(PutS3Object.READ_ACL_LIST)); assertTrue(pd.contains(PutS3Object.READ_USER_LIST)); - assertTrue(pd.contains(PutS3Object.S3_REGION)); + assertTrue(pd.contains(RegionUtilV1.S3_REGION)); assertTrue(pd.contains(PutS3Object.SECRET_KEY)); assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE)); assertTrue(pd.contains(PutS3Object.S3_CUSTOM_SIGNER_CLASS_NAME)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java index 924a1a8000e24..cb52073d24f65 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java @@ -25,6 +25,7 @@ import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -66,7 +67,7 @@ protected AmazonS3Client createClient(final ProcessContext context, final AWSCre public void testTagObjectSimple() { final String tagKey = "k"; final String tagVal = "v"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -94,7 +95,7 @@ public void testTagObjectSimple() { @Test public void testTagObjectSimpleRegionFromFlowFileAttribute() { - runner.setProperty(TagS3Object.S3_REGION, "attribute-defined-region"); + runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, "k"); runner.setProperty(TagS3Object.TAG_VALUE, "v"); @@ -114,7 +115,7 @@ public void testTagObjectSimpleRegionFromFlowFileAttribute() { public void testTagObjectVersion() { final String tagKey = "k"; final String tagVal = "v"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.VERSION_ID, "test-version"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); @@ -144,7 +145,7 @@ public void testTagObjectAppendToExistingTags() { final String tagKey = "nk"; final String tagVal = "nv"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -179,7 +180,7 @@ public void testTagObjectAppendUpdatesExistingTagValue() { final String tagKey = "nk"; final String tagVal = "nv"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -208,7 +209,7 @@ public void testTagObjectReplacesExistingTags() { final String tagKey = "nk"; final String tagVal = "nv"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -243,7 +244,7 @@ public void testTagObjectS3Exception() { final String tagKey = "nk"; final String tagVal = "nv"; - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, tagKey); runner.setProperty(TagS3Object.TAG_VALUE, tagVal); @@ -269,7 +270,7 @@ public void testGetPropertyDescriptors() { assertTrue(pd.contains(TagS3Object.CREDENTIALS_FILE)); assertTrue(pd.contains(TagS3Object.ENDPOINT_OVERRIDE)); assertTrue(pd.contains(TagS3Object.KEY)); - assertTrue(pd.contains(TagS3Object.S3_REGION)); + assertTrue(pd.contains(RegionUtilV1.S3_REGION)); assertTrue(pd.contains(TagS3Object.SECRET_KEY)); assertTrue(pd.contains(TagS3Object.SIGNER_OVERRIDE)); assertTrue(pd.contains(TagS3Object.S3_CUSTOM_SIGNER_CLASS_NAME)); @@ -289,7 +290,7 @@ public void testGetPropertyDescriptors() { @Test public void testBucketEvaluatedAsBlank() { - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "${not.existant.attribute}"); runner.setProperty(TagS3Object.TAG_KEY, "key"); runner.setProperty(TagS3Object.TAG_VALUE, "val"); @@ -304,7 +305,7 @@ public void testBucketEvaluatedAsBlank() { @Test public void testTagKeyEvaluatedAsBlank() { - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, "${not.existant.attribute}"); runner.setProperty(TagS3Object.TAG_VALUE, "val"); @@ -319,7 +320,7 @@ public void testTagKeyEvaluatedAsBlank() { @Test public void testTagValEvaluatedAsBlank() { - runner.setProperty(TagS3Object.S3_REGION, "us-west-2"); + runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2"); runner.setProperty(TagS3Object.BUCKET, "test-bucket"); runner.setProperty(TagS3Object.TAG_KEY, "tagKey"); runner.setProperty(TagS3Object.TAG_VALUE, "${not.existant.attribute}"); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java index ceb2eac4bd057..ac26328dd786a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java @@ -21,6 +21,7 @@ import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -73,7 +74,7 @@ protected void setupCredFile() { } public void setupEndpointAndRegion() { - runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_REGION, "us-east-1"); + runner.setProperty(RegionUtilV1.REGION, "us-east-1"); runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd"); runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT, mockWebServer.url("/").toString()); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java index b3a4a970e8b1f..cbfad186bc63f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java @@ -29,6 +29,7 @@ import org.apache.http.message.BasicStatusLine; import org.apache.http.protocol.HttpContext; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processors.aws.util.RegionUtilV1; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner;