From 643dfe42be075265680e615193f8bed3b9d19bcd Mon Sep 17 00:00:00 2001 From: c200bzh Date: Mon, 27 Jan 2025 16:04:00 +0800 Subject: [PATCH 1/2] Twitter : adding ads_stream twitter twitter --- .../source_twitter_fetcher/ads_stream.py | 76 ++++++ .../schemas/billing_tweet_promoted.json | 25 ++ .../source_twitter_fetcher/source.py | 238 +++--------------- .../source_twitter_fetcher/tweets_stream.py | 180 +++++++++++++ 4 files changed, 317 insertions(+), 202 deletions(-) create mode 100644 source-twitter-fetcher/source_twitter_fetcher/ads_stream.py create mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/billing_tweet_promoted.json create mode 100644 source-twitter-fetcher/source_twitter_fetcher/tweets_stream.py diff --git a/source-twitter-fetcher/source_twitter_fetcher/ads_stream.py b/source-twitter-fetcher/source_twitter_fetcher/ads_stream.py new file mode 100644 index 0000000..2d022c0 --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/ads_stream.py @@ -0,0 +1,76 @@ +from typing import Any, Iterable, Mapping, MutableMapping, Optional +import logging +import requests +import time +from datetime import datetime +from .tweets_stream import TweetPromoted + +logger = logging.getLogger("airbyte") + +class PromotedTweetBilling(TweetPromoted): + #start_time and account_id are set in the tweets_stream file + url_base = "https://ads-api.x.com/9/" + primary_key = "id" + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return f"stats/jobs/accounts/{self.account_id}" + + def stream_slices( + self, + sync_mode = None, # will inherit from TweetPromoted + stream_state: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Optional[Mapping[str, Any]]]: + # Reuse TweetPromoted's stream_slices to get promoted tweets + for slice in super().stream_slices(stream_state=stream_state, **kwargs): + tweet = slice.get("parent", {}) + yield { + "promoted_tweet_id": tweet.get("id"), + # "created_at": tweet.get("created_at") + } + + def request_params( + self, + next_page_token: Optional[Mapping[str, Any]] = None, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + promoted_tweet_id = stream_slice.get("promoted_tweet_id") if stream_slice else None + + params = { + "entity": "PROMOTED_TWEET", + "entity_ids": promoted_tweet_id, + "granularity": "DAY", + "placement": "ALL_ON_TWITTER", + "metric_groups": "BILLING", + "start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "end_time": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + } + + return params + + def parse_response( + self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + if 'data' in response.json(): + data = response.json()['data'] + for record in data: + billing_data = { + "id": stream_slice.get("promoted_tweet_id"), + # "created_at": stream_slice.get("created_at"), + # "stats_date": record.get("date"), + "billed_engagements": record.get("billed_engagements", []), + "billed_charge_local_micro": record.get("billed_charge_local_micro", []), + **record + } + yield billing_data + + time.sleep(2) \ No newline at end of file diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/billing_tweet_promoted.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/billing_tweet_promoted.json new file mode 100644 index 0000000..625750c --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/billing_tweet_promoted.json @@ -0,0 +1,25 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "promoted_tweet_id": { + "type": ["null", "string"] + }, + "billed_engagements": { + "type": ["null", "array"], + "items": { + "type": "integer" + } + }, + "billed_charge_local_micro": { + "type": ["null", "array"], + "items": { + "type": "integer" + } + } + } + } + \ No newline at end of file diff --git a/source-twitter-fetcher/source_twitter_fetcher/source.py b/source-twitter-fetcher/source_twitter_fetcher/source.py index c0d64f9..80ebd3c 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/source.py +++ b/source-twitter-fetcher/source_twitter_fetcher/source.py @@ -1,217 +1,51 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - from abc import ABC -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple -import logging -import requests -import time -from datetime import datetime, timedelta +from typing import Any, List, Mapping, Tuple +from datetime import datetime from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream -from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator -from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator -from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from .tweets_stream import Account, Tweet, TweetMetrics, TweetPromoted +from .ads_stream import PromotedTweetBilling from .auth import TwitterOAuth -logger = logging.getLogger("airbyte") - -class TwitterStream(HttpStream): - - url_base = "https://api.x.com/2/" - - def __init__(self, start_time: str = None, account_id: str = None, **kwargs): - super().__init__(**kwargs) - self.start_time = start_time - self.account_id = account_id - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return None - - # Lets see if the API is correctly made - def backoff_time(self, response: requests.Response) -> Optional[float]: - logger.warn("API rate limit: %s\n%s", response.json(),response.hearders) - - delay_time = response.headers.get("Retry-After") - if delay_time: - return int(delay_time) - -class Account(TwitterStream): - primary_key= "id" - - def path( - self, stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None) -> str: - return f"users/me?user.fields=public_metrics,protected,description,url,most_recent_tweet_id,pinned_tweet_id,created_at,verified_type" - - def parse_response( - self, - response: requests.Response, - stream_slice: Mapping[str, Any] = None, - **kwargs - ) -> Iterable[Mapping]: - logger.debug("Response: %s", response.json()) - data=response.json()['data'] - yield data - -class Tweet(TwitterStream): - primary_key = "id" - - @property - def use_cache(self) -> bool: - return True - - def path( - self, stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None) -> str: - return f"users/{self.account_id}/tweets" - - def next_page_token( - self, response: requests.Response - ) -> Optional[Mapping[str, Any]]: - if 'meta' in response.json() and 'next_token' in response.json()['meta'] and response.json()['meta']['result_count'] > 0: - logger.debug('DBG-NT: %s', response.json()['meta']['next_token']) - return {"pagination_token": response.json()['meta']['next_token']} - - def request_params( - self, next_page_token: Optional[Mapping[str, Any]] = None,stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - params = { - "tweet.fields" : "text,public_metrics,author_id,referenced_tweets,created_at", - "max_results": 100 - } - # Add condition later: - params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}) - if next_page_token: - params.update(**next_page_token) - return params - - - def parse_response( - self, response: requests.Response, - stream_slice: Mapping[str, Any] = None, **kwargs - ) -> Iterable[Mapping]: - logger.debug("Full response %s", response.json()) - if 'data' in response.json(): - data=response.json()['data'] - for t in data: - yield t - time.sleep(2) - -class TweetMetrics(HttpSubStream, Tweet): - primary_key = "id" - - def path( - self, stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None - ) -> str: - tweet_id = stream_slice.get("id") - logger.debug("Fetching tweet %s from Account id %s", tweet_id, self.account_id) - return f"tweets/{tweet_id}" - - def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - limit_date = datetime.today()- timedelta(31) - for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh): - tweet = parent_slice["parent"] - if datetime.strptime(tweet.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ") > limit_date: - yield {"id": tweet.get('id') } - else: - logger.info("Not calling full metrics endpoint for tweet %s, tweet too old", tweet.get('id')) - - def request_params( - self, stream_state: Optional[Mapping[str, Any]], - stream_slice: Optional[Mapping[str, Any]] = None, - next_page_token: Optional[Mapping[str, Any]] = None, - ) -> MutableMapping[str, Any]: - params = { - "tweet.fields" : "non_public_metrics,organic_metrics,created_at", - } - # Add condition later: - logger.debug(f"DBG-FULL - query params: %s", params) - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - if 'data' in response.json(): - data=response.json()['data'] - logger.debug("DBG-FULL-T: id %s", data.get('id')) - yield data - time.sleep(2) - -class TweetPromoted(HttpSubStream, Tweet): - primary_key = "id" - - def path( - self, stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None - ) -> str: - tweet_id = stream_slice.get("id") - logger.debug("Fetching tweet %s from Account id %s", tweet_id, self.account_id) - return f"tweets/{tweet_id}" - - def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - limit_date = datetime.today()- timedelta(31) - for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh): - tweet = parent_slice["parent"] - if datetime.strptime(tweet.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ") > limit_date: - yield {"id": tweet.get('id') } - else: - logger.info("Not calling promoted_metrics endpoint for tweet %s, tweet too old", tweet.get('id')) - - def request_params( - self, stream_state: Optional[Mapping[str, Any]], - stream_slice: Optional[Mapping[str, Any]] = None, - next_page_token: Optional[Mapping[str, Any]] = None, - ) -> MutableMapping[str, Any]: - params = { - "tweet.fields" : "promoted_metrics", - } - # Add condition later: - logger.debug(f"DBG-FULL - query params: %s", params) - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - if 'data' in response.json(): - data=response.json()['data'] - yield data - elif 'error' in response.json(): - logger.info("No promoted Metrics for this tweet") - time.sleep(2) - - -# Source class SourceTwitterFetcher(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: return True, None def streams(self, config: Mapping[str, Any]) -> List[Stream]: - auth=TwitterOAuth( - config, token_refresh_endpoint="https://api.x.com/2/oauth2/token") + auth = TwitterOAuth( + config, + token_refresh_endpoint="https://api.x.com/2/oauth2/token" + ) + tweet = Tweet( - authenticator=auth, - account_id=config["account_id"], - start_time=datetime.strptime(config['start_time'], "%Y-%m-%dT%H:%M:%SZ"), - ) - tweetMetrics = TweetMetrics( - authenticator=auth, - account_id=config['account_id'], - parent=tweet - ) - tweetPromoted = TweetPromoted( - authenticator=auth, - account_id=config['account_id'], - parent=tweet - ) + authenticator=auth, + account_id=config["account_id"], + start_time=datetime.strptime(config['start_time'], "%Y-%m-%dT%H:%M:%SZ"), + ) + + tweet_metrics = TweetMetrics( + authenticator=auth, + account_id=config['account_id'], + parent=tweet + ) + + tweet_promoted = TweetPromoted( + authenticator=auth, + account_id=config['account_id'], + parent=tweet + ) + + promoted_tweet_billing = PromotedTweetBilling( + authenticator=auth, + account_id=config['account_id'], + parent=tweet_promoted + ) + return [ Account(authenticator=auth, account_id=config["account_id"]), tweet, - tweetMetrics, - tweetPromoted - ] + tweet_metrics, + tweet_promoted, + promoted_tweet_billing + ] \ No newline at end of file diff --git a/source-twitter-fetcher/source_twitter_fetcher/tweets_stream.py b/source-twitter-fetcher/source_twitter_fetcher/tweets_stream.py new file mode 100644 index 0000000..31e3a3c --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/tweets_stream.py @@ -0,0 +1,180 @@ +from typing import Any, Iterable, Mapping, MutableMapping, Optional +import logging +import requests +import time +from datetime import datetime, timedelta +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream +from airbyte_cdk.models import SyncMode + +logger = logging.getLogger("airbyte") + +class TwitterStream(HttpStream): + url_base = "https://api.x.com/2/" + + def __init__(self, start_time: str = None, account_id: str = None, **kwargs): + super().__init__(**kwargs) + self.start_time = start_time + self.account_id = account_id + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def backoff_time(self, response: requests.Response) -> Optional[float]: + logger.warn("API rate limit: %s\n%s", response.json(), response.headers) + + delay_time = response.headers.get("Retry-After") + if delay_time: + return int(delay_time) + +class Account(TwitterStream): + primary_key = "id" + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return "users/me?user.fields=public_metrics,protected,description,url,most_recent_tweet_id,pinned_tweet_id,created_at,verified_type" + + def parse_response( + self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + logger.debug("Response: %s", response.json()) + data = response.json()['data'] + yield data + +class Tweet(TwitterStream): + primary_key = "id" + + @property + def use_cache(self) -> bool: + return True + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return f"users/{self.account_id}/tweets" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + if 'meta' in response.json() and 'next_token' in response.json()['meta'] and response.json()['meta']['result_count'] > 0: + logger.debug('DBG-NT: %s', response.json()['meta']['next_token']) + return {"pagination_token": response.json()['meta']['next_token']} + + def request_params( + self, + next_page_token: Optional[Mapping[str, Any]] = None, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + params = { + "tweet.fields": "text,public_metrics,author_id,referenced_tweets,created_at", + "max_results": 100 + } + params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}) + if next_page_token: + params.update(**next_page_token) + return params + + def parse_response( + self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + logger.debug("Full response %s", response.json()) + if 'data' in response.json(): + data = response.json()['data'] + for t in data: + yield t + time.sleep(2) + +class TweetMetrics(HttpSubStream, Tweet): + primary_key = "id" + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + tweet_id = stream_slice.get("id") + logger.debug("Fetching tweet %s from Account id %s", tweet_id, self.account_id) + return f"tweets/{tweet_id}" + + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + limit_date = datetime.today() - timedelta(31) + for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh): + tweet = parent_slice["parent"] + if datetime.strptime(tweet.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ") > limit_date: + yield {"id": tweet.get('id')} + else: + logger.info("Not calling full metrics endpoint for tweet %s, tweet too old", tweet.get('id')) + + def request_params( + self, + stream_state: Optional[Mapping[str, Any]], + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + params = { + "tweet.fields": "non_public_metrics,organic_metrics,created_at", + } + logger.debug(f"DBG-FULL - query params: %s", params) + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if 'data' in response.json(): + data = response.json()['data'] + logger.debug("DBG-FULL-T: id %s", data.get('id')) + yield data + time.sleep(2) + +class TweetPromoted(HttpSubStream, Tweet): + primary_key = "id" + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + tweet_id = stream_slice.get("id") + logger.debug("Fetching tweet %s from Account id %s", tweet_id, self.account_id) + return f"tweets/{tweet_id}" + + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + limit_date = datetime.today() - timedelta(31) + for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh): + tweet = parent_slice["parent"] + if datetime.strptime(tweet.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ") > limit_date: + yield {"id": tweet.get('id')} + else: + logger.info("Not calling promoted_metrics endpoint for tweet %s, tweet too old", tweet.get('id')) + + def request_params( + self, + stream_state: Optional[Mapping[str, Any]], + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + params = { + "tweet.fields": "promoted_metrics", + } + logger.debug(f"DBG-FULL - query params: %s", params) + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if 'data' in response.json(): + data = response.json()['data'] + yield data + elif 'error' in response.json(): + logger.info("No promoted Metrics for this tweet") + time.sleep(2) \ No newline at end of file From c65f5aec0b9739910e12051a06099d894e2c4cda Mon Sep 17 00:00:00 2001 From: c200bzh Date: Wed, 29 Jan 2025 16:05:48 +0800 Subject: [PATCH 2/2] twitter : adding streams from x_ads_api to get engagement on promoted tweets and active promoted tweets ids --- .../source_twitter_fetcher/ads_stream.py | 159 +++++++++++++++--- .../schemas/active_promoted_tweet.json | 22 +++ .../schemas/billing_tweet_promoted.json | 22 ++- .../schemas/promoted_tweets_engagement.json | 56 ++++++ .../source_twitter_fetcher/source.py | 19 ++- 5 files changed, 252 insertions(+), 26 deletions(-) create mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/active_promoted_tweet.json create mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/promoted_tweets_engagement.json diff --git a/source-twitter-fetcher/source_twitter_fetcher/ads_stream.py b/source-twitter-fetcher/source_twitter_fetcher/ads_stream.py index 2d022c0..79d9c7b 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/ads_stream.py +++ b/source-twitter-fetcher/source_twitter_fetcher/ads_stream.py @@ -3,13 +3,54 @@ import requests import time from datetime import datetime -from .tweets_stream import TweetPromoted +from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream +from .tweets_stream import TwitterStream logger = logging.getLogger("airbyte") -class PromotedTweetBilling(TweetPromoted): - #start_time and account_id are set in the tweets_stream file - url_base = "https://ads-api.x.com/9/" +class PromotedTweetActive(TwitterStream): + #fetch the active promoted twwet ids + url_base = "https://ads-api.x.com/12/" + primary_key = "entity_id" + + @property + def use_cache(self) -> bool: + return True + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return f"stats/accounts/{self.account_id}/active_entities" + + def request_params( + self, + next_page_token: Optional[Mapping[str, Any]] = None, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + return { + "entity": "PROMOTED_TWEET", + "start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "end_time": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + } + + def parse_response( + self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + if 'data' in response.json(): + for entity in response.json()['data']: + yield entity + time.sleep(2) + + +class PromotedTweetBilling(HttpSubStream, PromotedTweetActive): + #gets billing info for each promotted tweet primary_key = "id" def path( @@ -18,21 +59,22 @@ def path( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: - return f"stats/jobs/accounts/{self.account_id}" + return f"stats/accounts/{self.account_id}" def stream_slices( - self, - sync_mode = None, # will inherit from TweetPromoted + self, + sync_mode=None, stream_state: Mapping[str, Any] = None, **kwargs ) -> Iterable[Optional[Mapping[str, Any]]]: - # Reuse TweetPromoted's stream_slices to get promoted tweets - for slice in super().stream_slices(stream_state=stream_state, **kwargs): - tweet = slice.get("parent", {}) - yield { - "promoted_tweet_id": tweet.get("id"), - # "created_at": tweet.get("created_at") - } + for parent_slice in super().stream_slices(sync_mode=sync_mode): + active_tweet = parent_slice["parent"] + if "ALL_ON_TWITTER" in active_tweet.get("placements", []): + yield { + "promoted_tweet_id": active_tweet.get("entity_id"), + "activity_start_time": active_tweet.get("activity_start_time"), + "activity_end_time": active_tweet.get("activity_end_time") + } def request_params( self, @@ -42,7 +84,7 @@ def request_params( ) -> MutableMapping[str, Any]: promoted_tweet_id = stream_slice.get("promoted_tweet_id") if stream_slice else None - params = { + return { "entity": "PROMOTED_TWEET", "entity_ids": promoted_tweet_id, "granularity": "DAY", @@ -51,8 +93,6 @@ def request_params( "start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), "end_time": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") } - - return params def parse_response( self, @@ -65,12 +105,91 @@ def parse_response( for record in data: billing_data = { "id": stream_slice.get("promoted_tweet_id"), - # "created_at": stream_slice.get("created_at"), - # "stats_date": record.get("date"), + "activity_start_time": stream_slice.get("activity_start_time"), + "activity_end_time": stream_slice.get("activity_end_time"), "billed_engagements": record.get("billed_engagements", []), "billed_charge_local_micro": record.get("billed_charge_local_micro", []), **record } yield billing_data - time.sleep(2) \ No newline at end of file + time.sleep(2) + +class PromotedTweetEngagement(HttpSubStream, PromotedTweetActive): + # fetches engagement metrics on promoted tweets + primary_key = "id" + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return f"stats/accounts/{self.account_id}" + + def stream_slices( + self, + sync_mode=None, + stream_state: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Optional[Mapping[str, Any]]]: + for parent_slice in super().stream_slices(sync_mode=sync_mode): + active_tweet = parent_slice["parent"] + if "ALL_ON_TWITTER" in active_tweet.get("placements", []): + yield { + "promoted_tweet_id": active_tweet.get("entity_id"), + "activity_start_time": active_tweet.get("activity_start_time"), + "activity_end_time": active_tweet.get("activity_end_time") + } + + def request_params( + self, + next_page_token: Optional[Mapping[str, Any]] = None, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + promoted_tweet_id = stream_slice.get("promoted_tweet_id") if stream_slice else None + + return { + "entity": "PROMOTED_TWEET", + "entity_ids": promoted_tweet_id, + "granularity": "TOTAL", + "placement": "ALL_ON_TWITTER", + "metric_groups": "ENGAGEMENT", + "start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "end_time": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + } + + def parse_response( + self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + if 'data' in response.json(): + data = response.json()['data'] + for record in data: + id_data = record.get("id_data", []) + for data_point in id_data: + metrics = data_point.get("metrics", {}) + engagement_data = { + "id": stream_slice.get("promoted_tweet_id"), + "activity_start_time": stream_slice.get("activity_start_time"), + "activity_end_time": stream_slice.get("activity_end_time"), + "impressions": metrics.get("impressions", [None])[0], + "likes": metrics.get("likes", [None])[0], + "engagements": metrics.get("engagements", [None])[0], + "clicks": metrics.get("clicks", [None])[0], + "retweets": metrics.get("retweets", [None])[0], + "replies": metrics.get("replies", [None])[0], + "follows": metrics.get("follows", [None])[0], + "app_clicks": metrics.get("app_clicks", [None])[0], + "card_engagements": metrics.get("card_engagements", [None])[0], + "qualified_impressions": metrics.get("qualified_impressions", [None])[0], + "tweets_send": metrics.get("tweets_send", [None])[0], + "poll_card_vote": metrics.get("poll_card_vote", [None])[0], + "carousel_swipes": metrics.get("carousel_swipes", [None])[0], + } + yield engagement_data + + time.sleep(2) \ No newline at end of file diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/active_promoted_tweet.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/active_promoted_tweet.json new file mode 100644 index 0000000..a9423ca --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/active_promoted_tweet.json @@ -0,0 +1,22 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "entity_id": { + "type": ["null", "string"] + }, + "activity_start_time": { + "type": ["null", "string"], + "format": "date-time" + }, + "activity_end_time": { + "type": ["null", "string"], + "format": "date-time" + }, + "placements": { + "type": ["null", "array"], + "items": { + "type": "string" + } + } + }} \ No newline at end of file diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/billing_tweet_promoted.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/billing_tweet_promoted.json index 625750c..6a224c3 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/schemas/billing_tweet_promoted.json +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/billing_tweet_promoted.json @@ -5,8 +5,13 @@ "id": { "type": ["null", "string"] }, - "promoted_tweet_id": { - "type": ["null", "string"] + "activity_start_time": { + "type": ["null", "string"], + "format": "date-time" + }, + "activity_end_time": { + "type": ["null", "string"], + "format": "date-time" }, "billed_engagements": { "type": ["null", "array"], @@ -19,7 +24,16 @@ "items": { "type": "integer" } - } + }, + "date": { + "type": ["null", "string"], + "format": "date" + }, + "placement": { + "type": ["null", "string"] + }, + "granularity": { + "type": ["null", "string"] } - } + }} \ No newline at end of file diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/promoted_tweets_engagement.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/promoted_tweets_engagement.json new file mode 100644 index 0000000..e042854 --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/promoted_tweets_engagement.json @@ -0,0 +1,56 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "activity_start_time": { + "type": ["null", "string"], + "format": "date-time" + }, + "activity_end_time": { + "type": ["null", "string"], + "format": "date-time" + }, + "impressions": { + "type": ["null", "number"] + }, + "likes": { + "type": ["null", "number"] + }, + "engagements": { + "type": ["null", "number"] + }, + "clicks": { + "type": ["null", "number"] + }, + "retweets": { + "type": ["null", "number"] + }, + "replies": { + "type": ["null", "number"] + }, + "follows": { + "type": ["null", "number"] + }, + "app_clicks": { + "type": ["null", "number"] + }, + "card_engagements": { + "type": ["null", "number"] + }, + "qualified_impressions": { + "type": ["null", "number"] + }, + "tweets_send": { + "type": ["null", "number"] + }, + "poll_card_vote": { + "type": ["null", "number"] + }, + "carousel_swipes": { + "type": ["null", "number"] + } + } + } diff --git a/source-twitter-fetcher/source_twitter_fetcher/source.py b/source-twitter-fetcher/source_twitter_fetcher/source.py index 80ebd3c..89e241d 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/source.py +++ b/source-twitter-fetcher/source_twitter_fetcher/source.py @@ -5,7 +5,7 @@ from airbyte_cdk.sources.streams import Stream from .tweets_stream import Account, Tweet, TweetMetrics, TweetPromoted -from .ads_stream import PromotedTweetBilling +from .ads_stream import PromotedTweetActive, PromotedTweetBilling, PromotedTweetEngagement from .auth import TwitterOAuth class SourceTwitterFetcher(AbstractSource): @@ -36,16 +36,31 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: parent=tweet ) + promoted_tweet_active = PromotedTweetActive( + authenticator=auth, + account_id=config['account_id'], + start_time=datetime.strptime(config['start_time'], "%Y-%m-%dT%H:%M:%SZ"), + ) + promoted_tweet_billing = PromotedTweetBilling( authenticator=auth, account_id=config['account_id'], parent=tweet_promoted ) + + promoted_tweet_engagement = PromotedTweetEngagement( + authenticator=auth, + account_id=config['account_id'], + parent=promoted_tweet_active + ) + return [ Account(authenticator=auth, account_id=config["account_id"]), tweet, tweet_metrics, tweet_promoted, - promoted_tweet_billing + promoted_tweet_active, + promoted_tweet_billing, + promoted_tweet_engagement ] \ No newline at end of file