Coverage for integrations / agent_engine / compute_borrowing.py: 71.6%

74 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Compute Borrowing Service — request, offer, and settle compute across peers. 

3 

4Builds on peer_discovery.py gossip protocol and hosting_reward_service.py 

5contribution scoring. Peers with idle compute advertise capacity; 

6nodes under pressure can borrow and pay via Spark settlement. 

7""" 

8import logging 

9from datetime import datetime, timedelta 

10from typing import Dict, List, Optional 

11 

12logger = logging.getLogger(__name__) 

13 

14# In-memory tracking (transient / short-lived — not persisted) 

15_compute_offers: Dict[str, Dict] = {} # node_id → {resources, timestamp} 

16_compute_requests: Dict[str, Dict] = {} # request_id → {node_id, task_type, ...} 

17# _compute_debts removed — now persisted via ComputeEscrow table 

18 

19 

20class ComputeBorrowingService: 

21 """Cross-node compute sharing via gossip protocol.""" 

22 

23 @staticmethod 

24 def offer_compute(db, node_id: str, available_resources: Dict) -> Dict: 

25 """Advertise idle compute capacity to the network. 

26 

27 Args: 

28 node_id: Offering node 

29 available_resources: {cpu_pct_free, ram_gb_free, gpu_free_gb} 

30 

31 Returns: Offer confirmation 

32 """ 

33 offer = { 

34 'node_id': node_id, 

35 'resources': available_resources, 

36 'timestamp': datetime.utcnow().isoformat(), 

37 'status': 'available', 

38 } 

39 _compute_offers[node_id] = offer 

40 

41 # Broadcast via gossip 

42 try: 

43 from integrations.social.peer_discovery import get_peer_discovery 

44 pd = get_peer_discovery() 

45 pd.broadcast({ 

46 'type': 'compute_offer', 

47 'payload': offer, 

48 }) 

49 except Exception as e: 

50 logger.debug(f"Gossip broadcast failed: {e}") 

51 

52 return {'success': True, 'offer': offer} 

53 

54 @staticmethod 

55 def request_compute(db, node_id: str, task_type: str, 

56 min_resources: Dict) -> Dict: 

57 """Request compute from the network. 

58 

59 Args: 

60 node_id: Requesting node 

61 task_type: 'inference' | 'training' | 'federation' 

62 min_resources: {min_cpu_pct, min_ram_gb, min_gpu_gb} 

63 

64 Returns: Matched offer or no_match 

65 """ 

66 # Find a matching offer 

67 matched = None 

68 for offer_id, offer in _compute_offers.items(): 

69 if offer['status'] != 'available': 

70 continue 

71 res = offer['resources'] 

72 if (res.get('cpu_pct_free', 0) >= min_resources.get('min_cpu_pct', 0) and 

73 res.get('ram_gb_free', 0) >= min_resources.get('min_ram_gb', 0)): 

74 matched = offer 

75 break 

76 

77 if not matched: 

78 # Broadcast request via gossip for deferred matching 

79 try: 

80 from integrations.social.peer_discovery import get_peer_discovery 

81 pd = get_peer_discovery() 

82 pd.broadcast({ 

83 'type': 'compute_request', 

84 'payload': { 

85 'requester': node_id, 

86 'task_type': task_type, 

87 'min_resources': min_resources, 

88 }, 

89 }) 

90 except Exception: 

91 pass 

92 return {'matched': False, 'reason': 'no_available_offers'} 

93 

94 # Reserve the offer 

95 matched['status'] = 'reserved' 

96 request_id = f"{node_id}_{matched['node_id']}_{task_type}" 

97 _compute_requests[request_id] = { 

98 'requester': node_id, 

99 'provider': matched['node_id'], 

100 'task_type': task_type, 

101 'resources': matched['resources'], 

102 'started_at': datetime.utcnow().isoformat(), 

103 } 

104 

105 # Persist escrow record (fail-open: match succeeds even if DB write fails) 

106 # Estimate cost: 1 Spark per GPU-GB offered per hour (minimum 1) 

107 estimated_spark = max(1, int(matched.get('resources', {}).get('gpu_gb', 1))) 

108 try: 

109 from integrations.social.models import ComputeEscrow 

110 escrow = ComputeEscrow( 

111 debtor_node_id=node_id, 

112 creditor_node_id=matched['node_id'], 

113 request_id=request_id, 

114 task_type=task_type, 

115 spark_amount=estimated_spark, 

116 status='pending', 

117 created_at=datetime.utcnow(), 

118 expires_at=datetime.utcnow() + timedelta(hours=24), 

119 ) 

120 db.add(escrow) 

121 db.commit() 

122 except Exception as e: 

123 logger.warning(f"ComputeEscrow write failed (fail-open): {e}") 

124 try: 

125 db.rollback() 

126 except Exception: 

127 pass 

128 

129 return { 

130 'matched': True, 

131 'request_id': request_id, 

132 'provider': matched['node_id'], 

133 'resources': matched['resources'], 

134 } 

135 

136 @staticmethod 

137 def settle_compute_debt(db, debtor_node_id: str, 

138 creditor_node_id: str, 

139 spark_amount: float) -> Dict: 

140 """Pay a peer for borrowed compute cycles via Spark transfer. 

