Coverage for integrations / channels / automation / workflows.py: 37.4%

235 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Workflow Engine for HevolveBot Integration. 

3 

4Provides workflow definition and execution capabilities. 

5""" 

6 

7import secrets 

8from dataclasses import dataclass, field 

9from datetime import datetime 

10from enum import Enum 

11from typing import Any, Callable, Dict, List, Optional, Union 

12import copy 

13 

14 

15class StepType(Enum): 

16 """Types of workflow steps.""" 

17 ACTION = "action" 

18 CONDITION = "condition" 

19 LOOP = "loop" 

20 PARALLEL = "parallel" 

21 DELAY = "delay" 

22 SUBPROCESS = "subprocess" 

23 TRANSFORM = "transform" 

24 

25 

26class WorkflowStatus(Enum): 

27 """Status of a workflow execution.""" 

28 PENDING = "pending" 

29 RUNNING = "running" 

30 PAUSED = "paused" 

31 COMPLETED = "completed" 

32 FAILED = "failed" 

33 CANCELLED = "cancelled" 

34 

35 

36@dataclass 

37class WorkflowStep: 

38 """A single step in a workflow.""" 

39 id: str 

40 name: str 

41 step_type: StepType 

42 action: Optional[Callable] = None 

43 condition: Optional[Callable[[Dict[str, Any]], bool]] = None 

44 transform: Optional[Callable[[Any], Any]] = None 

45 on_true: Optional[str] = None # Next step if condition is true 

46 on_false: Optional[str] = None # Next step if condition is false 

47 next_step: Optional[str] = None # Default next step 

48 delay_seconds: float = 0 

49 retry_count: int = 0 

50 max_retries: int = 3 

51 parallel_steps: List[str] = field(default_factory=list) 

52 subprocess_id: Optional[str] = None 

53 metadata: Dict[str, Any] = field(default_factory=dict) 

54 

55 

56@dataclass 

57class Workflow: 

58 """A complete workflow definition.""" 

59 id: str 

60 name: str 

61 description: str = "" 

62 steps: Dict[str, WorkflowStep] = field(default_factory=dict) 

63 entry_point: Optional[str] = None 

64 version: str = "1.0.0" 

65 created_at: datetime = field(default_factory=datetime.now) 

66 updated_at: datetime = field(default_factory=datetime.now) 

67 enabled: bool = True 

68 tags: List[str] = field(default_factory=list) 

69 metadata: Dict[str, Any] = field(default_factory=dict) 

70 

71 def add_step(self, step: WorkflowStep) -> None: 

72 """Add a step to the workflow.""" 

73 self.steps[step.id] = step 

74 if self.entry_point is None: 

75 self.entry_point = step.id 

76 self.updated_at = datetime.now() 

77 

78 def remove_step(self, step_id: str) -> bool: 

79 """Remove a step from the workflow.""" 

80 if step_id in self.steps: 

81 del self.steps[step_id] 

82 if self.entry_point == step_id: 

83 self.entry_point = next(iter(self.steps), None) 

84 self.updated_at = datetime.now() 

85 return True 

86 return False 

87 

88 def get_step(self, step_id: str) -> Optional[WorkflowStep]: 

89 """Get a step by ID.""" 

90 return self.steps.get(step_id) 

91 

92 

93@dataclass 

94class WorkflowExecution: 

95 """Tracks the execution of a workflow.""" 

96 id: str 

97 workflow_id: str 

98 status: WorkflowStatus = WorkflowStatus.PENDING 

99 current_step: Optional[str] = None 

100 context: Dict[str, Any] = field(default_factory=dict) 

101 results: Dict[str, Any] = field(default_factory=dict) 

102 started_at: Optional[datetime] = None 

103 completed_at: Optional[datetime] = None 

104 error: Optional[str] = None 

105 step_history: List[Dict[str, Any]] = field(default_factory=list) 

106 

107 

108class WorkflowEngine: 

109 """ 

110 Executes and manages workflows. 

