From 1426f9fa52fd5fc97cb01b29061500ed988a1f96 Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 08:18:09 -0500 Subject: [PATCH 01/11] Add files via upload - serpapisearch integration with https://serpapi.com/ --- src/writer/workflows_blocks/serpapisearch.py | 139 +++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 src/writer/workflows_blocks/serpapisearch.py diff --git a/src/writer/workflows_blocks/serpapisearch.py b/src/writer/workflows_blocks/serpapisearch.py new file mode 100644 index 000000000..1dde4d13b --- /dev/null +++ b/src/writer/workflows_blocks/serpapisearch.py @@ -0,0 +1,139 @@ +import requests +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class SerpApiSearch(WorkflowBlock): + @classmethod + def register(cls, type: str): + super(SerpApiSearch, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "SerpApi Search", + "description": "Executes a search query using SerpApi.", + "category": "Other", + "fields": { + "api_key": { + "name": "API Key", + "type": "Text", + "description": "Your SerpApi API key" + }, + "search_engine": { + "name": "Search Engine", + "type": "Text", + "options": { + "google": "Google", + "bing": "Bing", + "yahoo": "Yahoo", + "yandex": "Yandex" + }, + "default": "google" + }, + "query": { + "name": "Search Query", + "type": "Text", + "description": "The search term or phrase" + }, + "location": { + "name": "Location", + "type": "Text", + "description": "Geographic location for search results", + }, + "num_results": { + "name": "Number of Results", + "type": "Number", + "default": "10", + }, + "additional_params": { + "name": "Additional Parameters", + "type": "Key-Value", + "default": "{}", + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The search was completed successfully.", + "style": "success", + }, + "apiError": { + "name": "API Error", + "description": "An error occurred while making the API request.", + "style": "error", + }, + "rateLimitError": { + "name": "Rate Limit Error", + "description": "The API rate limit has been exceeded.", + "style": "error", + } + }, + } + )) + + def run(self): + try: + # Get required fields + api_key = self._get_field("api_key") + search_engine = self._get_field("search_engine", False, "google") + query = self._get_field("query") + + # Get optional fields + location = self._get_field("location", True) + num_results = self._get_field("num_results", True, "10") + additional_params = self._get_field("additional_params", True, "{}") + + # Construct base URL based on search engine + base_urls = { + "google": "https://serpapi.com/search.json", + "bing": "https://serpapi.com/bing/search", + "yahoo": "https://serpapi.com/yahoo/search", + "yandex": "https://serpapi.com/yandex/search" + } + + # Prepare parameters + params = { + "api_key": api_key, + "q": query, + "num": num_results + } + + # Add optional parameters + if location: + params["location"] = location + + # Merge additional parameters + params.update(additional_params) + + # Make the API request + response = requests.get(base_urls[search_engine], params=params) + + if response.status_code == 429: + self.result = "SerpApi rate limit exceeded" + self.outcome = "rateLimitError" + return + + if not response.ok: + self.result = f"SerpApi error: {response.status_code} - {response.text}" + self.outcome = "apiError" + return + + # Parse and store results + self.result = { + "search_metadata": { + "status": response.status_code, + "created_at": response.headers.get("Date"), + "search_engine": search_engine + }, + "search_parameters": params, + "search_results": response.json() + } + + self.outcome = "success" + except requests.exceptions.RequestException as e: + self.outcome = "apiError" + raise RuntimeError(f"Connection error: {str(e)}") + except Exception as e: + self.outcome = "apiError" + raise e + From 6af4eaa22c685f1661a82c2969d13669ce24956d Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 08:26:20 -0500 Subject: [PATCH 02/11] Add files via upload This Brave Search integration block follows a similar structure to the previous SerpApi and Notion blocks. Here's a breakdown of the key elements: Registration: The BraveSearch class is registered as a workflow block, and the block's metadata (name, description, category, fields, and outcomes) are defined in the register method. Fields: The block accepts the following fields: api_key: The Brave Search API key query: The search query location: The geographic location for search results (optional) device: The device type (desktop or mobile) for search results (optional) language: The language for search results (optional) num_results: The number of search results to retrieve (optional) additional_params: Any additional parameters to pass to the Brave Search API (optional) Outcomes: The block can have the following outcomes: success: The search was completed successfully apiError: An error occurred while making the API request rateLimitError: The API rate limit has been exceeded Request Handling: The run method is responsible for executing the search query and handling the API response. It: Retrieves the required and optional fields from the block configuration Constructs the API endpoint URL and the request parameters Sets the API key in the request headers Makes the API request using the requests library Handles the response, setting the appropriate outcome and storing the result data in the self.result attribute To use this block, you would need to: Register it in your workflow system: pythonCopyBraveSearch.register("brave_search") Configure it with your Brave Search API key and search parameters: pythonCopyblock = BraveSearch() block.configure({ "api_key": "your_brave_search_api_key", "query": "your search query", "location": "San Francisco", "language": "en", "num_results": "20" }) Let me know if you have any other questions or if you'd like me to make any additional improvements to the Brave Search integration block. --- .../workflows_blocks/bravesearchworkflow.py | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 src/writer/workflows_blocks/bravesearchworkflow.py diff --git a/src/writer/workflows_blocks/bravesearchworkflow.py b/src/writer/workflows_blocks/bravesearchworkflow.py new file mode 100644 index 000000000..240b7a0b2 --- /dev/null +++ b/src/writer/workflows_blocks/bravesearchworkflow.py @@ -0,0 +1,138 @@ +import requests +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class BraveSearch(WorkflowBlock): + @classmethod + def register(cls, type: str): + super(BraveSearch, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "Brave Search", + "description": "Executes a search query using the Brave Search API.", + "category": "Search", + "fields": { + "api_key": { + "name": "API Key", + "type": "Text", + "description": "Your Brave Search API key" + }, + "query": { + "name": "Search Query", + "type": "Text", + "description": "The search term or phrase" + }, + "location": { + "name": "Location", + "type": "Text", + "description": "Geographic location for search results", + "required": False + }, + "device": { + "name": "Device", + "type": "Text", + "options": { + "desktop": "Desktop", + "mobile": "Mobile" + }, + "default": "desktop", + "required": False + }, + "language": { + "name": "Language", + "type": "Text", + "description": "Language for search results", + "default": "en", + "required": False + }, + "num_results": { + "name": "Number of Results", + "type": "Text", + "default": "10", + "required": False + }, + "additional_params": { + "name": "Additional Parameters", + "type": "Key-Value", + "default": "{}", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The search was completed successfully.", + "style": "success", + }, + "apiError": { + "name": "API Error", + "description": "An error occurred while making the API request.", + "style": "error", + }, + "rateLimitError": { + "name": "Rate Limit Error", + "description": "The API rate limit has been exceeded.", + "style": "error", + } + }, + } + )) + + def run(self): + try: + # Get required fields + api_key = self._get_field("api_key") + query = self._get_field("query") + + # Get optional fields + location = self._get_field("location", True) + device = self._get_field("device", True, "desktop") + language = self._get_field("language", True, "en") + num_results = self._get_field("num_results", True, "10") + additional_params = self._get_field("additional_params", True, "{}") + + # Construct the API endpoint URL + url = "https://api.brave.com/v1/search" + + # Prepare parameters + params = { + "q": query, + "location": location, + "device": device, + "lang": language, + "num": num_results + } + + # Merge additional parameters + params.update(eval(additional_params)) + + # Set the API key in the headers + headers = { + "X-Api-Key": api_key + } + + # Make the API request + response = requests.get(url, params=params, headers=headers) + + if response.status_code == 429: + self.outcome = "rateLimitError" + elif not response.ok: + self.outcome = "apiError" + else: + self.outcome = "success" + self.result = { + "search_metadata": { + "status": response.status_code, + "created_at": response.headers.get("Date"), + "search_engine": "brave" + }, + "search_parameters": params, + "search_results": response.json() + } + + except requests.exceptions.RequestException as e: + self.outcome = "apiError" + except Exception as e: + self.outcome = "apiError" From 6ce14c08ec3b7cf4d023216d6db6a2104cbe2ff2 Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 08:27:42 -0500 Subject: [PATCH 03/11] Add files via upload This Tavily Search integration block follows a similar structure to the previous search engine integration blocks. Here are the key differences: Fields: The block accepts the following fields: api_key: The Tavily Search API key query: The search query location: The geographic location for search results (optional) num_results: The number of search results to retrieve (optional) additional_params: Any additional parameters to pass to the Tavily Search API (optional) API Endpoint: The API endpoint URL is set to "https://api.tavily.com/v1/search". Request Parameters: The request parameters include "q" for the search query, "location" for the geographic location, and "num" for the number of results. Any additional parameters are merged into the params dictionary. Request Headers: The API key is set in the "X-Api-Key" header. Response Handling: The response handling logic is similar to the previous blocks, setting the appropriate outcome based on the response status code. To use this block, you would need to: Register it in your workflow system: pythonCopyTavilySearch.register("tavily_search") Configure it with your Tavily Search API key and search parameters: pythonCopyblock = TavilySearch() block.configure({ "api_key": "your_tavily_search_api_key", "query": "your search query", "location": "New York", "num_results": "15" }) --- .../workflows_blocks/tavilysearchworkflow.py | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 src/writer/workflows_blocks/tavilysearchworkflow.py diff --git a/src/writer/workflows_blocks/tavilysearchworkflow.py b/src/writer/workflows_blocks/tavilysearchworkflow.py new file mode 100644 index 000000000..472348d71 --- /dev/null +++ b/src/writer/workflows_blocks/tavilysearchworkflow.py @@ -0,0 +1,117 @@ +import requests +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class TavilySearch(WorkflowBlock): + @classmethod + def register(cls, type: str): + super(TavilySearch, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "Tavily Search", + "description": "Executes a search query using the Tavily Search API.", + "category": "Search", + "fields": { + "api_key": { + "name": "API Key", + "type": "Text", + "description": "Your Tavily Search API key" + }, + "query": { + "name": "Search Query", + "type": "Text", + "description": "The search term or phrase" + }, + "location": { + "name": "Location", + "type": "Text", + "description": "Geographic location for search results", + "required": False + }, + "num_results": { + "name": "Number of Results", + "type": "Text", + "default": "10", + "required": False + }, + "additional_params": { + "name": "Additional Parameters", + "type": "Key-Value", + "default": "{}", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The search was completed successfully.", + "style": "success", + }, + "apiError": { + "name": "API Error", + "description": "An error occurred while making the API request.", + "style": "error", + }, + "rateLimitError": { + "name": "Rate Limit Error", + "description": "The API rate limit has been exceeded.", + "style": "error", + } + }, + } + )) + + def run(self): + try: + # Get required fields + api_key = self._get_field("api_key") + query = self._get_field("query") + + # Get optional fields + location = self._get_field("location", True) + num_results = self._get_field("num_results", True, "10") + additional_params = self._get_field("additional_params", True, "{}") + + # Construct the API endpoint URL + url = "https://api.tavily.com/v1/search" + + # Prepare parameters + params = { + "q": query, + "location": location, + "num": num_results + } + + # Merge additional parameters + params.update(eval(additional_params)) + + # Set the API key in the headers + headers = { + "X-Api-Key": api_key + } + + # Make the API request + response = requests.get(url, params=params, headers=headers) + + if response.status_code == 429: + self.outcome = "rateLimitError" + elif not response.ok: + self.outcome = "apiError" + else: + self.outcome = "success" + self.result = { + "search_metadata": { + "status": response.status_code, + "created_at": response.headers.get("Date"), + "search_engine": "tavily" + }, + "search_parameters": params, + "search_results": response.json() + } + + except requests.exceptions.RequestException as e: + self.outcome = "apiError" + except Exception as e: + self.outcome = "apiError" From e80100963d87366d4aaf788e8a995b316b94a5bf Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 08:46:29 -0500 Subject: [PATCH 04/11] Add files via upload This Slack integration block provides a comprehensive set of operations for working with Slack channels, messages, and users. Here's a breakdown of the key elements: Registration: The SlackIntegration class is registered as a workflow block, and the block's metadata (name, description, category, fields, and outcomes) are defined in the register method. Fields: The block accepts the following fields: api_token: The Slack API token operation: The specific Slack API operation to perform (e.g., create channel, post message, list users) channel_id: The ID of the Slack channel to operate on (required for channel and message operations) message_ts: The timestamp of the message to update or delete (required for message update/delete operations) message_text: The text content of the message to post or update (required for message post/update operations) user_id: The ID of the Slack user to retrieve information for (required for user operations) additional_params: Any additional parameters to pass to the Slack API (optional) Outcomes: The block can have the following outcomes: success: The operation was completed successfully apiError: An error occurred while making the API request authError: Invalid API token or insufficient permissions API Methods: The block includes the following methods for interacting with the Slack API: _create_channel: Creates a new Slack channel _list_channels: Lists all Slack channels _join_channel: Joins a Slack channel _post_message: Posts a message to a Slack channel _update_message: Updates a message in a Slack channel _delete_message: Deletes a message from a Slack channel _list_users: Lists all Slack users _get_user_info: Retrieves information about a Slack user _get_user_presence: Retrieves the presence status of a Slack user Request Handling: The run method is responsible for executing the requested Slack API operation and handling the response. It: Retrieves the required and optional fields from the block configuration Constructs the API headers with the provided API token Calls the appropriate API method based on the selected operation Handles the response, setting the appropriate outcome and storing the result data in the self.result attribute To use this block, you would need to: Register it in your workflow system: pythonCopySlackIntegration.register("slack_integration") Configure it with your Slack API token and the desired operation: pythonCopyblock = SlackIntegration() block.configure({ "api_token": "your_slack_api_token", "operation": "post_message", "channel_id": "C012345678", "message_text": "Hello, Slack!" }) This is a comprehensive Slack integration block that covers a wide range of Slack API operations. Let me know if you have any other questions or if you'd like me to make any additional improvements to the implementation. --- .../workflows_blocks/slackintegration.py | 250 ++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 src/writer/workflows_blocks/slackintegration.py diff --git a/src/writer/workflows_blocks/slackintegration.py b/src/writer/workflows_blocks/slackintegration.py new file mode 100644 index 000000000..86da70cf1 --- /dev/null +++ b/src/writer/workflows_blocks/slackintegration.py @@ -0,0 +1,250 @@ +import requests +import json +from datetime import datetime +from typing import Dict, Any, Optional +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class SlackIntegration(WorkflowBlock): + BASE_URL = "https://slack.com/api" + + @classmethod + def register(cls, type: str): + super(SlackIntegration, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "Slack Integration", + "description": "Executes various Slack API operations for channels, messages, and users.", + "category": "Collaboration", + "fields": { + "api_token": { + "name": "API Token", + "type": "Text", + "description": "Your Slack API token" + }, + "operation": { + "name": "Operation", + "type": "Text", + "options": { + # Channel operations + "create_channel": "Create Channel", + "list_channels": "List Channels", + "join_channel": "Join Channel", + "leave_channel": "Leave Channel", + + # Message operations + "post_message": "Post Message", + "update_message": "Update Message", + "delete_message": "Delete Message", + + # User operations + "list_users": "List Users", + "get_user_info": "Get User Info", + "get_user_presence": "Get User Presence" + }, + "default": "list_channels" + }, + "channel_id": { + "name": "Channel ID", + "type": "Text", + "description": "ID of the Slack channel to operate on", + "required": False + }, + "message_ts": { + "name": "Message Timestamp", + "type": "Text", + "description": "Timestamp of the message to update or delete", + "required": False + }, + "message_text": { + "name": "Message Text", + "type": "Text", + "description": "Text content of the message to post or update", + "required": False + }, + "user_id": { + "name": "User ID", + "type": "Text", + "description": "ID of the Slack user to retrieve information for", + "required": False + }, + "additional_params": { + "name": "Additional Parameters", + "type": "Key-Value", + "description": "Additional parameters for the API operation", + "default": "{}", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The operation was completed successfully.", + "style": "success", + }, + "apiError": { + "name": "API Error", + "description": "An error occurred while making the API request.", + "style": "error", + }, + "authError": { + "name": "Authentication Error", + "description": "Invalid API token or insufficient permissions.", + "style": "error", + } + }, + } + )) + + def _get_headers(self, api_token: str) -> Dict[str, str]: + """Create headers for Slack API requests""" + return { + "Authorization": f"Bearer {api_token}", + "Content-Type": "application/json" + } + + def _handle_response(self, response: requests.Response) -> Dict[str, Any]: + """Handle API response and set appropriate outcome""" + if response.status_code == 200: + return response.json() + elif response.status_code == 401: + self.outcome = "authError" + raise RuntimeError("Authentication failed: Invalid API token or insufficient permissions") + else: + self.outcome = "apiError" + raise RuntimeError(f"API error: {response.status_code} - {response.text}") + + def _create_channel(self, headers: Dict[str, str], name: str) -> Dict[str, Any]: + """Create a new Slack channel""" + url = f"{self.BASE_URL}/conversations.create" + data = { + "name": name + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + return self._handle_response(response) + + def _list_channels(self, headers: Dict[str, str]) -> Dict[str, Any]: + """List all Slack channels""" + url = f"{self.BASE_URL}/conversations.list" + response = requests.get(url, headers=headers) + return self._handle_response(response) + + def _join_channel(self, headers: Dict[str, str], channel_id: str) -> Dict[str, Any]: + """Join a Slack channel""" + url = f"{self.BASE_URL}/conversations.join" + data = { + "channel": channel_id + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + return self._handle_response(response) + + def _post_message(self, headers: Dict[str, str], channel_id: str, text: str) -> Dict[str, Any]: + """Post a message to a Slack channel""" + url = f"{self.BASE_URL}/chat.postMessage" + data = { + "channel": channel_id, + "text": text + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + return self._handle_response(response) + + def _update_message(self, headers: Dict[str, str], channel_id: str, ts: str, text: str) -> Dict[str, Any]: + """Update a message in a Slack channel""" + url = f"{self.BASE_URL}/chat.update" + data = { + "channel": channel_id, + "ts": ts, + "text": text + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + return self._handle_response(response) + + def _delete_message(self, headers: Dict[str, str], channel_id: str, ts: str) -> Dict[str, Any]: + """Delete a message from a Slack channel""" + url = f"{self.BASE_URL}/chat.delete" + data = { + "channel": channel_id, + "ts": ts + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + return self._handle_response(response) + + def _list_users(self, headers: Dict[str, str]) -> Dict[str, Any]: + """List all Slack users""" + url = f"{self.BASE_URL}/users.list" + response = requests.get(url, headers=headers) + return self._handle_response(response) + + def _get_user_info(self, headers: Dict[str, str], user_id: str) -> Dict[str, Any]: + """Get information about a Slack user""" + url = f"{self.BASE_URL}/users.info" + data = { + "user": user_id + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + return self._handle_response(response) + + def _get_user_presence(self, headers: Dict[str, str], user_id: str) -> Dict[str, Any]: + """Get the presence status of a Slack user""" + url = f"{self.BASE_URL}/users.getPresence" + data = { + "user": user_id + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + return self._handle_response(response) + + def run(self): + try: + # Get required fields + api_token = self._get_field("api_token") + operation = self._get_field("operation") + headers = self._get_headers(api_token) + + # Get optional fields + channel_id = self._get_field("channel_id", True) + message_ts = self._get_field("message_ts", True) + message_text = self._get_field("message_text", True) + user_id = self._get_field("user_id", True) + additional_params = self._get_field("additional_params", True, "{}") + + # Execute the requested operation + if operation == "create_channel": + result = self._create_channel(headers, channel_id) + elif operation == "list_channels": + result = self._list_channels(headers) + elif operation == "join_channel": + result = self._join_channel(headers, channel_id) + elif operation == "post_message": + result = self._post_message(headers, channel_id, message_text) + elif operation == "update_message": + result = self._update_message(headers, channel_id, message_ts, message_text) + elif operation == "delete_message": + result = self._delete_message(headers, channel_id, message_ts) + elif operation == "list_users": + result = self._list_users(headers) + elif operation == "get_user_info": + result = self._get_user_info(headers, user_id) + elif operation == "get_user_presence": + result = self._get_user_presence(headers, user_id) + else: + raise ValueError(f"Unsupported operation: {operation}") + + # Store the result and set success outcome + self.result = { + "operation": operation, + "timestamp": datetime.now().isoformat(), + "data": result + } + self.outcome = "success" + + except ValueError as e: + self.outcome = "apiError" + raise RuntimeError(f"Validation error: {str(e)}") + except requests.exceptions.RequestException as e: + self.outcome = "apiError" + raise RuntimeError(f"Connection error: {str(e)}") + except Exception as e: + self.outcome = "apiError" + raise e From ae920311c81981ead78a4e467948b77e4e004c3d Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 08:48:38 -0500 Subject: [PATCH 05/11] Add files via upload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This Google Drive integration provides a comprehensive set of operations for working with files and folders. Here's how to use it: First, you need to set up Google Cloud Project and obtain credentials: pythonCopy# Configure the integration block = GoogleDriveIntegration() block.configure({ "credentials_path": "path/to/your/credentials.json", "token_path": "token.json", "operation": "list_files" }) Example operations: pythonCopy# Upload a file block.configure({ "credentials_path": "credentials.json", "operation": "upload_file", "file_path": "document.pdf", "folder_id": "target_folder_id", # Optional "mime_type": "application/pdf" }) # Create a folder block.configure({ "credentials_path": "credentials.json", "operation": "create_folder", "name": "New Folder", "folder_id": "parent_folder_id" # Optional }) # Share a file block.configure({ "credentials_path": "credentials.json", "operation": "share_file", "file_id": "your_file_id", "permissions": { "type": "user", "role": "reader", "emailAddress": "user@example.com" } }) Setting Up Google Cloud Project Create a Google Cloud Project Go to Google Cloud Console Click "Select a Project" → "New Project" Enter a project name (e.g., "Drive Integration") Click "Create" Enable the Google Drive API In the Cloud Console, go to "APIs & Services" → "Library" Search for "Google Drive API" Click "Enable" Create Credentials Go to "APIs & Services" → "Credentials" Click "Create Credentials" → "OAuth client ID" If prompted, configure the OAuth consent screen --- .../googledriveintegration.py | 323 ++++++++++++++++++ 1 file changed, 323 insertions(+) create mode 100644 src/writer/workflows_blocks/googledriveintegration.py diff --git a/src/writer/workflows_blocks/googledriveintegration.py b/src/writer/workflows_blocks/googledriveintegration.py new file mode 100644 index 000000000..7c669d0ed --- /dev/null +++ b/src/writer/workflows_blocks/googledriveintegration.py @@ -0,0 +1,323 @@ +import os +from datetime import datetime +from typing import Dict, Any, Optional, List +import json +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request +from googleapiclient.discovery import build +from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload +import io + +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class GoogleDriveIntegration(WorkflowBlock): + SCOPES = [ + 'https://www.googleapis.com/auth/drive.file', + 'https://www.googleapis.com/auth/drive.metadata.readonly', + 'https://www.googleapis.com/auth/drive.readonly', + 'https://www.googleapis.com/auth/drive' + ] + + @classmethod + def register(cls, type: str): + super(GoogleDriveIntegration, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "Google Drive Integration", + "description": "Executes various Google Drive API operations for files and folders.", + "category": "Storage", + "fields": { + "credentials_path": { + "name": "Credentials Path", + "type": "Text", + "description": "Path to your Google credentials JSON file" + }, + "token_path": { + "name": "Token Path", + "type": "Text", + "description": "Path to store/retrieve the OAuth token", + "default": "token.json" + }, + "operation": { + "name": "Operation", + "type": "Text", + "options": { + # File operations + "list_files": "List Files", + "upload_file": "Upload File", + "download_file": "Download File", + "delete_file": "Delete File", + "copy_file": "Copy File", + "move_file": "Move File", + "search_files": "Search Files", + + # Folder operations + "create_folder": "Create Folder", + "list_folder_contents": "List Folder Contents", + "delete_folder": "Delete Folder", + + # Permission operations + "share_file": "Share File/Folder", + "get_permissions": "Get Permissions", + "update_permissions": "Update Permissions", + "remove_permissions": "Remove Permissions" + }, + "default": "list_files" + }, + "file_path": { + "name": "File Path", + "type": "Text", + "description": "Local path of the file to upload/download", + "required": False + }, + "file_id": { + "name": "File ID", + "type": "Text", + "description": "Google Drive file/folder ID", + "required": False + }, + "folder_id": { + "name": "Folder ID", + "type": "Text", + "description": "Parent folder ID for operations", + "required": False + }, + "name": { + "name": "Name", + "type": "Text", + "description": "Name for new files/folders", + "required": False + }, + "mime_type": { + "name": "MIME Type", + "type": "Text", + "description": "MIME type for file operations", + "required": False + }, + "query": { + "name": "Search Query", + "type": "Text", + "description": "Query string for searching files", + "required": False + }, + "permissions": { + "name": "Permissions", + "type": "Key-Value", + "description": "Permissions configuration for sharing", + "default": "{}", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The operation was completed successfully.", + "style": "success", + }, + "apiError": { + "name": "API Error", + "description": "An error occurred while making the API request.", + "style": "error", + }, + "authError": { + "name": "Authentication Error", + "description": "Authentication failed or insufficient permissions.", + "style": "error", + }, + "fileError": { + "name": "File Error", + "description": "Error handling local files.", + "style": "error", + } + }, + } + )) + + def _get_credentials(self, credentials_path: str, token_path: str) -> Credentials: + """Get or refresh credentials for Google Drive API""" + creds = None + if os.path.exists(token_path): + creds = Credentials.from_authorized_user_file(token_path, self.SCOPES) + + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + flow = InstalledAppFlow.from_client_secrets_file(credentials_path, self.SCOPES) + creds = flow.run_local_server(port=0) + + with open(token_path, 'w') as token: + token.write(creds.to_json()) + + return creds + + def _build_service(self, credentials: Credentials): + """Build Google Drive API service""" + return build('drive', 'v3', credentials=credentials) + + def _list_files(self, service, query: str = None, folder_id: str = None) -> Dict[str, Any]: + """List files in Google Drive or specific folder""" + try: + query_parts = [] + if query: + query_parts.append(query) + if folder_id: + query_parts.append(f"'{folder_id}' in parents") + + final_query = ' and '.join(query_parts) if query_parts else None + + files = [] + page_token = None + while True: + response = service.files().list( + q=final_query, + spaces='drive', + fields='nextPageToken, files(id, name, mimeType, modifiedTime, size)', + pageToken=page_token + ).execute() + + files.extend(response.get('files', [])) + page_token = response.get('nextPageToken') + if not page_token: + break + + return {"files": files} + except Exception as e: + self.outcome = "apiError" + raise RuntimeError(f"Error listing files: {str(e)}") + + def _upload_file(self, service, file_path: str, folder_id: str = None, mime_type: str = None) -> Dict[str, Any]: + """Upload a file to Google Drive""" + try: + file_metadata = { + 'name': os.path.basename(file_path) + } + if folder_id: + file_metadata['parents'] = [folder_id] + + media = MediaFileUpload( + file_path, + mimetype=mime_type, + resumable=True + ) + + file = service.files().create( + body=file_metadata, + media_body=media, + fields='id, name, mimeType, webViewLink' + ).execute() + + return file + except Exception as e: + self.outcome = "fileError" + raise RuntimeError(f"Error uploading file: {str(e)}") + + def _download_file(self, service, file_id: str, output_path: str) -> Dict[str, Any]: + """Download a file from Google Drive""" + try: + request = service.files().get_media(fileId=file_id) + fh = io.BytesIO() + downloader = MediaIoBaseDownload(fh, request) + + done = False + while done is False: + status, done = downloader.next_chunk() + + fh.seek(0) + with open(output_path, 'wb') as f: + f.write(fh.read()) + f.close() + + return {"status": "downloaded", "path": output_path} + except Exception as e: + self.outcome = "fileError" + raise RuntimeError(f"Error downloading file: {str(e)}") + + def _create_folder(self, service, name: str, parent_id: str = None) -> Dict[str, Any]: + """Create a new folder in Google Drive""" + try: + file_metadata = { + 'name': name, + 'mimeType': 'application/vnd.google-apps.folder' + } + if parent_id: + file_metadata['parents'] = [parent_id] + + folder = service.files().create( + body=file_metadata, + fields='id, name, mimeType, webViewLink' + ).execute() + + return folder + except Exception as e: + self.outcome = "apiError" + raise RuntimeError(f"Error creating folder: {str(e)}") + + def _share_file(self, service, file_id: str, permissions: Dict[str, Any]) -> Dict[str, Any]: + """Share a file or folder with specified permissions""" + try: + permission = service.permissions().create( + fileId=file_id, + body=permissions, + fields='id, type, role, emailAddress' + ).execute() + + return permission + except Exception as e: + self.outcome = "apiError" + raise RuntimeError(f"Error sharing file: {str(e)}") + + def run(self): + try: + # Get required fields + credentials_path = self._get_field("credentials_path") + token_path = self._get_field("token_path") + operation = self._get_field("operation") + + # Authenticate and build service + credentials = self._get_credentials(credentials_path, token_path) + service = self._build_service(credentials) + + # Get optional fields + file_path = self._get_field("file_path", True) + file_id = self._get_field("file_id", True) + folder_id = self._get_field("folder_id", True) + name = self._get_field("name", True) + mime_type = self._get_field("mime_type", True) + query = self._get_field("query", True) + permissions = self._get_field("permissions", True, "{}") + + # Execute the requested operation + if operation == "list_files": + result = self._list_files(service, query, folder_id) + elif operation == "upload_file": + result = self._upload_file(service, file_path, folder_id, mime_type) + elif operation == "download_file": + result = self._download_file(service, file_id, file_path) + elif operation == "create_folder": + result = self._create_folder(service, name, folder_id) + elif operation == "share_file": + result = self._share_file(service, file_id, eval(permissions)) + elif operation == "list_folder_contents": + result = self._list_files(service, folder_id=folder_id) + else: + raise ValueError(f"Unsupported operation: {operation}") + + # Store the result and set success outcome + self.result = { + "operation": operation, + "timestamp": datetime.now().isoformat(), + "data": result + } + self.outcome = "success" + + except ValueError as e: + self.outcome = "apiError" + except Exception as e: + if not self.outcome: + self.outcome = "apiError" + From f7210de7c83ce6e493be23a6d53ab8b445fcc791 Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 08:53:17 -0500 Subject: [PATCH 06/11] Add files via upload # E2B Data Analysis Integration Guide ## 1. Setup and Installation ### Prerequisites ```bash pip install e2b ``` ### Configuration ```python from e2b_data_analysis_integration import E2BDataAnalysisIntegration # Register the integration E2BDataAnalysisIntegration.register("e2b_analysis") ``` ## 2. Basic Usage Examples ### Execute Python Code ```python async with E2BDataAnalysisIntegration() as analysis: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "execute_code", "code": """ import pandas as pd import numpy as np # Create sample data data = { 'A': np.random.rand(100), 'B': np.random.rand(100) } df = pd.DataFrame(data) print(df.describe()) """ }) await analysis.run() if analysis.outcome == "success": print(analysis.result["data"]["output"]) ``` ### Install Package ```python async with E2BDataAnalysisIntegration() as analysis: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "install_package", "package_name": "scikit-learn" }) await analysis.run() ``` ### File Operations ```python # Upload data file async with E2BDataAnalysisIntegration() as analysis: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "upload_file", "file_path": "data.csv", "content": "col1,col2\n1,2\n3,4" }) await analysis.run() # Process uploaded file await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "execute_code", "code": """ import pandas as pd df = pd.read_csv('data.csv') print(df.head()) """ }) await analysis.run() ``` ## 3. Advanced Usage ### Complex Data Analysis ```python async def analyze_data(analysis, data_path): # Upload data await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "upload_file", "file_path": "input.csv", "content": data_path }) await analysis.run() # Install required packages await analysis.configure({ "operation": "install_package", "package_name": "scikit-learn" }) await analysis.run() # Perform analysis await analysis.configure({ "operation": "execute_code", "code": """ import pandas as pd from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report # Load data df = pd.read_csv('input.csv') # Prepare features and target X = df.drop('target', axis=1) y = df['target'] # Split data X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # Train model model = RandomForestClassifier(n_estimators=100) model.fit(X_train, y_train) # Evaluate y_pred = model.predict(X_test) print(classification_report(y_test, y_pred)) """ }) await analysis.run() ``` ### Environment Management ```python async def setup_environment(analysis): # Install required packages packages = ['pandas', 'numpy', 'scikit-learn', 'matplotlib'] for package in packages: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "install_package", "package_name": package }) await analysis.run() # Verify installation await analysis.configure({ "operation": "get_environment_info" }) await analysis.run() ``` ## 4. Error Handling ```python async def safe_execution(analysis, code): try: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "execute_code", "code": code, "timeout": "60" }) await analysis.run() if analysis.outcome == "success": return analysis.result["data"]["output"] elif analysis.outcome == "timeout": return "Execution timed out" else: return f"Error: {analysis.result['data'].get('error', 'Unknown error')}" except Exception as e: return f"Execution failed: {str(e)}" ``` ## 5. Best Practices ### Session Management ```python async def managed_session(): async with E2BDataAnalysisIntegration() as analysis: # Session is automatically cleaned up after use await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "execute_code", "code": "print('Hello, World!')" }) await analysis.run() ``` ### Resource Cleanup ```python async def cleanup_resources(analysis): # List and remove temporary files await analysis.configure({ "operation": "list_files" }) await analysis.run() # Clean up session await analysis.configure({ "operation": "cleanup_session" }) await analysis.run() ``` ## 6. Security Considerations 1. API Key Management: ```python import os api_key = os.environ.get('E2B_API_KEY') if not api_key: raise ValueError("E2B API key not found in environment variables") ``` 2. Code Sanitization: ```python def sanitize_code(code: str) -> str: """Basic code sanitization""" forbidden_terms = ['os.system', 'subprocess', '__import__'] for term in forbidden_terms: if term in code: raise ValueError(f"Forbidden term found in code: {term}") return code ``` ## 7. Monitoring and Logging ```python import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger('e2b_analysis') async def monitored_execution(analysis, code): logger.info(f"Starting execution at {datetime. --- src/writer/e2bdataanalysis.py | 258 ++++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100644 src/writer/e2bdataanalysis.py diff --git a/src/writer/e2bdataanalysis.py b/src/writer/e2bdataanalysis.py new file mode 100644 index 000000000..7cc69cd31 --- /dev/null +++ b/src/writer/e2bdataanalysis.py @@ -0,0 +1,258 @@ +import os +import json +import time +from typing import Dict, Any, Optional, List, Union +from datetime import datetime +import base64 +from e2b import DataAnalysis +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class E2BDataAnalysisIntegration(WorkflowBlock): + def __init__(self): + super().__init__() + self.session = None + + @classmethod + def register(cls, type: str): + super(E2BDataAnalysisIntegration, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "E2B Data Analysis Integration", + "description": "Execute data analysis code in a secure sandbox environment", + "category": "Analysis", + "fields": { + "api_key": { + "name": "API Key", + "type": "Text", + "description": "Your E2B API key" + }, + "operation": { + "name": "Operation", + "type": "Text", + "options": { + "execute_code": "Execute Code", + "install_package": "Install Package", + "upload_file": "Upload File", + "download_file": "Download File", + "list_files": "List Files", + "get_environment_info": "Get Environment Info", + "interrupt_execution": "Interrupt Execution", + "cleanup_session": "Cleanup Session" + }, + "default": "execute_code" + }, + "code": { + "name": "Code", + "type": "Text", + "description": "Python code to execute", + "control": "Textarea", + "required": False + }, + "package_name": { + "name": "Package Name", + "type": "Text", + "description": "Name of the Python package to install", + "required": False + }, + "file_path": { + "name": "File Path", + "type": "Text", + "description": "Path for file operations", + "required": False + }, + "content": { + "name": "Content", + "type": "Text", + "description": "Content for file operations", + "required": False + }, + "timeout": { + "name": "Timeout", + "type": "Text", + "description": "Timeout in seconds", + "default": "30", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The operation completed successfully.", + "style": "success", + }, + "error": { + "name": "Error", + "description": "An error occurred during execution.", + "style": "error", + }, + "timeout": { + "name": "Timeout", + "description": "The operation timed out.", + "style": "error", + } + }, + } + )) + + async def _initialize_session(self, api_key: str): + """Initialize E2B session""" + if not self.session: + self.session = await DataAnalysis(api_key=api_key) + return self.session + + async def _execute_code(self, code: str, timeout: int = 30) -> Dict[str, Any]: + """Execute Python code in sandbox""" + try: + result = await self.session.execute_python( + code=code, + timeout=timeout + ) + return { + "output": result.output, + "error": result.error if result.error else None, + "duration": result.duration + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Code execution error: {str(e)}") + + async def _install_package(self, package_name: str) -> Dict[str, Any]: + """Install Python package in sandbox""" + try: + result = await self.session.install_python_package(package_name) + return { + "package": package_name, + "status": "installed", + "output": result + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Package installation error: {str(e)}") + + async def _upload_file(self, file_path: str, content: str) -> Dict[str, Any]: + """Upload file to sandbox""" + try: + await self.session.upload_file(file_path, content) + return { + "status": "uploaded", + "file_path": file_path + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"File upload error: {str(e)}") + + async def _download_file(self, file_path: str) -> Dict[str, Any]: + """Download file from sandbox""" + try: + content = await self.session.download_file(file_path) + return { + "content": content, + "file_path": file_path + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"File download error: {str(e)}") + + async def _list_files(self, path: str = ".") -> Dict[str, Any]: + """List files in sandbox directory""" + try: + files = await self.session.list_files(path) + return { + "files": files + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"File listing error: {str(e)}") + + async def _get_environment_info(self) -> Dict[str, Any]: + """Get information about the sandbox environment""" + try: + python_version = await self._execute_code("import sys; print(sys.version)") + installed_packages = await self._execute_code("!pip list") + + return { + "python_version": python_version["output"], + "installed_packages": installed_packages["output"] + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Environment info error: {str(e)}") + + async def run(self): + try: + # Get required fields + api_key = self._get_field("api_key") + operation = self._get_field("operation") + + # Initialize session + await self._initialize_session(api_key) + + # Get optional fields + code = self._get_field("code", True) + package_name = self._get_field("package_name", True) + file_path = self._get_field("file_path", True) + content = self._get_field("content", True) + timeout = int(self._get_field("timeout", True, "30")) + + # Execute the requested operation + if operation == "execute_code": + if not code: + raise ValueError("Code is required for execution") + result = await self._execute_code(code, timeout) + + elif operation == "install_package": + if not package_name: + raise ValueError("Package name is required for installation") + result = await self._install_package(package_name) + + elif operation == "upload_file": + if not file_path or not content: + raise ValueError("File path and content are required for upload") + result = await self._upload_file(file_path, content) + + elif operation == "download_file": + if not file_path: + raise ValueError("File path is required for download") + result = await self._download_file(file_path) + + elif operation == "list_files": + result = await self._list_files(file_path if file_path else ".") + + elif operation == "get_environment_info": + result = await self._get_environment_info() + + elif operation == "cleanup_session": + if self.session: + await self.session.close() + self.session = None + result = {"status": "cleaned"} + + else: + raise ValueError(f"Unsupported operation: {operation}") + + # Store the result and set success outcome + self.result = { + "operation": operation, + "timestamp": datetime.now().isoformat(), + "data": result + } + self.outcome = "success" + + except ValueError as e: + self.outcome = "error" + raise RuntimeError(f"Validation error: {str(e)}") + except Exception as e: + if not self.outcome: + self.outcome = "error" + raise e + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.session: + await self.session.close() + self.session = None From f08fcd55b53cb83247471a843d35c29d1c560b42 Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 08:59:18 -0500 Subject: [PATCH 07/11] Add files via upload # Webpage Parser Integration Usage Guide ## 1. Installation Requirements ```bash pip install aiohttp beautifulsoup4 trafilatura readability-lxml playwright pandas ``` ## 2. Basic Usage Examples ### Single URL Parsing ```python from webpage_parser_integration import WebpageParserIntegration # Initialize parser parser = WebpageParserIntegration() # Parse single URL await parser.configure({ "operation": "parse_urls", "urls": "https://example.com", "output_format": "json", "include_metadata": "true" }) await parser.run() if parser.outcome == "success": print(parser.result["data"]) ``` ### Multiple URLs Processing ```python # Process multiple URLs urls = [ "https://example.com", "https://example.org", "https://example.net" ] await parser.configure({ "operation": "batch_process", "urls": json.dumps(urls), "output_format": "csv" }) await parser.run() ``` ### Structured Data Extraction ```python # Extract specific data using CSS selectors extraction_rules = { "title": "h1", "price": ".product-price", "description": ".product-description", "categories": ".category" } await parser.configure({ "operation": "extract_structured_data", "urls": "https://example.com/product", "extraction_rules": json.dumps(extraction_rules) }) await parser.run() ``` ## 3. Advanced Usage ### JavaScript Rendering ```python # Parse JavaScript-rendered content await parser.configure({ "operation": "parse_urls", "urls": "https://example.com", "js_rendering": "true", "timeout": "60" }) await parser.run() ``` ### Custom Extraction ```python # Custom extraction with specific rules await parser.configure({ "operation": "custom_extraction", "urls": "https://example.com", "extraction_rules": json.dumps({ "main_article": "article.main-content", "comments": ".comment-section .comment", "author_info": { "name": ".author-name", "bio": ".author-bio", "social_links": ".social-links a" } }) }) await parser.run() ``` ## 4. Error Handling ```python try: await parser.configure({ "operation": "parse_urls", "urls": "https://example.com", "timeout": "30" }) await parser.run() if parser.outcome == "success": print("Parsing successful!") print(parser.result["data"]) elif parser.outcome == "timeout": print("Operation timed out") else: print(f"Error: {parser.result.get('error', 'Unknown error')}") except Exception as e: print(f"Error: {str(e)}") ``` ## 5. Best Practices ### Rate Limiting ```python import asyncio async def rate_limited_parsing(urls: List[str], delay: float = 1.0): results = [] for url in urls: await parser.configure({ "operation": "parse_urls", "urls": url }) await parser.run() results.append(parser.result) await asyncio.sleep(delay) return results ``` ### Caching Results ```python import hashlib import json from datetime import datetime, timedelta class ResultCache: def __init__(self, cache_duration: timedelta = timedelta(hours=1)): self.cache = {} self.duration = cache_duration def get_cached_result(self, url: str) -> Optional[Dict]: key = hashlib.md5(url.encode()).hexdigest() if key in self.cache: result, timestamp = self.cache[key] if datetime.now() - timestamp < self.duration: return result return None def cache_result(self, url: str, result: Dict): key = hashlib.md5(url.encode()).hexdigest() self.cache --- src/writer/workflows_blocks/webpageparser.py | 312 +++++++++++++++++++ 1 file changed, 312 insertions(+) create mode 100644 src/writer/workflows_blocks/webpageparser.py diff --git a/src/writer/workflows_blocks/webpageparser.py b/src/writer/workflows_blocks/webpageparser.py new file mode 100644 index 000000000..98fa73d67 --- /dev/null +++ b/src/writer/workflows_blocks/webpageparser.py @@ -0,0 +1,312 @@ +import asyncio +import aiohttp +from bs4 import BeautifulSoup +import json +import re +from typing import Dict, Any, List, Union +from datetime import datetime +from urllib.parse import urlparse, urljoin +import trafilatura +from readability import Document +import hashlib +from playwright.async_api import async_playwright +import pandas as pd + +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class WebpageParserIntegration(WorkflowBlock): + def __init__(self): + super().__init__() + self.session = None + + @classmethod + def register(cls, type: str): + super(WebpageParserIntegration, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "Webpage Parser Integration", + "description": "Parse and extract content from webpages", + "category": "Web Scraping", + "fields": { + "operation": { + "name": "Operation", + "type": "Text", + "options": { + "parse_urls": "Parse URLs", + "extract_content": "Extract Content", + "extract_structured_data": "Extract Structured Data", + "batch_process": "Batch Process URLs", + "custom_extraction": "Custom Extraction" + }, + "default": "parse_urls" + }, + "urls": { + "name": "URLs", + "type": "Text", + "description": "Single URL or JSON array of URLs", + "required": True + }, + "extraction_rules": { + "name": "Extraction Rules", + "type": "Key-Value", + "description": "CSS selectors or XPath rules for extraction", + "default": "{}", + "required": False + }, + "output_format": { + "name": "Output Format", + "type": "Text", + "options": { + "json": "JSON", + "csv": "CSV", + "text": "Plain Text", + "html": "HTML" + }, + "default": "json", + "required": False + }, + "include_metadata": { + "name": "Include Metadata", + "type": "Text", + "default": "true", + "required": False + }, + "js_rendering": { + "name": "JavaScript Rendering", + "type": "Text", + "default": "false", + "required": False + }, + "timeout": { + "name": "Timeout", + "type": "Text", + "default": "30", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The parsing operation completed successfully.", + "style": "success", + }, + "error": { + "name": "Error", + "description": "An error occurred during parsing.", + "style": "error", + }, + "timeout": { + "name": "Timeout", + "description": "The operation timed out.", + "style": "error", + } + }, + } + )) + + async def _get_page_content(self, url: str, js_rendering: bool = False, timeout: int = 30) -> str: + """Fetch webpage content with optional JavaScript rendering""" + try: + if js_rendering == "true": + async with async_playwright() as p: + browser = await p.chromium.launch() + page = await browser.new_page() + await page.goto(url, timeout=timeout * 1000) + content = await page.content() + await browser.close() + return content + else: + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=timeout) as response: + return await response.text() + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Error fetching content from {url}: {str(e)}") + + def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: + """Extract metadata from webpage""" + metadata = { + "url": url, + "timestamp": datetime.now().isoformat(), + "title": soup.title.string if soup.title else None, + "meta_description": None, + "meta_keywords": None, + "canonical_url": None, + "author": None, + "publication_date": None + } + + # Extract meta tags + for meta in soup.find_all('meta'): + if meta.get('name') == 'description': + metadata['meta_description'] = meta.get('content') + elif meta.get('name') == 'keywords': + metadata['meta_keywords'] = meta.get('content') + elif meta.get('name') == 'author': + metadata['author'] = meta.get('content') + elif meta.get('property') == 'article:published_time': + metadata['publication_date'] = meta.get('content') + + # Extract canonical URL + canonical = soup.find('link', {'rel': 'canonical'}) + if canonical: + metadata['canonical_url'] = canonical.get('href') + + return metadata + + def _clean_text(self, text: str) -> str: + """Clean extracted text""" + # Remove extra whitespace + text = re.sub(r'\s+', ' ', text) + # Remove special characters + text = re.sub(r'[^\w\s.,!?-]', '', text) + return text.strip() + + async def _extract_content(self, url: str, html: str) -> Dict[str, Any]: + """Extract main content from webpage""" + try: + # Use trafilatura for main content extraction + main_content = trafilatura.extract(html, include_comments=False) + + # Use readability as backup + if not main_content: + doc = Document(html) + main_content = doc.summary() + + # Parse with BeautifulSoup + soup = BeautifulSoup(html, 'html.parser') + + # Extract metadata if enabled + metadata = self._extract_metadata(soup, url) + + return { + "url": url, + "content": self._clean_text(main_content) if main_content else None, + "metadata": metadata, + "html": html + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Error extracting content from {url}: {str(e)}") + + async def _extract_structured_data(self, html: str, rules: Dict[str, str]) -> Dict[str, Any]: + """Extract structured data using custom rules""" + soup = BeautifulSoup(html, 'html.parser') + structured_data = {} + + for field, selector in rules.items(): + try: + if selector.startswith('//'): # XPath + # Convert to CSS selector for simplicity + elements = soup.select(selector) + else: # CSS selector + elements = soup.select(selector) + + extracted = [self._clean_text(el.get_text()) for el in elements] + structured_data[field] = extracted[0] if len(extracted) == 1 else extracted + except Exception as e: + structured_data[field] = None + + return structured_data + + def _format_output(self, data: Union[Dict, List], format: str) -> Any: + """Format extracted data according to specified output format""" + if format == "json": + return json.dumps(data, indent=2) + elif format == "csv": + df = pd.DataFrame(data if isinstance(data, list) else [data]) + return df.to_csv(index=False) + elif format == "text": + if isinstance(data, list): + return "\n\n".join([item.get('content', '') for item in data]) + return data.get('content', '') + else: # html + return data.get('html', '') + + async def _process_single_url(self, url: str, config: Dict[str, Any]) -> Dict[str, Any]: + """Process a single URL with given configuration""" + js_rendering = config.get('js_rendering', 'false') + timeout = int(config.get('timeout', 30)) + extraction_rules = json.loads(config.get('extraction_rules', '{}')) + + html = await self._get_page_content(url, js_rendering, timeout) + content = await self._extract_content(url, html) + + if extraction_rules: + structured_data = await self._extract_structured_data(html, extraction_rules) + content['structured_data'] = structured_data + + return content + + async def run(self): + try: + # Get configuration + urls = self._get_field("urls") + operation = self._get_field("operation") + output_format = self._get_field("output_format", True, "json") + include_metadata = self._get_field("include_metadata", True, "true") + + # Parse URLs + if isinstance(urls, str): + if urls.startswith('['): + urls = json.loads(urls) + else: + urls = [urls] + + # Process URLs based on operation + if operation == "parse_urls": + results = [] + for url in urls: + result = await self._process_single_url(url, self._fields) + results.append(result) + + elif operation == "extract_content": + results = [] + for url in urls: + html = await self._get_page_content(url) + content = await self._extract_content(url, html) + results.append(content) + + elif operation == "extract_structured_data": + extraction_rules = json.loads(self._get_field("extraction_rules")) + results = [] + for url in urls: + html = await self._get_page_content(url) + structured_data = await self._extract_structured_data(html, extraction_rules) + results.append({ + "url": url, + "data": structured_data + }) + + elif operation == "batch_process": + results = await asyncio.gather(*[ + self._process_single_url(url, self._fields) + for url in urls + ]) + + else: + raise ValueError(f"Unsupported operation: {operation}") + + # Format results + formatted_output = self._format_output( + results[0] if len(results) == 1 else results, + output_format + ) + + # Set result + self.result = { + "operation": operation, + "timestamp": datetime.now().isoformat(), + "urls_processed": len(urls), + "output_format": output_format, + "data": formatted_output + } + self.outcome = "success" + + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Error processing URLs: {str(e)}") + From 84d3bde165670067a30c753a616a0fe3d76cee64 Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 09:05:32 -0500 Subject: [PATCH 08/11] Salesforce Supported Operations Query Operations soql_query: Execute SOQL queries describe_object: Get object metadata Record Operations create_record: Create single record update_record: Update existing record delete_record: Delete record upsert_record: Update or insert based on external ID Bulk Operations bulk_create: Create multiple records bulk_update: Update multiple records bulk_delete: Delete multiple records bulk_upsert: Upsert multiple records Metadata Operations get_metadata: Retrieve metadata update_metadata: Update metadata File Operations upload_file: Upload attachments download_file: Download attachments --- .../workflows_blocks/salesforceintegration.py | 330 ++++++++++++++++++ 1 file changed, 330 insertions(+) create mode 100644 src/writer/workflows_blocks/salesforceintegration.py diff --git a/src/writer/workflows_blocks/salesforceintegration.py b/src/writer/workflows_blocks/salesforceintegration.py new file mode 100644 index 000000000..29cfff589 --- /dev/null +++ b/src/writer/workflows_blocks/salesforceintegration.py @@ -0,0 +1,330 @@ +import json +from datetime import datetime +from typing import Dict, Any, List, Optional +from simple_salesforce import Salesforce +from simple_salesforce.exceptions import SalesforceError, SalesforceAuthenticationFailed +import pandas as pd +import requests + +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class SalesforceIntegration(WorkflowBlock): + def __init__(self): + super().__init__() + self.sf = None + + @classmethod + def register(cls, type: str): + super(SalesforceIntegration, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "Salesforce Integration", + "description": "Execute Salesforce operations and manage data", + "category": "CRM", + "fields": { + "username": { + "name": "Username", + "type": "Text", + "description": "Salesforce username" + }, + "password": { + "name": "Password", + "type": "Text", + "description": "Salesforce password" + }, + "security_token": { + "name": "Security Token", + "type": "Text", + "description": "Salesforce security token" + }, + "domain": { + "name": "Domain", + "type": "Text", + "description": "Salesforce domain (test/production)", + "options": { + "login": "Production", + "test": "Sandbox" + }, + "default": "login" + }, + "operation": { + "name": "Operation", + "type": "Text", + "options": { + # Query Operations + "soql_query": "SOQL Query", + "describe_object": "Describe Object", + + # Record Operations + "create_record": "Create Record", + "update_record": "Update Record", + "delete_record": "Delete Record", + "upsert_record": "Upsert Record", + + # Bulk Operations + "bulk_create": "Bulk Create", + "bulk_update": "Bulk Update", + "bulk_delete": "Bulk Delete", + "bulk_upsert": "Bulk Upsert", + + # Metadata Operations + "get_metadata": "Get Metadata", + "update_metadata": "Update Metadata", + + # File Operations + "upload_file": "Upload File", + "download_file": "Download File" + }, + "default": "soql_query" + }, + "object_name": { + "name": "Object Name", + "type": "Text", + "description": "Salesforce object name (e.g., Account, Contact)", + "required": False + }, + "record_id": { + "name": "Record ID", + "type": "Text", + "description": "Salesforce record ID", + "required": False + }, + "data": { + "name": "Data", + "type": "Key-Value", + "description": "Data for create/update operations", + "default": "{}", + "required": False + }, + "query": { + "name": "Query", + "type": "Text", + "description": "SOQL query string", + "required": False + }, + "external_id_field": { + "name": "External ID Field", + "type": "Text", + "description": "Field name for upsert operations", + "required": False + }, + "batch_size": { + "name": "Batch Size", + "type": "Text", + "description": "Size of batches for bulk operations", + "default": "200", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The operation completed successfully.", + "style": "success", + }, + "error": { + "name": "Error", + "description": "An error occurred during the operation.", + "style": "error", + }, + "auth_error": { + "name": "Authentication Error", + "description": "Authentication failed.", + "style": "error", + } + }, + } + )) + + def _connect(self, username: str, password: str, security_token: str, domain: str): + """Establish connection to Salesforce""" + try: + self.sf = Salesforce( + username=username, + password=password, + security_token=security_token, + domain=domain + ) + except SalesforceAuthenticationFailed as e: + self.outcome = "auth_error" + raise RuntimeError(f"Authentication failed: {str(e)}") + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Connection error: {str(e)}") + + def _execute_soql_query(self, query: str) -> Dict[str, Any]: + """Execute SOQL query""" + try: + results = self.sf.query_all(query) + return { + "total_size": results['totalSize'], + "records": results['records'] + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Query error: {str(e)}") + + def _describe_object(self, object_name: str) -> Dict[str, Any]: + """Get object metadata""" + try: + obj = getattr(self.sf, object_name) + metadata = obj.describe() + return metadata + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Describe error: {str(e)}") + + def _create_record(self, object_name: str, data: Dict[str, Any]) -> Dict[str, Any]: + """Create a new record""" + try: + obj = getattr(self.sf, object_name) + result = obj.create(data) + return result + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Create error: {str(e)}") + + def _update_record(self, object_name: str, record_id: str, data: Dict[str, Any]) -> Dict[str, Any]: + """Update an existing record""" + try: + obj = getattr(self.sf, object_name) + result = obj.update(record_id, data) + return result + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Update error: {str(e)}") + + def _delete_record(self, object_name: str, record_id: str) -> Dict[str, Any]: + """Delete a record""" + try: + obj = getattr(self.sf, object_name) + result = obj.delete(record_id) + return result + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Delete error: {str(e)}") + + def _upsert_record(self, object_name: str, external_id_field: str, data: Dict[str, Any]) -> Dict[str, Any]: + """Upsert a record using external ID""" + try: + obj = getattr(self.sf, object_name) + result = obj.upsert(external_id_field, data) + return result + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Upsert error: {str(e)}") + + def _bulk_operation(self, operation: str, object_name: str, data: List[Dict[str, Any]], + batch_size: int = 200) -> Dict[str, Any]: + """Execute bulk operation""" + try: + results = [] + for i in range(0, len(data), batch_size): + batch = data[i:i + batch_size] + + if operation == "create": + result = getattr(self.sf.bulk, object_name).insert(batch) + elif operation == "update": + result = getattr(self.sf.bulk, object_name).update(batch) + elif operation == "delete": + result = getattr(self.sf.bulk, object_name).delete(batch) + elif operation == "upsert": + result = getattr(self.sf.bulk, object_name).upsert(batch) + + results.extend(result) + + return { + "total_processed": len(results), + "results": results + } + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Bulk operation error: {str(e)}") + + def run(self): + try: + # Get authentication fields + username = self._get_field("username") + password = self._get_field("password") + security_token = self._get_field("security_token") + domain = self._get_field("domain", False, "login") + + # Connect to Salesforce + self._connect(username, password, security_token, domain) + + # Get operation details + operation = self._get_field("operation") + object_name = self._get_field("object_name", True) + record_id = self._get_field("record_id", True) + data = self._get_field("data", True, "{}") + query = self._get_field("query", True) + external_id_field = self._get_field("external_id_field", True) + batch_size = int(self._get_field("batch_size", True, "200")) + + # Parse data if provided + if data: + data = json.loads(data) + + # Execute requested operation + if operation == "soql_query": + if not query: + raise ValueError("Query is required for SOQL operation") + result = self._execute_soql_query(query) + + elif operation == "describe_object": + if not object_name: + raise ValueError("Object name is required for describe operation") + result = self._describe_object(object_name) + + elif operation == "create_record": + if not object_name or not data: + raise ValueError("Object name and data are required for create operation") + result = self._create_record(object_name, data) + + elif operation == "update_record": + if not object_name or not record_id or not data: + raise ValueError("Object name, record ID, and data are required for update operation") + result = self._update_record(object_name, record_id, data) + + elif operation == "delete_record": + if not object_name or not record_id: + raise ValueError("Object name and record ID are required for delete operation") + result = self._delete_record(object_name, record_id) + + elif operation == "upsert_record": + if not object_name or not external_id_field or not data: + raise ValueError("Object name, external ID field, and data are required for upsert operation") + result = self._upsert_record(object_name, external_id_field, data) + + elif operation in ["bulk_create", "bulk_update", "bulk_delete", "bulk_upsert"]: + if not object_name or not data: + raise ValueError("Object name and data are required for bulk operations") + result = self._bulk_operation( + operation.replace("bulk_", ""), + object_name, + data if isinstance(data, list) else [data], + batch_size + ) + + else: + raise ValueError(f"Unsupported operation: {operation}") + + # Store result and set success outcome + self.result = { + "operation": operation, + "timestamp": datetime.now().isoformat(), + "data": result + } + self.outcome = "success" + + except ValueError as e: + self.outcome = "error" + raise RuntimeError(f"Validation error: {str(e)}") + except Exception as e: + if not self.outcome: + self.outcome = "error" + raise RuntimeError(f"Operation error: {str(e)}") + From 844fcf8c01a4b8d0d3bd2eb60ccd078eeee4166e Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 09:10:45 -0500 Subject: [PATCH 09/11] GitHub Core Features Authentication & Access Control Personal Access Token authentication Organization and user-level access Permission validation Secure token handling --- .../workflows_blocks/githubintegration.py | 436 ++++++++++++++++++ 1 file changed, 436 insertions(+) create mode 100644 src/writer/workflows_blocks/githubintegration.py diff --git a/src/writer/workflows_blocks/githubintegration.py b/src/writer/workflows_blocks/githubintegration.py new file mode 100644 index 000000000..000210642 --- /dev/null +++ b/src/writer/workflows_blocks/githubintegration.py @@ -0,0 +1,436 @@ +import os +import json +from datetime import datetime +from typing import Dict, Any, List, Optional +from github import Github, GithubException +import base64 +import requests +from pathlib import Path + +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class GitHubIntegration(WorkflowBlock): + def __init__(self): + super().__init__() + self.github = None + + @classmethod + def register(cls, type: str): + super(GitHubIntegration, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "GitHub Integration", + "description": "Execute GitHub operations and manage repositories", + "category": "Version Control", + "fields": { + "token": { + "name": "Access Token", + "type": "Text", + "description": "GitHub Personal Access Token" + }, + "operation": { + "name": "Operation", + "type": "Text", + "options": { + # Repository Operations + "create_repo": "Create Repository", + "delete_repo": "Delete Repository", + "list_repos": "List Repositories", + "get_repo_info": "Get Repository Info", + + # Branch Operations + "create_branch": "Create Branch", + "delete_branch": "Delete Branch", + "list_branches": "List Branches", + "protect_branch": "Protect Branch", + + # File Operations + "get_file_content": "Get File Content", + "create_file": "Create File", + "update_file": "Update File", + "delete_file": "Delete File", + + # Issue Operations + "create_issue": "Create Issue", + "update_issue": "Update Issue", + "list_issues": "List Issues", + "add_issue_comment": "Add Issue Comment", + + # Pull Request Operations + "create_pull_request": "Create Pull Request", + "list_pull_requests": "List Pull Requests", + "merge_pull_request": "Merge Pull Request", + "review_pull_request": "Review Pull Request", + + # Workflow Operations + "list_workflows": "List Workflows", + "trigger_workflow": "Trigger Workflow", + "get_workflow_runs": "Get Workflow Runs", + + # Release Operations + "create_release": "Create Release", + "list_releases": "List Releases", + + # Team Operations + "list_teams": "List Teams", + "add_team_member": "Add Team Member", + + # Project Operations + "create_project": "Create Project", + "list_projects": "List Projects" + }, + "default": "list_repos" + }, + "repo_name": { + "name": "Repository Name", + "type": "Text", + "description": "Name of the repository", + "required": False + }, + "owner": { + "name": "Owner", + "type": "Text", + "description": "Repository owner (user/organization)", + "required": False + }, + "branch": { + "name": "Branch", + "type": "Text", + "description": "Branch name", + "required": False + }, + "file_path": { + "name": "File Path", + "type": "Text", + "description": "Path to file in repository", + "required": False + }, + "content": { + "name": "Content", + "type": "Text", + "description": "Content for file/issue/PR operations", + "required": False + }, + "title": { + "name": "Title", + "type": "Text", + "description": "Title for issue/PR/release", + "required": False + }, + "body": { + "name": "Body", + "type": "Text", + "description": "Body content for issue/PR/release", + "required": False + }, + "labels": { + "name": "Labels", + "type": "Text", + "description": "Labels as JSON array", + "default": "[]", + "required": False + }, + "assignees": { + "name": "Assignees", + "type": "Text", + "description": "Assignees as JSON array", + "default": "[]", + "required": False + }, + "commit_message": { + "name": "Commit Message", + "type": "Text", + "description": "Commit message for file operations", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The operation completed successfully.", + "style": "success", + }, + "error": { + "name": "Error", + "description": "An error occurred during the operation.", + "style": "error", + }, + "auth_error": { + "name": "Authentication Error", + "description": "Authentication failed.", + "style": "error", + } + }, + } + )) + + def _initialize_client(self, token: str): + """Initialize GitHub client""" + try: + self.github = Github(token) + # Test authentication + self.github.get_user().login + except GithubException as e: + self.outcome = "auth_error" + raise RuntimeError(f"Authentication failed: {str(e)}") + + def _get_repo(self, owner: str, repo_name: str): + """Get repository object""" + try: + return self.github.get_repo(f"{owner}/{repo_name}") + except GithubException as e: + self.outcome = "error" + raise RuntimeError(f"Repository error: {str(e)}") + + def _create_repo(self, name: str, private: bool = False, description: str = None) -> Dict[str, Any]: + """Create a new repository""" + try: + user = self.github.get_user() + repo = user.create_repo( + name=name, + private=private, + description=description + ) + return { + "id": repo.id, + "name": repo.name, + "full_name": repo.full_name, + "html_url": repo.html_url, + "private": repo.private + } + except GithubException as e: + self.outcome = "error" + raise RuntimeError(f"Repository creation error: {str(e)}") + + def _list_repos(self, owner: str = None) -> Dict[str, Any]: + """List repositories""" + try: + if owner: + repos = self.github.get_user(owner).get_repos() + else: + repos = self.github.get_user().get_repos() + + return { + "repositories": [{ + "id": repo.id, + "name": repo.name, + "full_name": repo.full_name, + "html_url": repo.html_url, + "private": repo.private, + "description": repo.description + } for repo in repos] + } + except GithubException as e: + self.outcome = "error" + raise RuntimeError(f"Repository listing error: {str(e)}") + + def _create_issue(self, repo, title: str, body: str, labels: List[str] = None, + assignees: List[str] = None) -> Dict[str, Any]: + """Create an issue""" + try: + issue = repo.create_issue( + title=title, + body=body, + labels=labels, + assignees=assignees + ) + return { + "id": issue.id, + "number": issue.number, + "title": issue.title, + "html_url": issue.html_url, + "state": issue.state + } + except GithubException as e: + self.outcome = "error" + raise RuntimeError(f"Issue creation error: {str(e)}") + + def _create_pull_request(self, repo, title: str, body: str, head: str, + base: str = "main") -> Dict[str, Any]: + """Create a pull request""" + try: + pr = repo.create_pull( + title=title, + body=body, + head=head, + base=base + ) + return { + "id": pr.id, + "number": pr.number, + "title": pr.title, + "html_url": pr.html_url, + "state": pr.state + } + except GithubException as e: + self.outcome = "error" + raise RuntimeError(f"Pull request creation error: {str(e)}") + + def _file_operation(self, repo, operation: str, file_path: str, + content: str = None, commit_message: str = None) -> Dict[str, Any]: + """Handle file operations""" + try: + if operation == "get": + contents = repo.get_contents(file_path) + content = base64.b64decode(contents.content).decode('utf-8') + return { + "content": content, + "sha": contents.sha, + "size": contents.size + } + elif operation == "create": + result = repo.create_file( + path=file_path, + message=commit_message, + content=content + ) + return { + "commit": { + "sha": result["commit"].sha, + "message": result["commit"].message + }, + "content": { + "path": result["content"].path, + "sha": result["content"].sha + } + } + elif operation == "update": + contents = repo.get_contents(file_path) + result = repo.update_file( + path=file_path, + message=commit_message, + content=content, + sha=contents.sha + ) + return { + "commit": { + "sha": result["commit"].sha, + "message": result["commit"].message + }, + "content": { + "path": result["content"].path, + "sha": result["content"].sha + } + } + elif operation == "delete": + contents = repo.get_contents(file_path) + result = repo.delete_file( + path=file_path, + message=commit_message, + sha=contents.sha + ) + return { + "commit": { + "sha": result["commit"].sha, + "message": result["commit"].message + } + } + except GithubException as e: + self.outcome = "error" + raise RuntimeError(f"File operation error: {str(e)}") + + def run(self): + try: + # Get authentication token + token = self._get_field("token") + self._initialize_client(token) + + # Get operation details + operation = self._get_field("operation") + repo_name = self._get_field("repo_name", True) + owner = self._get_field("owner", True) + branch = self._get_field("branch", True) + file_path = self._get_field("file_path", True) + content = self._get_field("content", True) + title = self._get_field("title", True) + body = self._get_field("body", True) + labels = json.loads(self._get_field("labels", True, "[]")) + assignees = json.loads(self._get_field("assignees", True, "[]")) + commit_message = self._get_field("commit_message", True) + + # Execute requested operation + if operation == "create_repo": + result = self._create_repo( + name=repo_name, + description=body + ) + + elif operation == "list_repos": + result = self._list_repos(owner) + + elif operation in ["create_issue", "create_pull_request", "get_file_content", + "create_file", "update_file", "delete_file"]: + repo = self._get_repo(owner, repo_name) + + if operation == "create_issue": + result = self._create_issue( + repo=repo, + title=title, + body=body, + labels=labels, + assignees=assignees + ) + + elif operation == "create_pull_request": + result = self._create_pull_request( + repo=repo, + title=title, + body=body, + head=branch + ) + + elif operation == "get_file_content": + result = self._file_operation( + repo=repo, + operation="get", + file_path=file_path + ) + + elif operation == "create_file": + result = self._file_operation( + repo=repo, + operation="create", + file_path=file_path, + content=content, + commit_message=commit_message + ) + + elif operation == "update_file": + result = self._file_operation( + repo=repo, + operation="update", + file_path=file_path, + content=content, + commit_message=commit_message + ) + + elif operation == "delete_file": + result = self._file_operation( + repo=repo, + operation="delete", + file_path=file_path, + commit_message=commit_message + ) + + else: + raise ValueError(f"Unsupported operation: {operation}") + + # Store result and set success outcome + self.result = { + "operation": operation, + "timestamp": datetime.now().isoformat(), + "data": result + } + self.outcome = "success" + + except ValueError as e: + self.outcome = "error" + raise RuntimeError(f"Validation error: {str(e)}") + except Exception as e: + if not self.outcome: + self.outcome = "error" + raise RuntimeError(f"Operation error: {str(e)}") + From 0653076f9329c06cd52f232aa1d7f4324e46566c Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 09:17:14 -0500 Subject: [PATCH 10/11] Shopify # Shopify Integration Guide ## Core Features 1. **Authentication & Session Management** - Secure token-based authentication - Session handling - Rate limiting implementation - API version management 2. **Product Management** # Product operations - List products with variants - Create new products - Update existing products - Delete products - Manage product images 3. **Order Processing** # Order operations - List orders with filtering - Create orders - Update order status - Cancel orders - Process refunds 4. **Customer Management** # Customer operations - List customers - Create customer profiles - Update customer information - Track customer orders --- .../workflows_blocks/shopifyintegration.py | 383 ++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 src/writer/workflows_blocks/shopifyintegration.py diff --git a/src/writer/workflows_blocks/shopifyintegration.py b/src/writer/workflows_blocks/shopifyintegration.py new file mode 100644 index 000000000..b0895400b --- /dev/null +++ b/src/writer/workflows_blocks/shopifyintegration.py @@ -0,0 +1,383 @@ +import json +import time +from datetime import datetime +from typing import Dict, Any, List, Optional +import shopify +import requests +from urllib.parse import urlparse +import pandas as pd +from ratelimit import limits, sleep_and_retry + +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class ShopifyIntegration(WorkflowBlock): + # Rate limit: 2 requests per second for API version 2024-01 + CALLS_PER_SECOND = 2 + + def __init__(self): + super().__init__() + self.session = None + + @classmethod + def register(cls, type: str): + super(ShopifyIntegration, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "Shopify Integration", + "description": "Execute Shopify operations and manage store data", + "category": "E-commerce", + "fields": { + "shop_url": { + "name": "Shop URL", + "type": "Text", + "description": "Your Shopify shop URL" + }, + "access_token": { + "name": "Access Token", + "type": "Text", + "description": "Shopify Admin API access token" + }, + "api_version": { + "name": "API Version", + "type": "Text", + "description": "Shopify API version", + "default": "2024-01" + }, + "operation": { + "name": "Operation", + "type": "Text", + "options": { + # Product Operations + "list_products": "List Products", + "get_product": "Get Product", + "create_product": "Create Product", + "update_product": "Update Product", + "delete_product": "Delete Product", + + # Order Operations + "list_orders": "List Orders", + "get_order": "Get Order", + "create_order": "Create Order", + "update_order": "Update Order", + "cancel_order": "Cancel Order", + + # Customer Operations + "list_customers": "List Customers", + "get_customer": "Get Customer", + "create_customer": "Create Customer", + "update_customer": "Update Customer", + + # Inventory Operations + "list_inventory": "List Inventory", + "adjust_inventory": "Adjust Inventory", + "get_inventory_level": "Get Inventory Level", + + # Collection Operations + "list_collections": "List Collections", + "create_collection": "Create Collection", + "add_to_collection": "Add to Collection", + + # Discount Operations + "create_discount": "Create Discount", + "list_discounts": "List Discounts", + + # Webhook Operations + "create_webhook": "Create Webhook", + "list_webhooks": "List Webhooks", + + # Analytics Operations + "get_shop_analytics": "Get Shop Analytics", + "get_product_analytics": "Get Product Analytics" + }, + "default": "list_products" + }, + "resource_id": { + "name": "Resource ID", + "type": "Text", + "description": "ID of the resource to operate on", + "required": False + }, + "data": { + "name": "Data", + "type": "Key-Value", + "description": "Data for create/update operations", + "default": "{}", + "required": False + }, + "filters": { + "name": "Filters", + "type": "Key-Value", + "description": "Filters for list operations", + "default": "{}", + "required": False + }, + "page_size": { + "name": "Page Size", + "type": "Text", + "description": "Number of items per page", + "default": "50", + "required": False + }, + "webhook_url": { + "name": "Webhook URL", + "type": "Text", + "description": "URL for webhook notifications", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The operation completed successfully.", + "style": "success", + }, + "error": { + "name": "Error", + "description": "An error occurred during the operation.", + "style": "error", + }, + "auth_error": { + "name": "Authentication Error", + "description": "Authentication failed.", + "style": "error", + }, + "rate_limit": { + "name": "Rate Limit", + "description": "Rate limit exceeded.", + "style": "error", + } + }, + } + )) + + @sleep_and_retry + @limits(calls=CALLS_PER_SECOND, period=1) + def _make_api_call(self, func, *args, **kwargs): + """Make rate-limited API call""" + try: + return func(*args, **kwargs) + except Exception as e: + if "429" in str(e): + self.outcome = "rate_limit" + raise RuntimeError("Rate limit exceeded") + raise e + + def _initialize_session(self, shop_url: str, access_token: str, api_version: str): + """Initialize Shopify session""" + try: + shop_url = self._format_shop_url(shop_url) + shopify.Session.setup(api_key=access_token, secret=None) + session = shopify.Session(shop_url, api_version, access_token) + shopify.ShopifyResource.activate_session(session) + self.session = session + except Exception as e: + self.outcome = "auth_error" + raise RuntimeError(f"Authentication failed: {str(e)}") + + def _format_shop_url(self, shop_url: str) -> str: + """Format shop URL to standard format""" + parsed = urlparse(shop_url) + return parsed.netloc if parsed.netloc else parsed.path + + def _handle_product_operations(self, operation: str, data: Dict[str, Any] = None, + resource_id: str = None) -> Dict[str, Any]: + """Handle product-related operations""" + try: + if operation == "list_products": + products = self._make_api_call(shopify.Product.find, **data) + return { + "products": [{ + "id": p.id, + "title": p.title, + "handle": p.handle, + "variants": [vars(v) for v in p.variants], + "images": [vars(i) for i in p.images] + } for p in products] + } + + elif operation == "create_product": + product = self._make_api_call(shopify.Product.create, data) + return vars(product) + + elif operation == "update_product": + product = shopify.Product.find(resource_id) + for key, value in data.items(): + setattr(product, key, value) + self._make_api_call(product.save) + return vars(product) + + elif operation == "delete_product": + product = shopify.Product.find(resource_id) + self._make_api_call(product.destroy) + return {"status": "deleted", "id": resource_id} + + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Product operation error: {str(e)}") + + def _handle_order_operations(self, operation: str, data: Dict[str, Any] = None, + resource_id: str = None) -> Dict[str, Any]: + """Handle order-related operations""" + try: + if operation == "list_orders": + orders = self._make_api_call(shopify.Order.find, **data) + return { + "orders": [{ + "id": o.id, + "order_number": o.order_number, + "total_price": o.total_price, + "customer": vars(o.customer) if o.customer else None, + "line_items": [vars(item) for item in o.line_items] + } for o in orders] + } + + elif operation == "create_order": + order = self._make_api_call(shopify.Order.create, data) + return vars(order) + + elif operation == "update_order": + order = shopify.Order.find(resource_id) + for key, value in data.items(): + setattr(order, key, value) + self._make_api_call(order.save) + return vars(order) + + elif operation == "cancel_order": + order = shopify.Order.find(resource_id) + self._make_api_call(order.cancel) + return {"status": "cancelled", "id": resource_id} + + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Order operation error: {str(e)}") + + def _handle_customer_operations(self, operation: str, data: Dict[str, Any] = None, + resource_id: str = None) -> Dict[str, Any]: + """Handle customer-related operations""" + try: + if operation == "list_customers": + customers = self._make_api_call(shopify.Customer.find, **data) + return { + "customers": [{ + "id": c.id, + "email": c.email, + "first_name": c.first_name, + "last_name": c.last_name, + "orders_count": c.orders_count, + "total_spent": c.total_spent + } for c in customers] + } + + elif operation == "create_customer": + customer = self._make_api_call(shopify.Customer.create, data) + return vars(customer) + + elif operation == "update_customer": + customer = shopify.Customer.find(resource_id) + for key, value in data.items(): + setattr(customer, key, value) + self._make_api_call(customer.save) + return vars(customer) + + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Customer operation error: {str(e)}") + + def _handle_inventory_operations(self, operation: str, data: Dict[str, Any] = None, + resource_id: str = None) -> Dict[str, Any]: + """Handle inventory-related operations""" + try: + if operation == "list_inventory": + locations = self._make_api_call(shopify.Location.find) + inventory_levels = [] + for location in locations: + levels = self._make_api_call( + shopify.InventoryLevel.find, + location_id=location.id + ) + inventory_levels.extend([{ + "location_id": level.location_id, + "inventory_item_id": level.inventory_item_id, + "available": level.available + } for level in levels]) + return {"inventory_levels": inventory_levels} + + elif operation == "adjust_inventory": + self._make_api_call( + shopify.InventoryLevel.adjust, + location_id=data['location_id'], + inventory_item_id=data['inventory_item_id'], + available_adjustment=data['adjustment'] + ) + return {"status": "adjusted", "data": data} + + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Inventory operation error: {str(e)}") + + def run(self): + try: + # Get authentication fields + shop_url = self._get_field("shop_url") + access_token = self._get_field("access_token") + api_version = self._get_field("api_version", False, "2024-01") + + # Initialize session + self._initialize_session(shop_url, access_token, api_version) + + # Get operation details + operation = self._get_field("operation") + resource_id = self._get_field("resource_id", True) + data = json.loads(self._get_field("data", True, "{}")) + filters = json.loads(self._get_field("filters", True, "{}")) + page_size = int(self._get_field("page_size", True, "50")) + + # Add pagination to filters if applicable + if filters: + filters['limit'] = page_size + + # Execute requested operation + if operation.startswith("list_") or operation == "get_product": + data.update(filters) + + if operation in ["list_products", "get_product", "create_product", + "update_product", "delete_product"]: + result = self._handle_product_operations(operation, data, resource_id) + + elif operation in ["list_orders", "get_order", "create_order", + "update_order", "cancel_order"]: + result = self._handle_order_operations(operation, data, resource_id) + + elif operation in ["list_customers", "get_customer", "create_customer", + "update_customer"]: + result = self._handle_customer_operations(operation, data, resource_id) + + elif operation in ["list_inventory", "adjust_inventory", "get_inventory_level"]: + result = self._handle_inventory_operations(operation, data, resource_id) + + else: + raise ValueError(f"Unsupported operation: {operation}") + + # Store result and set success outcome + self.result = { + "operation": operation, + "timestamp": datetime.now().isoformat(), + "data": result + } + self.outcome = "success" + + except ValueError as e: + self.outcome = "error" + raise RuntimeError(f"Validation error: {str(e)}") + except Exception as e: + if not self.outcome: + self.outcome = "error" + raise RuntimeError(f"Operation error: {str(e)}") + + finally: + if self.session: + shopify.ShopifyResource.clear_session() + From f94cc69c8c3dc8b9e8aefe491561845397bb9928 Mon Sep 17 00:00:00 2001 From: Waseem AlShikh Date: Thu, 7 Nov 2024 09:23:58 -0500 Subject: [PATCH 11/11] Airtable # Airtable Integration Guide ## Core Features 1. **Authentication & Rate Limiting** - Secure API key authentication - Automatic rate limiting (5 requests/second) - Error handling for API limits - Session management 2. **Record Management** ```python # Record operations - List records with filtering - Get individual records - Create new records - Update existing records - Delete records - Batch operations for efficiency ``` 3. **Table Management** ```python # Table operations - List tables in base - Get table schema - Create new tables - Update table structure - Manage fields and views ``` 4. **View Management** ```python # View operations - List views - Get view details - Create custom views - Filter and sort records in views ``` ## Usage Examples 1. **Basic Record Operations** ```python # Initialize integration airtable = AirtableIntegration() # List records await airtable.configure({ "api_key": "your_api_key", "base_id": "your_base_id", "table_name": "Your Table", "operation": "list_records", "data": json.dumps({ "max_records": 100, "view": "Grid view", "sort_field": "Name" }) }) await airtable.run() # Create record await airt --- .../workflows_blocks/airtableintegration.py | 352 ++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 src/writer/workflows_blocks/airtableintegration.py diff --git a/src/writer/workflows_blocks/airtableintegration.py b/src/writer/workflows_blocks/airtableintegration.py new file mode 100644 index 000000000..b8f0d2186 --- /dev/null +++ b/src/writer/workflows_blocks/airtableintegration.py @@ -0,0 +1,352 @@ +import json +from datetime import datetime +from typing import Dict, Any, List, Optional, Union +import requests +from ratelimit import limits, sleep_and_retry +import pandas as pd +from urllib.parse import quote + +from writer.abstract import register_abstract_template +from writer.ss_types import AbstractTemplate +from writer.workflows_blocks.blocks import WorkflowBlock + +class AirtableIntegration(WorkflowBlock): + BASE_URL = "https://api.airtable.com/v0" + METADATA_URL = "https://api.airtable.com/v0/meta" + # Rate limit: 5 requests per second + CALLS_PER_SECOND = 5 + + def __init__(self): + super().__init__() + self.session = None + + @classmethod + def register(cls, type: str): + super(AirtableIntegration, cls).register(type) + register_abstract_template(type, AbstractTemplate( + baseType="workflows_node", + writer={ + "name": "Airtable Integration", + "description": "Execute Airtable operations and manage bases", + "category": "Database", + "fields": { + "api_key": { + "name": "API Key", + "type": "Text", + "description": "Your Airtable API key" + }, + "base_id": { + "name": "Base ID", + "type": "Text", + "description": "Airtable base ID" + }, + "table_name": { + "name": "Table Name", + "type": "Text", + "description": "Name of the table", + "required": False + }, + "operation": { + "name": "Operation", + "type": "Text", + "options": { + # Record Operations + "list_records": "List Records", + "get_record": "Get Record", + "create_record": "Create Record", + "update_record": "Update Record", + "delete_record": "Delete Record", + "batch_create": "Batch Create Records", + "batch_update": "Batch Update Records", + "batch_delete": "Batch Delete Records", + + # Table Operations + "list_tables": "List Tables", + "get_table_schema": "Get Table Schema", + "create_table": "Create Table", + "update_table": "Update Table", + + # View Operations + "list_views": "List Views", + "get_view": "Get View", + "create_view": "Create View", + + # Field Operations + "list_fields": "List Fields", + "create_field": "Create Field", + "update_field": "Update Field", + + # Automation Operations + "list_automations": "List Automations", + "run_automation": "Run Automation", + + # Base Operations + "get_base_schema": "Get Base Schema", + "get_base_usage": "Get Base Usage" + }, + "default": "list_records" + }, + "record_id": { + "name": "Record ID", + "type": "Text", + "description": "ID of the record to operate on", + "required": False + }, + "data": { + "name": "Data", + "type": "Key-Value", + "description": "Data for create/update operations", + "default": "{}", + "required": False + }, + "view_name": { + "name": "View Name", + "type": "Text", + "description": "Name of the view", + "required": False + }, + "formula": { + "name": "Formula", + "type": "Text", + "description": "Airtable formula for filtering", + "required": False + }, + "sort_field": { + "name": "Sort Field", + "type": "Text", + "description": "Field to sort by", + "required": False + }, + "max_records": { + "name": "Max Records", + "type": "Text", + "description": "Maximum number of records to return", + "default": "100", + "required": False + }, + "cell_format": { + "name": "Cell Format", + "type": "Text", + "options": { + "json": "JSON", + "string": "String" + }, + "default": "json", + "required": False + } + }, + "outs": { + "success": { + "name": "Success", + "description": "The operation completed successfully.", + "style": "success", + }, + "error": { + "name": "Error", + "description": "An error occurred during the operation.", + "style": "error", + }, + "auth_error": { + "name": "Authentication Error", + "description": "Authentication failed.", + "style": "error", + }, + "rate_limit": { + "name": "Rate Limit", + "description": "Rate limit exceeded.", + "style": "error", + } + }, + } + )) + + def _get_headers(self, api_key: str) -> Dict[str, str]: + """Create headers for Airtable API requests""" + return { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + + @sleep_and_retry + @limits(calls=CALLS_PER_SECOND, period=1) + def _make_request(self, method: str, url: str, headers: Dict[str, str], + data: Dict[str, Any] = None) -> Dict[str, Any]: + """Make rate-limited API request""" + try: + response = requests.request(method, url, headers=headers, json=data) + + if response.status_code == 429: + self.outcome = "rate_limit" + raise RuntimeError("Rate limit exceeded") + elif response.status_code == 401: + self.outcome = "auth_error" + raise RuntimeError("Authentication failed") + elif response.status_code >= 400: + self.outcome = "error" + raise RuntimeError(f"API error: {response.text}") + + return response.json() + except requests.exceptions.RequestException as e: + self.outcome = "error" + raise RuntimeError(f"Request error: {str(e)}") + + def _handle_records(self, operation: str, headers: Dict[str, str], base_id: str, + table_name: str, data: Dict[str, Any] = None, + record_id: str = None) -> Dict[str, Any]: + """Handle record operations""" + try: + base_url = f"{self.BASE_URL}/{base_id}/{quote(table_name)}" + + if operation == "list_records": + params = [] + if data.get("view"): + params.append(f"view={quote(data['view'])}") + if data.get("formula"): + params.append(f"filterByFormula={quote(data['formula'])}") + if data.get("sort_field"): + params.append(f"sort[0][field]={quote(data['sort_field'])}") + params.append(f"sort[0][direction]={data.get('sort_direction', 'asc')}") + if data.get("max_records"): + params.append(f"maxRecords={data['max_records']}") + + url = f"{base_url}?{'&'.join(params)}" if params else base_url + return self._make_request("GET", url, headers) + + elif operation == "get_record": + return self._make_request("GET", f"{base_url}/{record_id}", headers) + + elif operation == "create_record": + return self._make_request("POST", base_url, headers, {"fields": data}) + + elif operation == "update_record": + return self._make_request("PATCH", f"{base_url}/{record_id}", + headers, {"fields": data}) + + elif operation == "delete_record": + return self._make_request("DELETE", f"{base_url}/{record_id}", headers) + + elif operation.startswith("batch_"): + batch_operation = operation.split("_")[1] + method = { + "create": "POST", + "update": "PATCH", + "delete": "DELETE" + }[batch_operation] + + if batch_operation in ["create", "update"]: + payload = {"records": [{"fields": record} for record in data]} + else: # delete + payload = {"records": data} + + return self._make_request(method, f"{base_url}/batch", headers, payload) + + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Record operation error: {str(e)}") + + def _handle_tables(self, operation: str, headers: Dict[str, str], base_id: str, + table_name: str = None, data: Dict[str, Any] = None) -> Dict[str, Any]: + """Handle table operations""" + try: + if operation == "list_tables": + url = f"{self.METADATA_URL}/bases/{base_id}/tables" + return self._make_request("GET", url, headers) + + elif operation == "get_table_schema": + url = f"{self.METADATA_URL}/bases/{base_id}/tables/{quote(table_name)}" + return self._make_request("GET", url, headers) + + elif operation == "create_table": + url = f"{self.METADATA_URL}/bases/{base_id}/tables" + return self._make_request("POST", url, headers, data) + + elif operation == "update_table": + url = f"{self.METADATA_URL}/bases/{base_id}/tables/{quote(table_name)}" + return self._make_request("PATCH", url, headers, data) + + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"Table operation error: {str(e)}") + + def _handle_views(self, operation: str, headers: Dict[str, str], base_id: str, + table_name: str, view_name: str = None, + data: Dict[str, Any] = None) -> Dict[str, Any]: + """Handle view operations""" + try: + base_url = f"{self.METADATA_URL}/bases/{base_id}/tables/{quote(table_name)}/views" + + if operation == "list_views": + return self._make_request("GET", base_url, headers) + + elif operation == "get_view": + return self._make_request("GET", f"{base_url}/{quote(view_name)}", headers) + + elif operation == "create_view": + return self._make_request("POST", base_url, headers, data) + + except Exception as e: + self.outcome = "error" + raise RuntimeError(f"View operation error: {str(e)}") + + def run(self): + try: + # Get authentication and base configuration + api_key = self._get_field("api_key") + base_id = self._get_field("base_id") + headers = self._get_headers(api_key) + + # Get operation details + operation = self._get_field("operation") + table_name = self._get_field("table_name", True) + record_id = self._get_field("record_id", True) + view_name = self._get_field("view_name", True) + data = json.loads(self._get_field("data", True, "{}")) + + # Process optional parameters + if "max_records" in self._fields: + data["max_records"] = int(self._get_field("max_records", True, "100")) + if "formula" in self._fields: + data["formula"] = self._get_field("formula", True) + if "sort_field" in self._fields: + data["sort_field"] = self._get_field("sort_field", True) + if "view" in self._fields: + data["view"] = self._get_field("view", True) + + # Execute requested operation + if operation in ["list_records", "get_record", "create_record", + "update_record", "delete_record", "batch_create", + "batch_update", "batch_delete"]: + result = self._handle_records( + operation, headers, base_id, table_name, data, record_id + ) + + elif operation in ["list_tables", "get_table_schema", "create_table", + "update_table"]: + result = self._handle_tables( + operation, headers, base_id, table_name, data + ) + + elif operation in ["list_views", "get_view", "create_view"]: + result = self._handle_views( + operation, headers, base_id, table_name, view_name, data + ) + + else: + raise ValueError(f"Unsupported operation: {operation}") + + # Store result and set success outcome + self.result = { + "operation": operation, + "timestamp": datetime.now().isoformat(), + "data": result + } + self.outcome = "success" + + except ValueError as e: + self.outcome = "error" + raise RuntimeError(f"Validation error: {str(e)}") + except Exception as e: + if not self.outcome: + self.outcome = "error" + raise RuntimeError(f"Operation error: {str(e)}") +