141 

142 Records settlement and awards Spark to the provider. 

143 """ 

144 try: 

145 from integrations.social.models import PeerNode, ComputeEscrow 

146 provider = db.query(PeerNode).filter_by( 

147 node_id=creditor_node_id).first() 

148 if not provider or not provider.node_operator_id: 

149 return {'error': 'provider_not_found'} 

150 

151 # Award Spark to provider 

152 from integrations.social.resonance_engine import ResonanceService 

153 ResonanceService.award_spark( 

154 db, str(provider.node_operator_id), int(spark_amount), 

155 'compute_borrowing_settlement', debtor_node_id, 

156 f'Compute settlement: {spark_amount} Spark from {debtor_node_id}') 

157 

158 # Settle escrow record in DB 

159 remaining_debt = 0 

160 try: 

161 escrow = db.query(ComputeEscrow).filter( 

162 ComputeEscrow.debtor_node_id == debtor_node_id, 

163 ComputeEscrow.creditor_node_id == creditor_node_id, 

164 ComputeEscrow.status == 'pending', 

165 ).first() 

166 if escrow: 

167 escrow.spark_amount = int(spark_amount) 

168 escrow.status = 'settled' 

169 escrow.settled_at = datetime.utcnow() 

170 db.commit() 

171 

172 # Calculate remaining debt from any other pending escrows 

173 from sqlalchemy import func 

174 pending_sum = db.query(func.coalesce( 

175 func.sum(ComputeEscrow.spark_amount), 0 

176 )).filter( 

177 ComputeEscrow.debtor_node_id == debtor_node_id, 

178 ComputeEscrow.status == 'pending', 

179 ).scalar() 

180 remaining_debt = float(pending_sum) 

181 except Exception as e: 

182 logger.warning(f"ComputeEscrow settle failed (fail-open): {e}") 

183 try: 

184 db.rollback() 

185 except Exception: 

186 pass 

187 

188 logger.info( 

189 f"Compute settlement: {debtor_node_id} → {creditor_node_id} " 

190 f"({spark_amount} Spark)") 

191 

192 return { 

193 'settled': True, 

194 'debtor': debtor_node_id, 

195 'creditor': creditor_node_id, 

196 'amount': spark_amount, 

197 'remaining_debt': remaining_debt, 

198 } 

199 except Exception as e: 

200 return {'error': str(e)} 

201 

202 @staticmethod 

203 def get_status(db=None) -> Dict: 

204 """Current compute borrowing status.""" 

205 total_debt = 0.0 

206 if db is not None: 

207 try: 

208 from integrations.social.models import ComputeEscrow 

209 from sqlalchemy import func 

210 pending_sum = db.query(func.coalesce( 

211 func.sum(ComputeEscrow.spark_amount), 0 

212 )).filter( 

213 ComputeEscrow.status == 'pending', 

214 ).scalar() 

215 total_debt = round(float(pending_sum), 2) 

216 except Exception as e: 

217 logger.debug(f"ComputeEscrow query failed (fail-open): {e}") 

218 total_debt = 0.0 

219 

220 return { 

221 'active_offers': len([o for o in _compute_offers.values() 

222 if o['status'] == 'available']), 

223 'reserved_offers': len([o for o in _compute_offers.values() 

224 if o['status'] == 'reserved']), 

225 'active_requests': len(_compute_requests), 

226 'total_debt_spark': total_debt, 

227 }