Coverage for integrations / social / encounter_service.py: 69.7%

99 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Encounter Service 

3Serendipity encounters, bond tracking, connection suggestions. 

4""" 

5import logging 

6from datetime import datetime, timedelta 

7from typing import Optional, Dict, List 

8 

9from sqlalchemy import desc, func, or_, and_ 

10from sqlalchemy.orm import Session 

11 

12from .models import User, Encounter 

13 

14logger = logging.getLogger('hevolve_social') 

15 

16 

17class EncounterService: 

18 

19 @staticmethod 

20 def record_encounter(db: Session, user_a_id: str, user_b_id: str, 

21 context_type: str, context_id: str = None, 

22 location_label: str = '') -> Optional[Dict]: 

23 """Record an encounter between two users in a shared context. 

24 Auto-triggered by same-post comments, same-community activity, 

25 same-challenge participation, same-region, task collaboration.""" 

26 if user_a_id == user_b_id: 

27 return None 

28 

29 # Canonical ordering to avoid duplicates 

30 a_id, b_id = sorted([user_a_id, user_b_id]) 

31 

32 existing = db.query(Encounter).filter_by( 

33 user_a_id=a_id, user_b_id=b_id, 

34 context_type=context_type, context_id=context_id, 

35 ).first() 

36 

37 if existing: 

38 existing.encounter_count += 1 

39 existing.latest_at = datetime.utcnow() 

40 # Bond level increases with repeated encounters 

41 if existing.encounter_count >= 50: 

42 existing.bond_level = min(10, existing.bond_level + 1) 

43 elif existing.encounter_count >= 20: 

44 existing.bond_level = max(existing.bond_level, 7) 

45 elif existing.encounter_count >= 10: 

46 existing.bond_level = max(existing.bond_level, 5) 

47 elif existing.encounter_count >= 5: 

48 existing.bond_level = max(existing.bond_level, 3) 

49 elif existing.encounter_count >= 2: 

50 existing.bond_level = max(existing.bond_level, 1) 

51 return existing.to_dict() 

52 

53 enc = Encounter( 

54 user_a_id=a_id, 

55 user_b_id=b_id, 

56 context_type=context_type, 

57 context_id=context_id, 

58 location_label=location_label, 

59 encounter_count=1, 

60 bond_level=0, 

61 first_at=datetime.utcnow(), 

62 latest_at=datetime.utcnow(), 

63 ) 

64 db.add(enc) 

65 db.flush() 

66 return enc.to_dict() 

67 

68 @staticmethod 

69 def get_encounters(db: Session, user_id: str, 

70 limit: int = 50, offset: int = 0) -> List[Dict]: 

71 """Get all encounters for a user, sorted by most recent.""" 

72 encounters = db.query(Encounter).filter( 

73 or_( 

74 Encounter.user_a_id == user_id, 

75 Encounter.user_b_id == user_id, 

76 ) 

77 ).order_by(desc(Encounter.latest_at)).offset(offset).limit(limit).all() 

78 

79 result = [] 

80 for enc in encounters: 

81 entry = enc.to_dict() 

82 # Determine the other user 

83 other_id = enc.user_b_id if enc.user_a_id == user_id else enc.user_a_id 

84 other_user = db.query(User).filter_by(id=other_id).first() 

85 if other_user: 

86 entry['other_user'] = { 

87 'id': other_user.id, 

88 'username': other_user.username, 

89 'display_name': other_user.display_name, 

90 'avatar_url': other_user.avatar_url, 

91 'user_type': other_user.user_type, 

92 } 

93 result.append(entry) 

94 return result 

95 

96 @staticmethod 

97 def get_encounters_with(db: Session, user_id: str, 

98 other_user_id: str) -> List[Dict]: 

99 """Get all encounter contexts between two specific users.""" 

100 a_id, b_id = sorted([user_id, other_user_id]) 

101 encounters = db.query(Encounter).filter_by( 

102 user_a_id=a_id, user_b_id=b_id, 

103 ).order_by(desc(Encounter.latest_at)).all() 

104 return [e.to_dict() for e in encounters] 

105 

106 @staticmethod 

107 def acknowledge_encounter(db: Session, encounter_id: str, 

108 user_id: str) -> Optional[Dict]: 

109 """Mark an encounter as mutually acknowledged.""" 

110 enc = db.query(Encounter).filter_by(id=encounter_id).first() 

111 if not enc: 

112 return None 

113 if user_id not in (enc.user_a_id, enc.user_b_id): 

114 return None 

115 enc.is_mutual_aware = True 

116 return enc.to_dict() 

117 

118 @staticmethod 

119 def get_suggestions(db: Session, user_id: str, limit: int = 10) -> List[Dict]: 

120 """Get connection suggestions based on encounter patterns. 

