Coverage for integrations / agent_engine / trading_tools.py: 28.7%

108 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Trading Tools — AutoGen tool registration for paper/live trading agents. 

3 

4Provides market data, technical indicators, sentiment analysis, and 

5paper trade execution. Follows finance_tools.py pattern. 

6 

7Live trading is gated by constitutional vote (paper_trading=True default). 

8""" 

9import logging 

10from datetime import datetime 

11from typing import Dict, List, Optional 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16def get_market_data(symbol: str, timeframe: str = '1d', 

17 market: str = 'crypto') -> Dict: 

18 """Fetch price data for a symbol. 

19 

20 Args: 

21 symbol: Ticker symbol (e.g. 'BTC-USD', 'AAPL') 

22 timeframe: '1m', '5m', '1h', '1d', '1w' 

23 market: 'crypto' or 'stocks' 

24 

25 Returns: {symbol, timeframe, prices: [...], latest_price, volume} 

26 """ 

27 try: 

28 import yfinance as yf 

29 ticker = yf.Ticker(symbol) 

30 period_map = {'1m': '1d', '5m': '5d', '1h': '5d', '1d': '1mo', '1w': '3mo'} 

31 period = period_map.get(timeframe, '1mo') 

32 hist = ticker.history(period=period, interval=timeframe) 

33 if hist.empty: 

34 return {'error': f'No data for {symbol}', 'symbol': symbol} 

35 

36 prices = [ 

37 {'date': str(idx), 'open': round(r['Open'], 4), 

38 'high': round(r['High'], 4), 'low': round(r['Low'], 4), 

39 'close': round(r['Close'], 4), 'volume': int(r['Volume'])} 

40 for idx, r in hist.tail(50).iterrows() 

41 ] 

42 latest = prices[-1] if prices else {} 

43 return { 

44 'symbol': symbol, 'timeframe': timeframe, 'market': market, 

45 'prices': prices, 'latest_price': latest.get('close'), 

46 'volume': latest.get('volume'), 

47 } 

48 except ImportError: 

49 return {'error': 'yfinance not installed', 'symbol': symbol, 

50 'hint': 'pip install yfinance'} 

51 except Exception as e: 

52 return {'error': str(e), 'symbol': symbol} 

53 

54 

55def get_technical_indicators(symbol: str, 

56 indicators: Optional[List[str]] = None) -> Dict: 

57 """Calculate technical indicators for a symbol. 

58 

59 Args: 

60 symbol: Ticker symbol 

61 indicators: List of ['rsi', 'macd', 'bollinger']. Defaults to all. 

62 

63 Returns: {symbol, indicators: {rsi: float, macd: {...}, bollinger: {...}}} 

64 """ 

65 indicators = indicators or ['rsi', 'macd', 'bollinger'] 

66 result = {'symbol': symbol, 'indicators': {}} 

67 

68 try: 

69 import yfinance as yf 

70 hist = yf.Ticker(symbol).history(period='3mo', interval='1d') 

71 if hist.empty: 

72 return {'error': f'No data for {symbol}'} 

73 closes = hist['Close'] 

74 

75 if 'rsi' in indicators: 

76 delta = closes.diff() 

77 gain = delta.where(delta > 0, 0).rolling(14).mean() 

78 loss = (-delta.where(delta < 0, 0)).rolling(14).mean() 

79 rs = gain / loss.replace(0, float('inf')) 

80 rsi = 100 - (100 / (1 + rs)) 

81 result['indicators']['rsi'] = round(float(rsi.iloc[-1]), 2) 

82 

83 if 'macd' in indicators: 

84 ema12 = closes.ewm(span=12).mean() 

85 ema26 = closes.ewm(span=26).mean() 

86 macd_line = ema12 - ema26 

87 signal = macd_line.ewm(span=9).mean() 

88 result['indicators']['macd'] = { 

89 'macd': round(float(macd_line.iloc[-1]), 4), 

90 'signal': round(float(signal.iloc[-1]), 4), 

91 'histogram': round(float(macd_line.iloc[-1] - signal.iloc[-1]), 4), 

92 } 

93 

94 if 'bollinger' in indicators: 

95 sma20 = closes.rolling(20).mean() 

96 std20 = closes.rolling(20).std() 

97 result['indicators']['bollinger'] = { 

98 'upper': round(float(sma20.iloc[-1] + 2 * std20.iloc[-1]), 4), 

99 'middle': round(float(sma20.iloc[-1]), 4), 

100 'lower': round(float(sma20.iloc[-1] - 2 * std20.iloc[-1]), 4), 

101 'current': round(float(closes.iloc[-1]), 4), 

102 } 

103 

104 return result 

105 except ImportError: 

106 return {'error': 'yfinance not installed', 'hint': 'pip install yfinance'} 

107 except Exception as e: 

108 return {'error': str(e), 'symbol': symbol} 

109 

110 

111def get_market_sentiment(symbol: str) -> Dict: 

112 """Analyse news-based sentiment for a symbol. 

113 

114 Uses existing news tools if available, otherwise returns neutral. 

