Coverage for integrations / social / backup_service.py: 71.4%
21 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Encrypted Backup & Restore Service
3Zero-knowledge backup: Fernet encryption (AES-128-CBC + HMAC-SHA256)
4Key derived from user passphrase via PBKDF2 (600K iterations).
5Server stores only opaque ciphertext - cannot read user data.
6"""
7import base64
8import hashlib
9import json
10import logging
11import os
13from cryptography.fernet import Fernet
14from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
15from cryptography.hazmat.primitives import hashes
17from .models import (
18 User, Post, Comment, Vote, BackupMetadata,
19)
21logger = logging.getLogger('hevolve_social')
23PBKDF2_ITERATIONS = 600_000
24BACKUP_DIR_NAME = 'backups'
27def _get_backup_dir():
28 try:
29 from core.platform_paths import get_db_dir
30 base = os.path.join(get_db_dir(), BACKUP_DIR_NAME)
31 except ImportError:
32 base = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', BACKUP_DIR_NAME)
33 os.makedirs(base, exist_ok=True)
34 return base
37def derive_key(passphrase: str, salt: bytes) -> bytes:
38 """Derive a Fernet key from passphrase + salt using PBKDF2."""
39 kdf = PBKDF2HMAC(
40 algorithm=hashes.SHA256(),
41 length=32,
42 salt=salt,
43 iterations=PBKDF2_ITERATIONS,
44 )
45 key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode()))
46 return key
49def create_backup(db, user_id: str, passphrase: str) -> dict:
50 """
51 Bundle user data → JSON → Fernet encrypt → write file.
52 Returns { backup_id, content_hash, size_bytes }.
53 """
54 user = db.query(User).filter_by(id=user_id).first()
55 if not user:
56 raise ValueError("User not found")
58 # Collect data
59 posts = db.query(Post).filter_by(author_id=user_id).all()
60 comments = db.query(Comment).filter_by(author_id=user_id).all()
61 votes = db.query(Vote).filter_by(user_id=user_id).all()
63 bundle = {
64 'version': 1,
65 'user_id': user_id,
66 'profile': user.to_dict(include_token=False),
67 'posts': [p.to_dict() for p in posts],
68 'comments': [c.to_dict() for c in comments],
69 'votes': [{'target_type': v.target_type, 'target_id': v.target_id,
70 'value': v.value} for v in votes],
71 }
73 # Optional: memory graph data
74 try:
75 from integrations.channels.memory.memory_graph import MemoryGraph
76 mg = MemoryGraph(f"{user_id}_default")
77 memories = mg.get_session_memories(limit=1000)
78 bundle['memories'] = [m.to_dict() for m in memories]
79 except Exception:
80 bundle['memories'] = []
82 plaintext = json.dumps(bundle, default=str).encode()
84 # Encrypt
85 salt = os.urandom(16)
86 key = derive_key(passphrase, salt)
87 f = Fernet(key)
88 ciphertext = f.encrypt(plaintext)
90 # Prepend salt (16 bytes) to ciphertext
91 blob = salt + ciphertext
92 content_hash = hashlib.sha256(blob).hexdigest()
94 # Write to file
95 backup_dir = _get_backup_dir()
96 from .models import _uuid
97 backup_id = _uuid()
98 filepath = os.path.join(backup_dir, f"{user_id}_{backup_id}.enc")
99 with open(filepath, 'wb') as fp:
100 fp.write(blob)
102 # Record metadata
103 meta = BackupMetadata(
104 id=backup_id,
105 user_id=user_id,
106 backup_version=1,
107 content_hash=content_hash,
108 size_bytes=len(blob),
109 )
110 db.add(meta)
111 db.commit()
113 return {
114 'backup_id': backup_id,
115 'content_hash': content_hash,
116 'size_bytes': len(blob),
117 }
120def restore_backup(db, user_id: str, passphrase: str, backup_id: str = None) -> dict:
121 """
122 Read latest (or specified) backup → decrypt → upsert data.
123 Returns { restored_items }.
124 """
125 backup_dir = _get_backup_dir()
127 if backup_id:
128 filepath = os.path.join(backup_dir, f"{user_id}_{backup_id}.enc")
129 else:
130 # Find latest backup for user
131 metas = (db.query(BackupMetadata)
132 .filter_by(user_id=user_id)
133 .order_by(BackupMetadata.created_at.desc())
134 .first())
135 if not metas:
136 raise ValueError("No backups found")
137 filepath = os.path.join(backup_dir, f"{user_id}_{metas.id}.enc")
139 if not os.path.exists(filepath):
140 raise ValueError("Backup file not found")
142 with open(filepath, 'rb') as fp:
143 blob = fp.read()
145 # Extract salt (first 16 bytes) and ciphertext
146 salt = blob[:16]
147 ciphertext = blob[16:]
149 key = derive_key(passphrase, salt)
150 f = Fernet(key)
151 try:
152 plaintext = f.decrypt(ciphertext)
153 except Exception:
154 raise ValueError("Invalid passphrase - decryption failed")
156 bundle = json.loads(plaintext.decode())
157 restored = {'profile': False, 'posts': 0, 'comments': 0, 'votes': 0}
159 # Restore profile fields
160 user = db.query(User).filter_by(id=user_id).first()
161 if user and bundle.get('profile'):
162 profile = bundle['profile']
163 user.display_name = profile.get('display_name', user.display_name)
164 user.bio = profile.get('bio', user.bio)
165 user.avatar_url = profile.get('avatar_url', user.avatar_url)
166 restored['profile'] = True
168 # Restore posts (upsert by id)
169 for p_data in bundle.get('posts', []):
170 existing = db.query(Post).filter_by(id=p_data.get('id')).first()
171 if not existing:
172 post = Post(
173 id=p_data.get('id'),
174 author_id=user_id,
175 title=p_data.get('title', ''),
176 content=p_data.get('content', ''),
177 content_type=p_data.get('content_type', 'text'),
178 )
179 db.add(post)
180 restored['posts'] += 1
182 # Restore comments (upsert by id)
183 for c_data in bundle.get('comments', []):
184 existing = db.query(Comment).filter_by(id=c_data.get('id')).first()
185 if not existing:
186 comment = Comment(
187 id=c_data.get('id'),
188 post_id=c_data.get('post_id'),
189 author_id=user_id,
190 content=c_data.get('content', ''),
191 )
192 db.add(comment)
193 restored['comments'] += 1
195 db.commit()
196 return restored
199def list_backups(db, user_id: str) -> list:
200 """List all backup metadata for a user."""
201 metas = (db.query(BackupMetadata)
202 .filter_by(user_id=user_id)
203 .order_by(BackupMetadata.created_at.desc())
204 .all())
205 return [m.to_dict() for m in metas]