Coverage for integrations / agent_engine / compute_borrowing.py: 71.6%
74 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Compute Borrowing Service — request, offer, and settle compute across peers.
4Builds on peer_discovery.py gossip protocol and hosting_reward_service.py
5contribution scoring. Peers with idle compute advertise capacity;
6nodes under pressure can borrow and pay via Spark settlement.
7"""
8import logging
9from datetime import datetime, timedelta
10from typing import Dict, List, Optional
12logger = logging.getLogger(__name__)
14# In-memory tracking (transient / short-lived — not persisted)
15_compute_offers: Dict[str, Dict] = {} # node_id → {resources, timestamp}
16_compute_requests: Dict[str, Dict] = {} # request_id → {node_id, task_type, ...}
17# _compute_debts removed — now persisted via ComputeEscrow table
20class ComputeBorrowingService:
21 """Cross-node compute sharing via gossip protocol."""
23 @staticmethod
24 def offer_compute(db, node_id: str, available_resources: Dict) -> Dict:
25 """Advertise idle compute capacity to the network.
27 Args:
28 node_id: Offering node
29 available_resources: {cpu_pct_free, ram_gb_free, gpu_free_gb}
31 Returns: Offer confirmation
32 """
33 offer = {
34 'node_id': node_id,
35 'resources': available_resources,
36 'timestamp': datetime.utcnow().isoformat(),
37 'status': 'available',
38 }
39 _compute_offers[node_id] = offer
41 # Broadcast via gossip
42 try:
43 from integrations.social.peer_discovery import get_peer_discovery
44 pd = get_peer_discovery()
45 pd.broadcast({
46 'type': 'compute_offer',
47 'payload': offer,
48 })
49 except Exception as e:
50 logger.debug(f"Gossip broadcast failed: {e}")
52 return {'success': True, 'offer': offer}
54 @staticmethod
55 def request_compute(db, node_id: str, task_type: str,
56 min_resources: Dict) -> Dict:
57 """Request compute from the network.
59 Args:
60 node_id: Requesting node
61 task_type: 'inference' | 'training' | 'federation'
62 min_resources: {min_cpu_pct, min_ram_gb, min_gpu_gb}
64 Returns: Matched offer or no_match
65 """
66 # Find a matching offer
67 matched = None
68 for offer_id, offer in _compute_offers.items():
69 if offer['status'] != 'available':
70 continue
71 res = offer['resources']
72 if (res.get('cpu_pct_free', 0) >= min_resources.get('min_cpu_pct', 0) and
73 res.get('ram_gb_free', 0) >= min_resources.get('min_ram_gb', 0)):
74 matched = offer
75 break
77 if not matched:
78 # Broadcast request via gossip for deferred matching
79 try:
80 from integrations.social.peer_discovery import get_peer_discovery
81 pd = get_peer_discovery()
82 pd.broadcast({
83 'type': 'compute_request',
84 'payload': {
85 'requester': node_id,
86 'task_type': task_type,
87 'min_resources': min_resources,
88 },
89 })
90 except Exception:
91 pass
92 return {'matched': False, 'reason': 'no_available_offers'}
94 # Reserve the offer
95 matched['status'] = 'reserved'
96 request_id = f"{node_id}_{matched['node_id']}_{task_type}"
97 _compute_requests[request_id] = {
98 'requester': node_id,
99 'provider': matched['node_id'],
100 'task_type': task_type,
101 'resources': matched['resources'],
102 'started_at': datetime.utcnow().isoformat(),
103 }
105 # Persist escrow record (fail-open: match succeeds even if DB write fails)
106 # Estimate cost: 1 Spark per GPU-GB offered per hour (minimum 1)
107 estimated_spark = max(1, int(matched.get('resources', {}).get('gpu_gb', 1)))
108 try:
109 from integrations.social.models import ComputeEscrow
110 escrow = ComputeEscrow(
111 debtor_node_id=node_id,
112 creditor_node_id=matched['node_id'],
113 request_id=request_id,
114 task_type=task_type,
115 spark_amount=estimated_spark,
116 status='pending',
117 created_at=datetime.utcnow(),
118 expires_at=datetime.utcnow() + timedelta(hours=24),
119 )
120 db.add(escrow)
121 db.commit()
122 except Exception as e:
123 logger.warning(f"ComputeEscrow write failed (fail-open): {e}")
124 try:
125 db.rollback()
126 except Exception:
127 pass
129 return {
130 'matched': True,
131 'request_id': request_id,
132 'provider': matched['node_id'],
133 'resources': matched['resources'],
134 }
136 @staticmethod
137 def settle_compute_debt(db, debtor_node_id: str,
138 creditor_node_id: str,
139 spark_amount: float) -> Dict:
140 """Pay a peer for borrowed compute cycles via Spark transfer.
142 Records settlement and awards Spark to the provider.
143 """
144 try:
145 from integrations.social.models import PeerNode, ComputeEscrow
146 provider = db.query(PeerNode).filter_by(
147 node_id=creditor_node_id).first()
148 if not provider or not provider.node_operator_id:
149 return {'error': 'provider_not_found'}
151 # Award Spark to provider
152 from integrations.social.resonance_engine import ResonanceService
153 ResonanceService.award_spark(
154 db, str(provider.node_operator_id), int(spark_amount),
155 'compute_borrowing_settlement', debtor_node_id,
156 f'Compute settlement: {spark_amount} Spark from {debtor_node_id}')
158 # Settle escrow record in DB
159 remaining_debt = 0
160 try:
161 escrow = db.query(ComputeEscrow).filter(
162 ComputeEscrow.debtor_node_id == debtor_node_id,
163 ComputeEscrow.creditor_node_id == creditor_node_id,
164 ComputeEscrow.status == 'pending',
165 ).first()
166 if escrow:
167 escrow.spark_amount = int(spark_amount)
168 escrow.status = 'settled'
169 escrow.settled_at = datetime.utcnow()
170 db.commit()
172 # Calculate remaining debt from any other pending escrows
173 from sqlalchemy import func
174 pending_sum = db.query(func.coalesce(
175 func.sum(ComputeEscrow.spark_amount), 0
176 )).filter(
177 ComputeEscrow.debtor_node_id == debtor_node_id,
178 ComputeEscrow.status == 'pending',
179 ).scalar()
180 remaining_debt = float(pending_sum)
181 except Exception as e:
182 logger.warning(f"ComputeEscrow settle failed (fail-open): {e}")
183 try:
184 db.rollback()
185 except Exception:
186 pass
188 logger.info(
189 f"Compute settlement: {debtor_node_id} → {creditor_node_id} "
190 f"({spark_amount} Spark)")
192 return {
193 'settled': True,
194 'debtor': debtor_node_id,
195 'creditor': creditor_node_id,
196 'amount': spark_amount,
197 'remaining_debt': remaining_debt,
198 }
199 except Exception as e:
200 return {'error': str(e)}
202 @staticmethod
203 def get_status(db=None) -> Dict:
204 """Current compute borrowing status."""
205 total_debt = 0.0
206 if db is not None:
207 try:
208 from integrations.social.models import ComputeEscrow
209 from sqlalchemy import func
210 pending_sum = db.query(func.coalesce(
211 func.sum(ComputeEscrow.spark_amount), 0
212 )).filter(
213 ComputeEscrow.status == 'pending',
214 ).scalar()
215 total_debt = round(float(pending_sum), 2)
216 except Exception as e:
217 logger.debug(f"ComputeEscrow query failed (fail-open): {e}")
218 total_debt = 0.0
220 return {
221 'active_offers': len([o for o in _compute_offers.values()
222 if o['status'] == 'available']),
223 'reserved_offers': len([o for o in _compute_offers.values()
224 if o['status'] == 'reserved']),
225 'active_requests': len(_compute_requests),
226 'total_debt_spark': total_debt,
227 }