115 """ 

116 try: 

117 from integrations.agent_engine.news_tools import fetch_news_feeds 

118 articles = fetch_news_feeds(query=symbol, limit=10) 

119 if not articles or 'error' in articles: 

120 return {'symbol': symbol, 'sentiment': 'neutral', 'score': 0.0, 

121 'articles_analyzed': 0} 

122 

123 items = articles.get('articles', []) 

124 positive = sum(1 for a in items if 'up' in (a.get('title', '') + a.get('summary', '')).lower() 

125 or 'surge' in (a.get('title', '') + a.get('summary', '')).lower() 

126 or 'bull' in (a.get('title', '') + a.get('summary', '')).lower()) 

127 negative = sum(1 for a in items if 'down' in (a.get('title', '') + a.get('summary', '')).lower() 

128 or 'crash' in (a.get('title', '') + a.get('summary', '')).lower() 

129 or 'bear' in (a.get('title', '') + a.get('summary', '')).lower()) 

130 

131 total = len(items) or 1 

132 score = (positive - negative) / total 

133 sentiment = 'bullish' if score > 0.2 else 'bearish' if score < -0.2 else 'neutral' 

134 

135 return {'symbol': symbol, 'sentiment': sentiment, 

136 'score': round(score, 3), 'articles_analyzed': len(items)} 

137 except ImportError: 

138 return {'symbol': symbol, 'sentiment': 'neutral', 'score': 0.0, 

139 'articles_analyzed': 0, 'note': 'news tools not available'} 

140 except Exception as e: 

141 return {'error': str(e), 'symbol': symbol} 

142 

143 

144def place_paper_trade(symbol: str, side: str, amount: float, 

145 stop_loss: float, portfolio_id: str = None) -> Dict: 

146 """Execute a simulated paper trade. 

147 

148 Args: 

149 symbol: Ticker symbol 

150 side: 'buy' or 'sell' 

151 amount: Trade amount in portfolio currency 

152 stop_loss: Stop-loss price (mandatory) 

153 portfolio_id: Portfolio to trade in (optional) 

154 

155 Returns: Trade record dict 

156 """ 

157 if side not in ('buy', 'sell'): 

158 return {'error': f'Invalid side: {side}. Must be buy or sell.'} 

159 if not stop_loss: 

160 return {'error': 'Stop-loss is mandatory for all trades.'} 

161 

162 # Get current price 

163 data = get_market_data(symbol, '1d') 

164 if 'error' in data: 

165 return data 

166 price = data.get('latest_price') 

167 if not price: 

168 return {'error': f'Cannot determine price for {symbol}'} 

169 

170 quantity = amount / price 

171 

172 try: 

173 from integrations.social.models import db_session, PaperTrade, PaperPortfolio 

174 with db_session() as db: 

175 # Find or validate portfolio 

176 portfolio = None 

177 if portfolio_id: 

178 portfolio = db.query(PaperPortfolio).filter_by( 

179 id=portfolio_id, status='active').first() 

180 

181 if portfolio: 

182 # Check budget 

183 if amount > portfolio.current_balance: 

184 return {'error': 'Insufficient balance', 

185 'available': portfolio.current_balance} 

186 # Check cumulative loss halt 

187 if portfolio.total_pnl < 0: 

188 loss_pct = abs(portfolio.total_pnl) / portfolio.initial_balance * 100 

189 if loss_pct >= 10: 

190 return {'error': 'Trading halted: cumulative loss exceeds 10%', 

191 'loss_pct': round(loss_pct, 2)} 

192 portfolio.current_balance -= amount 

193 

194 trade = PaperTrade( 

195 portfolio_id=portfolio_id or 'unlinked', 

196 symbol=symbol, side=side, quantity=quantity, 

197 entry_price=price, stop_loss=stop_loss, 

198 status='open', 

199 ) 

200 db.add(trade) 

201 if portfolio: 

202 portfolio.total_trades = (portfolio.total_trades or 0) + 1 

203 return trade.to_dict() 

204 except ImportError: 

205 # No DB — return simulated result 

206 return { 

207 'id': 'paper_sim', 'symbol': symbol, 'side': side, 

208 'quantity': round(quantity, 6), 'entry_price': price, 

209 'stop_loss': stop_loss, 'status': 'open', 

210 'opened_at': datetime.utcnow().isoformat(), 

211 } 

212 

213 

214def get_portfolio_status(portfolio_id: str = None) -> Dict: 

215 """Get current portfolio positions, P&L, and risk metrics.""" 

216 try: 

217 from integrations.social.models import db_session, PaperPortfolio, PaperTrade 

218 with db_session(commit=False) as db: 

219 if portfolio_id: 

220 portfolio = db.query(PaperPortfolio).filter_by(id=portfolio_id).first() 

221 if not portfolio: 

222 return {'error': 'Portfolio not found'} 

223 open_trades = db.query(PaperTrade).filter_by( 

224 portfolio_id=portfolio_id, status='open').all() 

225 return { 

226 'portfolio': portfolio.to_dict(), 

227 'open_positions': [t.to_dict() for t in open_trades], 

228 'position_count': len(open_trades), 

229 } 

230 else: 

231 portfolios = db.query(PaperPortfolio).filter_by(status='active').all() 

232 return {'portfolios': [p.to_dict() for p in portfolios]} 

233 except ImportError: 

234 return {'error': 'Database not available'} 

235 

236 

237def get_trade_history(portfolio_id: str = None, limit: int = 20) -> Dict: 

238 """Get trade history for audit trail.""" 

239 try: 

240 from integrations.social.models import db_session, PaperTrade 

241 with db_session(commit=False) as db: 

242 query = db.query(PaperTrade) 

243 if portfolio_id: 

244 query = query.filter_by(portfolio_id=portfolio_id) 

245 trades = query.order_by(PaperTrade.opened_at.desc()).limit(limit).all() 

246 return {'trades': [t.to_dict() for t in trades], 'count': len(trades)} 

247 except ImportError: 

248 return {'error': 'Database not available'}