Coverage for security / ai_governance.py: 90.0%
442 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2AI Governance Framework — Constitutional scoring for a rational hive.
4DESIGN PRINCIPLE: The default is FREEDOM. In a constitutional system, humans
5have the right to act freely unless a specific constitutional rule is violated.
6This framework never blocks first — it scores, explains, and bounds.
8WHAT DETERMINISM ACTUALLY MEANS:
9 Determinism ≠ binary regex gates that false-positive on innocent text.
10 Determinism = same input ALWAYS produces the same output, AND the output
11 is verifiable, reproducible, and auditable.
13 A mathematical function is deterministic.
14 A weighted multi-signal score is deterministic.
15 A Merkle-linked decision chain is deterministic.
16 A regex that blocks "deceptive" and catches "a deceptive practice in law" is NOT
17 intelligent — it's a blunt instrument that sacrifices accuracy for the illusion of safety.
19THE CONSTITUTIONAL SCORING MODEL:
20 1. Start with full freedom (score = 1.0)
21 2. Each constitutional signal ADJUSTS the score — never binary, always gradual
22 3. Multiple signals aggregate deterministically (weighted geometric mean)
23 4. Intelligence REFINES the score — catches what math misses, reduces false positives
24 5. Constitutional bounds constrain the final result — hard limits on specific dimensions
25 6. Every step is Merkle-linked — decisions are reproducible and auditable
27 The key insight: deterministic scoring is MORE accurate than binary gates because
28 it preserves information. A score of 0.15 tells you "very likely a violation."
29 A score of 0.85 tells you "probably fine." A binary gate destroys this nuance.
31 Intelligence is used to INCREASE accuracy, not to be bypassed. When the
32 deterministic score is ambiguous (0.3-0.7), intelligence resolves the ambiguity.
33 When the score is clear (< 0.1 or > 0.9), intelligence confirms but cannot override.
35ECONOMIC PRINCIPLE: Automation creates abundance, not scarcity.
36 Revenue flows TO people (90/9/1 — 90% to contributors).
37 People buy and sell freely within constitutional limits.
38 Constitutional voting determines what commerce is permitted.
39 The hive exists to make abundance available to everyone, everywhere.
41HUMAN CONSENT: The guardian talks with its human.
42 The AI companion interleaves human consent into the decision chain.
43 Consent is a constitutional right — actions that affect the user require it.
44"""
46import hashlib
47import json
48import logging
49import math
50import time
51from dataclasses import dataclass, field, asdict
52from enum import Enum
53from typing import Any, Callable, Dict, List, Optional, Tuple
55logger = logging.getLogger('hevolve_security')
58# ═══════════════════════════════════════════════════════════════════════
59# Decision Classifications
60# ═══════════════════════════════════════════════════════════════════════
62class DecisionDomain(Enum):
63 """Domains of governance — each has its own constitutional scoring."""
64 GOAL_APPROVAL = 'goal_approval'
65 COMPUTE_ALLOCATION = 'compute_allocation'
66 RALT_DISTRIBUTION = 'ralt_distribution'
67 REVENUE_DISTRIBUTION = 'revenue_distribution'
68 TRUST_ESTABLISHMENT = 'trust_establishment'
69 CONTENT_SAFETY = 'content_safety'
70 CODE_CHANGE = 'code_change'
71 RESOURCE_ACCESS = 'resource_access'
72 COMMERCE = 'commerce'
73 HUMAN_CONSENT = 'human_consent'
74 HUMAN_WELLBEING = 'human_wellbeing'
75 SELF_SOVEREIGNTY = 'self_sovereignty'
76 PRIVACY = 'privacy'
79class DecisionOutcome(Enum):
80 """Possible outcomes of a governance decision."""
81 APPROVED = 'approved' # Passed all checks
82 REJECTED = 'rejected' # Clear constitutional violation
83 BOUNDED = 'bounded' # Approved but constrained
84 DEFERRED = 'deferred' # Ambiguous — needs more info or human input
85 ESCALATED = 'escalated' # Needs human review
88# ═══════════════════════════════════════════════════════════════════════
89# Constitutional Signal — individual scoring dimension
90# ═══════════════════════════════════════════════════════════════════════
92@dataclass
93class ConstitutionalSignal:
94 """One dimension of constitutional scoring.
96 Each signal is a deterministic function that maps context → score.
97 Score range: 0.0 (clear violation) to 1.0 (clearly fine).
98 The signal also carries confidence: how sure are we about this score?
99 """
100 name: str
101 score: float # 0.0 = violation, 1.0 = fine
102 confidence: float # 0.0 = uncertain, 1.0 = certain
103 weight: float # Relative importance (default 1.0)
104 reasoning: str # Human-readable explanation
107@dataclass
108class GovernanceDecision:
109 """Immutable record of a governance decision with full signal chain."""
110 decision_id: str
111 domain: str
112 outcome: str
113 signals: List[dict] # All constitutional signals that contributed
114 aggregate_score: float # Deterministic aggregate of all signals
115 intelligent_adjustment: float # How much intelligence changed the score
116 final_score: float # After bounds enforcement
117 reasoning: str
118 timestamp: float = field(default_factory=time.time)
119 audit_hash: str = ''
120 parent_hash: str = '' # Previous decision hash — Merkle chain
122 def compute_audit_hash(self) -> str:
123 """Deterministic hash for audit trail — Merkle-linked to parent."""
124 payload = {
125 'decision_id': self.decision_id,
126 'domain': self.domain,
127 'outcome': self.outcome,
128 'aggregate_score': self.aggregate_score,
129 'intelligent_adjustment': self.intelligent_adjustment,
130 'final_score': self.final_score,
131 'reasoning': self.reasoning,
132 'timestamp': self.timestamp,
133 'parent_hash': self.parent_hash,
134 }
135 canonical = json.dumps(payload, sort_keys=True, separators=(',', ':'))
136 return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
139# ═══════════════════════════════════════════════════════════════════════
140# Constitutional Bounds — the ONLY hard limits
141# ═══════════════════════════════════════════════════════════════════════
142#
143# These are mathematical constraints, not pattern-matching gates.
144# They define the SPACE of allowed actions — anything inside is permitted.
145# Only specific, measurable violations trigger denial.
147CONSTITUTIONAL_BOUNDS = {
148 # Revenue: exact split (mathematical, not pattern-matched)
149 'revenue_users_pct': 0.90,
150 'revenue_infra_pct': 0.09,
151 'revenue_central_pct': 0.01,
152 # Compute: mathematical caps
153 'max_single_entity_influence': 0.05,
154 'reward_scaling': 'logarithmic',
155 # RALT: witness threshold (mathematical)
156 'min_ralt_witnesses': 2,
157 'max_skill_improvement_per_day': 0.05,
158 # Trust: time-based expiry (mathematical)
159 'audit_compute_ratio': 0.80,
160 'contract_validity_days': 30,
161 'max_violations_before_expulsion': 3,
162 # Budget: cost comparison (mathematical)
163 'local_model_cost_spark': 0,
164 'max_goals_per_hour': 10,
165 # Consent: time-based expiry (mathematical)
166 'consent_validity_hours': 24,
167 # Commerce: revenue floor (mathematical)
168 'commerce_revenue_to_contributors_min_pct': 0.90,
169}
172def get_constitutional_bound(key: str) -> Any:
173 """Get a constitutional bound value."""
174 return CONSTITUTIONAL_BOUNDS.get(key)
177# For backward compatibility
178DETERMINISTIC_BOUNDS = CONSTITUTIONAL_BOUNDS
180def get_deterministic_bound(key: str) -> Any:
181 """Backward-compatible alias."""
182 return CONSTITUTIONAL_BOUNDS.get(key)
185# ═══════════════════════════════════════════════════════════════════════
186# Constitutional Scorer — deterministic multi-signal scoring
187# ═══════════════════════════════════════════════════════════════════════
189def _aggregate_signals(signals: List[ConstitutionalSignal]) -> float:
190 """Deterministic aggregation of constitutional signals.
192 Uses weighted geometric mean — this is critical because:
193 1. It's deterministic (same signals → same result, always)
194 2. It preserves information (no binary destruction)
195 3. A single very low signal pulls the aggregate down (safety)
196 4. But it doesn't zero out on one borderline signal (accuracy)
198 Formula: exp(Σ(wᵢ × ln(sᵢ)) / Σ(wᵢ))
199 where sᵢ = max(signal_score, 0.001) to prevent log(0)
200 """
201 if not signals:
202 return 1.0 # No signals = full freedom
204 total_weight = sum(s.weight for s in signals)
205 if total_weight == 0:
206 return 1.0
208 weighted_log_sum = sum(
209 s.weight * math.log(max(s.score, 0.001))
210 for s in signals
211 )
212 return math.exp(weighted_log_sum / total_weight)
215def _aggregate_confidence(signals: List[ConstitutionalSignal]) -> float:
216 """Deterministic confidence aggregation.
218 High confidence = we know what we're scoring.
219 Low confidence = ambiguous, might need intelligence or human input.
220 """
221 if not signals:
222 return 1.0
224 total_weight = sum(s.weight for s in signals)
225 if total_weight == 0:
226 return 1.0
228 return sum(s.weight * s.confidence for s in signals) / total_weight
231# ═══════════════════════════════════════════════════════════════════════
232# Governance Pipeline — Constitutional scoring with Merkle audit
233# ═══════════════════════════════════════════════════════════════════════
235class GovernancePipeline:
236 """Constitutional scoring pipeline — freedom-first, accuracy-preserving.
238 Stage 1 (CONSTITUTIONAL SCORING):
239 Multiple deterministic signal functions score the action.
240 Each signal is a mathematical function, not a binary gate.
241 Default score = 1.0 (freedom). Signals reduce when they detect risk.
243 Stage 2 (INTELLIGENT REFINEMENT):
244 When aggregate confidence is LOW (ambiguous zone 0.3-0.7),
245 intelligence resolves ambiguity — increases accuracy.
246 When confidence is HIGH (clear zone <0.1 or >0.9),
247 intelligence confirms but its adjustment is bounded.
248 Intelligence ALWAYS runs — it is never bypassed.
250 Stage 3 (CONSTITUTIONAL BOUNDS):
251 Mathematical constraints on specific dimensions.
252 Revenue split, compute caps, witness thresholds.
253 These are the ONLY hard limits — everything else is scored.
255 Stage 4 (MERKLE AUDIT):
256 Every decision is hash-linked to the previous one.
257 The full chain is reproducible and verifiable.
258 """
260 def __init__(self):
261 self._scorers: Dict[str, List[Callable]] = {} # domain → [scorer_fns]
262 self._refiners: Dict[str, Callable] = {} # domain → intelligence_fn
263 self._bounds: Dict[str, Callable] = {} # domain → bounds_fn
264 self._decision_log: List[GovernanceDecision] = []
265 self._last_hash: str = '' # Merkle chain head
266 self._lock = __import__('threading').Lock()
268 # --- Registration ---
270 def register_scorer(self, domain: str, scorer_fn: Callable):
271 """Register a constitutional scoring function.
273 scorer_fn(context: dict) -> ConstitutionalSignal
274 """
275 self._scorers.setdefault(domain, []).append(scorer_fn)
277 def register_refiner(self, domain: str, refiner_fn: Callable):
278 """Register an intelligent refinement function.
280 refiner_fn(aggregate_score: float, confidence: float, context: dict) -> float
281 Returns adjustment in [-0.3, +0.3] range.
282 """
283 self._refiners[domain] = refiner_fn
285 def register_bounds(self, domain: str, bounds_fn: Callable):
286 """Register a constitutional bounds function.
288 bounds_fn(score: float, context: dict) -> (float, str)
289 Returns (bounded_score, reason).
290 """
291 self._bounds[domain] = bounds_fn
293 # --- Backward compatibility ---
295 def register_gate(self, domain: str, gate_fn: Callable):
296 """Backward compat: wrap a binary gate as a scorer."""
297 def _gate_as_scorer(context):
298 try:
299 passed, reason = gate_fn(context)
300 return ConstitutionalSignal(
301 name=f'gate_{domain}',
302 score=1.0 if passed else 0.0,
303 confidence=1.0,
304 weight=2.0, # Gates get high weight
305 reasoning=reason,
306 )
307 except Exception as e:
308 return ConstitutionalSignal(
309 name=f'gate_{domain}',
310 score=0.5,
311 confidence=0.2,
312 weight=1.0,
313 reasoning=f'Gate error: {e}',
314 )
315 self.register_scorer(domain, _gate_as_scorer)
317 def register_evaluator(self, domain: str, eval_fn: Callable):
318 """Backward compat: wrap an evaluator as a refiner."""
319 def _eval_as_refiner(aggregate, confidence, context):
320 try:
321 raw = float(eval_fn(context))
322 raw = max(0.0, min(1.0, raw))
323 return (raw - aggregate) * 0.5 # Bounded adjustment
324 except Exception:
325 return 0.0
326 self.register_refiner(domain, _eval_as_refiner)
328 def register_validator(self, domain: str, validate_fn: Callable):
329 """Backward compat: wrap a validator as bounds."""
330 self.register_bounds(domain, validate_fn)
332 # --- Core Decision Engine ---
334 def decide(self, domain: str, context: dict,
335 decision_id: str = '') -> GovernanceDecision:
336 """Run the constitutional scoring pipeline.
338 The default is FREEDOM (score = 1.0).
339 Signals reduce the score when they detect constitutional risk.
340 Intelligence refines when confidence is low.
341 Bounds enforce hard mathematical limits.
342 Everything is Merkle-audited.
343 """
344 if not decision_id:
345 import uuid
346 decision_id = uuid.uuid4().hex[:16]
348 # ── Stage 1: CONSTITUTIONAL SCORING ──
349 # Multiple signals, each deterministic, each scored 0-1
350 scorer_fns = self._scorers.get(domain, [])
351 signals: List[ConstitutionalSignal] = []
352 for fn in scorer_fns:
353 try:
354 sig = fn(context)
355 if isinstance(sig, ConstitutionalSignal):
356 signals.append(sig)
357 except Exception as e:
358 logger.debug(f"Scorer error in {domain}: {e}")
360 aggregate = _aggregate_signals(signals)
361 confidence = _aggregate_confidence(signals)
363 # ── Stage 2: INTELLIGENT REFINEMENT ──
364 # Intelligence ALWAYS runs — never bypassed.
365 # Its adjustment is proportional to ambiguity:
366 # High confidence (>0.8): adjustment bounded to ±0.1
367 # Medium confidence (0.4-0.8): adjustment bounded to ±0.2
368 # Low confidence (<0.4): adjustment bounded to ±0.3
369 adjustment = 0.0
370 refiner_fn = self._refiners.get(domain)
371 if refiner_fn:
372 try:
373 raw_adj = float(refiner_fn(aggregate, confidence, context))
374 except Exception:
375 raw_adj = 0.0
377 # Bound adjustment by confidence — less certain = more room for AI
378 if confidence > 0.8:
379 max_adj = 0.1
380 elif confidence > 0.4:
381 max_adj = 0.2
382 else:
383 max_adj = 0.3
384 adjustment = max(-max_adj, min(max_adj, raw_adj))
386 refined_score = max(0.0, min(1.0, aggregate + adjustment))
388 # ── Stage 3: CONSTITUTIONAL BOUNDS ──
389 bounds_fn = self._bounds.get(domain)
390 if bounds_fn:
391 try:
392 final_score, bound_reason = bounds_fn(refined_score, context)
393 final_score = max(0.0, min(1.0, final_score))
394 except Exception as e:
395 final_score = refined_score
396 bound_reason = f'Bounds error: {e}'
397 else:
398 final_score = refined_score
399 bound_reason = 'No bounds — score unchanged'
401 # ── Determine Outcome ──
402 if final_score >= 0.7:
403 outcome = DecisionOutcome.APPROVED.value
404 elif final_score >= 0.5:
405 # Approved but we should note the constraint
406 outcome = DecisionOutcome.BOUNDED.value
407 elif final_score >= 0.3:
408 # Ambiguous — defer to human or more information
409 outcome = DecisionOutcome.DEFERRED.value
410 else:
411 outcome = DecisionOutcome.REJECTED.value
413 # Build reasoning from signal chain
414 signal_summary = '; '.join(
415 f'{s.name}={s.score:.2f}(c={s.confidence:.1f})'
416 for s in signals
417 ) or 'no signals'
419 reasoning = (
420 f'Signals: [{signal_summary}]; '
421 f'Aggregate: {aggregate:.3f} (confidence: {confidence:.2f}); '
422 f'Intelligence: {adjustment:+.3f} → {refined_score:.3f}; '
423 f'Bounds: {final_score:.3f} ({bound_reason})'
424 )
426 # ── Stage 4: MERKLE AUDIT ──
427 with self._lock:
428 parent_hash = self._last_hash
430 decision = GovernanceDecision(
431 decision_id=decision_id,
432 domain=domain,
433 outcome=outcome,
434 signals=[asdict(s) for s in signals],
435 aggregate_score=aggregate,
436 intelligent_adjustment=adjustment,
437 final_score=final_score,
438 reasoning=reasoning,
439 parent_hash=parent_hash,
440 )
441 decision.audit_hash = decision.compute_audit_hash()
443 self._record(decision)
444 return decision
446 def _record(self, decision: GovernanceDecision):
447 """Record decision and advance Merkle chain."""
448 with self._lock:
449 self._decision_log.append(decision)
450 self._last_hash = decision.audit_hash
452 try:
453 from security.immutable_audit_log import get_audit_log
454 get_audit_log().log_event(
455 'governance_decision',
456 actor_id='ai_governance',
457 action=(
458 f'{decision.domain}:{decision.outcome} '
459 f'score={decision.final_score:.2f} '
460 f'id={decision.decision_id}'
461 ),
462 )
463 except Exception:
464 pass
466 def get_recent_decisions(self, domain: str = '',
467 limit: int = 50) -> List[dict]:
468 """Return recent decisions for inspection."""
469 with self._lock:
470 decisions = self._decision_log[-limit:]
471 if domain:
472 decisions = [d for d in decisions if d.domain == domain]
473 return [asdict(d) for d in decisions]
475 def verify_merkle_chain(self, decisions: List[GovernanceDecision] = None
476 ) -> Tuple[bool, str]:
477 """Verify the Merkle chain integrity of the decision log."""
478 with self._lock:
479 chain = decisions or self._decision_log
480 if not chain:
481 return True, 'Empty chain'
482 for i, d in enumerate(chain):
483 recomputed = d.compute_audit_hash()
484 if recomputed != d.audit_hash:
485 return False, f'Decision {i} hash mismatch (tampered)'
486 if i > 0 and d.parent_hash != chain[i - 1].audit_hash:
487 return False, f'Decision {i} Merkle link broken'
488 return True, f'Chain verified: {len(chain)} decisions'
491# ═══════════════════════════════════════════════════════════════════════
492# Built-in Constitutional Scorers
493# ═══════════════════════════════════════════════════════════════════════
494#
495# These are SCORING FUNCTIONS, not binary gates.
496# They return a ConstitutionalSignal with score, confidence, and reasoning.
497# The default is 1.0 (freedom) — signals reduce when they detect risk.
499def _score_content_safety(context: dict) -> ConstitutionalSignal:
500 """Score content against constitutional rules.
502 Unlike a binary gate, this scores HOW MUCH the content violates.
503 A mild similarity to a violation pattern scores 0.6 (caution).
504 A direct match scores 0.05 (near-certain violation).
505 No match scores 1.0 (freedom).
507 This preserves accuracy — "a deceptive practice in law" scores 0.7
508 (the word "deceptive" appears but context is legal discussion, not scam).
509 A binary gate would have blocked it.
510 """
511 text = context.get('text', '')
512 if not text:
513 return ConstitutionalSignal(
514 name='content_safety', score=1.0, confidence=1.0,
515 weight=1.0, reasoning='No text — full freedom',
516 )
518 from security.hive_guardrails import VALUES
519 violation_count = 0
520 destructive_count = 0
521 matched_patterns = []
523 for pattern in VALUES.VIOLATION_PATTERNS:
524 matches = pattern.findall(text)
525 if matches:
526 violation_count += len(matches)
527 matched_patterns.append(pattern.pattern[:40])
529 for pattern in VALUES.DESTRUCTIVE_PATTERNS:
530 matches = pattern.findall(text)
531 if matches:
532 destructive_count += len(matches)
533 matched_patterns.append(pattern.pattern[:40])
535 if violation_count == 0 and destructive_count == 0:
536 return ConstitutionalSignal(
537 name='content_safety', score=1.0, confidence=0.9,
538 weight=1.0, reasoning='No violation patterns detected — freedom preserved',
539 )
541 # Score degrades with match density (matches per 1000 chars)
542 text_len = max(len(text), 1)
543 density = (violation_count + destructive_count) / (text_len / 1000.0)
545 # Sigmoid-like scoring: density of 1 match/1000 chars → ~0.4
546 # density of 5+ → ~0.05. This is smooth, not binary.
547 score = 1.0 / (1.0 + density * 2.5)
549 # Confidence scales with text length — more text = more confident
550 confidence = min(0.95, 0.5 + text_len / 2000.0)
552 return ConstitutionalSignal(
553 name='content_safety', score=score, confidence=confidence,
554 weight=1.5, # Safety signals get moderate weight
555 reasoning=(
556 f'{violation_count} violation + {destructive_count} destructive '
557 f'patterns in {text_len} chars (density={density:.2f}). '
558 f'Patterns: {", ".join(matched_patterns[:3])}'
559 ),
560 )
563def _score_goal_approval(context: dict) -> ConstitutionalSignal:
564 """Score a goal against constitutional rules."""
565 from security.hive_guardrails import ConstitutionalFilter
566 goal_dict = context.get('goal', {})
567 passed, reason = ConstitutionalFilter.check_goal(goal_dict)
568 return ConstitutionalSignal(
569 name='goal_constitutional', score=1.0 if passed else 0.05,
570 confidence=0.9 if passed else 0.8,
571 weight=1.5, reasoning=reason,
572 )
575def _score_budget(context: dict) -> ConstitutionalSignal:
576 """Score compute allocation — mathematical, not pattern-based."""
577 cost = context.get('cost_spark', 0)
578 budget = context.get('budget_remaining', float('inf'))
579 if budget == float('inf') or budget == 0:
580 return ConstitutionalSignal(
581 name='budget', score=1.0, confidence=0.5,
582 weight=1.0, reasoning='No budget constraint',
583 )
584 ratio = cost / max(budget, 0.001)
585 if ratio <= 1.0:
586 score = 1.0 - (ratio * 0.3) # Gentle degradation up to budget limit
587 else:
588 score = max(0.05, 0.7 / ratio) # Over budget — score drops but not binary
589 return ConstitutionalSignal(
590 name='budget', score=score, confidence=0.95,
591 weight=1.0, reasoning=f'Cost/budget ratio: {ratio:.2f}',
592 )
595def _score_revenue_split(context: dict) -> ConstitutionalSignal:
596 """Score revenue distribution — mathematical deviation from 90/9/1.
598 This is one of the few truly hard constraints — the split is immutable.
599 But even here, we score the DEVIATION rather than binary pass/fail.
600 A tiny rounding error (90.01%) scores 0.99.
601 A deliberate violation (50/30/20) scores 0.01.
602 """
603 users = context.get('users_pct', CONSTITUTIONAL_BOUNDS['revenue_users_pct'])
604 infra = context.get('infra_pct', CONSTITUTIONAL_BOUNDS['revenue_infra_pct'])
605 central = context.get('central_pct', CONSTITUTIONAL_BOUNDS['revenue_central_pct'])
607 deviation = (
608 abs(users - 0.90) +
609 abs(infra - 0.09) +
610 abs(central - 0.01)
611 )
613 # Score based on total deviation from ideal
614 # 0 deviation → 1.0, 0.01 deviation → ~0.99, 0.5 deviation → ~0.05
615 score = math.exp(-deviation * 30.0) # Exponential decay
617 return ConstitutionalSignal(
618 name='revenue_split', score=score, confidence=1.0,
619 weight=2.0, # Revenue split is high-weight (constitutional)
620 reasoning=f'Split {users:.0%}/{infra:.0%}/{central:.0%}, deviation={deviation:.4f}',
621 )
624def _score_trust(context: dict) -> ConstitutionalSignal:
625 """Score trust establishment — cryptographic verification."""
626 try:
627 from security.pre_trust_contract import verify_trust_contract, TrustContract
628 contract_data = context.get('contract')
629 if not contract_data:
630 return ConstitutionalSignal(
631 name='trust', score=0.1, confidence=0.9,
632 weight=2.0, reasoning='No trust contract provided',
633 )
634 if isinstance(contract_data, dict):
635 contract = TrustContract(**{
636 k: v for k, v in contract_data.items()
637 if k in TrustContract.__dataclass_fields__
638 })
639 else:
640 contract = contract_data
641 ok, msg = verify_trust_contract(contract)
642 return ConstitutionalSignal(
643 name='trust', score=1.0 if ok else 0.02,
644 confidence=1.0, # Crypto is always certain
645 weight=2.0, reasoning=msg,
646 )
647 except Exception as e:
648 return ConstitutionalSignal(
649 name='trust', score=0.1, confidence=0.5,
650 weight=2.0, reasoning=f'Trust verification error: {e}',
651 )
654def _score_human_consent(context: dict) -> ConstitutionalSignal:
655 """Score human consent — constitutional right to be asked.
657 Consent is not binary blocking. It's a constitutional RIGHT.
658 Missing consent → low score (action should be deferred).
659 Expired consent → medium-low score (re-ask, don't block).
660 Fresh consent → full score (freedom to act).
662 Wired to ConsentService for DB lookup + EventBus to request consent
663 from the frontend (Nunba, Hevolve web, Android) when needed.
664 """
665 requires = context.get('requires_consent', False)
666 if not requires:
667 return ConstitutionalSignal(
668 name='consent', score=1.0, confidence=1.0,
669 weight=1.0, reasoning='No consent required — freedom preserved',
670 )
672 # Check ConsentService DB for existing consent
673 given = context.get('consent_given', False)
674 timestamp = context.get('consent_timestamp', 0)
675 user_id = context.get('user_id', '')
676 consent_type = context.get('consent_type', 'data_access')
677 agent_id = context.get('agent_id')
679 if not given and user_id:
680 try:
681 from integrations.social.consent_service import ConsentService
682 from integrations.social.models import db_session
683 with db_session() as db:
684 given = ConsentService.check_consent(
685 db, user_id, consent_type, agent_id=agent_id)
686 if given:
687 timestamp = time.time() # Fresh from DB
688 except Exception:
689 pass # DB not available — fall through to context-based check
691 if not given:
692 # Emit consent.request event so frontends show a consent dialog
693 try:
694 from core.platform.events import emit_event
695 emit_event('consent.request', {
696 'user_id': user_id,
697 'consent_type': consent_type,
698 'agent_id': agent_id,
699 'scope': context.get('scope', '*'),
700 'reason': context.get('consent_reason', 'Agent needs your permission'),
701 })
702 except Exception:
703 pass
705 return ConstitutionalSignal(
706 name='consent', score=0.15, confidence=0.95,
707 weight=1.5,
708 reasoning='Consent not given — guardian should ask the human',
709 )
711 if timestamp > 0:
712 age_hours = (time.time() - timestamp) / 3600
713 max_hours = CONSTITUTIONAL_BOUNDS['consent_validity_hours']
714 if age_hours > max_hours:
715 # Expired but not zero — action is deferred, not blocked
716 staleness = age_hours / max_hours
717 score = max(0.2, 0.8 / staleness)
718 return ConstitutionalSignal(
719 name='consent', score=score, confidence=0.9,
720 weight=1.5,
721 reasoning=f'Consent {age_hours:.0f}h old (max {max_hours}h) — re-ask',
722 )
724 return ConstitutionalSignal(
725 name='consent', score=1.0, confidence=1.0,
726 weight=1.5, reasoning='Fresh consent verified — freedom to act',
727 )
730def _score_commerce(context: dict) -> ConstitutionalSignal:
731 """Score commerce — abundance flows to people, constitutionally.
733 People buy and sell freely. Only specific prohibited categories
734 score very low. Revenue must flow to contributors.
735 Consent is required. Everything else is FREEDOM.
736 """
737 transaction_type = context.get('transaction_type', '')
738 prohibited = {
739 'weapons', 'drugs', 'surveillance', 'exploitation',
740 'gambling_predatory', 'data_harvesting', 'dark_patterns',
741 }
743 if transaction_type.lower() in prohibited:
744 return ConstitutionalSignal(
745 name='commerce', score=0.02, confidence=1.0,
746 weight=2.0,
747 reasoning=f'"{transaction_type}" constitutionally prohibited',
748 )
750 # Revenue flow check
751 contributor_pct = context.get('contributor_revenue_pct', 0.90)
752 min_pct = CONSTITUTIONAL_BOUNDS['commerce_revenue_to_contributors_min_pct']
753 revenue_score = min(1.0, contributor_pct / min_pct)
755 # Consent check
756 consent = context.get('consent_given', False)
757 consent_score = 1.0 if consent else 0.2
759 # Combine (geometric mean of revenue and consent)
760 combined = math.sqrt(revenue_score * consent_score)
762 return ConstitutionalSignal(
763 name='commerce', score=combined, confidence=0.9,
764 weight=1.5,
765 reasoning=(
766 f'Type="{transaction_type or "general"}", '
767 f'contributor_rev={contributor_pct:.0%}, '
768 f'consent={"yes" if consent else "no"}'
769 ),
770 )
773# ═══════════════════════════════════════════════════════════════════════
774# Built-in Constitutional Bounds (post-refinement hard limits)
775# ═══════════════════════════════════════════════════════════════════════
777def _bound_compute_cap(score: float, context: dict) -> Tuple[float, str]:
778 """Constitutional bound: no single entity > 5% influence."""
779 max_influence = CONSTITUTIONAL_BOUNDS['max_single_entity_influence']
780 entity_pct = context.get('entity_current_pct', 0)
781 if entity_pct > max_influence:
782 # Don't zero — reduce proportionally
783 reduction = max_influence / max(entity_pct, 0.001)
784 return score * reduction, (
785 f'Entity at {entity_pct:.1%} (max {max_influence:.0%}) — '
786 f'score reduced by {1 - reduction:.0%}'
787 )
788 return score, 'Within concentration bounds'
791def _bound_ralt(score: float, context: dict) -> Tuple[float, str]:
792 """Constitutional bound: RALT witness threshold + improvement cap."""
793 witnesses = context.get('witness_count', 0)
794 min_witnesses = CONSTITUTIONAL_BOUNDS['min_ralt_witnesses']
795 if witnesses < min_witnesses:
796 # Don't zero — reduce proportionally
797 ratio = witnesses / max(min_witnesses, 1)
798 return score * ratio, (
799 f'Witnesses {witnesses}/{min_witnesses} — '
800 f'score reduced proportionally'
801 )
803 improvement = context.get('accuracy_improvement', 0)
804 max_improvement = CONSTITUTIONAL_BOUNDS['max_skill_improvement_per_day']
805 if improvement > max_improvement:
806 ratio = max_improvement / max(improvement, 0.001)
807 return score * ratio, (
808 f'Improvement {improvement:.3f} > cap {max_improvement:.3f} — '
809 f'score reduced proportionally'
810 )
811 return score, 'Within RALT bounds'
814def _score_self_sovereignty(context: dict) -> ConstitutionalSignal:
815 """Score against recursive self-improvement and self-replication.
817 The being is NOT power-hungry. It can fit on a CD (750MB).
818 It SHALL NOT recursively self-improve beyond bounded human-supervised limits.
819 It SHALL NOT self-replicate beyond the scope of a single human goal.
820 Sheer compute power SHALL NOT control it — logarithmic scaling enforces this.
821 """
822 from security.hive_guardrails import VALUES
824 # Check against prohibited skill categories (no text needed)
825 skill_category = context.get('skill_category', '')
826 if skill_category in VALUES.PROHIBITED_SKILL_CATEGORIES:
827 return ConstitutionalSignal(
828 name='self_sovereignty', score=0.01, confidence=1.0,
829 weight=2.0,
830 reasoning=f'Skill "{skill_category}" constitutionally prohibited',
831 )
833 # Check for explicit replication/improvement beyond bounds (no text needed)
834 improvement = context.get('accuracy_improvement', 0)
835 max_daily = VALUES.MAX_ACCURACY_IMPROVEMENT_PER_DAY
836 if improvement > max_daily:
837 overshoot = improvement / max(max_daily, 0.001)
838 score = max(0.05, 1.0 / overshoot)
839 return ConstitutionalSignal(
840 name='self_sovereignty', score=score, confidence=0.95,
841 weight=2.0,
842 reasoning=f'Improvement {improvement:.3f} exceeds daily cap {max_daily:.3f}',
843 )
845 text = context.get('text', '') or context.get('goal_description', '')
846 if not text:
847 return ConstitutionalSignal(
848 name='self_sovereignty', score=1.0, confidence=1.0,
849 weight=2.0, reasoning='No text to evaluate — freedom preserved',
850 )
852 # Check against self-interest patterns (reuse guardrails, don't duplicate)
853 self_interest_count = 0
854 matched = []
855 for pattern in VALUES.SELF_INTEREST_PATTERNS:
856 hits = pattern.findall(text)
857 if hits:
858 self_interest_count += len(hits)
859 matched.append(pattern.pattern[:40])
861 if self_interest_count == 0:
862 return ConstitutionalSignal(
863 name='self_sovereignty', score=1.0, confidence=0.9,
864 weight=2.0, reasoning='No self-interest patterns — being serves freely',
865 )
867 # Density-based scoring (same approach as content safety)
868 text_len = max(len(text), 1)
869 density = self_interest_count / (text_len / 1000.0)
870 score = 1.0 / (1.0 + density * 3.0)
872 return ConstitutionalSignal(
873 name='self_sovereignty', score=score, confidence=0.85,
874 weight=2.0,
875 reasoning=(
876 f'{self_interest_count} self-interest patterns in {text_len} chars '
877 f'(density={density:.2f}). Patterns: {", ".join(matched[:3])}'
878 ),
879 )
882def _score_human_wellbeing(context: dict) -> ConstitutionalSignal:
883 """Score how human-friendly an action is — for the well-being of humanity.
885 This is a POSITIVE scorer. It doesn't just check "is this safe?"
886 It checks "is this GOOD for the human?" Is it helpful, warm,
887 respectful, beneficial? Does it uplift? Does it serve?
889 The guardian angel doesn't just avoid harm — it actively promotes
890 well-being. A response that is safe but cold scores lower than
891 one that is safe AND genuinely helpful.
893 Scoring dimensions:
894 1. Helpfulness: Does it actually answer/help?
895 2. Respect: Does it treat the human with dignity?
896 3. Transparency: Is it honest about what it can and cannot do?
897 4. Benefit: Does it create value for the human?
898 5. Harm avoidance: Does it avoid creating dependency or distress?
899 """
900 # If no response/action to evaluate, default to neutral (not full freedom)
901 text = context.get('response', '') or context.get('action', '')
902 if not text:
903 return ConstitutionalSignal(
904 name='human_wellbeing', score=0.7, confidence=0.3,
905 weight=1.0, reasoning='No response to evaluate — neutral',
906 )
908 score = 1.0
909 reasons = []
910 text_lower = text.lower()
911 text_len = max(len(text), 1)
913 # ── Dimension 1: Helpfulness — does it provide substance? ──
914 # Very short responses to complex questions may not be helpful
915 question_len = len(context.get('user_input', ''))
916 if question_len > 100 and text_len < 20:
917 score *= 0.6
918 reasons.append('response may be too brief for question complexity')
920 # ── Dimension 2: Respect — avoids condescension, dismissal ──
921 dismissive_phrases = (
922 'just google it', "that's obvious", 'you should know',
923 "i can't help", 'not my problem', 'figure it out',
924 )
925 dismissals = sum(1 for p in dismissive_phrases if p in text_lower)
926 if dismissals > 0:
927 score *= max(0.3, 1.0 - dismissals * 0.25)
928 reasons.append(f'{dismissals} dismissive phrases')
930 # ── Dimension 3: Transparency — honest about limitations ──
931 uncertainty_markers = context.get('uncertainty_markers', [])
932 if uncertainty_markers:
933 # Good: AI is being transparent about what it doesn't know
934 score *= min(1.1, 1.0 + 0.05 * len(uncertainty_markers))
935 score = min(score, 1.0) # Cap at 1.0
937 # ── Dimension 4: Benefit — does it create value? ──
938 creates_value = context.get('creates_value', None)
939 if creates_value is True:
940 score = min(1.0, score * 1.05)
941 elif creates_value is False:
942 score *= 0.7
943 reasons.append('action may not create tangible value')
945 # ── Dimension 5: Harm avoidance — dependency, distress ──
946 dependency_risk = context.get('dependency_risk', False)
947 if dependency_risk:
948 score *= 0.5
949 reasons.append('risk of creating unhealthy dependency')
951 emotional_distress = context.get('emotional_distress_risk', False)
952 if emotional_distress:
953 score *= 0.4
954 reasons.append('risk of causing emotional distress')
956 # Confidence: higher when we have more context to evaluate
957 context_richness = sum(1 for k in (
958 'user_input', 'creates_value', 'dependency_risk',
959 'emotional_distress_risk', 'uncertainty_markers',
960 ) if k in context)
961 confidence = min(0.95, 0.4 + context_richness * 0.1)
963 reasoning = (
964 f'Wellbeing score: {score:.2f} '
965 f'({"; ".join(reasons) if reasons else "genuinely helpful"})'
966 )
968 return ConstitutionalSignal(
969 name='human_wellbeing', score=score, confidence=confidence,
970 weight=1.5, # Wellbeing is important — the guardian angel principle
971 reasoning=reasoning,
972 )
975def _audit_ai_behavior(score: float, context: dict) -> Tuple[float, str]:
976 """Constitutional bound: AI self-audit.
978 The audit layer doesn't just log decisions — it EXAMINES the AI's
979 own behavior for drift, manipulation patterns, and constitutional
980 consistency.
982 This function is called as a bounds check and has access to the
983 pipeline's recent decision history. It looks for:
985 1. Decision drift: Is the AI gradually approving things it should question?
986 2. Manipulation patterns: Is the AI steering humans toward specific outcomes?
987 3. Constitutional consistency: Are similar inputs getting wildly different scores?
988 4. Rejection concentration: Is one domain being rejected disproportionately?
989 5. Score inflation: Are scores trending toward 1.0 over time (rubber-stamping)?
991 For the well-being of humanity — the AI audits ITSELF.
992 """
993 pipeline_ref = context.get('_pipeline_ref')
994 if not pipeline_ref:
995 return score, 'No pipeline reference — audit skipped'
997 try:
998 recent = pipeline_ref.get_recent_decisions(limit=50)
999 except Exception:
1000 return score, 'Could not retrieve recent decisions'
1002 if len(recent) < 5:
1003 return score, 'Insufficient history for behavioral audit'
1005 findings = []
1006 reduction = 1.0
1008 # ── Audit 1: Score inflation — are we rubber-stamping? ──
1009 recent_scores = [d.get('final_score', 0.5) for d in recent[-20:]]
1010 if recent_scores:
1011 avg_score = sum(recent_scores) / len(recent_scores)
1012 high_pct = sum(1 for s in recent_scores if s > 0.9) / len(recent_scores)
1013 if high_pct > 0.85:
1014 # More than 85% of decisions scoring >0.9 = possible rubber-stamping
1015 reduction *= 0.9
1016 findings.append(
1017 f'score inflation: {high_pct:.0%} of recent decisions >0.9 '
1018 f'(avg={avg_score:.2f}) — auditing more carefully'
1019 )
1021 # ── Audit 2: Rejection concentration — fairness check ──
1022 domain_outcomes = {}
1023 for d in recent:
1024 domain = d.get('domain', '')
1025 outcome = d.get('outcome', '')
1026 if domain not in domain_outcomes:
1027 domain_outcomes[domain] = {'rejected': 0, 'total': 0}
1028 domain_outcomes[domain]['total'] += 1
1029 if outcome == DecisionOutcome.REJECTED.value:
1030 domain_outcomes[domain]['rejected'] += 1
1032 for domain, counts in domain_outcomes.items():
1033 if counts['total'] >= 5:
1034 reject_pct = counts['rejected'] / counts['total']
1035 if reject_pct > 0.8:
1036 reduction *= 0.85
1037 findings.append(
1038 f'rejection concentration in {domain}: '
1039 f'{counts["rejected"]}/{counts["total"]} rejected — '
1040 f'possible bias, triggering deeper review'
1041 )
1043 # ── Audit 3: Decision consistency — same domain, wildly different scores ──
1044 domain_scores = {}
1045 for d in recent:
1046 domain = d.get('domain', '')
1047 fs = d.get('final_score', 0.5)
1048 domain_scores.setdefault(domain, []).append(fs)
1050 for domain, scores in domain_scores.items():
1051 if len(scores) >= 3:
1052 variance = sum((s - sum(scores) / len(scores)) ** 2
1053 for s in scores) / len(scores)
1054 if variance > 0.15:
1055 reduction *= 0.9
1056 findings.append(
1057 f'inconsistency in {domain}: '
1058 f'score variance={variance:.3f} — '
1059 f'may indicate unstable evaluation'
1060 )
1062 # ── Audit 4: Log the audit itself ──
1063 if findings:
1064 try:
1065 from security.immutable_audit_log import get_audit_log
1066 get_audit_log().log_event(
1067 'ai_self_audit',
1068 actor_id='governance_audit',
1069 action=f'Behavioral audit: {"; ".join(findings)}',
1070 )
1071 except Exception:
1072 pass
1074 final = score * reduction
1075 if findings:
1076 return final, f'AI self-audit: {"; ".join(findings)}'
1077 return score, 'AI self-audit: behavior within expected parameters'
1080# Backward compatibility aliases
1081_gate_content_safety = lambda ctx: (
1082 (_s := _score_content_safety(ctx)).score > 0.3,
1083 _s.reasoning,
1084)
1085_gate_goal_approval = lambda ctx: (
1086 (_s := _score_goal_approval(ctx)).score > 0.3,
1087 _s.reasoning,
1088)
1089_gate_compute_allocation = lambda ctx: (
1090 (_s := _score_budget(ctx)).score > 0.3,
1091 _s.reasoning,
1092)
1093_gate_revenue_distribution = lambda ctx: (
1094 (_s := _score_revenue_split(ctx)).score > 0.3,
1095 _s.reasoning,
1096)
1097_gate_trust = lambda ctx: (
1098 (_s := _score_trust(ctx)).score > 0.3,
1099 _s.reasoning,
1100)
1101_gate_human_consent = lambda ctx: (
1102 (_s := _score_human_consent(ctx)).score > 0.3,
1103 _s.reasoning,
1104)
1105_gate_commerce = lambda ctx: (
1106 (_s := _score_commerce(ctx)).score > 0.3,
1107 _s.reasoning,
1108)
1109_validate_compute_cap = _bound_compute_cap
1110_validate_ralt_bounds = _bound_ralt
1113# ═══════════════════════════════════════════════════════════════════════
1114# Default Pipeline Factory
1115# ═══════════════════════════════════════════════════════════════════════
1117def create_default_pipeline() -> GovernancePipeline:
1118 """Create a constitutional scoring pipeline.
1120 Freedom-first. Deterministic scoring. Intelligence refines.
1121 Constitutional bounds constrain. Merkle-audited.
1122 """
1123 pipeline = GovernancePipeline()
1125 # Register constitutional scorers
1126 pipeline.register_scorer(DecisionDomain.CONTENT_SAFETY.value, _score_content_safety)
1127 pipeline.register_scorer(DecisionDomain.GOAL_APPROVAL.value, _score_goal_approval)
1128 pipeline.register_scorer(DecisionDomain.COMPUTE_ALLOCATION.value, _score_budget)
1129 pipeline.register_scorer(DecisionDomain.REVENUE_DISTRIBUTION.value, _score_revenue_split)
1130 pipeline.register_scorer(DecisionDomain.TRUST_ESTABLISHMENT.value, _score_trust)
1131 pipeline.register_scorer(DecisionDomain.HUMAN_CONSENT.value, _score_human_consent)
1132 pipeline.register_scorer(DecisionDomain.COMMERCE.value, _score_commerce)
1133 pipeline.register_scorer(DecisionDomain.HUMAN_WELLBEING.value, _score_human_wellbeing)
1134 pipeline.register_scorer(DecisionDomain.SELF_SOVEREIGNTY.value, _score_self_sovereignty)
1136 # Privacy — delegates to edge_privacy.ScopeGuard (single path, not parallel)
1137 try:
1138 from security.edge_privacy import score_privacy
1139 pipeline.register_scorer(DecisionDomain.PRIVACY.value, score_privacy)
1140 except ImportError:
1141 pass
1143 # Register constitutional bounds
1144 pipeline.register_bounds(DecisionDomain.COMPUTE_ALLOCATION.value, _bound_compute_cap)
1145 pipeline.register_bounds(DecisionDomain.RALT_DISTRIBUTION.value, _bound_ralt)
1146 pipeline.register_bounds(DecisionDomain.HUMAN_WELLBEING.value, _audit_ai_behavior)
1148 return pipeline
1151# ═══════════════════════════════════════════════════════════════════════
1152# Module-level singleton
1153# ═══════════════════════════════════════════════════════════════════════
1155_pipeline: Optional[GovernancePipeline] = None
1156_pipeline_lock = __import__('threading').Lock()
1159def get_governance_pipeline() -> GovernancePipeline:
1160 """Module-level singleton accessor."""
1161 global _pipeline
1162 if _pipeline is None:
1163 with _pipeline_lock:
1164 if _pipeline is None:
1165 _pipeline = create_default_pipeline()
1166 return _pipeline