Social media intelligence Systematic approaches for monitoring, analyzing, and investigating social media for journalism. When to activate Tracking how a story spreads across platforms Investigating potential coordinated inauthentic behavior Monitoring breaking news across social platforms Analyzing account networks and relationships Detecting bot activity or manipulation campaigns Building evidence trails for digital investigations Archiving social content before deletion Real-time monitoring Multi-platform tracker from dataclasses import dataclass , field from datetime import datetime from typing import List , Optional , Dict from enum import Enum import hashlib class Platform ( Enum ) : TWITTER = "twitter" FACEBOOK = "facebook" INSTAGRAM = "instagram" TIKTOK = "tiktok" YOUTUBE = "youtube" REDDIT = "reddit" THREADS = "threads" BLUESKY = "bluesky" MASTODON = "mastodon" @dataclass class SocialPost : platform : Platform post_id : str author : str content : str timestamp : datetime url : str engagement : Dict [ str , int ] = field ( default_factory = dict ) media_urls : List [ str ] = field ( default_factory = list ) archived_urls : List [ str ] = field ( default_factory = list ) content_hash : str = "" def post_init ( self ) :
Hash content for duplicate detection
self . content_hash = hashlib . md5 ( f" { self . platform . value } : { self . content } " . encode ( ) ) . hexdigest ( ) @dataclass class MonitoringQuery : keywords : List [ str ] platforms : List [ Platform ] accounts : List [ str ] = field ( default_factory = list ) hashtags : List [ str ] = field ( default_factory = list ) exclude_terms : List [ str ] = field ( default_factory = list ) start_date : Optional [ datetime ] = None def to_search_string ( self , platform : Platform ) -
str : """Generate platform-specific search query.""" parts = [ ]
Keywords
if self . keywords : parts . append ( ' OR ' . join ( f'" { k } "' for k in self . keywords ) )
Hashtags
if self . hashtags : parts . append ( ' OR ' . join ( f'# { h } ' for h in self . hashtags ) )
Exclusions
if self . exclude_terms : parts . append ( ' ' . join ( f'- { t } ' for t in self . exclude_terms ) ) return ' ' . join ( parts ) Breaking news monitor from collections import defaultdict from datetime import datetime , timedelta class BreakingNewsDetector : """Detect sudden spikes in keyword mentions.""" def init ( self , baseline_window_hours : int = 24 ) : self . baseline_window = timedelta ( hours = baseline_window_hours ) self . mention_history = defaultdict ( list ) def add_mention ( self , keyword : str , timestamp : datetime ) : """Record a mention of a keyword.""" self . mention_history [ keyword ] . append ( timestamp )
Prune old data
cutoff
datetime . now ( ) - self . baseline_window * 2 self . mention_history [ keyword ] = [ t for t in self . mention_history [ keyword ] if t
cutoff ] def is_spiking ( self , keyword : str , threshold_multiplier : float = 3.0 ) -
bool : """Check if keyword is spiking above baseline.""" now = datetime . now ( ) recent = sum ( 1 for t in self . mention_history [ keyword ] if t
now
timedelta ( hours = 1 ) ) baseline_hourly = len ( [ t for t in self . mention_history [ keyword ] if t
now
self . baseline_window ] ) / self . baseline_window . total_seconds ( ) * 3600 if baseline_hourly == 0 : return recent
10
Arbitrary threshold for new topics
return recent
baseline_hourly * threshold_multiplier def get_trending ( self , top_n : int = 10 ) -
List [ tuple ] : """Get keywords sorted by spike intensity.""" spikes = [ ] for keyword in self . mention_history : if self . is_spiking ( keyword ) : recent = sum ( 1 for t in self . mention_history [ keyword ] if t
datetime . now ( ) - timedelta ( hours = 1 ) ) spikes . append ( ( keyword , recent ) ) return sorted ( spikes , key = lambda x : x [ 1 ] , reverse = True ) [ : top_n ] Account analysis Authenticity indicators from dataclasses import dataclass from datetime import datetime from typing import List , Optional @dataclass class AccountAnalysis : username : str platform : Platform created_date : Optional [ datetime ] = None follower_count : int = 0 following_count : int = 0 post_count : int = 0
Authenticity signals
profile_photo_is_stock : Optional [ bool ] = None bio_contains_keywords : List [ str ] = field ( default_factory = list ) posts_primarily_reshares : Optional [ bool ] = None posting_pattern_irregular : Optional [ bool ] = None engagement_ratio_suspicious : Optional [ bool ] = None def calculate_red_flags ( self ) -
dict : """Score account authenticity.""" flags = { }
Account age
if self . created_date : age_days = ( datetime . now ( ) - self . created_date ) . days if age_days < 30 : flags [ 'new_account' ] = f"Created { age_days } days ago"
Follower ratio
if self . following_count
0 : ratio = self . follower_count / self . following_count if ratio < 0.1 : flags [ 'low_follower_ratio' ] = f"Ratio: { ratio : .2f } "
Posting frequency
if self . created_date and self . post_count
0 : age_days = max ( 1 , ( datetime . now ( ) - self . created_date ) . days ) posts_per_day = self . post_count / age_days if posts_per_day
50 : flags [ 'excessive_posting' ] = f" { posts_per_day : .0f } posts/day"
Stock photo check
if self . profile_photo_is_stock : flags [ 'stock_profile_photo' ] = "Profile appears to be stock image" return flags def authenticity_score ( self ) -
int : """0-100 score, higher = more likely authentic.""" score = 100 flags = self . calculate_red_flags ( ) penalty_per_flag = 20 score -= len ( flags ) * penalty_per_flag return max ( 0 , score ) Network mapping from collections import defaultdict from typing import Set , Dict class AccountNetwork : """Map relationships between accounts.""" def init ( self ) : self . interactions = defaultdict ( lambda : defaultdict ( int ) ) self . accounts = { } def add_interaction ( self , from_account : str , to_account : str , interaction_type : str = "mention" ) : """Record an interaction between accounts.""" self . interactions [ from_account ] [ to_account ] += 1 def find_clusters ( self , min_interactions : int = 3 ) -
List [ Set [ str ] ] : """Find groups of accounts that frequently interact."""
Build adjacency with minimum threshold
adjacency
defaultdict ( set ) for from_acc , targets in self . interactions . items ( ) : for to_acc , count in targets . items ( ) : if count
= min_interactions : adjacency [ from_acc ] . add ( to_acc ) adjacency [ to_acc ] . add ( from_acc )
Find connected components
visited
set ( ) clusters = [ ] for account in adjacency : if account in visited : continue cluster = set ( ) stack = [ account ] while stack : current = stack . pop ( ) if current in visited : continue visited . add ( current ) cluster . add ( current ) stack . extend ( adjacency [ current ] - visited ) if len ( cluster )
1 : clusters . append ( cluster ) return sorted ( clusters , key = len , reverse = True ) def coordination_score ( self , accounts : Set [ str ] ) -
float : """Score how coordinated a group of accounts appears.""" if len ( accounts ) < 2 : return 0.0 total_possible = len ( accounts ) * ( len ( accounts ) - 1 ) actual_connections = 0 for acc in accounts : for other in accounts : if acc != other and self . interactions [ acc ] [ other ]
0 : actual_connections += 1 return actual_connections / total_possible if total_possible
0 else 0 Narrative tracking Claim propagation tracker from dataclasses import dataclass , field from datetime import datetime from typing import List , Dict , Optional @dataclass class Claim : text : str first_seen : datetime first_seen_url : str variations : List [ str ] = field ( default_factory = list ) appearances : List [ Dict ] = field ( default_factory = list ) def add_appearance ( self , url : str , platform : Platform , timestamp : datetime , author : str ) : """Track where this claim has appeared.""" self . appearances . append ( { 'url' : url , 'platform' : platform . value , 'timestamp' : timestamp , 'author' : author } ) def spread_timeline ( self ) -
List [ Dict ] : """Get chronological spread of the claim.""" return sorted ( self . appearances , key = lambda x : x [ 'timestamp' ] ) def platforms_reached ( self ) -
Dict [ str , int ] : """Count appearances by platform.""" counts = defaultdict ( int ) for app in self . appearances : counts [ app [ 'platform' ] ] += 1 return dict ( counts ) def velocity ( self , window_hours : int = 24 ) -
float : """Calculate spread rate in appearances per hour.""" if not self . appearances : return 0.0 recent = [ a for a in self . appearances if a [ 'timestamp' ]
datetime . now ( ) - timedelta ( hours = window_hours ) ] return len ( recent ) / window_hours Hashtag analysis from collections import Counter from datetime import datetime , timedelta class HashtagAnalyzer : """Analyze hashtag usage patterns.""" def init ( self ) : self . hashtag_posts = defaultdict ( list ) def add_post ( self , hashtags : List [ str ] , post : SocialPost ) : """Record a post's hashtags.""" for tag in hashtags : self . hashtag_posts [ tag . lower ( ) ] . append ( post ) def co_occurrence ( self , hashtag : str , top_n : int = 10 ) -
List [ tuple ] : """Find hashtags that commonly appear with this one.""" co_tags = Counter ( ) for post in self . hashtag_posts . get ( hashtag . lower ( ) , [ ] ) :
Extract hashtags from post content
tags
[ word . lower ( ) for word in post . content . split ( ) if word . startswith ( '#' ) ] for tag in tags : if tag != f'# { hashtag . lower ( ) } ' : co_tags [ tag ] += 1 return co_tags . most_common ( top_n ) def posting_pattern ( self , hashtag : str ) -
Dict : """Analyze when posts with this hashtag appear.""" posts = self . hashtag_posts . get ( hashtag . lower ( ) , [ ] ) hour_counts = Counter ( p . timestamp . hour for p in posts ) day_counts = Counter ( p . timestamp . strftime ( '%A' ) for p in posts ) return { 'by_hour' : dict ( hour_counts ) , 'by_day' : dict ( day_counts ) , 'total_posts' : len ( posts ) , 'unique_authors' : len ( set ( p . author for p in posts ) ) } Evidence preservation Archive before it disappears import requests from datetime import datetime from typing import Optional class SocialArchiver : """Archive social content before deletion.""" def init ( self ) : self . archived = { } def archive_to_wayback ( self , url : str ) -
Optional [ str ] : """Submit URL to Internet Archive.""" try : save_url = f"https://web.archive.org/save/ { url } " response = requests . get ( save_url , timeout = 30 ) if response . status_code == 200 : archived_url = response . url self . archived [ url ] = { 'wayback' : archived_url , 'archived_at' : datetime . now ( ) . isoformat ( ) } return archived_url except Exception as e : print ( f"Archive failed: { e } " ) return None def archive_to_archive_today ( self , url : str ) -
Optional [ str ] : """Submit URL to archive.today.""" try : response = requests . post ( 'https://archive.today/submit/' , data = { 'url' : url } , timeout = 60 ) if response . status_code == 200 : return response . url except Exception as e : print ( f"Archive.today failed: { e } " ) return None def full_archive ( self , url : str ) -
dict : """Archive to multiple services for redundancy.""" results = { 'original_url' : url , 'archived_at' : datetime . now ( ) . isoformat ( ) , 'archives' : { } } wayback = self . archive_to_wayback ( url ) if wayback : results [ 'archives' ] [ 'wayback' ] = wayback archive_today = self . archive_to_archive_today ( url ) if archive_today : results [ 'archives' ] [ 'archive_today' ] = archive_today return results Coordination detection Behavioral signals checklist
Coordinated inauthentic behavior indicators
Timing patterns
[ ] Multiple accounts posting same content within minutes
[ ] Synchronized posting times across accounts
[ ] Burst activity followed by dormancy
[ ] Posts appear faster than human typing speed
Content patterns
[ ] Identical or near-identical text across accounts
[ ] Same images/media shared by multiple accounts
[ ] Identical typos or formatting errors
[ ] Copy-paste artifacts visible
Account patterns
[ ] Accounts created around same time
[ ] Similar naming conventions (name + numbers)
[ ] Generic or stock profile photos
[ ] Minimal personal content, mostly shares
[ ] Follow the same accounts
[ ] Engage with each other disproportionately
Network patterns
[ ] Form dense clusters in network analysis
[ ] Amplify same external sources
[ ] Target same accounts or hashtags
[ ] Cross-platform coordination visible Automated coordination scoring def coordination_likelihood ( posts : List [ SocialPost ] ) -
dict : """Score how likely posts represent coordinated activity.""" if len ( posts ) < 2 : return { 'score' : 0 , 'signals' : [ ] } signals = [ ] score = 0
Check for identical content
contents
[ p . content for p in posts ] unique_contents = set ( contents ) if len ( unique_contents ) < len ( contents ) * 0.5 : signals . append ( "High content duplication" ) score += 30
Check timing clusters
timestamps
sorted ( p . timestamp for p in posts ) rapid_posts = 0 for i in range ( 1 , len ( timestamps ) ) : if ( timestamps [ i ] - timestamps [ i - 1 ] ) . seconds < 60 : rapid_posts += 1 if rapid_posts
len ( posts ) * 0.3 : signals . append ( "Suspicious timing clusters" ) score += 25
Check unique authors
authors
set ( p . author for p in posts ) if len ( authors )
5 and len ( contents ) / len ( authors )
2 : signals . append ( "Few authors, many similar posts" ) score += 20 return { 'score' : min ( 100 , score ) , 'signals' : signals , 'posts_analyzed' : len ( posts ) , 'unique_authors' : len ( authors ) } Platform-specific tools Platform Monitoring Tool Notes Twitter/X TweetDeck, Brandwatch API increasingly restricted Facebook CrowdTangle (limited) Academic access only now Instagram Later, Brandwatch No public API for search TikTok Exolyt, Pentos Limited historical data Reddit Pushshift, Arctic Shift Archive access varies YouTube YouTube Data API Good metadata access Bluesky Firehose API Open, real-time access Ethical guidelines Archive public content only Don't create fake accounts for monitoring Respect platform terms of service Protect sources who share social content Verify before publishing claims about coordination Consider context before amplifying harmful content