Digital archive methodology
Patterns for building production-quality digital archives with AI-powered analysis and knowledge graph construction.
Archive architecture Multi-source integration pattern ┌─────────────────┐ ┌──────────────────┐ ┌────────────────┐ │ OCR Pipeline │ │ Web Scraping │ │ Social Media │ │ (newspapers) │ │ (articles) │ │ (transcripts) │ └────────┬────────┘ └────────┬─────────┘ └───────┬────────┘ │ │ │ └──────────────────────┼──────────────────────┘ │ ┌───────────▼───────────┐ │ Unified Schema │ │ (35+ fields) │ └───────────┬───────────┘ │ ┌──────────────────────┼──────────────────────┐ │ │ │ ┌────────▼────────┐ ┌──────────▼──────────┐ ┌───────▼───────┐ │ AI Enrichment │ │ Entity Extraction │ │ PDF Archive │ │ (Gemini) │ │ (Knowledge Graph) │ │ (WCAG 2.1) │ └────────┬────────┘ └──────────┬──────────┘ └───────┬───────┘ │ │ │ └──────────────────────┼──────────────────────┘ │ ┌───────────▼───────────┐ │ Google Sheets │ │ (primary database) │ └───────────┬───────────┘ │ ┌───────────▼───────────┐ │ Frontend Export │ │ (JSON/CSV) │ └───────────────────────┘
Unified schema design from dataclasses import dataclass, field from datetime import date from typing import Optional from enum import Enum
class ContentType(Enum): ARTICLE = 'Article' VIDEO = 'Video' AUDIO = 'Audio' SOCIAL = 'Social Post' NEWSPAPER = 'Newspaper Article'
class ThematicCategory(Enum): PRESS_CRITICISM = 'Press & Media Criticism' JOURNALISM_THEORY = 'Journalism Theory' POLITICS = 'Politics & Democracy' TECHNOLOGY = 'Technology & Digital Media' EDUCATION = 'Journalism Education' AUDIENCE = 'Audience & Public Engagement'
class HistoricalEra(Enum): ERA_1990s = '1990-1999' ERA_2000_04 = '2000-2004' ERA_2005_09 = '2005-2009' ERA_2010_15 = '2010-2015' ERA_2016_20 = '2016-2020' ERA_2021_PRESENT = '2021-present'
@dataclass class ArchiveRecord: # Core identifiers id: str # Format: SOURCE-00001 url: str title: str
# Content
author: Optional[str] = None
publication_date: Optional[date] = None
publication: Optional[str] = None
content_type: ContentType = ContentType.ARTICLE
text: str = ''
# AI-enriched fields
summary: Optional[str] = None
pull_quote: Optional[str] = None
categories: list[ThematicCategory] = field(default_factory=list)
key_concepts: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
era: Optional[HistoricalEra] = None
scope: Optional[str] = None # Theoretical, Commentary, Case Study, etc.
# Entity references
entities_mentioned: list[str] = field(default_factory=list)
related_to: list[str] = field(default_factory=list)
responds_to: list[str] = field(default_factory=list)
# Archive metadata
pdf_url: Optional[str] = None
transcript_url: Optional[str] = None
verified: bool = False
processing_status: str = 'pending'
last_updated: Optional[date] = None
def generate_record_id(source: str, sequence: int) -> str: """Generate unique ID with source prefix.""" prefixes = { 'nytimes': 'NYT', 'columbia journalism review': 'CJR', 'pressthink': 'PT', 'twitter': 'TW', 'youtube': 'YT', 'newspaper': 'NEWS', } prefix = prefixes.get(source.lower(), 'MISC') return f"{prefix}-{sequence:05d}"
AI-powered categorization Taxonomy-based classification import google.generativeai as genai import json from typing import Optional
TAXONOMY = { "thematic_categories": [ "Press & Media Criticism", "Journalism Theory", "Politics & Democracy", "Technology & Digital Media", "Journalism Education", "Audience & Public Engagement" ], "key_concepts": [ "The View from Nowhere", "Verification vs. Assertion", "Citizens vs. Consumers", "Public Journalism", "The Rosen Test", "Savvy vs. Naive", "Professional vs. Amateur", "Production vs. Distribution", "Trust vs. Transparency", "Horse Race Coverage", "Both Sides Journalism", "Audience Atomization", "The Church of the Savvy" ], "scope_types": [ "Theoretical", "Commentary", "Historical", "Case Study", "Pedagogical", "Personal Reflection" ] }
class ArchiveCategorizer: def init(self, model: str = 'gemini-2.0-flash'): self.model = genai.GenerativeModel(model)
def categorize(self, record: ArchiveRecord) -> dict:
prompt = f"""Analyze this archival content and categorize it according to the taxonomy.
CONTENT: Title: {record.title} Author: {record.author or 'Unknown'} Date: {record.publication_date or 'Unknown'} Text (first 8000 chars):
TAXONOMY:
Respond with JSON containing: {{ "categories": ["category1", "category2"], // 1-3 from thematic_categories "key_concepts": ["concept1", "concept2"], // 0-5 from key_concepts list "scope": "scope_type", // one from scope_types "era": "YYYY-YYYY", // decade range "tags": ["tag1", "tag2", "tag3", "tag4", "tag5"], // 5 contextual keywords "summary": "2-3 sentence summary", "pull_quote": "Most impactful quote from the text" }}
IMPORTANT: - Only use categories/concepts from the taxonomy - Tags should be lowercase, hyphenated keywords - Summary should capture the main argument - Pull quote must be an exact excerpt from the text """
response = self.model.generate_content(prompt)
result = self._parse_response(response.text)
# Validate against taxonomy
result['categories'] = [c for c in result.get('categories', [])
if c in TAXONOMY['thematic_categories']]
result['key_concepts'] = [c for c in result.get('key_concepts', [])
if c in TAXONOMY['key_concepts']]
return result
def _parse_response(self, text: str) -> dict:
"""Extract JSON from response, handling markdown code blocks."""
# Remove markdown code blocks if present
if '```json' in text:
text = text.split('```json')[1].split('```')[0]
elif '```' in text:
text = text.split('```')[1].split('```')[0]
return json.loads(text.strip())
def validate_response(self, result: dict, text: str) -> bool:
"""Detect AI hallucination patterns."""
# Check for uniform response signature (all same values)
if len(set(result.get('tags', []))) < 3:
return False
# Check pull quote exists in text
pull_quote = result.get('pull_quote', '')
if pull_quote and pull_quote.lower() not in text.lower():
return False
# Check summary isn't generic
generic_phrases = ['this article discusses', 'the author explores', 'this piece examines']
summary = result.get('summary', '').lower()
if any(phrase in summary for phrase in generic_phrases):
return False
return True
Entity extraction and knowledge graph Entity types and relationships from dataclasses import dataclass from typing import Literal
EntityType = Literal['Person', 'Organization', 'Work', 'Concept', 'Event', 'Location'] RelationshipType = Literal[ 'Mentions', 'Criticizes', 'Cites', 'Discusses', 'Expands On', 'Supports', 'Founded By', 'Pioneered', 'Inspired By', 'Affiliated With', 'Published In', 'Originated By', 'Occurred At', 'Owns', 'Owned By' ]
@dataclass class Entity: id: str # P-001, O-001, W-001, etc. name: str type: EntityType aliases: list[str] # Alternative names/spellings prominence: float # 0-10 based on discussion depth mention_count: int = 0 first_mentioned_in: str = '' # Record ID
@dataclass class Relationship: source_entity_id: str target_entity_id: str relationship_type: RelationshipType source_record_id: str # Which record established this relationship confidence: float = 1.0
class EntityRegistry: """Deduplication and normalization for entities."""
NORMALIZATIONS = {
'nyt': 'The New York Times',
'new york times': 'The New York Times',
'ny times': 'The New York Times',
'washington post': 'The Washington Post',
'wapo': 'The Washington Post',
'cnn': 'CNN',
'fox': 'Fox News',
'fox news channel': 'Fox News',
}
def __init__(self):
self.entities: dict[str, Entity] = {}
self.name_to_id: dict[str, str] = {}
def normalize_name(self, name: str) -> str:
"""Normalize entity name to canonical form."""
name_lower = name.lower().strip()
return self.NORMALIZATIONS.get(name_lower, name.strip())
def find_or_create(self, name: str, entity_type: EntityType) -> Entity:
"""Find existing entity or create new one."""
normalized = self.normalize_name(name)
# Check if already exists
if normalized.lower() in self.name_to_id:
entity_id = self.name_to_id[normalized.lower()]
entity = self.entities[entity_id]
entity.mention_count += 1
return entity
# Create new entity
type_prefix = entity_type[0].upper() # P, O, W, C, E, L
count = sum(1 for e in self.entities.values() if e.type == entity_type)
entity_id = f"{type_prefix}-{count + 1:04d}"
entity = Entity(
id=entity_id,
name=normalized,
type=entity_type,
aliases=[name] if name != normalized else [],
prominence=0.0,
mention_count=1
)
self.entities[entity_id] = entity
self.name_to_id[normalized.lower()] = entity_id
return entity
AI-powered entity extraction class EntityExtractor: def init(self, registry: EntityRegistry): self.registry = registry self.model = genai.GenerativeModel('gemini-2.0-flash')
def extract(self, record: ArchiveRecord) -> tuple[list[Entity], list[Relationship]]:
prompt = f"""Extract named entities and relationships from this archival content.
CONTENT: Title: {record.title} Text: {record.text[:10000]}
ENTITY TYPES: - Person: journalists, politicians, academics, media figures - Organization: news outlets, media companies, academic institutions - Work: articles, books, blog posts, studies, reports - Concept: journalism theories, media criticism frameworks - Event: conferences, elections, media crises - Location: geographic locations relevant to media context
RELATIONSHIP TYPES: - Mentions, Criticizes, Cites, Discusses, Expands On, Supports - Founded By, Pioneered, Inspired By - Affiliated With, Published In, Originated By, Occurred At - Owns, Owned By
Respond with JSON: {{ "entities": [ {{"name": "Entity Name", "type": "Person|Organization|...", "prominence": 1-10}} ], "relationships": [ {{"source": "Entity Name", "target": "Entity Name", "type": "Relationship Type"}} ] }}
IMPORTANT: - Prominence: 1-3 = mentioned briefly, 4-6 = discussed, 7-10 = central focus - Only extract entities actually discussed, not just mentioned in passing - Relationships must connect entities that appear in the same text """
response = self.model.generate_content(prompt)
data = json.loads(response.text)
entities = []
entity_name_to_obj = {}
# Process entities
for e in data.get('entities', []):
entity = self.registry.find_or_create(e['name'], e['type'])
entity.prominence = max(entity.prominence, e.get('prominence', 5))
entities.append(entity)
entity_name_to_obj[e['name'].lower()] = entity
# Process relationships
relationships = []
for r in data.get('relationships', []):
source = entity_name_to_obj.get(r['source'].lower())
target = entity_name_to_obj.get(r['target'].lower())
if source and target:
relationships.append(Relationship(
source_entity_id=source.id,
target_entity_id=target.id,
relationship_type=r['type'],
source_record_id=record.id
))
return entities, relationships
PDF archival generation from reportlab.lib.pagesizes import letter from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image from reportlab.lib.units import inch from pathlib import Path
class ArchivePDFGenerator: """Generate accessible PDFs for archival preservation."""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
self.styles = getSampleStyleSheet()
# Custom styles
self.styles.add(ParagraphStyle(
'ArchiveTitle',
parent=self.styles['Heading1'],
fontSize=16,
spaceAfter=12
))
self.styles.add(ParagraphStyle(
'ArchiveMeta',
parent=self.styles['Normal'],
fontSize=10,
textColor='#666666',
spaceAfter=6
))
def generate(self, record: ArchiveRecord) -> Path:
output_path = self.output_dir / f"{record.id}.pdf"
doc = SimpleDocTemplate(
str(output_path),
pagesize=letter,
title=record.title,
author=record.author or 'Unknown',
subject=f"Archive record {record.id}"
)
story = []
# Title
story.append(Paragraph(record.title, self.styles['ArchiveTitle']))
# Metadata block
meta_lines = [
f"<b>Author:</b> {record.author or 'Unknown'}",
f"<b>Date:</b> {record.publication_date or 'Unknown'}",
f"<b>Source:</b> {record.publication or 'Unknown'}",
f"<b>URL:</b> {record.url}",
f"<b>Archive ID:</b> {record.id}",
]
for line in meta_lines:
story.append(Paragraph(line, self.styles['ArchiveMeta']))
story.append(Spacer(1, 0.25 * inch))
# Summary (if available)
if record.summary:
story.append(Paragraph("<b>Summary:</b>", self.styles['Heading2']))
story.append(Paragraph(record.summary, self.styles['Normal']))
story.append(Spacer(1, 0.25 * inch))
# Main content
story.append(Paragraph("<b>Full Text:</b>", self.styles['Heading2']))
# Split into paragraphs and add
paragraphs = record.text.split('\n\n')
for para in paragraphs:
if para.strip():
story.append(Paragraph(para.strip(), self.styles['Normal']))
story.append(Spacer(1, 0.1 * inch))
# Build PDF
doc.build(story)
return output_path
Data quality and validation from dataclasses import dataclass from typing import Callable
@dataclass class ValidationResult: field: str valid: bool message: str severity: Literal['error', 'warning', 'info']
class ArchiveValidator: """Validate archive records for completeness and consistency."""
REQUIRED_FIELDS = ['id', 'url', 'title', 'text']
CRITICAL_FIELDS = ['publication_date', 'author', 'summary']
OPTIONAL_FIELDS = ['categories', 'tags', 'pull_quote']
def validate(self, record: ArchiveRecord) -> list[ValidationResult]:
results = []
# Required fields
for field in self.REQUIRED_FIELDS:
value = getattr(record, field, None)
if not value:
results.append(ValidationResult(
field=field,
valid=False,
message=f"Required field '{field}' is missing",
severity='error'
))
# Critical fields (should have but not blocking)
for field in self.CRITICAL_FIELDS:
value = getattr(record, field, None)
if not value:
results.append(ValidationResult(
field=field,
valid=False,
message=f"Critical field '{field}' is missing",
severity='warning'
))
# Content length check
if record.text and len(record.text) < 100:
results.append(ValidationResult(
field='text',
valid=False,
message=f"Text unusually short ({len(record.text)} chars)",
severity='warning'
))
# Date format validation
if record.publication_date:
try:
# Ensure date is valid
_ = record.publication_date.isoformat()
except (AttributeError, ValueError):
results.append(ValidationResult(
field='publication_date',
valid=False,
message="Invalid date format",
severity='error'
))
# Category validation
for cat in record.categories:
if cat not in ThematicCategory:
results.append(ValidationResult(
field='categories',
valid=False,
message=f"Unknown category: {cat}",
severity='warning'
))
return results
def is_complete(self, record: ArchiveRecord) -> bool:
"""Check if record has all critical fields populated."""
results = self.validate(record)
errors = [r for r in results if r.severity == 'error']
return len(errors) == 0
Integration workflow class ArchiveWorkflow: """Orchestrate the complete archive processing pipeline."""
def __init__(self, config: Config):
self.scraper = ScrapingCascade()
self.categorizer = ArchiveCategorizer()
self.entity_registry = EntityRegistry()
self.entity_extractor = EntityExtractor(self.entity_registry)
self.pdf_generator = ArchivePDFGenerator(config.PDF_DIR)
self.sheets_service = SheetsService(config.CREDENTIALS_PATH)
self.validator = ArchiveValidator()
self.progress = ProgressTracker(config.PROGRESS_FILE)
def process_url(self, url: str, record_id: str) -> ArchiveRecord:
"""Process a single URL through the complete pipeline."""
# 1. Scrape content
result = self.scraper.fetch(url)
if not result:
raise ValueError(f"Failed to scrape: {url}")
# 2. Create initial record
record = ArchiveRecord(
id=record_id,
url=url,
title=result.title,
text=result.content
)
# 3. AI categorization
categories = self.categorizer.categorize(record)
record.summary = categories.get('summary')
record.pull_quote = categories.get('pull_quote')
record.categories = categories.get('categories', [])
record.key_concepts = categories.get('key_concepts', [])
record.tags = categories.get('tags', [])
record.era = categories.get('era')
record.scope = categories.get('scope')
# 4. Entity extraction
entities, relationships = self.entity_extractor.extract(record)
record.entities_mentioned = [e.id for e in entities]
# 5. Generate PDF
pdf_path = self.pdf_generator.generate(record)
record.pdf_url = str(pdf_path)
# 6. Validate
validation = self.validator.validate(record)
record.verified = self.validator.is_complete(record)
record.processing_status = 'completed'
return record
def run_batch(self, input_csv: Path):
"""Process all URLs from input CSV."""
for row in read_input(input_csv):
if self.progress.is_processed(row.id):
continue
try:
record = self.process_url(row.url, row.id)
self.sheets_service.append_row(self.worksheet, record_to_row(record))
self.progress.mark_processed(row.id)
except Exception as e:
self.progress.log_error(row.id, str(e))
Export for frontend consumption import json from pathlib import Path
def export_for_frontend(records: list[ArchiveRecord], output_dir: Path): """Export archive data in frontend-friendly formats."""
# Main archive JSON
archive_data = {
'metadata': {
'total_records': len(records),
'last_updated': datetime.now().isoformat(),
'schema_version': '2.0'
},
'records': [asdict(r) for r in records]
}
(output_dir / 'archive-data.json').write_text(
json.dumps(archive_data, indent=2, default=str)
)
# Entity export
entities_data = [asdict(e) for e in entity_registry.entities.values()]
(output_dir / 'entities.json').write_text(
json.dumps(entities_data, indent=2)
)
# Relationships export
relationships_data = [asdict(r) for r in all_relationships]
(output_dir / 'relationships.json').write_text(
json.dumps(relationships_data, indent=2)
)
# CSV exports for spreadsheet compatibility
records_df = pd.DataFrame([asdict(r) for r in records])
records_df.to_csv(output_dir / 'archive_records.csv', index=False)