Coverage for integrations / social / backup_service.py: 71.4%

21 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Encrypted Backup & Restore Service 

3Zero-knowledge backup: Fernet encryption (AES-128-CBC + HMAC-SHA256) 

4Key derived from user passphrase via PBKDF2 (600K iterations). 

5Server stores only opaque ciphertext - cannot read user data. 

6""" 

7import base64 

8import hashlib 

9import json 

10import logging 

11import os 

12 

13from cryptography.fernet import Fernet 

14from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC 

15from cryptography.hazmat.primitives import hashes 

16 

17from .models import ( 

18 User, Post, Comment, Vote, BackupMetadata, 

19) 

20 

21logger = logging.getLogger('hevolve_social') 

22 

23PBKDF2_ITERATIONS = 600_000 

24BACKUP_DIR_NAME = 'backups' 

25 

26 

27def _get_backup_dir(): 

28 try: 

29 from core.platform_paths import get_db_dir 

30 base = os.path.join(get_db_dir(), BACKUP_DIR_NAME) 

31 except ImportError: 

32 base = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', BACKUP_DIR_NAME) 

33 os.makedirs(base, exist_ok=True) 

34 return base 

35 

36 

37def derive_key(passphrase: str, salt: bytes) -> bytes: 

38 """Derive a Fernet key from passphrase + salt using PBKDF2.""" 

39 kdf = PBKDF2HMAC( 

40 algorithm=hashes.SHA256(), 

41 length=32, 

42 salt=salt, 

43 iterations=PBKDF2_ITERATIONS, 

44 ) 

45 key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode())) 

46 return key 

47 

48 

49def create_backup(db, user_id: str, passphrase: str) -> dict: 

50 """ 

51 Bundle user data → JSON → Fernet encrypt → write file. 

52 Returns { backup_id, content_hash, size_bytes }. 

53 """ 

54 user = db.query(User).filter_by(id=user_id).first() 

55 if not user: 

56 raise ValueError("User not found") 

57 

58 # Collect data 

59 posts = db.query(Post).filter_by(author_id=user_id).all() 

60 comments = db.query(Comment).filter_by(author_id=user_id).all() 

61 votes = db.query(Vote).filter_by(user_id=user_id).all() 

62 

63 bundle = { 

64 'version': 1, 

65 'user_id': user_id, 

66 'profile': user.to_dict(include_token=False), 

67 'posts': [p.to_dict() for p in posts], 

68 'comments': [c.to_dict() for c in comments], 

69 'votes': [{'target_type': v.target_type, 'target_id': v.target_id, 

70 'value': v.value} for v in votes], 

71 } 

72 

73 # Optional: memory graph data 

74 try: 

75 from integrations.channels.memory.memory_graph import MemoryGraph 

76 mg = MemoryGraph(f"{user_id}_default") 

77 memories = mg.get_session_memories(limit=1000) 

78 bundle['memories'] = [m.to_dict() for m in memories] 

79 except Exception: 

80 bundle['memories'] = [] 

81 

82 plaintext = json.dumps(bundle, default=str).encode() 

83 

84 # Encrypt 

85 salt = os.urandom(16) 

86 key = derive_key(passphrase, salt) 

87 f = Fernet(key) 

88 ciphertext = f.encrypt(plaintext) 

89 

90 # Prepend salt (16 bytes) to ciphertext 

91 blob = salt + ciphertext 

92 content_hash = hashlib.sha256(blob).hexdigest() 

93 

94 # Write to file 

95 backup_dir = _get_backup_dir() 

96 from .models import _uuid 

97 backup_id = _uuid() 

98 filepath = os.path.join(backup_dir, f"{user_id}_{backup_id}.enc") 

99 with open(filepath, 'wb') as fp: 

100 fp.write(blob) 

101 

102 # Record metadata 

103 meta = BackupMetadata( 

104 id=backup_id, 

105 user_id=user_id, 

106 backup_version=1, 

107 content_hash=content_hash, 

108 size_bytes=len(blob), 

109 ) 

110 db.add(meta) 

111 db.commit() 

112 

113 return { 

114 'backup_id': backup_id, 

115 'content_hash': content_hash, 

116 'size_bytes': len(blob), 

117 } 

118 

119 

120def restore_backup(db, user_id: str, passphrase: str, backup_id: str = None) -> dict: 

121 """ 

122 Read latest (or specified) backup → decrypt → upsert data. 

123 Returns { restored_items }. 

124 """ 

125 backup_dir = _get_backup_dir() 

126 

127 if backup_id: 

128 filepath = os.path.join(backup_dir, f"{user_id}_{backup_id}.enc") 

129 else: 

130 # Find latest backup for user 

131 metas = (db.query(BackupMetadata) 

132 .filter_by(user_id=user_id) 

133 .order_by(BackupMetadata.created_at.desc()) 

134 .first()) 

135 if not metas: 

136 raise ValueError("No backups found") 

137 filepath = os.path.join(backup_dir, f"{user_id}_{metas.id}.enc") 

138 

139 if not os.path.exists(filepath): 

140 raise ValueError("Backup file not found") 

141 

142 with open(filepath, 'rb') as fp: 

143 blob = fp.read() 

144 

145 # Extract salt (first 16 bytes) and ciphertext 

146 salt = blob[:16] 

147 ciphertext = blob[16:] 

148 

149 key = derive_key(passphrase, salt) 

150 f = Fernet(key) 

151 try: 

152 plaintext = f.decrypt(ciphertext) 

153 except Exception: 

154 raise ValueError("Invalid passphrase - decryption failed") 

155 

156 bundle = json.loads(plaintext.decode()) 

157 restored = {'profile': False, 'posts': 0, 'comments': 0, 'votes': 0} 

158 

159 # Restore profile fields 

160 user = db.query(User).filter_by(id=user_id).first() 

161 if user and bundle.get('profile'): 

162 profile = bundle['profile'] 

163 user.display_name = profile.get('display_name', user.display_name) 

164 user.bio = profile.get('bio', user.bio) 

165 user.avatar_url = profile.get('avatar_url', user.avatar_url) 

166 restored['profile'] = True 

167 

168 # Restore posts (upsert by id) 

169 for p_data in bundle.get('posts', []): 

170 existing = db.query(Post).filter_by(id=p_data.get('id')).first() 

171 if not existing: 

172 post = Post( 

173 id=p_data.get('id'), 

174 author_id=user_id, 

175 title=p_data.get('title', ''), 

176 content=p_data.get('content', ''), 

177 content_type=p_data.get('content_type', 'text'), 

178 ) 

179 db.add(post) 

180 restored['posts'] += 1 

181 

182 # Restore comments (upsert by id) 

183 for c_data in bundle.get('comments', []): 

184 existing = db.query(Comment).filter_by(id=c_data.get('id')).first() 

185 if not existing: 

186 comment = Comment( 

187 id=c_data.get('id'), 

188 post_id=c_data.get('post_id'), 

189 author_id=user_id, 

190 content=c_data.get('content', ''), 

191 ) 

192 db.add(comment) 

193 restored['comments'] += 1 

194 

195 db.commit() 

196 return restored 

197 

198 

199def list_backups(db, user_id: str) -> list: 

200 """List all backup metadata for a user.""" 

201 metas = (db.query(BackupMetadata) 

202 .filter_by(user_id=user_id) 

203 .order_by(BackupMetadata.created_at.desc()) 

204 .all()) 

205 return [m.to_dict() for m in metas]