111 

112 Features: 

113 - Register workflow definitions 

114 - Execute workflows with context 

115 - Support for conditional branching 

116 - Support for parallel execution 

117 - Subprocess workflows 

118 - Execution tracking and history 

119 """ 

120 

121 def __init__(self): 

122 """Initialize the WorkflowEngine.""" 

123 self._workflows: Dict[str, Workflow] = {} 

124 self._executions: Dict[str, WorkflowExecution] = {} 

125 self._execution_history: List[WorkflowExecution] = [] 

126 self._global_actions: Dict[str, Callable] = {} 

127 

128 def register( 

129 self, 

130 workflow: Workflow = None, 

131 workflow_id: Optional[str] = None, 

132 name: Optional[str] = None, 

133 description: str = "", 

134 tags: Optional[List[str]] = None, 

135 metadata: Optional[Dict[str, Any]] = None 

136 ) -> Workflow: 

137 """ 

138 Register a workflow. 

139 

140 Args: 

141 workflow: Optional pre-built Workflow object 

142 workflow_id: Optional custom ID (if not providing workflow) 

143 name: Workflow name (if not providing workflow) 

144 description: Workflow description 

145 tags: Optional tags 

146 metadata: Optional metadata 

147 

148 Returns: 

149 The registered Workflow 

150 """ 

151 if workflow is None: 

152 workflow_id = workflow_id or f"wf_{secrets.token_hex(6)}" 

153 name = name or f"Workflow {workflow_id}" 

154 

155 workflow = Workflow( 

156 id=workflow_id, 

157 name=name, 

158 description=description, 

159 tags=tags or [], 

160 metadata=metadata or {} 

161 ) 

162 

163 if workflow.id in self._workflows: 

164 raise ValueError(f"Workflow with ID '{workflow.id}' already exists") 

165 

166 self._workflows[workflow.id] = workflow 

167 return workflow 

168 

169 def unregister(self, workflow_id: str) -> bool: 

170 """ 

171 Unregister a workflow. 

172 

173 Args: 

174 workflow_id: The workflow ID 

175 

176 Returns: 

177 True if removed, False if not found 

178 """ 

179 if workflow_id in self._workflows: 

180 del self._workflows[workflow_id] 

181 return True 

182 return False 

183 

184 def get_workflow(self, workflow_id: str) -> Optional[Workflow]: 

185 """ 

186 Get a workflow by ID. 

187 

188 Args: 

189 workflow_id: The workflow ID 

190 

191 Returns: 

192 The workflow or None 

193 """ 

194 return self._workflows.get(workflow_id) 

195 

196 def list_workflows( 

197 self, 

198 enabled_only: bool = False, 

199 tags: Optional[List[str]] = None 

200 ) -> List[Workflow]: 

201 """ 

202 List registered workflows. 

203 

204 Args: 

205 enabled_only: Only return enabled workflows 

206 tags: Optional filter by tags (any match) 

207 

208 Returns: 

209 List of matching workflows 

210 """ 

211 workflows = list(self._workflows.values()) 

212 

213 if enabled_only: 

214 workflows = [w for w in workflows if w.enabled] 

215 

216 if tags: 

217 workflows = [w for w in workflows if any(t in w.tags for t in tags)] 

218 

219 return workflows 

220 

221 def run( 

222 self, 

223 workflow_id: str, 

224 context: Optional[Dict[str, Any]] = None, 

225 execution_id: Optional[str] = None 

226 ) -> WorkflowExecution: 

227 """ 

228 Execute a workflow. 

229 

230 Args: 

231 workflow_id: The workflow to execute 

232 context: Optional initial context/input data 

233 execution_id: Optional custom execution ID 

234 

235 Returns: 

236 The workflow execution record 

237 

238 Raises: 

239 ValueError: If workflow not found or has no steps 

