Coverage for integrations / agent_engine / trading_tools.py: 28.7%
108 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Trading Tools — AutoGen tool registration for paper/live trading agents.
4Provides market data, technical indicators, sentiment analysis, and
5paper trade execution. Follows finance_tools.py pattern.
7Live trading is gated by constitutional vote (paper_trading=True default).
8"""
9import logging
10from datetime import datetime
11from typing import Dict, List, Optional
13logger = logging.getLogger(__name__)
16def get_market_data(symbol: str, timeframe: str = '1d',
17 market: str = 'crypto') -> Dict:
18 """Fetch price data for a symbol.
20 Args:
21 symbol: Ticker symbol (e.g. 'BTC-USD', 'AAPL')
22 timeframe: '1m', '5m', '1h', '1d', '1w'
23 market: 'crypto' or 'stocks'
25 Returns: {symbol, timeframe, prices: [...], latest_price, volume}
26 """
27 try:
28 import yfinance as yf
29 ticker = yf.Ticker(symbol)
30 period_map = {'1m': '1d', '5m': '5d', '1h': '5d', '1d': '1mo', '1w': '3mo'}
31 period = period_map.get(timeframe, '1mo')
32 hist = ticker.history(period=period, interval=timeframe)
33 if hist.empty:
34 return {'error': f'No data for {symbol}', 'symbol': symbol}
36 prices = [
37 {'date': str(idx), 'open': round(r['Open'], 4),
38 'high': round(r['High'], 4), 'low': round(r['Low'], 4),
39 'close': round(r['Close'], 4), 'volume': int(r['Volume'])}
40 for idx, r in hist.tail(50).iterrows()
41 ]
42 latest = prices[-1] if prices else {}
43 return {
44 'symbol': symbol, 'timeframe': timeframe, 'market': market,
45 'prices': prices, 'latest_price': latest.get('close'),
46 'volume': latest.get('volume'),
47 }
48 except ImportError:
49 return {'error': 'yfinance not installed', 'symbol': symbol,
50 'hint': 'pip install yfinance'}
51 except Exception as e:
52 return {'error': str(e), 'symbol': symbol}
55def get_technical_indicators(symbol: str,
56 indicators: Optional[List[str]] = None) -> Dict:
57 """Calculate technical indicators for a symbol.
59 Args:
60 symbol: Ticker symbol
61 indicators: List of ['rsi', 'macd', 'bollinger']. Defaults to all.
63 Returns: {symbol, indicators: {rsi: float, macd: {...}, bollinger: {...}}}
64 """
65 indicators = indicators or ['rsi', 'macd', 'bollinger']
66 result = {'symbol': symbol, 'indicators': {}}
68 try:
69 import yfinance as yf
70 hist = yf.Ticker(symbol).history(period='3mo', interval='1d')
71 if hist.empty:
72 return {'error': f'No data for {symbol}'}
73 closes = hist['Close']
75 if 'rsi' in indicators:
76 delta = closes.diff()
77 gain = delta.where(delta > 0, 0).rolling(14).mean()
78 loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
79 rs = gain / loss.replace(0, float('inf'))
80 rsi = 100 - (100 / (1 + rs))
81 result['indicators']['rsi'] = round(float(rsi.iloc[-1]), 2)
83 if 'macd' in indicators:
84 ema12 = closes.ewm(span=12).mean()
85 ema26 = closes.ewm(span=26).mean()
86 macd_line = ema12 - ema26
87 signal = macd_line.ewm(span=9).mean()
88 result['indicators']['macd'] = {
89 'macd': round(float(macd_line.iloc[-1]), 4),
90 'signal': round(float(signal.iloc[-1]), 4),
91 'histogram': round(float(macd_line.iloc[-1] - signal.iloc[-1]), 4),
92 }
94 if 'bollinger' in indicators:
95 sma20 = closes.rolling(20).mean()
96 std20 = closes.rolling(20).std()
97 result['indicators']['bollinger'] = {
98 'upper': round(float(sma20.iloc[-1] + 2 * std20.iloc[-1]), 4),
99 'middle': round(float(sma20.iloc[-1]), 4),
100 'lower': round(float(sma20.iloc[-1] - 2 * std20.iloc[-1]), 4),
101 'current': round(float(closes.iloc[-1]), 4),
102 }
104 return result
105 except ImportError:
106 return {'error': 'yfinance not installed', 'hint': 'pip install yfinance'}
107 except Exception as e:
108 return {'error': str(e), 'symbol': symbol}
111def get_market_sentiment(symbol: str) -> Dict:
112 """Analyse news-based sentiment for a symbol.
114 Uses existing news tools if available, otherwise returns neutral.
115 """
116 try:
117 from integrations.agent_engine.news_tools import fetch_news_feeds
118 articles = fetch_news_feeds(query=symbol, limit=10)
119 if not articles or 'error' in articles:
120 return {'symbol': symbol, 'sentiment': 'neutral', 'score': 0.0,
121 'articles_analyzed': 0}
123 items = articles.get('articles', [])
124 positive = sum(1 for a in items if 'up' in (a.get('title', '') + a.get('summary', '')).lower()
125 or 'surge' in (a.get('title', '') + a.get('summary', '')).lower()
126 or 'bull' in (a.get('title', '') + a.get('summary', '')).lower())
127 negative = sum(1 for a in items if 'down' in (a.get('title', '') + a.get('summary', '')).lower()
128 or 'crash' in (a.get('title', '') + a.get('summary', '')).lower()
129 or 'bear' in (a.get('title', '') + a.get('summary', '')).lower())
131 total = len(items) or 1
132 score = (positive - negative) / total
133 sentiment = 'bullish' if score > 0.2 else 'bearish' if score < -0.2 else 'neutral'
135 return {'symbol': symbol, 'sentiment': sentiment,
136 'score': round(score, 3), 'articles_analyzed': len(items)}
137 except ImportError:
138 return {'symbol': symbol, 'sentiment': 'neutral', 'score': 0.0,
139 'articles_analyzed': 0, 'note': 'news tools not available'}
140 except Exception as e:
141 return {'error': str(e), 'symbol': symbol}
144def place_paper_trade(symbol: str, side: str, amount: float,
145 stop_loss: float, portfolio_id: str = None) -> Dict:
146 """Execute a simulated paper trade.
148 Args:
149 symbol: Ticker symbol
150 side: 'buy' or 'sell'
151 amount: Trade amount in portfolio currency
152 stop_loss: Stop-loss price (mandatory)
153 portfolio_id: Portfolio to trade in (optional)
155 Returns: Trade record dict
156 """
157 if side not in ('buy', 'sell'):
158 return {'error': f'Invalid side: {side}. Must be buy or sell.'}
159 if not stop_loss:
160 return {'error': 'Stop-loss is mandatory for all trades.'}
162 # Get current price
163 data = get_market_data(symbol, '1d')
164 if 'error' in data:
165 return data
166 price = data.get('latest_price')
167 if not price:
168 return {'error': f'Cannot determine price for {symbol}'}
170 quantity = amount / price
172 try:
173 from integrations.social.models import db_session, PaperTrade, PaperPortfolio
174 with db_session() as db:
175 # Find or validate portfolio
176 portfolio = None
177 if portfolio_id:
178 portfolio = db.query(PaperPortfolio).filter_by(
179 id=portfolio_id, status='active').first()
181 if portfolio:
182 # Check budget
183 if amount > portfolio.current_balance:
184 return {'error': 'Insufficient balance',
185 'available': portfolio.current_balance}
186 # Check cumulative loss halt
187 if portfolio.total_pnl < 0:
188 loss_pct = abs(portfolio.total_pnl) / portfolio.initial_balance * 100
189 if loss_pct >= 10:
190 return {'error': 'Trading halted: cumulative loss exceeds 10%',
191 'loss_pct': round(loss_pct, 2)}
192 portfolio.current_balance -= amount
194 trade = PaperTrade(
195 portfolio_id=portfolio_id or 'unlinked',
196 symbol=symbol, side=side, quantity=quantity,
197 entry_price=price, stop_loss=stop_loss,
198 status='open',
199 )
200 db.add(trade)
201 if portfolio:
202 portfolio.total_trades = (portfolio.total_trades or 0) + 1
203 return trade.to_dict()
204 except ImportError:
205 # No DB — return simulated result
206 return {
207 'id': 'paper_sim', 'symbol': symbol, 'side': side,
208 'quantity': round(quantity, 6), 'entry_price': price,
209 'stop_loss': stop_loss, 'status': 'open',
210 'opened_at': datetime.utcnow().isoformat(),
211 }
214def get_portfolio_status(portfolio_id: str = None) -> Dict:
215 """Get current portfolio positions, P&L, and risk metrics."""
216 try:
217 from integrations.social.models import db_session, PaperPortfolio, PaperTrade
218 with db_session(commit=False) as db:
219 if portfolio_id:
220 portfolio = db.query(PaperPortfolio).filter_by(id=portfolio_id).first()
221 if not portfolio:
222 return {'error': 'Portfolio not found'}
223 open_trades = db.query(PaperTrade).filter_by(
224 portfolio_id=portfolio_id, status='open').all()
225 return {
226 'portfolio': portfolio.to_dict(),
227 'open_positions': [t.to_dict() for t in open_trades],
228 'position_count': len(open_trades),
229 }
230 else:
231 portfolios = db.query(PaperPortfolio).filter_by(status='active').all()
232 return {'portfolios': [p.to_dict() for p in portfolios]}
233 except ImportError:
234 return {'error': 'Database not available'}
237def get_trade_history(portfolio_id: str = None, limit: int = 20) -> Dict:
238 """Get trade history for audit trail."""
239 try:
240 from integrations.social.models import db_session, PaperTrade
241 with db_session(commit=False) as db:
242 query = db.query(PaperTrade)
243 if portfolio_id:
244 query = query.filter_by(portfolio_id=portfolio_id)
245 trades = query.order_by(PaperTrade.opened_at.desc()).limit(limit).all()
246 return {'trades': [t.to_dict() for t in trades], 'count': len(trades)}
247 except ImportError:
248 return {'error': 'Database not available'}