Skip to content

Commit

Permalink
[FEAT][Twitter integration]
Browse files Browse the repository at this point in the history
  • Loading branch information
kyegomez committed Jan 24, 2025
1 parent 11dcefb commit 9ee4a48
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "swarms-tools"
version = "0.1.0"
version = "0.1.1"
description = "Paper - Pytorch"
license = "MIT"
authors = ["Kye Gomez <[email protected]>"]
Expand Down
2 changes: 1 addition & 1 deletion swarms_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

from swarms_tools.finance import *
from swarms_tools.structs import *
from swarms_tools.social_media import *
from swarms_tools.social_media import *
20 changes: 18 additions & 2 deletions swarms_tools/social_media/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
from swarms_tools.social_media.twitter_tool import TwitterTool
from swarms_tools.social_media.twitter_tool import (
TwitterTool,
initialize_twitter_tool,
post_tweet,
reply_tweet,
like_tweet,
quote_tweet,
get_metrics,
)

__all__ = ["TwitterTool"]
__all__ = [
"TwitterTool",
"initialize_twitter_tool",
"post_tweet",
"reply_tweet",
"like_tweet",
"quote_tweet",
"get_metrics",
]
119 changes: 118 additions & 1 deletion swarms_tools/social_media/twitter_tool.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
import os
import subprocess
from typing import Any, Callable, Dict, List, Optional

try:
import tweepy
except ImportError:
print("Tweepy is not installed. Please install it using 'pip install tweepy'.")
print(
"Tweepy is not installed. Please install it using 'pip install tweepy'."
)
subprocess.run(["pip", "install", "tweepy"])
raise

Expand Down Expand Up @@ -167,3 +170,117 @@ def _quote_tweet(self, tweet_id: int, quote: str) -> None:
print(f"Successfully quoted tweet {tweet_id}.")
except tweepy.TweepyException as e:
print(f"Failed to quote tweet {tweet_id}: {e}")


def initialize_twitter_tool() -> TwitterTool:
# Define your options with the necessary credentials
options = {
"id": "29998836",
"name": "mcsswarm",
"description": "An example Twitter Plugin for testing.",
"credentials": {
"apiKey": os.getenv("TWITTER_API_KEY"),
"apiSecretKey": os.getenv("TWITTER_API_SECRET_KEY"),
"accessToken": os.getenv("TWITTER_ACCESS_TOKEN"),
"accessTokenSecret": os.getenv(
"TWITTER_ACCESS_TOKEN_SECRET"
),
},
}

# Initialize the TwitterTool with your options
twitter_plugin = TwitterTool(options)
return twitter_plugin


def post_tweet(tweet: str) -> None:
"""
Posts a tweet with the given text.
Args:
tweet (str): The text of the tweet to post.
Raises:
tweepy.TweepyException: If there's an error posting the tweet.
"""
try:
twitter_plugin = initialize_twitter_tool()
twitter_plugin.post_tweet(tweet)
print(f"Tweet posted successfully: {tweet}")
except tweepy.TweepyException as e:
print(f"Failed to post tweet: {e}")


def reply_tweet(tweet_id: int, reply: str) -> None:
"""
Replies to a tweet with the given ID and reply text.
Args:
tweet_id (int): The ID of the tweet to reply to.
reply (str): The text of the reply.
Raises:
tweepy.TweepyException: If there's an error replying to the tweet.
"""
try:
twitter_plugin = initialize_twitter_tool()
twitter_plugin.reply_tweet(tweet_id, reply)
print(f"Successfully replied to tweet {tweet_id}.")
except tweepy.TweepyException as e:
print(f"Failed to reply to tweet {tweet_id}: {e}")


def like_tweet(tweet_id: int) -> None:
"""
Likes a tweet with the given ID.
Args:
tweet_id (int): The ID of the tweet to like.
Raises:
tweepy.TweepyException: If there's an error liking the tweet.
"""
try:
twitter_plugin = initialize_twitter_tool()
twitter_plugin.like_tweet(tweet_id)
print(f"Tweet {tweet_id} liked successfully.")
except tweepy.TweepyException as e:
print(f"Failed to like tweet {tweet_id}: {e}")


def quote_tweet(tweet_id: int, quote: str) -> None:
"""
Quotes a tweet with the given ID and adds a quote text.
Args:
tweet_id (int): The ID of the tweet to quote.
quote (str): The text of the quote.
Raises:
tweepy.TweepyException: If there's an error quoting the tweet.
"""
try:
twitter_plugin = initialize_twitter_tool()
twitter_plugin.quote_tweet(tweet_id, quote)
print(f"Successfully quoted tweet {tweet_id}.")
except tweepy.TweepyException as e:
print(f"Failed to quote tweet {tweet_id}: {e}")


def get_metrics() -> Dict[str, int]:
"""
Retrieves metrics from the Twitter plugin.
Returns:
Dict[str, int]: A dictionary containing metrics.
Raises:
tweepy.TweepyException: If there's an error fetching metrics.
"""
try:
twitter_plugin = initialize_twitter_tool()
metrics = twitter_plugin.get_metrics()
return metrics
except tweepy.TweepyException as e:
print(f"Failed to fetch metrics: {e}")
return {}

0 comments on commit 9ee4a48

Please sign in to comment.