240 """ 

241 workflow = self._workflows.get(workflow_id) 

242 if not workflow: 

243 raise ValueError(f"Workflow '{workflow_id}' not found") 

244 

245 if not workflow.steps: 

246 raise ValueError(f"Workflow '{workflow_id}' has no steps") 

247 

248 if not workflow.entry_point: 

249 raise ValueError(f"Workflow '{workflow_id}' has no entry point") 

250 

251 execution_id = execution_id or f"exec_{secrets.token_hex(8)}" 

252 

253 execution = WorkflowExecution( 

254 id=execution_id, 

255 workflow_id=workflow_id, 

256 context=copy.deepcopy(context or {}), 

257 current_step=workflow.entry_point, 

258 status=WorkflowStatus.RUNNING, 

259 started_at=datetime.now() 

260 ) 

261 

262 self._executions[execution_id] = execution 

263 

264 # Execute the workflow 

265 try: 

266 self._execute_workflow(workflow, execution) 

267 except Exception as e: 

268 execution.status = WorkflowStatus.FAILED 

269 execution.error = str(e) 

270 

271 execution.completed_at = datetime.now() 

272 self._execution_history.append(execution) 

273 

274 return execution 

275 

276 def _execute_workflow( 

277 self, 

278 workflow: Workflow, 

279 execution: WorkflowExecution 

280 ) -> None: 

281 """Execute workflow steps.""" 

282 max_steps = 1000 # Prevent infinite loops 

283 step_count = 0 

284 

285 while execution.current_step and step_count < max_steps: 

286 step_count += 1 

287 step = workflow.get_step(execution.current_step) 

288 

289 if not step: 

290 raise ValueError(f"Step '{execution.current_step}' not found") 

291 

292 # Record step entry 

293 step_record = { 

294 "step_id": step.id, 

295 "step_name": step.name, 

296 "started_at": datetime.now().isoformat(), 

297 "result": None, 

298 "error": None 

299 } 

300 

301 try: 

302 next_step = self._execute_step(step, execution) 

303 step_record["result"] = execution.results.get(step.id) 

304 step_record["next_step"] = next_step 

305 execution.current_step = next_step 

306 

307 except Exception as e: 

308 step_record["error"] = str(e) 

309 

310 # Retry logic 

311 if step.retry_count < step.max_retries: 

312 step.retry_count += 1 

313 step_record["retry"] = step.retry_count 

314 continue 

315 else: 

316 raise 

317 

318 finally: 

319 step_record["completed_at"] = datetime.now().isoformat() 

320 execution.step_history.append(step_record) 

321 

322 if step_count >= max_steps: 

323 raise RuntimeError("Maximum step count exceeded - possible infinite loop") 

324 

325 execution.status = WorkflowStatus.COMPLETED 

326 

327 def _execute_step( 

328 self, 

329 step: WorkflowStep, 

330 execution: WorkflowExecution 

331 ) -> Optional[str]: 

332 """ 

333 Execute a single workflow step. 

334 

335 Returns: 

336 The ID of the next step, or None if workflow is complete 

