diff --git a/gh_static_front/app.js b/gh_static_front/app.js index 3fe8e82..b6a51f5 100644 --- a/gh_static_front/app.js +++ b/gh_static_front/app.js @@ -447,16 +447,19 @@ function displayMediaStats(data) { // Display media distribution const distributionHtml = Object.entries(data.media_stats.media_by_type) - .map(([type, stats]) => ` -
- ${type} -
- ${stats.count} - (${stats.percentage}%) + .map(([type, count]) => { + const percentage = data.media_stats.media_type_percentages[type]; + return ` +
+ ${type} +
+ ${count} + (${percentage}%) +
-
- `) + `; + }) .join(''); document.getElementById('media-distribution').innerHTML = distributionHtml; diff --git a/main.py b/main.py index 756d270..16d53ef 100644 --- a/main.py +++ b/main.py @@ -153,6 +153,7 @@ class MediaItem(BaseModel): class MediaStats(BaseModel): total_media_shared: int media_by_type: Dict[str, int] + media_type_percentages: Dict[str, float] top_media_sharers: List[UserActivity] most_reacted_media: List[MediaItem] @@ -209,8 +210,8 @@ def clean_message(text: str) -> str: """Remove Unicode control characters and normalize whitespace.""" return UNICODE_CONTROL_CHARS.sub('', text).strip() -# Pre-compile pattern for media messages -MEDIA_PATTERN = re.compile(r'(?:‎)?\[?(?:\d{2}/\d{2}/\d{4},\s\d{1,2}:\d{2}(?::\d{2})?\s?(?:AM|PM)?\]?\s)?(?:.*?):\s?(?:‎)?(?:(image|video|GIF|sticker|audio|document) omitted|)') +# Pre-compile patterns for media messages +MEDIA_PATTERN = re.compile(r'(?:image|video|gif|sticker|audio|document)\s+omitted|\.(jpg|jpeg|png|gif|mp4|webp|pdf|doc|docx)>|\[Media:', re.IGNORECASE) # Valid media types VALID_MEDIA_TYPES = {'image', 'video', 'gif', 'sticker', 'audio', 'document'} @@ -280,14 +281,21 @@ def get_chat_insights(prompt_text: str) -> ChatSummary: 'well', 'were', 'what', 'when', 'which', 'who', 'will', 'with', 'would', 'year', 'you', 'your', 'yours', 'yourself', 'yourselves' } +# Additional words to filter from word cloud +MEDIA_RELATED_WORDS = { + 'image', 'video', 'gif', 'sticker', 'audio', 'document', + 'omitted', 'attached', 'file', 'photo', 'picture' +} def process_message_stats(message: str) -> tuple[list, dict, dict]: """ Process a single message for emojis and words. - Enhanced to normalize words (remove punctuation, simple contractions, - possessives) before checking against COMMON_WORDS. + Enhanced to normalize words and filter out media-related terms. """ - + # Skip media-related messages entirely + if MEDIA_PATTERN.search(str(message)): + return [], Counter(), Counter() + def normalize_word(word: str) -> str: """Return a 'normalized' version of the word for improved matching.""" # Lowercase the word @@ -309,7 +317,8 @@ def normalize_word(word: str) -> str: norm_word = normalize_word(raw_word) if ( len(norm_word) > 3 and # Remove very short words - norm_word not in COMMON_WORDS and # Filter out common words (using normalized form) + norm_word not in COMMON_WORDS and # Filter out common words + norm_word not in MEDIA_RELATED_WORDS and # Filter out media-related words not norm_word.isdigit() and # Remove pure numbers 'http' not in norm_word # Remove URLs or partial URLs ): @@ -347,50 +356,45 @@ def parse_whatsapp_chat(content: str) -> tuple[pd.DataFrame, List[MediaItem]]: message = clean_message(message) # Check for media messages - media_match = MEDIA_PATTERN.match(message) + media_match = MEDIA_PATTERN.search(message) if media_match: try: - # Extract media type from the match - # Groups: (1) = omitted type, (2) = attached type, (3) = file extension - omitted_type = media_match.group(1) - attached_type = media_match.group(2) # pylint: disable=unused-variable - file_extension = media_match.group(3) - - # Determine media type based on available information + # Extract media type from the message media_type = None - if omitted_type: - media_type = omitted_type.lower() - elif file_extension: - media_type = file_extension.lower() - - # Normalize to lowercase - normalized_type = media_type.lower() if media_type else '' - # First check if it's a valid media type directly - if normalized_type in VALID_MEDIA_TYPES: - mapped_type = normalized_type - else: - # If not, check if it's a known file extension - mapped_type = MEDIA_TYPE_MAP.get(normalized_type, 'unknown') - if mapped_type == 'unknown': - logger.warning("Unknown media type encountered: %s", media_type) + # Check for "X omitted" pattern + if 'omitted' in message.lower(): + for type_name in VALID_MEDIA_TYPES: + if type_name in message.lower(): + media_type = type_name + break - # Create the media item - media_item = MediaItem( - type=mapped_type, - sender=sender, - timestamp=timestamp, - reactions=0 # Will be updated later when processing reactions - ) + # Check for file extensions + if not media_type: + for ext, type_name in MEDIA_TYPE_MAP.items(): + if f'.{ext}' in message.lower(): + media_type = type_name + break - # Add to media items list - media_items.append(media_item) - - # Replace the original message with a standardized format - message = f"[Media: {mapped_type.upper()}] shared by {sender}" - - # Log the media item for debugging - logger.debug("Processed media item: %s from %s at %s", mapped_type, sender, timestamp) + if media_type: + # Create the media item + media_item = MediaItem( + type=media_type, + sender=sender, + timestamp=timestamp, + reactions=0 # Will be updated later when processing reactions + ) + + # Add to media items list + media_items.append(media_item) + + # Replace the original message with a standardized format + message = f"[Media: {media_type.upper()}] shared by {sender}" + + # Log the media item for debugging + logger.debug("Processed media item: %s from %s at %s", media_type, sender, timestamp) + else: + logger.debug("Could not determine media type for message: %s", message) except (AttributeError, IndexError) as e: logger.error("Error processing media message at line %d: %s", line_num, str(e)) @@ -511,19 +515,11 @@ def analyze_media_stats(df: pd.DataFrame, media_items: List[MediaItem]) -> Media k: round((v / total_media) * 100, 2) if total_media > 0 else 0 for k, v in media_by_type_dict.items() } - - # Add distribution percentages to the type counts - media_by_type_dict = { - k: { - 'count': v, - 'percentage': media_distribution[k] - } - for k, v in media_by_type_dict.items() - } - + return MediaStats( total_media_shared=int(len(media_items)), media_by_type=media_by_type_dict, + media_type_percentages=media_distribution, top_media_sharers=top_sharers, most_reacted_media=most_reacted ) @@ -534,6 +530,11 @@ def analyze_chat_stats(df: pd.DataFrame) -> tuple[Dict, Dict]: all_words = Counter() for message in df['message']: + # DataFrame should already be cleaned of media messages + # but we'll double-check just in case + if MEDIA_PATTERN.search(str(message)): + continue + _, emoji_counts, word_counts = process_message_stats(message) all_emojis.update(emoji_counts) all_words.update(word_counts) @@ -548,7 +549,7 @@ async def analyze_sentiment_batch(messages_batch: List[str]) -> float: # Filter out media messages filtered_messages = [ msg for msg in messages_batch - if not (msg.startswith('[Media:') or MEDIA_PATTERN.match(msg)) + if not MEDIA_PATTERN.search(str(msg)) ] if not filtered_messages: @@ -690,17 +691,20 @@ async def analyze_chat(file: UploadFile = File(...)): logger.error("No messages could be parsed from the chat file") raise HTTPException(status_code=400, detail="No messages could be parsed from the chat file. Please ensure this is a valid WhatsApp chat export.") - # Then perform analysis on cleaned data - emoji_counts, word_counts = analyze_chat_stats(df) - - # Basic statistics - logger.debug("Calculating user activity statistics") - most_active = df['sender'].value_counts().head(5).to_dict() - - # Analyze media statistics + # First analyze media statistics logger.debug("Analyzing media statistics") media_stats = analyze_media_stats(df, media_items) + # Create a clean DataFrame without media messages for text analysis + clean_df = df[~df['message'].apply(lambda x: bool(MEDIA_PATTERN.search(str(x))))] + + # Then perform analysis on cleaned data (excluding media messages) + emoji_counts, word_counts = analyze_chat_stats(clean_df) + + # Basic statistics from clean data + logger.debug("Calculating user activity statistics") + most_active = clean_df['sender'].value_counts().head(5).to_dict() + # Get chat insights using Claude via AWS Bedrock logger.debug("Preparing prompt for Claude analysis") # Intelligent message sampling - get messages from different time periods