-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Twitter : adding ads_stream #21
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
from typing import Any, Iterable, Mapping, MutableMapping, Optional | ||
import logging | ||
import requests | ||
import time | ||
from datetime import datetime | ||
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream | ||
from .tweets_stream import TwitterStream | ||
|
||
logger = logging.getLogger("airbyte") | ||
|
||
class PromotedTweetActive(TwitterStream): | ||
#fetch the active promoted twwet ids | ||
url_base = "https://ads-api.x.com/12/" | ||
primary_key = "entity_id" | ||
|
||
@property | ||
def use_cache(self) -> bool: | ||
return True | ||
|
||
def path( | ||
self, | ||
stream_state: Mapping[str, Any] = None, | ||
stream_slice: Mapping[str, Any] = None, | ||
next_page_token: Mapping[str, Any] = None | ||
) -> str: | ||
return f"stats/accounts/{self.account_id}/active_entities" | ||
|
||
def request_params( | ||
self, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
stream_state: Mapping[str, Any] = None, | ||
stream_slice: Mapping[str, Any] = None | ||
) -> MutableMapping[str, Any]: | ||
return { | ||
"entity": "PROMOTED_TWEET", | ||
"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), | ||
"end_time": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") | ||
} | ||
|
||
def parse_response( | ||
self, | ||
response: requests.Response, | ||
stream_slice: Mapping[str, Any] = None, | ||
**kwargs | ||
) -> Iterable[Mapping]: | ||
if 'data' in response.json(): | ||
for entity in response.json()['data']: | ||
yield entity | ||
time.sleep(2) | ||
|
||
|
||
class PromotedTweetBilling(HttpSubStream, PromotedTweetActive): | ||
#gets billing info for each promotted tweet | ||
primary_key = "id" | ||
|
||
def path( | ||
self, | ||
stream_state: Mapping[str, Any] = None, | ||
stream_slice: Mapping[str, Any] = None, | ||
next_page_token: Mapping[str, Any] = None | ||
) -> str: | ||
return f"stats/accounts/{self.account_id}" | ||
|
||
def stream_slices( | ||
self, | ||
sync_mode=None, | ||
stream_state: Mapping[str, Any] = None, | ||
**kwargs | ||
) -> Iterable[Optional[Mapping[str, Any]]]: | ||
for parent_slice in super().stream_slices(sync_mode=sync_mode): | ||
active_tweet = parent_slice["parent"] | ||
if "ALL_ON_TWITTER" in active_tweet.get("placements", []): | ||
yield { | ||
"promoted_tweet_id": active_tweet.get("entity_id"), | ||
"activity_start_time": active_tweet.get("activity_start_time"), | ||
"activity_end_time": active_tweet.get("activity_end_time") | ||
} | ||
|
||
def request_params( | ||
self, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
stream_state: Mapping[str, Any] = None, | ||
stream_slice: Mapping[str, Any] = None | ||
) -> MutableMapping[str, Any]: | ||
promoted_tweet_id = stream_slice.get("promoted_tweet_id") if stream_slice else None | ||
|
||
return { | ||
"entity": "PROMOTED_TWEET", | ||
"entity_ids": promoted_tweet_id, | ||
"granularity": "DAY", | ||
"placement": "ALL_ON_TWITTER", | ||
"metric_groups": "BILLING", | ||
"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), | ||
"end_time": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") | ||
} | ||
|
||
def parse_response( | ||
self, | ||
response: requests.Response, | ||
stream_slice: Mapping[str, Any] = None, | ||
**kwargs | ||
) -> Iterable[Mapping]: | ||
if 'data' in response.json(): | ||
data = response.json()['data'] | ||
for record in data: | ||
billing_data = { | ||
"id": stream_slice.get("promoted_tweet_id"), | ||
"activity_start_time": stream_slice.get("activity_start_time"), | ||
"activity_end_time": stream_slice.get("activity_end_time"), | ||
"billed_engagements": record.get("billed_engagements", []), | ||
"billed_charge_local_micro": record.get("billed_charge_local_micro", []), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same |
||
**record | ||
} | ||
yield billing_data | ||
|
||
time.sleep(2) | ||
|
||
class PromotedTweetEngagement(HttpSubStream, PromotedTweetActive): | ||
# fetches engagement metrics on promoted tweets | ||
primary_key = "id" | ||
|
||
def path( | ||
self, | ||
stream_state: Mapping[str, Any] = None, | ||
stream_slice: Mapping[str, Any] = None, | ||
next_page_token: Mapping[str, Any] = None | ||
) -> str: | ||
return f"stats/accounts/{self.account_id}" | ||
|
||
def stream_slices( | ||
self, | ||
sync_mode=None, | ||
stream_state: Mapping[str, Any] = None, | ||
**kwargs | ||
) -> Iterable[Optional[Mapping[str, Any]]]: | ||
for parent_slice in super().stream_slices(sync_mode=sync_mode): | ||
active_tweet = parent_slice["parent"] | ||
if "ALL_ON_TWITTER" in active_tweet.get("placements", []): | ||
yield { | ||
"promoted_tweet_id": active_tweet.get("entity_id"), | ||
"activity_start_time": active_tweet.get("activity_start_time"), | ||
"activity_end_time": active_tweet.get("activity_end_time") | ||
} | ||
|
||
def request_params( | ||
self, | ||
next_page_token: Optional[Mapping[str, Any]] = None, | ||
stream_state: Mapping[str, Any] = None, | ||
stream_slice: Mapping[str, Any] = None | ||
) -> MutableMapping[str, Any]: | ||
promoted_tweet_id = stream_slice.get("promoted_tweet_id") if stream_slice else None | ||
|
||
return { | ||
"entity": "PROMOTED_TWEET", | ||
"entity_ids": promoted_tweet_id, | ||
"granularity": "TOTAL", | ||
"placement": "ALL_ON_TWITTER", | ||
"metric_groups": "ENGAGEMENT", | ||
"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), | ||
"end_time": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") | ||
} | ||
|
||
def parse_response( | ||
self, | ||
response: requests.Response, | ||
stream_slice: Mapping[str, Any] = None, | ||
**kwargs | ||
) -> Iterable[Mapping]: | ||
if 'data' in response.json(): | ||
data = response.json()['data'] | ||
for record in data: | ||
id_data = record.get("id_data", []) | ||
for data_point in id_data: | ||
metrics = data_point.get("metrics", {}) | ||
engagement_data = { | ||
"id": stream_slice.get("promoted_tweet_id"), | ||
"activity_start_time": stream_slice.get("activity_start_time"), | ||
"activity_end_time": stream_slice.get("activity_end_time"), | ||
"impressions": metrics.get("impressions", [None])[0], | ||
"likes": metrics.get("likes", [None])[0], | ||
"engagements": metrics.get("engagements", [None])[0], | ||
"clicks": metrics.get("clicks", [None])[0], | ||
"retweets": metrics.get("retweets", [None])[0], | ||
"replies": metrics.get("replies", [None])[0], | ||
"follows": metrics.get("follows", [None])[0], | ||
"app_clicks": metrics.get("app_clicks", [None])[0], | ||
"card_engagements": metrics.get("card_engagements", [None])[0], | ||
"qualified_impressions": metrics.get("qualified_impressions", [None])[0], | ||
"tweets_send": metrics.get("tweets_send", [None])[0], | ||
"poll_card_vote": metrics.get("poll_card_vote", [None])[0], | ||
"carousel_swipes": metrics.get("carousel_swipes", [None])[0], | ||
} | ||
yield engagement_data | ||
|
||
time.sleep(2) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
{ | ||
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"type": "object", | ||
"properties": { | ||
"entity_id": { | ||
"type": ["null", "string"] | ||
}, | ||
"activity_start_time": { | ||
"type": ["null", "string"], | ||
"format": "date-time" | ||
}, | ||
"activity_end_time": { | ||
"type": ["null", "string"], | ||
"format": "date-time" | ||
}, | ||
"placements": { | ||
"type": ["null", "array"], | ||
"items": { | ||
"type": "string" | ||
} | ||
} | ||
}} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't the schema contains all those fields ? :
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also the date could be usefull |
||
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"type": "object", | ||
"properties": { | ||
"id": { | ||
"type": ["null", "string"] | ||
}, | ||
"activity_start_time": { | ||
"type": ["null", "string"], | ||
"format": "date-time" | ||
}, | ||
"activity_end_time": { | ||
"type": ["null", "string"], | ||
"format": "date-time" | ||
}, | ||
"billed_engagements": { | ||
"type": ["null", "array"], | ||
"items": { | ||
"type": "integer" | ||
} | ||
}, | ||
"billed_charge_local_micro": { | ||
"type": ["null", "array"], | ||
"items": { | ||
"type": "integer" | ||
} | ||
}, | ||
"date": { | ||
"type": ["null", "string"], | ||
"format": "date" | ||
}, | ||
"placement": { | ||
"type": ["null", "string"] | ||
}, | ||
"granularity": { | ||
"type": ["null", "string"] | ||
} | ||
}} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
{ | ||
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"type": "object", | ||
"properties": { | ||
"id": { | ||
"type": ["null", "string"] | ||
}, | ||
"activity_start_time": { | ||
"type": ["null", "string"], | ||
"format": "date-time" | ||
}, | ||
"activity_end_time": { | ||
"type": ["null", "string"], | ||
"format": "date-time" | ||
}, | ||
"impressions": { | ||
"type": ["null", "number"] | ||
}, | ||
"likes": { | ||
"type": ["null", "number"] | ||
}, | ||
"engagements": { | ||
"type": ["null", "number"] | ||
}, | ||
"clicks": { | ||
"type": ["null", "number"] | ||
}, | ||
"retweets": { | ||
"type": ["null", "number"] | ||
}, | ||
"replies": { | ||
"type": ["null", "number"] | ||
}, | ||
"follows": { | ||
"type": ["null", "number"] | ||
}, | ||
"app_clicks": { | ||
"type": ["null", "number"] | ||
}, | ||
"card_engagements": { | ||
"type": ["null", "number"] | ||
}, | ||
"qualified_impressions": { | ||
"type": ["null", "number"] | ||
}, | ||
"tweets_send": { | ||
"type": ["null", "number"] | ||
}, | ||
"poll_card_vote": { | ||
"type": ["null", "number"] | ||
}, | ||
"carousel_swipes": { | ||
"type": ["null", "number"] | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont see any
billed_engagments
for the endpoint.https://developer.x.com/en/docs/x-ads-api/analytics/api-reference/synchronous
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metrics from the BILLING endpoint are described here : https://docs.x.com/x-ads-api/analytics#available-metrics-by-metrics-group