From e4ed4ea4ea21b005393ccbc447c655bea18c02e3 Mon Sep 17 00:00:00 2001 From: Ricardo Ruiz Saiz Date: Fri, 5 Oct 2018 12:39:28 +0200 Subject: [PATCH] Change: Changed the way the existence of a package is checked. Added: Checked the existence of some of the metadata to be backuped. Added more logging to the API Handler Fix: Fixed a bug in the creation of the package that caused the processor to fail even when everithing was created correctly. Signed-off-by: Ricardo Ruiz Saiz --- .../atos/qrowd/handlers/CKAN_API_Handler.java | 137 ++++++++++-------- .../CKAN_Flowfile_Uploader.java | 9 +- 2 files changed, 85 insertions(+), 61 deletions(-) diff --git a/CKAN_API_Handler/src/main/java/net/atos/qrowd/handlers/CKAN_API_Handler.java b/CKAN_API_Handler/src/main/java/net/atos/qrowd/handlers/CKAN_API_Handler.java index 94b48d0..79a759f 100644 --- a/CKAN_API_Handler/src/main/java/net/atos/qrowd/handlers/CKAN_API_Handler.java +++ b/CKAN_API_Handler/src/main/java/net/atos/qrowd/handlers/CKAN_API_Handler.java @@ -29,7 +29,6 @@ import org.apache.http.entity.mime.content.StringBody; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; import java.io.BufferedReader; @@ -70,41 +69,37 @@ public CKAN_API_Handler(String HOST, String api_key) this.httpclient = HttpClientBuilder.create().build(); } - // ToDo: Check if the package exists marked as delete, then reactivate it? - public boolean packageExists() throws IOException{ + public boolean packageExists(String package_id) throws IOException{ String line; StringBuilder sb = new StringBuilder(); HttpPost postRequest; + Gson gson = new Gson(); - HttpEntity reqEntity = MultipartEntityBuilder.create() - .addPart("id",new StringBody(package_id,ContentType.TEXT_PLAIN)) - .build(); - - postRequest = new HttpPost(HOST+"/api/action/package_show"); - postRequest.setEntity(reqEntity); + postRequest = new HttpPost(HOST+"/api/3/action/package_search?q=name:"+package_id); postRequest.setHeader("X-CKAN-API-Key", api_key); HttpResponse response = httpclient.execute(postRequest); int statusCode = response.getStatusLine().getStatusCode(); - BufferedReader br = new BufferedReader( new InputStreamReader((response.getEntity().getContent()))); while ((line = br.readLine()) != null) { sb.append(line); - sb.append("\n"); } - - if(statusCode==200) - { - log.info("Package with id "+package_id+" exists"); - //Check if that package is deleted - - log.info(sb); - return true; + // Parse the response into a POJO to be able to get results from it. + // ToDo: If no result is returned, raise an error (when converting to POJO fails or return code !=200?) + if(statusCode==200) { + CkanFullList CkanFullList = gson.fromJson(sb.toString(), CkanFullList.class); + //by default we get the first package_ of the list of packages + if (CkanFullList.getPackage().getPackages().size() == 1) { + log.info("Package: "+package_id+" was found in CKAN."); + return true; + } else { + log.warn("Package: "+package_id+" not found"); + //ToDo: Null, really? + return false; + } }else{ - log.warn("Package with id "+package_id+" not found"); - log.warn(sb); return false; } } @@ -183,7 +178,7 @@ public Package_ getPackageByName(String name) throws IOException { } - public void createPackage() throws IOException{ + public void createPackage(String package_id) throws IOException{ HttpPost postRequest; StringBuilder sb = new StringBuilder(); @@ -203,9 +198,6 @@ public void createPackage() throws IOException{ HttpResponse response = httpclient.execute(postRequest); int statusCode = response.getStatusLine().getStatusCode(); - String message = EntityUtils.toString(response.getEntity()); - log.warn("***** Response: "+message); - BufferedReader br = new BufferedReader( new InputStreamReader((response.getEntity().getContent()))); sb.append(statusCode); @@ -233,27 +225,33 @@ public void createPackagePojo(Package_ dataset, String name) throws IOException{ MultipartEntityBuilder multipart = MultipartEntityBuilder.create() .addPart("name",new StringBody(name,ContentType.TEXT_PLAIN)); // ToDo: Improve this way of handling null values in the returned dataset - if(dataset.getAuthor()!=null) { - multipart.addPart("author", new StringBody(dataset.getAuthor(), ContentType.TEXT_PLAIN)); - } - if(dataset.getAuthorEmail()!=null) { - multipart.addPart("author_email", new StringBody(dataset.getAuthorEmail(), ContentType.TEXT_PLAIN)); - } - if(dataset.getOwnerOrg()!=null) { - multipart.addPart("owner_org", new StringBody(dataset.getOwnerOrg(), ContentType.TEXT_PLAIN)); - } - if(dataset.getNotes()!=null) { - multipart.addPart("notes", new StringBody(dataset.getNotes(), ContentType.TEXT_PLAIN)); - } - if(dataset.getPrivate()!=null) { - multipart.addPart("private", new StringBody(dataset.getPrivate().toString(), ContentType.TEXT_PLAIN)); - } - if(dataset.getLicenseTitle()!=null) { - multipart.addPart("license_title", new StringBody(dataset.getLicenseTitle(), ContentType.TEXT_PLAIN)); - } - if(dataset.getLicenseId()!=null) { - multipart.addPart("license_id", new StringBody(dataset.getLicenseId(), ContentType.TEXT_PLAIN)); - } + if(dataset.getAuthor()!=null) { + multipart.addPart("author", new StringBody(dataset.getAuthor(), ContentType.TEXT_PLAIN)); + } + if(dataset.getAuthorEmail()!=null) { + multipart.addPart("author_email", new StringBody(dataset.getAuthorEmail(), ContentType.TEXT_PLAIN)); + } + if(dataset.getOwnerOrg()!=null) { + multipart.addPart("owner_org", new StringBody(dataset.getOwnerOrg(), ContentType.TEXT_PLAIN)); + } + if(dataset.getNotes()!=null) { + multipart.addPart("notes", new StringBody(dataset.getNotes(), ContentType.TEXT_PLAIN)); + } + if(dataset.getPrivate()!=null) { + multipart.addPart("private", new StringBody(dataset.getPrivate().toString(), ContentType.TEXT_PLAIN)); + } + if(dataset.getLicenseTitle()!=null) { + multipart.addPart("license_title", new StringBody(dataset.getLicenseTitle(), ContentType.TEXT_PLAIN)); + } + if(dataset.getLicenseId()!=null) { + multipart.addPart("license_id", new StringBody(dataset.getLicenseId(), ContentType.TEXT_PLAIN)); + } + if(dataset.getLicenseTitle() != null){ + multipart.addPart("license_title", new StringBody(dataset.getLicenseTitle(), ContentType.TEXT_PLAIN)); + } + if(dataset.getMaintainerEmail() != null){ + multipart.addPart("maintainer_email", new StringBody(dataset.getMaintainerEmail(), ContentType.TEXT_PLAIN)); + } HttpEntity reqEntity = multipart.build(); @@ -365,18 +363,29 @@ public void uploadFilePojo(Resource resource, String dataset_name, String resour HttpPost postRequest; ContentBody cbFile = new FileBody(file, ContentType.TEXT_HTML); - //ToDo: Handle error when any of the attributes of the resource is null as in the package - HttpEntity reqEntity = MultipartEntityBuilder.create() + + MultipartEntityBuilder multipart = MultipartEntityBuilder.create() .addPart("file", cbFile) - //Cannot use getKey() because sometimes it is empty and causes error (resource with no filename in it) .addPart("key", new StringBody(resourceFileName.split("\\.")[0],ContentType.TEXT_PLAIN)) .addPart("name", new StringBody(resourceFileName,ContentType.TEXT_PLAIN)) - .addPart("url",new StringBody(resource.getUrl(),ContentType.TEXT_PLAIN)) .addPart("package_id",new StringBody(dataset_name,ContentType.TEXT_PLAIN)) - .addPart("format",new StringBody(resource.getFormat(),ContentType.TEXT_PLAIN)) - .addPart("upload",cbFile) - .addPart("description",new StringBody(resource.getDescription(),ContentType.TEXT_PLAIN)) - .build(); + .addPart("upload",cbFile); + //ToDo: Add the rest of the fields¿? + if(resource.getUrl() != null){ + multipart.addPart("url",new StringBody(resource.getUrl(),ContentType.TEXT_PLAIN)); + } + if(resource.getFormat() != null) + { + multipart.addPart("format",new StringBody(resource.getFormat(),ContentType.TEXT_PLAIN)); + } + if(resource.getDescription() != null){ + multipart.addPart("description",new StringBody(resource.getDescription(),ContentType.TEXT_PLAIN)); + } + if(resource.getMimetype() != null) + { + multipart.addPart("mimetype",new StringBody(resource.getMimetype().toString(),ContentType.TEXT_PLAIN)); + } + HttpEntity reqEntity = multipart.build(); postRequest = new HttpPost(HOST+"/api/action/resource_create"); postRequest.setEntity(reqEntity); @@ -414,9 +423,8 @@ public Boolean createOrUpdateResource(String path) throws IOException { ResourceResponse resResponse = gson.fromJson(sb.toString(),ResourceResponse.class); System.out.println(resResponse); - String resource_packageId = resResponse.getResult().getResults().get(0).getPackageId(); - String id = resResponse.getResult().getResults().get(0).getId(); - + String resource_packageId; + String id; //This is needed to check that the resource belongs to the current package Package_ foundPackage = getPackageByName(package_id); String foundPackageId = "Not_found"; @@ -435,6 +443,8 @@ public Boolean createOrUpdateResource(String path) throws IOException { //if the count is 1, get all the needed data to update the resource }else if(resResponse.getResult().getCount()==1) { + resource_packageId = resResponse.getResult().getResults().get(0).getPackageId(); + id = resResponse.getResult().getResults().get(0).getId(); //If the resource's package_id is the same as the current package id (search for package by name and get the id) if( foundPackage != null && resource_packageId.equals(foundPackageId)) { log.info("Resource found in the current package, updating it"); @@ -520,6 +530,8 @@ private void uploadFile(String path) throws IOException { File file = new File(path); SimpleDateFormat dateFormatGmt = new SimpleDateFormat("yyyyMMdd_HHmmss"); String date=dateFormatGmt.format(new Date()); + StringBuilder sb = new StringBuilder(); + String line; HttpPost postRequest; ContentBody cbFile = new FileBody(file, ContentType.TEXT_HTML); @@ -540,8 +552,19 @@ private void uploadFile(String path) throws IOException { HttpResponse response = httpclient.execute(postRequest); int statusCode = response.getStatusLine().getStatusCode(); + BufferedReader br = new BufferedReader( + new InputStreamReader((response.getEntity().getContent()))); + sb.append(statusCode); + sb.append("\n"); + while ((line = br.readLine()) != null) { + sb.append(line); + sb.append("\n"); + } + if(statusCode!=200){ + log.error("Error creating a resource: "+ file.getName().split("\\.")[0] +"in package:"+package_id); log.error("statusCode =!=" +statusCode); + log.error(sb); } else log.info("Request returns statusCode 200: OK"); } diff --git a/nifi-nifiCKANFlowfileUploader-processors/src/main/java/net/atos/qrowd/processors/nifiCKANprocessor/CKAN_Flowfile_Uploader.java b/nifi-nifiCKANFlowfileUploader-processors/src/main/java/net/atos/qrowd/processors/nifiCKANprocessor/CKAN_Flowfile_Uploader.java index 9437a70..3d39f5e 100644 --- a/nifi-nifiCKANFlowfileUploader-processors/src/main/java/net/atos/qrowd/processors/nifiCKANprocessor/CKAN_Flowfile_Uploader.java +++ b/nifi-nifiCKANFlowfileUploader-processors/src/main/java/net/atos/qrowd/processors/nifiCKANprocessor/CKAN_Flowfile_Uploader.java @@ -202,10 +202,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (!ckan_api_handler.organizationExists()) { ckan_api_handler.createOrganization(); } - if (!ckan_api_handler.packageExists()) { - ckan_api_handler.createPackage(); + if (!ckan_api_handler.packageExists(filenameNoExtension)) { + ckan_api_handler.createPackage(filenameNoExtension); } - if(ckan_api_handler.createOrUpdateResource(file.toAbsolutePath().toString())) { + if(ckan_api_handler.createOrUpdateResource(file.toFile().toString())) { + getLogger().info("File tried to be uploaded to CKAN: {}", new Object[]{file.toFile().toString()}); session.transfer(flowFile, REL_SUCCESS); ckan_api_handler.close(); }else @@ -216,11 +217,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session { getLogger().log(LogLevel.ERROR, "Error while uploading file {} to CKAN {}: Organization {}.", new Object[]{file, url, organizationId }); + getLogger().error(ioe.toString()); session.transfer(session.penalize(flowFile), REL_FAILURE); } - // It is critical that we commit the session before we perform the Delete. Otherwise, we could have a case where we // ingest the file, delete it, and then NiFi is restarted before the session is committed. That would result in data loss. // As long as we commit the session right here, we are safe.