Skip to content

Commit

Permalink
ODP-993 - NIFI-12642|NIFI-12671 Implement AWS option for FileResource…
Browse files Browse the repository at this point in the history
…Service
  • Loading branch information
shubhluck committed Feb 15, 2024
1 parent e618500 commit 34f3215
Show file tree
Hide file tree
Showing 29 changed files with 723 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;

/**
* Base class for AWS processors that uses AWSCredentialsProvider interface for creating AWS clients.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import com.amazonaws.services.s3.model.Grantee;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
Expand All @@ -45,7 +43,6 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
Expand All @@ -60,11 +57,13 @@
import java.util.List;
import java.util.Map;

import static java.lang.String.format;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V4_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER;
import static org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.ATTRIBUTE_DEFINED_REGION;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveRegion;

public abstract class AbstractS3Processor extends AbstractAWSCredentialsProviderProcessor<AmazonS3Client> {

Expand Down Expand Up @@ -164,16 +163,6 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
.dependsOn(SIGNER_OVERRIDE, CUSTOM_SIGNER)
.build();

public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region",
"Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
"Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region.");

public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractAWSProcessor.REGION)
.allowableValues(getAvailableS3Regions())
.build();

public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
.name("encryption-service")
.displayName("Encryption Service")
Expand Down Expand Up @@ -454,38 +443,11 @@ protected final CannedAccessControlList createCannedACL(final ProcessContext con
return cannedAcl;
}

private Region parseRegionValue(String regionValue) {
if (regionValue == null) {
throw new ProcessException(format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE));
}

try {
return Region.getRegion(Regions.fromName(regionValue));
} catch (Exception e) {
throw new ProcessException(format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
}
}

private Region resolveRegion(final ProcessContext context, final Map<String, String> attributes) {
String regionValue = context.getProperty(S3_REGION).getValue();

if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
regionValue = attributes.get(S3_REGION_ATTRIBUTE);
}

return parseRegionValue(regionValue);
}

private boolean isAttributeDefinedRegion(final ProcessContext context) {
String regionValue = context.getProperty(S3_REGION).getValue();
return ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue);
}

private static AllowableValue[] getAvailableS3Regions() {
final AllowableValue[] availableRegions = getAvailableRegions();
return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION);
}

private AwsClientDetails getAwsClientDetails(final ProcessContext context, final Map<String, String> attributes) {
final Region region = resolveRegion(context, attributes);
return new AwsClientDetails(region);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.util;

import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.exception.ProcessException;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Utility class for AWS region methods. This class uses AWS SDK v1.
*
*/
public final class RegionUtilV1 {

private RegionUtilV1() {
}

public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
public static final AllowableValue ATTRIBUTE_DEFINED_REGION = new AllowableValue("attribute-defined-region",
"Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
"Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as region.");

public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.description("The AWS Region to connect to.")
.required(true)
.allowableValues(getAvailableRegions())
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();

public static final PropertyDescriptor S3_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(REGION)
.allowableValues(getAvailableS3Regions())
.build();

public static Region parseRegionValue(String regionValue) {
if (regionValue == null) {
throw new ProcessException(String.format("[%s] was selected as region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION, S3_REGION_ATTRIBUTE));
}

try {
return Region.getRegion(Regions.fromName(regionValue));
} catch (Exception e) {
throw new ProcessException(String.format("The [%s] attribute contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
}
}

public static Region resolveRegion(final PropertyContext context, final Map<String, String> attributes) {
String regionValue = context.getProperty(S3_REGION).getValue();

if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
regionValue = attributes.get(S3_REGION_ATTRIBUTE);
}

return parseRegionValue(regionValue);
}

public static AllowableValue[] getAvailableS3Regions() {
final AllowableValue[] availableRegions = getAvailableRegions();
return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION);
}

public static AllowableValue createAllowableValue(final Regions region) {
return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName());
}

public static AllowableValue[] getAvailableRegions() {
final List<AllowableValue> values = new ArrayList<>();
for (final Regions region : Regions.values()) {
values.add(createAllowableValue(region));
}
return values.toArray(new AllowableValue[0]);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public abstract class AbstractAwsProcessor<T extends SdkClient, U extends AwsSyn
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.required(true)
.allowableValues(RegionUtil.getAvailableRegions())
.defaultValue(RegionUtil.createAllowableValue(Region.US_WEST_2).getValue())
.allowableValues(RegionUtilV2.getAvailableRegions())
.defaultValue(RegionUtilV2.createAllowableValue(Region.US_WEST_2).getValue())
.build();

public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import java.util.List;

/**
* Utility class for AWS region methods.
* Utility class for AWS region methods. This class uses AWS SDK v2.
*
*/
public abstract class RegionUtil {
public abstract class RegionUtilV2 {

/**
* Creates an AllowableValue from a Region.
Expand Down
15 changes: 15 additions & 0 deletions nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@
<artifactId>nifi-listed-entity</artifactId>
<version>1.23.2.3.2.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-resource-transfer</artifactId>
<version>1.23.2.3.2.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-resource-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-abstract-processors</artifactId>
Expand Down Expand Up @@ -137,6 +146,12 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-textract</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-resource-service</artifactId>
<version>1.23.2.3.2.2.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.processors.aws.s3.S3FileResourceService.S3_REGION;


@SupportsBatching
@WritesAttributes({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;

@SupportsBatching
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;

@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
Expand All @@ -59,14 +60,15 @@
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.transfer.ResourceTransferSource;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3Client;
Expand All @@ -89,6 +91,11 @@
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;

@SupportsBatching
@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand Down Expand Up @@ -286,6 +293,8 @@ public class PutS3Object extends AbstractS3Processor {
SECRET_KEY,
CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE,
RESOURCE_TRANSFER_SOURCE,
FILE_RESOURCE_SERVICE,
OBJECT_TAGS_PREFIX,
REMOVE_TAG_PREFIX,
STORAGE_CLASS,
Expand Down Expand Up @@ -529,6 +538,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final FlowFile ff = flowFile;
final Map<String, String> attributes = new HashMap<>();
final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue());

attributes.put(S3_BUCKET_KEY, bucket);
attributes.put(S3_OBJECT_KEY, key);

Expand All @@ -547,12 +558,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
*/
try {
final FlowFile flowFileCopy = flowFile;
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(ff.getSize());
Optional<FileResource> optFileResource = getFileResource(resourceTransferSource, context, flowFile.getAttributes());
try (InputStream in = optFileResource
.map(FileResource::getInputStream)
.orElseGet(() -> session.read(flowFileCopy))) {
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(optFileResource.map(FileResource::getSize).orElseGet(ff::getSize));

final String contentType = context.getProperty(CONTENT_TYPE)
.evaluateAttributeExpressions(ff).getValue();
Expand Down Expand Up @@ -869,9 +880,10 @@ public void process(final InputStream rawIn) throws IOException {
throw (e);
}
}
}
}
});
} catch (IOException e) {
getLogger().error("Error during upload of flow files: " + e.getMessage());
throw e;
}

if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
Expand All @@ -882,25 +894,25 @@ public void process(final InputStream rawIn) throws IOException {
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);

getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
try {
removeLocalState(cacheKey);
} catch (IOException e) {
getLogger().info("Error trying to delete key {} from cache: {}",
new Object[]{cacheKey, e.getMessage()});
}
} catch (final ProcessException | AmazonClientException pe) {
extractExceptionDetails(pe, session, flowFile);
if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
getLogger().info(pe.getMessage());

} catch (final ProcessException | AmazonClientException | IOException e) {
extractExceptionDetails(e, session, flowFile);
if (e.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
getLogger().info(e.getMessage());
session.rollback();
} else {
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
getLogger().error("Failed to put {} to Amazon S3 due to {}", flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}

}

private final Lock s3BucketLock = new ReentrantLock();
Expand Down
Loading

0 comments on commit 34f3215

Please sign in to comment.