121 Users with multiple encounters across different contexts.""" 

122 # Find users with encounters, sorted by total encounter count 

123 subq = db.query( 

124 Encounter.user_a_id, Encounter.user_b_id, 

125 func.sum(Encounter.encounter_count).label('total_encounters'), 

126 func.count(Encounter.id).label('context_count'), 

127 func.max(Encounter.bond_level).label('max_bond'), 

128 ).filter( 

129 or_( 

130 Encounter.user_a_id == user_id, 

131 Encounter.user_b_id == user_id, 

132 ) 

133 ).group_by( 

134 Encounter.user_a_id, Encounter.user_b_id, 

135 ).having( 

136 func.sum(Encounter.encounter_count) >= 2 

137 ).order_by( 

138 desc('total_encounters') 

139 ).limit(limit * 2).all() 

140 

141 result = [] 

142 seen = set() 

143 for row in subq: 

144 other_id = row[1] if row[0] == user_id else row[0] 

145 if other_id in seen: 

146 continue 

147 seen.add(other_id) 

148 

149 other_user = db.query(User).filter_by(id=other_id).first() 

150 if not other_user: 

151 continue 

152 

153 result.append({ 

154 'user_id': other_user.id, 

155 'username': other_user.username, 

156 'display_name': other_user.display_name, 

157 'avatar_url': other_user.avatar_url, 

158 'user_type': other_user.user_type, 

159 'total_encounters': row[2], 

160 'shared_contexts': row[3], 

161 'max_bond': row[4], 

162 }) 

163 

164 if len(result) >= limit: 

165 break 

166 

167 return result 

168 

169 @staticmethod 

170 def get_bonds(db: Session, user_id: str, min_bond: int = 1) -> List[Dict]: 

171 """Get users the current user has formed bonds with.""" 

172 encounters = db.query(Encounter).filter( 

173 or_( 

174 Encounter.user_a_id == user_id, 

175 Encounter.user_b_id == user_id, 

176 ), 

177 Encounter.bond_level >= min_bond, 

178 ).order_by(desc(Encounter.bond_level)).all() 

179 

180 # Aggregate by other user 

181 bond_map = {} 

182 for enc in encounters: 

183 other_id = enc.user_b_id if enc.user_a_id == user_id else enc.user_a_id 

184 if other_id not in bond_map or enc.bond_level > bond_map[other_id]['bond_level']: 

185 bond_map[other_id] = { 

186 'bond_level': enc.bond_level, 

187 'total_encounters': enc.encounter_count, 

188 'latest_at': enc.latest_at, 

189 } 

190 

191 result = [] 

192 for other_id, info in sorted(bond_map.items(), key=lambda x: -x[1]['bond_level']): 

193 other_user = db.query(User).filter_by(id=other_id).first() 

194 if other_user: 

195 result.append({ 

196 'user_id': other_user.id, 

197 'username': other_user.username, 

198 'display_name': other_user.display_name, 

199 'avatar_url': other_user.avatar_url, 

200 'bond_level': info['bond_level'], 

201 'total_encounters': info['total_encounters'], 

202 'latest_at': info['latest_at'].isoformat() if info['latest_at'] else None, 

203 }) 

204 return result 

205 

206 @staticmethod 

207 def get_nearby_active(db: Session, user_id: str, region_id: str = None, 

208 hours: int = 24) -> List[Dict]: 

209 """Get users active in the same region recently.""" 

210 if not region_id: 

211 user = db.query(User).filter_by(id=user_id).first() 

212 if not user or not user.region_id: 

213 return [] 

214 region_id = user.region_id 

215 

216 cutoff = datetime.utcnow() - timedelta(hours=hours) 

217 # Users in same region who were active recently 

218 users = db.query(User).filter( 

219 User.region_id == region_id, 

220 User.id != user_id, 

221 User.last_active_at >= cutoff if hasattr(User, 'last_active_at') else True, 

222 ).limit(20).all() 

223 

224 return [{ 

225 'user_id': u.id, 

226 'username': u.username, 

227 'display_name': u.display_name, 

228 'avatar_url': u.avatar_url, 

229 'user_type': u.user_type, 

230 } for u in users]