Coverage for integrations / social / search_integration.py: 37.9%

29 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Search Integration 

3Bridges to existing memory/embeddings modules for semantic search on posts. 

4""" 

5import logging 

6from typing import List, Optional 

7 

8logger = logging.getLogger('hevolve_social') 

9 

10_embedding_cache = None 

11_has_embeddings = False 

12 

13try: 

14 from integrations.channels.memory.embeddings import EmbeddingCache 

15 _has_embeddings = True 

16except ImportError: 

17 pass 

18 

19 

20def get_embedding_cache(): 

21 global _embedding_cache 

22 if _embedding_cache is None and _has_embeddings: 

23 try: 

24 _embedding_cache = EmbeddingCache() 

25 except Exception as e: 

26 logger.debug(f"EmbeddingCache init failed: {e}") 

27 return _embedding_cache 

28 

29 

30def compute_post_embedding(content: str) -> Optional[str]: 

31 """Compute and cache embedding for a post's content. Returns embedding_id.""" 

32 cache = get_embedding_cache() 

33 if cache is None: 

34 return None 

35 try: 

36 embedding = cache.get_embedding(content) 

37 if embedding is not None: 

38 return cache.store(content, embedding) 

39 except Exception as e: 

40 logger.debug(f"Embedding computation failed: {e}") 

41 return None 

42 

43 

44def semantic_search_posts(query: str, limit: int = 20) -> List[str]: 

45 """Search posts using semantic similarity. Returns list of post IDs.""" 

46 cache = get_embedding_cache() 

47 if cache is None: 

48 return [] 

49 try: 

50 results = cache.search(query, top_k=limit) 

51 return [r.id for r in results] if results else [] 

52 except Exception as e: 

53 logger.debug(f"Semantic search failed: {e}") 

54 return []