337 """ 

338 if step.step_type == StepType.ACTION: 

339 return self._execute_action_step(step, execution) 

340 

341 elif step.step_type == StepType.CONDITION: 

342 return self._execute_condition_step(step, execution) 

343 

344 elif step.step_type == StepType.TRANSFORM: 

345 return self._execute_transform_step(step, execution) 

346 

347 elif step.step_type == StepType.DELAY: 

348 return self._execute_delay_step(step, execution) 

349 

350 elif step.step_type == StepType.PARALLEL: 

351 return self._execute_parallel_step(step, execution) 

352 

353 elif step.step_type == StepType.SUBPROCESS: 

354 return self._execute_subprocess_step(step, execution) 

355 

356 else: 

357 raise ValueError(f"Unknown step type: {step.step_type}") 

358 

359 def _execute_action_step( 

360 self, 

361 step: WorkflowStep, 

362 execution: WorkflowExecution 

363 ) -> Optional[str]: 

364 """Execute an action step.""" 

365 if step.action: 

366 result = step.action(execution.context) 

367 execution.results[step.id] = result 

368 

369 # Update context if result is a dict 

370 if isinstance(result, dict): 

371 execution.context.update(result) 

372 

373 return step.next_step 

374 

375 def _execute_condition_step( 

376 self, 

377 step: WorkflowStep, 

378 execution: WorkflowExecution 

379 ) -> Optional[str]: 

380 """Execute a condition step.""" 

381 if step.condition: 

382 result = step.condition(execution.context) 

383 execution.results[step.id] = result 

384 

385 if result: 

386 return step.on_true 

387 else: 

388 return step.on_false 

389 

390 return step.next_step 

391 

392 def _execute_transform_step( 

393 self, 

394 step: WorkflowStep, 

395 execution: WorkflowExecution 

396 ) -> Optional[str]: 

397 """Execute a transform step.""" 

398 if step.transform: 

399 # Get input from context or previous results 

400 input_data = execution.context.get("_transform_input", execution.context) 

401 result = step.transform(input_data) 

402 execution.results[step.id] = result 

403 

404 # Update context 

405 if isinstance(result, dict): 

406 execution.context.update(result) 

407 else: 

408 execution.context["_transform_output"] = result 

409 

410 return step.next_step 

411 

412 def _execute_delay_step( 

413 self, 

414 step: WorkflowStep, 

415 execution: WorkflowExecution 

416 ) -> Optional[str]: 

417 """Execute a delay step.""" 

418 import time 

419 

420 if step.delay_seconds > 0: 

421 # In a real implementation, this might be async 

422 # For testing, we just record it 

423 execution.results[step.id] = { 

424 "delayed": True, 

425 "seconds": step.delay_seconds 

426 } 

427 # Simulate a small delay for testing 

428 time.sleep(min(step.delay_seconds, 0.1)) 

429 

430 return step.next_step 

431 

432 def _execute_parallel_step( 

433 self, 

434 step: WorkflowStep, 

435 execution: WorkflowExecution 

436 ) -> Optional[str]: 

437 """Execute parallel steps.""" 

438 workflow = self._workflows.get(execution.workflow_id) 

439 if not workflow: 

440 raise ValueError("Workflow not found") 

441 

442 results = {} 

443 for parallel_step_id in step.parallel_steps: 

444 parallel_step = workflow.get_step(parallel_step_id) 

445 if parallel_step: 

446 # Execute each parallel step 

447 # In a real implementation, this would be concurrent 

448 try: 

449 self._execute_step(parallel_step, execution) 

450 results[parallel_step_id] = execution.results.get(parallel_step_id) 

451 except Exception as e: 

452 results[parallel_step_id] = {"error": str(e)} 

453 

454 execution.results[step.id] = results 

455 return step.next_step 

456 

457 def _execute_subprocess_step( 

458 self, 

459 step: WorkflowStep, 

460 execution: WorkflowExecution 

461 ) -> Optional[str]: 

462 """Execute a subprocess (nested workflow).""" 

463 if step.subprocess_id: 

464 subprocess_result = self.run( 

465 step.subprocess_id, 

466 context=execution.context.copy() 

467 ) 

468 execution.results[step.id] = { 

469 "subprocess_id": subprocess_result.id, 

470 "status": subprocess_result.status.value, 

471 "results": subprocess_result.results 

472 } 

473 

474 # Merge subprocess context back 

475 execution.context.update(subprocess_result.context) 

476 

477 return step.next_step 

478 

479 def pause_execution(self, execution_id: str) -> bool: 

480 """ 

481 Pause a running execution. 

482 

483 Args: 

484 execution_id: The execution ID 

485 

486 Returns: 

487 True if paused, False if not found or not running 

488 """ 

489 if execution_id in self._executions: 

490 execution = self._executions[execution_id] 

491 if execution.status == WorkflowStatus.RUNNING: 

492 execution.status = WorkflowStatus.PAUSED 

493 return True 

494 return False 

495 

496 def cancel_execution(self, execution_id: str) -> bool: 

497 """ 

