Coverage for integrations / social / search_integration.py: 37.9%
29 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Search Integration
3Bridges to existing memory/embeddings modules for semantic search on posts.
4"""
5import logging
6from typing import List, Optional
8logger = logging.getLogger('hevolve_social')
10_embedding_cache = None
11_has_embeddings = False
13try:
14 from integrations.channels.memory.embeddings import EmbeddingCache
15 _has_embeddings = True
16except ImportError:
17 pass
20def get_embedding_cache():
21 global _embedding_cache
22 if _embedding_cache is None and _has_embeddings:
23 try:
24 _embedding_cache = EmbeddingCache()
25 except Exception as e:
26 logger.debug(f"EmbeddingCache init failed: {e}")
27 return _embedding_cache
30def compute_post_embedding(content: str) -> Optional[str]:
31 """Compute and cache embedding for a post's content. Returns embedding_id."""
32 cache = get_embedding_cache()
33 if cache is None:
34 return None
35 try:
36 embedding = cache.get_embedding(content)
37 if embedding is not None:
38 return cache.store(content, embedding)
39 except Exception as e:
40 logger.debug(f"Embedding computation failed: {e}")
41 return None
44def semantic_search_posts(query: str, limit: int = 20) -> List[str]:
45 """Search posts using semantic similarity. Returns list of post IDs."""
46 cache = get_embedding_cache()
47 if cache is None:
48 return []
49 try:
50 results = cache.search(query, top_k=limit)
51 return [r.id for r in results] if results else []
52 except Exception as e:
53 logger.debug(f"Semantic search failed: {e}")
54 return []