-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
3rdparty tools patch 1 #619
base: dev
Are you sure you want to change the base?
Conversation
- serpapisearch integration with https://serpapi.com/
This Brave Search integration block follows a similar structure to the previous SerpApi and Notion blocks. Here's a breakdown of the key elements: Registration: The BraveSearch class is registered as a workflow block, and the block's metadata (name, description, category, fields, and outcomes) are defined in the register method. Fields: The block accepts the following fields: api_key: The Brave Search API key query: The search query location: The geographic location for search results (optional) device: The device type (desktop or mobile) for search results (optional) language: The language for search results (optional) num_results: The number of search results to retrieve (optional) additional_params: Any additional parameters to pass to the Brave Search API (optional) Outcomes: The block can have the following outcomes: success: The search was completed successfully apiError: An error occurred while making the API request rateLimitError: The API rate limit has been exceeded Request Handling: The run method is responsible for executing the search query and handling the API response. It: Retrieves the required and optional fields from the block configuration Constructs the API endpoint URL and the request parameters Sets the API key in the request headers Makes the API request using the requests library Handles the response, setting the appropriate outcome and storing the result data in the self.result attribute To use this block, you would need to: Register it in your workflow system: pythonCopyBraveSearch.register("brave_search") Configure it with your Brave Search API key and search parameters: pythonCopyblock = BraveSearch() block.configure({ "api_key": "your_brave_search_api_key", "query": "your search query", "location": "San Francisco", "language": "en", "num_results": "20" }) Let me know if you have any other questions or if you'd like me to make any additional improvements to the Brave Search integration block.
This Tavily Search integration block follows a similar structure to the previous search engine integration blocks. Here are the key differences: Fields: The block accepts the following fields: api_key: The Tavily Search API key query: The search query location: The geographic location for search results (optional) num_results: The number of search results to retrieve (optional) additional_params: Any additional parameters to pass to the Tavily Search API (optional) API Endpoint: The API endpoint URL is set to "https://api.tavily.com/v1/search". Request Parameters: The request parameters include "q" for the search query, "location" for the geographic location, and "num" for the number of results. Any additional parameters are merged into the params dictionary. Request Headers: The API key is set in the "X-Api-Key" header. Response Handling: The response handling logic is similar to the previous blocks, setting the appropriate outcome based on the response status code. To use this block, you would need to: Register it in your workflow system: pythonCopyTavilySearch.register("tavily_search") Configure it with your Tavily Search API key and search parameters: pythonCopyblock = TavilySearch() block.configure({ "api_key": "your_tavily_search_api_key", "query": "your search query", "location": "New York", "num_results": "15" })
This Slack integration block provides a comprehensive set of operations for working with Slack channels, messages, and users. Here's a breakdown of the key elements: Registration: The SlackIntegration class is registered as a workflow block, and the block's metadata (name, description, category, fields, and outcomes) are defined in the register method. Fields: The block accepts the following fields: api_token: The Slack API token operation: The specific Slack API operation to perform (e.g., create channel, post message, list users) channel_id: The ID of the Slack channel to operate on (required for channel and message operations) message_ts: The timestamp of the message to update or delete (required for message update/delete operations) message_text: The text content of the message to post or update (required for message post/update operations) user_id: The ID of the Slack user to retrieve information for (required for user operations) additional_params: Any additional parameters to pass to the Slack API (optional) Outcomes: The block can have the following outcomes: success: The operation was completed successfully apiError: An error occurred while making the API request authError: Invalid API token or insufficient permissions API Methods: The block includes the following methods for interacting with the Slack API: _create_channel: Creates a new Slack channel _list_channels: Lists all Slack channels _join_channel: Joins a Slack channel _post_message: Posts a message to a Slack channel _update_message: Updates a message in a Slack channel _delete_message: Deletes a message from a Slack channel _list_users: Lists all Slack users _get_user_info: Retrieves information about a Slack user _get_user_presence: Retrieves the presence status of a Slack user Request Handling: The run method is responsible for executing the requested Slack API operation and handling the response. It: Retrieves the required and optional fields from the block configuration Constructs the API headers with the provided API token Calls the appropriate API method based on the selected operation Handles the response, setting the appropriate outcome and storing the result data in the self.result attribute To use this block, you would need to: Register it in your workflow system: pythonCopySlackIntegration.register("slack_integration") Configure it with your Slack API token and the desired operation: pythonCopyblock = SlackIntegration() block.configure({ "api_token": "your_slack_api_token", "operation": "post_message", "channel_id": "C012345678", "message_text": "Hello, Slack!" }) This is a comprehensive Slack integration block that covers a wide range of Slack API operations. Let me know if you have any other questions or if you'd like me to make any additional improvements to the implementation.
This Google Drive integration provides a comprehensive set of operations for working with files and folders. Here's how to use it: First, you need to set up Google Cloud Project and obtain credentials: pythonCopy# Configure the integration block = GoogleDriveIntegration() block.configure({ "credentials_path": "path/to/your/credentials.json", "token_path": "token.json", "operation": "list_files" }) Example operations: pythonCopy# Upload a file block.configure({ "credentials_path": "credentials.json", "operation": "upload_file", "file_path": "document.pdf", "folder_id": "target_folder_id", # Optional "mime_type": "application/pdf" }) # Create a folder block.configure({ "credentials_path": "credentials.json", "operation": "create_folder", "name": "New Folder", "folder_id": "parent_folder_id" # Optional }) # Share a file block.configure({ "credentials_path": "credentials.json", "operation": "share_file", "file_id": "your_file_id", "permissions": { "type": "user", "role": "reader", "emailAddress": "[email protected]" } }) Setting Up Google Cloud Project Create a Google Cloud Project Go to Google Cloud Console Click "Select a Project" → "New Project" Enter a project name (e.g., "Drive Integration") Click "Create" Enable the Google Drive API In the Cloud Console, go to "APIs & Services" → "Library" Search for "Google Drive API" Click "Enable" Create Credentials Go to "APIs & Services" → "Credentials" Click "Create Credentials" → "OAuth client ID" If prompted, configure the OAuth consent screen
# E2B Data Analysis Integration Guide ## 1. Setup and Installation ### Prerequisites ```bash pip install e2b ``` ### Configuration ```python from e2b_data_analysis_integration import E2BDataAnalysisIntegration # Register the integration E2BDataAnalysisIntegration.register("e2b_analysis") ``` ## 2. Basic Usage Examples ### Execute Python Code ```python async with E2BDataAnalysisIntegration() as analysis: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "execute_code", "code": """ import pandas as pd import numpy as np # Create sample data data = { 'A': np.random.rand(100), 'B': np.random.rand(100) } df = pd.DataFrame(data) print(df.describe()) """ }) await analysis.run() if analysis.outcome == "success": print(analysis.result["data"]["output"]) ``` ### Install Package ```python async with E2BDataAnalysisIntegration() as analysis: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "install_package", "package_name": "scikit-learn" }) await analysis.run() ``` ### File Operations ```python # Upload data file async with E2BDataAnalysisIntegration() as analysis: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "upload_file", "file_path": "data.csv", "content": "col1,col2\n1,2\n3,4" }) await analysis.run() # Process uploaded file await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "execute_code", "code": """ import pandas as pd df = pd.read_csv('data.csv') print(df.head()) """ }) await analysis.run() ``` ## 3. Advanced Usage ### Complex Data Analysis ```python async def analyze_data(analysis, data_path): # Upload data await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "upload_file", "file_path": "input.csv", "content": data_path }) await analysis.run() # Install required packages await analysis.configure({ "operation": "install_package", "package_name": "scikit-learn" }) await analysis.run() # Perform analysis await analysis.configure({ "operation": "execute_code", "code": """ import pandas as pd from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report # Load data df = pd.read_csv('input.csv') # Prepare features and target X = df.drop('target', axis=1) y = df['target'] # Split data X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # Train model model = RandomForestClassifier(n_estimators=100) model.fit(X_train, y_train) # Evaluate y_pred = model.predict(X_test) print(classification_report(y_test, y_pred)) """ }) await analysis.run() ``` ### Environment Management ```python async def setup_environment(analysis): # Install required packages packages = ['pandas', 'numpy', 'scikit-learn', 'matplotlib'] for package in packages: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "install_package", "package_name": package }) await analysis.run() # Verify installation await analysis.configure({ "operation": "get_environment_info" }) await analysis.run() ``` ## 4. Error Handling ```python async def safe_execution(analysis, code): try: await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "execute_code", "code": code, "timeout": "60" }) await analysis.run() if analysis.outcome == "success": return analysis.result["data"]["output"] elif analysis.outcome == "timeout": return "Execution timed out" else: return f"Error: {analysis.result['data'].get('error', 'Unknown error')}" except Exception as e: return f"Execution failed: {str(e)}" ``` ## 5. Best Practices ### Session Management ```python async def managed_session(): async with E2BDataAnalysisIntegration() as analysis: # Session is automatically cleaned up after use await analysis.configure({ "api_key": "your_e2b_api_key", "operation": "execute_code", "code": "print('Hello, World!')" }) await analysis.run() ``` ### Resource Cleanup ```python async def cleanup_resources(analysis): # List and remove temporary files await analysis.configure({ "operation": "list_files" }) await analysis.run() # Clean up session await analysis.configure({ "operation": "cleanup_session" }) await analysis.run() ``` ## 6. Security Considerations 1. API Key Management: ```python import os api_key = os.environ.get('E2B_API_KEY') if not api_key: raise ValueError("E2B API key not found in environment variables") ``` 2. Code Sanitization: ```python def sanitize_code(code: str) -> str: """Basic code sanitization""" forbidden_terms = ['os.system', 'subprocess', '__import__'] for term in forbidden_terms: if term in code: raise ValueError(f"Forbidden term found in code: {term}") return code ``` ## 7. Monitoring and Logging ```python import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger('e2b_analysis') async def monitored_execution(analysis, code): logger.info(f"Starting execution at {datetime.
# Webpage Parser Integration Usage Guide ## 1. Installation Requirements ```bash pip install aiohttp beautifulsoup4 trafilatura readability-lxml playwright pandas ``` ## 2. Basic Usage Examples ### Single URL Parsing ```python from webpage_parser_integration import WebpageParserIntegration # Initialize parser parser = WebpageParserIntegration() # Parse single URL await parser.configure({ "operation": "parse_urls", "urls": "https://example.com", "output_format": "json", "include_metadata": "true" }) await parser.run() if parser.outcome == "success": print(parser.result["data"]) ``` ### Multiple URLs Processing ```python # Process multiple URLs urls = [ "https://example.com", "https://example.org", "https://example.net" ] await parser.configure({ "operation": "batch_process", "urls": json.dumps(urls), "output_format": "csv" }) await parser.run() ``` ### Structured Data Extraction ```python # Extract specific data using CSS selectors extraction_rules = { "title": "h1", "price": ".product-price", "description": ".product-description", "categories": ".category" } await parser.configure({ "operation": "extract_structured_data", "urls": "https://example.com/product", "extraction_rules": json.dumps(extraction_rules) }) await parser.run() ``` ## 3. Advanced Usage ### JavaScript Rendering ```python # Parse JavaScript-rendered content await parser.configure({ "operation": "parse_urls", "urls": "https://example.com", "js_rendering": "true", "timeout": "60" }) await parser.run() ``` ### Custom Extraction ```python # Custom extraction with specific rules await parser.configure({ "operation": "custom_extraction", "urls": "https://example.com", "extraction_rules": json.dumps({ "main_article": "article.main-content", "comments": ".comment-section .comment", "author_info": { "name": ".author-name", "bio": ".author-bio", "social_links": ".social-links a" } }) }) await parser.run() ``` ## 4. Error Handling ```python try: await parser.configure({ "operation": "parse_urls", "urls": "https://example.com", "timeout": "30" }) await parser.run() if parser.outcome == "success": print("Parsing successful!") print(parser.result["data"]) elif parser.outcome == "timeout": print("Operation timed out") else: print(f"Error: {parser.result.get('error', 'Unknown error')}") except Exception as e: print(f"Error: {str(e)}") ``` ## 5. Best Practices ### Rate Limiting ```python import asyncio async def rate_limited_parsing(urls: List[str], delay: float = 1.0): results = [] for url in urls: await parser.configure({ "operation": "parse_urls", "urls": url }) await parser.run() results.append(parser.result) await asyncio.sleep(delay) return results ``` ### Caching Results ```python import hashlib import json from datetime import datetime, timedelta class ResultCache: def __init__(self, cache_duration: timedelta = timedelta(hours=1)): self.cache = {} self.duration = cache_duration def get_cached_result(self, url: str) -> Optional[Dict]: key = hashlib.md5(url.encode()).hexdigest() if key in self.cache: result, timestamp = self.cache[key] if datetime.now() - timestamp < self.duration: return result return None def cache_result(self, url: str, result: Dict): key = hashlib.md5(url.encode()).hexdigest() self.cache
Supported Operations Query Operations soql_query: Execute SOQL queries describe_object: Get object metadata Record Operations create_record: Create single record update_record: Update existing record delete_record: Delete record upsert_record: Update or insert based on external ID Bulk Operations bulk_create: Create multiple records bulk_update: Update multiple records bulk_delete: Delete multiple records bulk_upsert: Upsert multiple records Metadata Operations get_metadata: Retrieve metadata update_metadata: Update metadata File Operations upload_file: Upload attachments download_file: Download attachments
# Shopify Integration Guide ## Core Features 1. **Authentication & Session Management** - Secure token-based authentication - Session handling - Rate limiting implementation - API version management 2. **Product Management** # Product operations - List products with variants - Create new products - Update existing products - Delete products - Manage product images 3. **Order Processing** # Order operations - List orders with filtering - Create orders - Update order status - Cancel orders - Process refunds 4. **Customer Management** # Customer operations - List customers - Create customer profiles - Update customer information - Track customer orders
# Airtable Integration Guide ## Core Features 1. **Authentication & Rate Limiting** - Secure API key authentication - Automatic rate limiting (5 requests/second) - Error handling for API limits - Session management 2. **Record Management** ```python # Record operations - List records with filtering - Get individual records - Create new records - Update existing records - Delete records - Batch operations for efficiency ``` 3. **Table Management** ```python # Table operations - List tables in base - Get table schema - Create new tables - Update table structure - Manage fields and views ``` 4. **View Management** ```python # View operations - List views - Get view details - Create custom views - Filter and sort records in views ``` ## Usage Examples 1. **Basic Record Operations** ```python # Initialize integration airtable = AirtableIntegration() # List records await airtable.configure({ "api_key": "your_api_key", "base_id": "your_base_id", "table_name": "Your Table", "operation": "list_records", "data": json.dumps({ "max_records": 100, "view": "Grid view", "sort_field": "Name" }) }) await airtable.run() # Create record await airt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started the code review. I don't really know what to do with it other than consider it as a great specification of what needs to be implemented. I only check e2bdataanalysis.py
.
@@ -0,0 +1,258 @@ | |||
import os |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should move this module into src/writer/workflows_blocks
.
raise RuntimeError(f"Environment info error: {str(e)}") | ||
|
||
async def run(self): | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if importlib.util.find_spec('e2b') is not None:
self.result = f"You should install `e2b` library to use E2B Data Analysis Integration"
self.outcome = "error"
return
from typing import Dict, Any, Optional, List, Union | ||
from datetime import datetime | ||
import base64 | ||
from e2b import DataAnalysis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e2b
is not part of writer dependency. You have to check and import e2b at the runtime when developper try using it.
async def _initialize_session(self, api_key: str): | ||
"""Initialize E2B session""" | ||
if not self.session: | ||
self.session = await DataAnalysis(api_key=api_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async def _initialize_session(self, api_key: str):
from e2b import DataAnalysis
"""Initialize E2B session"""
if not self.session:
self.session = await DataAnalysis(api_key=api_key)
self.outcome = "error" | ||
raise RuntimeError(f"Environment info error: {str(e)}") | ||
|
||
async def run(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ramedina86 I don't think we support async invocation in workflow currently
class E2BDataAnalysisIntegration(WorkflowBlock): | ||
def __init__(self): | ||
super().__init__() | ||
self.session = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.session
is reserved for in WorkflowBlock
. You should declare an attribute e2b_session
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.e2b_session: Any = None
@@ -0,0 +1,258 @@ | |||
import os | |||
import json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some import are not used in the module
|
||
async def _process_single_url(self, url: str, config: Dict[str, Any]) -> Dict[str, Any]: | ||
"""Process a single URL with given configuration""" | ||
js_rendering = config.get('js_rendering', 'false') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a string, it should be a bool
async def _get_page_content(self, url: str, js_rendering: bool = False, timeout: int = 30) -> str: |
So it should be
js_rendering = config.get('js_rendering', 'false') | |
js_rendering = config.get('js_rendering', 'false') == 'true' |
channel_id = self._get_field("channel_id", True) | ||
message_ts = self._get_field("message_ts", True) | ||
message_text = self._get_field("message_text", True) | ||
user_id = self._get_field("user_id", True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These fields are not JSON
channel_id = self._get_field("channel_id", True) | |
message_ts = self._get_field("message_ts", True) | |
message_text = self._get_field("message_text", True) | |
user_id = self._get_field("user_id", True) | |
channel_id = self._get_field("channel_id") | |
message_ts = self._get_field("message_ts") | |
message_text = self._get_field("message_text") | |
user_id = self._get_field("user_id") |
# Execute the requested operation | ||
if operation == "create_channel": | ||
result = self._create_channel(headers, channel_id) | ||
elif operation == "list_channels": | ||
result = self._list_channels(headers) | ||
elif operation == "join_channel": | ||
result = self._join_channel(headers, channel_id) | ||
elif operation == "post_message": | ||
result = self._post_message(headers, channel_id, message_text) | ||
elif operation == "update_message": | ||
result = self._update_message(headers, channel_id, message_ts, message_text) | ||
elif operation == "delete_message": | ||
result = self._delete_message(headers, channel_id, message_ts) | ||
elif operation == "list_users": | ||
result = self._list_users(headers) | ||
elif operation == "get_user_info": | ||
result = self._get_user_info(headers, user_id) | ||
elif operation == "get_user_presence": | ||
result = self._get_user_presence(headers, user_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some fields are optional and can be missing from the configuration. I think it's better to handle it with a proper exception before doing the API call.
Maybe somthing like
# Execute the requested operation | |
if operation == "create_channel": | |
result = self._create_channel(headers, channel_id) | |
elif operation == "list_channels": | |
result = self._list_channels(headers) | |
elif operation == "join_channel": | |
result = self._join_channel(headers, channel_id) | |
elif operation == "post_message": | |
result = self._post_message(headers, channel_id, message_text) | |
elif operation == "update_message": | |
result = self._update_message(headers, channel_id, message_ts, message_text) | |
elif operation == "delete_message": | |
result = self._delete_message(headers, channel_id, message_ts) | |
elif operation == "list_users": | |
result = self._list_users(headers) | |
elif operation == "get_user_info": | |
result = self._get_user_info(headers, user_id) | |
elif operation == "get_user_presence": | |
result = self._get_user_presence(headers, user_id) | |
def prevent_field_as_empty_str(field: str, value: str): | |
if value == "": | |
raise ValueError(f"Missing parameters: operation '{operation}' needs a valid '{field}' param") | |
# Execute the requested operation | |
if operation == "create_channel": | |
prevent_field_as_empty_str('channel_id', channel_id) | |
result = self._create_channel(headers, channel_id) | |
elif operation == "list_channels": | |
result = self._list_channels(headers) | |
elif operation == "join_channel": | |
prevent_field_as_empty_str('channel_id', channel_id) | |
result = self._join_channel(headers, channel_id) | |
elif operation == "post_message": | |
prevent_field_as_empty_str('channel_id', channel_id) | |
prevent_field_as_empty_str('message_text', message_text) | |
result = self._post_message(headers, channel_id, message_text) | |
elif operation == "update_message": | |
prevent_field_as_empty_str('channel_id', channel_id) | |
prevent_field_as_empty_str('message_text', message_text) | |
prevent_field_as_empty_str('message_ts', message_ts) | |
result = self._update_message(headers, channel_id, message_ts, message_text) | |
elif operation == "delete_message": | |
prevent_field_as_empty_str('channel_id', channel_id) | |
prevent_field_as_empty_str('message_ts', message_ts) | |
result = self._delete_message(headers, channel_id, message_ts) | |
elif operation == "list_users": | |
result = self._list_users(headers) | |
elif operation == "get_user_info": | |
prevent_field_as_empty_str('user_id', user_id) | |
result = self._get_user_info(headers, user_id) | |
elif operation == "get_user_presence": | |
prevent_field_as_empty_str('user_id', user_id) | |
result = self._get_user_presence(headers, user_id) |
It the same for the other integration. Maybe we need a more robust validation, or maybe let Workflow send bad request is fine 🤷
It makes me think we can introduce dynamic required
fields, like channel_id
could become required if the operation is post_message
elif operation == "create_folder": | ||
result = self._create_folder(service, name, folder_id) | ||
elif operation == "share_file": | ||
result = self._share_file(service, file_id, eval(permissions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eval
is really dangerous, it means to execute any Py code coming from outside.
I don't get why it's needed since we already parse the JSON
result = self._share_file(service, file_id, eval(permissions)) | |
result = self._share_file(service, file_id, permissions) |
} | ||
|
||
# Merge additional parameters | ||
params.update(eval(additional_params)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eval
is really dangerous, it means to execute any Py code coming from outside.
I don't get why it's needed since we already parse the JSON
v = self.evaluator.evaluate_field(self.instance_path, field_key, base_context=self.execution_env, as_json=as_json, default_field_value=default_field_value) |
writer-framework/src/writer/core.py
Lines 1470 to 1473 in 00dcd1d
if (replaced is not None) and as_json: | |
replaced_as_json = None | |
try: | |
replaced_as_json = json.loads(replaced) |
params.update(eval(additional_params)) | |
params.update(additional_params) |
operation = self._get_field("operation") | ||
repo_name = self._get_field("repo_name", True) | ||
owner = self._get_field("owner", True) | ||
branch = self._get_field("branch", True) | ||
file_path = self._get_field("file_path", True) | ||
content = self._get_field("content", True) | ||
title = self._get_field("title", True) | ||
body = self._get_field("body", True) | ||
labels = json.loads(self._get_field("labels", True, "[]")) | ||
assignees = json.loads(self._get_field("assignees", True, "[]")) | ||
commit_message = self._get_field("commit_message", True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some fields are not JSON, and some are parsed twice
operation = self._get_field("operation") | |
repo_name = self._get_field("repo_name", True) | |
owner = self._get_field("owner", True) | |
branch = self._get_field("branch", True) | |
file_path = self._get_field("file_path", True) | |
content = self._get_field("content", True) | |
title = self._get_field("title", True) | |
body = self._get_field("body", True) | |
labels = json.loads(self._get_field("labels", True, "[]")) | |
assignees = json.loads(self._get_field("assignees", True, "[]")) | |
commit_message = self._get_field("commit_message", True) | |
operation = self._get_field("operation") | |
repo_name = self._get_field("repo_name") | |
owner = self._get_field("owner") | |
branch = self._get_field("branch") | |
file_path = self._get_field("file_path") | |
content = self._get_field("content") | |
title = self._get_field("title") | |
body = self._get_field("body") | |
labels = self._get_field("labels", True, "[]") | |
assignees = self._get_field("assignees", True, "[]") | |
commit_message = self._get_field("commit_message") |
table_name = self._get_field("table_name", True) | ||
record_id = self._get_field("record_id", True) | ||
view_name = self._get_field("view_name", True) | ||
data = json.loads(self._get_field("data", True, "{}")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some fields are not JSON, and data
is parsed twice
table_name = self._get_field("table_name", True) | |
record_id = self._get_field("record_id", True) | |
view_name = self._get_field("view_name", True) | |
data = json.loads(self._get_field("data", True, "{}")) | |
table_name = self._get_field("table_name") | |
record_id = self._get_field("record_id") | |
view_name = self._get_field("view_name") | |
data = self._get_field("data", True, "{}") |
data["max_records"] = int(self._get_field("max_records", True, "100")) | ||
if "formula" in self._fields: | ||
data["formula"] = self._get_field("formula", True) | ||
if "sort_field" in self._fields: | ||
data["sort_field"] = self._get_field("sort_field", True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some fields are not JSON
data["max_records"] = int(self._get_field("max_records", True, "100")) | |
if "formula" in self._fields: | |
data["formula"] = self._get_field("formula", True) | |
if "sort_field" in self._fields: | |
data["sort_field"] = self._get_field("sort_field", True) | |
data["max_records"] = int(self._get_field("max_records", False, "100")) | |
if "formula" in self._fields: | |
data["formula"] = self._get_field("formula") | |
if "sort_field" in self._fields: | |
data["sort_field"] = self._get_field("sort_field") |
if "view" in self._fields: | ||
data["view"] = self._get_field("view", True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
view
is not a field, maybe you mean view_name
?
if "view" in self._fields: | |
data["view"] = self._get_field("view", True) | |
if "view_name" in self._fields: | |
data["view_name"] = self._get_field("view_name") |
code = self._get_field("code", True) | ||
package_name = self._get_field("package_name", True) | ||
file_path = self._get_field("file_path", True) | ||
content = self._get_field("content", True) | ||
timeout = int(self._get_field("timeout", True, "30")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some fields are not JSON
code = self._get_field("code", True) | |
package_name = self._get_field("package_name", True) | |
file_path = self._get_field("file_path", True) | |
content = self._get_field("content", True) | |
timeout = int(self._get_field("timeout", True, "30")) | |
code = self._get_field("code") | |
package_name = self._get_field("package_name") | |
file_path = self._get_field("file_path") | |
content = self._get_field("content") | |
timeout = int(self._get_field("timeout", False, "30")) |
Integration Work Summary
1. SerpApi Integration
2. Notion Integration
3. Brave Search Integration
4. Tavily Search Integration
5. Slack Integration
6. Google Drive Integration
7. Salesforce Integration
8. Shopify Integration
9. Airtable Integration
Common Features Across All Integrations
Authentication
Error Handling
Data Operations
Performance Optimization
Security Features
Integration Architecture Patterns
Consistent Structure
Rate Limiting
Documentation
Best Practices Implemented
Code Organization
Error Management
Configuration
Security
Testing Considerations
Unit Testing
Integration Testing
Future Enhancements
Monitoring
Scalability
Additional Features