498 Cancel an execution. 

499 

500 Args: 

501 execution_id: The execution ID 

502 

503 Returns: 

504 True if cancelled, False if not found 

505 """ 

506 if execution_id in self._executions: 

507 execution = self._executions[execution_id] 

508 if execution.status in (WorkflowStatus.PENDING, WorkflowStatus.RUNNING, WorkflowStatus.PAUSED): 

509 execution.status = WorkflowStatus.CANCELLED 

510 execution.completed_at = datetime.now() 

511 return True 

512 return False 

513 

514 def get_execution(self, execution_id: str) -> Optional[WorkflowExecution]: 

515 """ 

516 Get an execution by ID. 

517 

518 Args: 

519 execution_id: The execution ID 

520 

521 Returns: 

522 The execution or None 

523 """ 

524 return self._executions.get(execution_id) 

525 

526 def list_executions( 

527 self, 

528 workflow_id: Optional[str] = None, 

529 status: Optional[WorkflowStatus] = None 

530 ) -> List[WorkflowExecution]: 

531 """ 

532 List workflow executions. 

533 

534 Args: 

535 workflow_id: Optional filter by workflow 

536 status: Optional filter by status 

537 

538 Returns: 

539 List of matching executions 

540 """ 

541 executions = list(self._executions.values()) 

542 

543 if workflow_id: 

544 executions = [e for e in executions if e.workflow_id == workflow_id] 

545 

546 if status: 

547 executions = [e for e in executions if e.status == status] 

548 

549 return executions 

550 

551 def register_global_action( 

552 self, 

553 name: str, 

554 action: Callable[[Dict[str, Any]], Any] 

555 ) -> None: 

556 """ 

557 Register a global action that can be used in workflows. 

558 

559 Args: 

560 name: The action name 

561 action: The action function 

562 """ 

563 self._global_actions[name] = action 

564 

565 def get_global_action(self, name: str) -> Optional[Callable]: 

566 """Get a registered global action.""" 

567 return self._global_actions.get(name) 

568 

569 def create_step( 

570 self, 

571 step_id: str, 

572 name: str, 

573 step_type: StepType = StepType.ACTION, 

574 action: Optional[Callable] = None, 

575 condition: Optional[Callable] = None, 

576 transform: Optional[Callable] = None, 

577 next_step: Optional[str] = None, 

578 on_true: Optional[str] = None, 

579 on_false: Optional[str] = None, 

580 delay_seconds: float = 0, 

581 parallel_steps: Optional[List[str]] = None, 

582 subprocess_id: Optional[str] = None, 

583 metadata: Optional[Dict[str, Any]] = None 

584 ) -> WorkflowStep: 

585 """ 

586 Factory method to create a workflow step. 

587 

588 Args: 

589 step_id: Unique step ID 

590 name: Step name 

591 step_type: Type of step 

592 action: Action function (for ACTION type) 

593 condition: Condition function (for CONDITION type) 

594 transform: Transform function (for TRANSFORM type) 

595 next_step: Default next step 

596 on_true: Next step if condition is true 

597 on_false: Next step if condition is false 

598 delay_seconds: Delay in seconds (for DELAY type) 

599 parallel_steps: List of step IDs (for PARALLEL type) 

600 subprocess_id: Workflow ID (for SUBPROCESS type) 

601 metadata: Optional metadata 

602 

603 Returns: 

604 The created WorkflowStep 

605 """ 

606 return WorkflowStep( 

607 id=step_id, 

608 name=name, 

609 step_type=step_type, 

610 action=action, 

611 condition=condition, 

612 transform=transform, 

613 next_step=next_step, 

614 on_true=on_true, 

615 on_false=on_false, 

616 delay_seconds=delay_seconds, 

617 parallel_steps=parallel_steps or [], 

618 subprocess_id=subprocess_id, 

619 metadata=metadata or {} 

620 )