Coverage for integrations / channels / automation / workflows.py: 37.4%
235 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Workflow Engine for HevolveBot Integration.
4Provides workflow definition and execution capabilities.
5"""
7import secrets
8from dataclasses import dataclass, field
9from datetime import datetime
10from enum import Enum
11from typing import Any, Callable, Dict, List, Optional, Union
12import copy
15class StepType(Enum):
16 """Types of workflow steps."""
17 ACTION = "action"
18 CONDITION = "condition"
19 LOOP = "loop"
20 PARALLEL = "parallel"
21 DELAY = "delay"
22 SUBPROCESS = "subprocess"
23 TRANSFORM = "transform"
26class WorkflowStatus(Enum):
27 """Status of a workflow execution."""
28 PENDING = "pending"
29 RUNNING = "running"
30 PAUSED = "paused"
31 COMPLETED = "completed"
32 FAILED = "failed"
33 CANCELLED = "cancelled"
36@dataclass
37class WorkflowStep:
38 """A single step in a workflow."""
39 id: str
40 name: str
41 step_type: StepType
42 action: Optional[Callable] = None
43 condition: Optional[Callable[[Dict[str, Any]], bool]] = None
44 transform: Optional[Callable[[Any], Any]] = None
45 on_true: Optional[str] = None # Next step if condition is true
46 on_false: Optional[str] = None # Next step if condition is false
47 next_step: Optional[str] = None # Default next step
48 delay_seconds: float = 0
49 retry_count: int = 0
50 max_retries: int = 3
51 parallel_steps: List[str] = field(default_factory=list)
52 subprocess_id: Optional[str] = None
53 metadata: Dict[str, Any] = field(default_factory=dict)
56@dataclass
57class Workflow:
58 """A complete workflow definition."""
59 id: str
60 name: str
61 description: str = ""
62 steps: Dict[str, WorkflowStep] = field(default_factory=dict)
63 entry_point: Optional[str] = None
64 version: str = "1.0.0"
65 created_at: datetime = field(default_factory=datetime.now)
66 updated_at: datetime = field(default_factory=datetime.now)
67 enabled: bool = True
68 tags: List[str] = field(default_factory=list)
69 metadata: Dict[str, Any] = field(default_factory=dict)
71 def add_step(self, step: WorkflowStep) -> None:
72 """Add a step to the workflow."""
73 self.steps[step.id] = step
74 if self.entry_point is None:
75 self.entry_point = step.id
76 self.updated_at = datetime.now()
78 def remove_step(self, step_id: str) -> bool:
79 """Remove a step from the workflow."""
80 if step_id in self.steps:
81 del self.steps[step_id]
82 if self.entry_point == step_id:
83 self.entry_point = next(iter(self.steps), None)
84 self.updated_at = datetime.now()
85 return True
86 return False
88 def get_step(self, step_id: str) -> Optional[WorkflowStep]:
89 """Get a step by ID."""
90 return self.steps.get(step_id)
93@dataclass
94class WorkflowExecution:
95 """Tracks the execution of a workflow."""
96 id: str
97 workflow_id: str
98 status: WorkflowStatus = WorkflowStatus.PENDING
99 current_step: Optional[str] = None
100 context: Dict[str, Any] = field(default_factory=dict)
101 results: Dict[str, Any] = field(default_factory=dict)
102 started_at: Optional[datetime] = None
103 completed_at: Optional[datetime] = None
104 error: Optional[str] = None
105 step_history: List[Dict[str, Any]] = field(default_factory=list)
108class WorkflowEngine:
109 """
110 Executes and manages workflows.
112 Features:
113 - Register workflow definitions
114 - Execute workflows with context
115 - Support for conditional branching
116 - Support for parallel execution
117 - Subprocess workflows
118 - Execution tracking and history
119 """
121 def __init__(self):
122 """Initialize the WorkflowEngine."""
123 self._workflows: Dict[str, Workflow] = {}
124 self._executions: Dict[str, WorkflowExecution] = {}
125 self._execution_history: List[WorkflowExecution] = []
126 self._global_actions: Dict[str, Callable] = {}
128 def register(
129 self,
130 workflow: Workflow = None,
131 workflow_id: Optional[str] = None,
132 name: Optional[str] = None,
133 description: str = "",
134 tags: Optional[List[str]] = None,
135 metadata: Optional[Dict[str, Any]] = None
136 ) -> Workflow:
137 """
138 Register a workflow.
140 Args:
141 workflow: Optional pre-built Workflow object
142 workflow_id: Optional custom ID (if not providing workflow)
143 name: Workflow name (if not providing workflow)
144 description: Workflow description
145 tags: Optional tags
146 metadata: Optional metadata
148 Returns:
149 The registered Workflow
150 """
151 if workflow is None:
152 workflow_id = workflow_id or f"wf_{secrets.token_hex(6)}"
153 name = name or f"Workflow {workflow_id}"
155 workflow = Workflow(
156 id=workflow_id,
157 name=name,
158 description=description,
159 tags=tags or [],
160 metadata=metadata or {}
161 )
163 if workflow.id in self._workflows:
164 raise ValueError(f"Workflow with ID '{workflow.id}' already exists")
166 self._workflows[workflow.id] = workflow
167 return workflow
169 def unregister(self, workflow_id: str) -> bool:
170 """
171 Unregister a workflow.
173 Args:
174 workflow_id: The workflow ID
176 Returns:
177 True if removed, False if not found
178 """
179 if workflow_id in self._workflows:
180 del self._workflows[workflow_id]
181 return True
182 return False
184 def get_workflow(self, workflow_id: str) -> Optional[Workflow]:
185 """
186 Get a workflow by ID.
188 Args:
189 workflow_id: The workflow ID
191 Returns:
192 The workflow or None
193 """
194 return self._workflows.get(workflow_id)
196 def list_workflows(
197 self,
198 enabled_only: bool = False,
199 tags: Optional[List[str]] = None
200 ) -> List[Workflow]:
201 """
202 List registered workflows.
204 Args:
205 enabled_only: Only return enabled workflows
206 tags: Optional filter by tags (any match)
208 Returns:
209 List of matching workflows
210 """
211 workflows = list(self._workflows.values())
213 if enabled_only:
214 workflows = [w for w in workflows if w.enabled]
216 if tags:
217 workflows = [w for w in workflows if any(t in w.tags for t in tags)]
219 return workflows
221 def run(
222 self,
223 workflow_id: str,
224 context: Optional[Dict[str, Any]] = None,
225 execution_id: Optional[str] = None
226 ) -> WorkflowExecution:
227 """
228 Execute a workflow.
230 Args:
231 workflow_id: The workflow to execute
232 context: Optional initial context/input data
233 execution_id: Optional custom execution ID
235 Returns:
236 The workflow execution record
238 Raises:
239 ValueError: If workflow not found or has no steps
240 """
241 workflow = self._workflows.get(workflow_id)
242 if not workflow:
243 raise ValueError(f"Workflow '{workflow_id}' not found")
245 if not workflow.steps:
246 raise ValueError(f"Workflow '{workflow_id}' has no steps")
248 if not workflow.entry_point:
249 raise ValueError(f"Workflow '{workflow_id}' has no entry point")
251 execution_id = execution_id or f"exec_{secrets.token_hex(8)}"
253 execution = WorkflowExecution(
254 id=execution_id,
255 workflow_id=workflow_id,
256 context=copy.deepcopy(context or {}),
257 current_step=workflow.entry_point,
258 status=WorkflowStatus.RUNNING,
259 started_at=datetime.now()
260 )
262 self._executions[execution_id] = execution
264 # Execute the workflow
265 try:
266 self._execute_workflow(workflow, execution)
267 except Exception as e:
268 execution.status = WorkflowStatus.FAILED
269 execution.error = str(e)
271 execution.completed_at = datetime.now()
272 self._execution_history.append(execution)
274 return execution
276 def _execute_workflow(
277 self,
278 workflow: Workflow,
279 execution: WorkflowExecution
280 ) -> None:
281 """Execute workflow steps."""
282 max_steps = 1000 # Prevent infinite loops
283 step_count = 0
285 while execution.current_step and step_count < max_steps:
286 step_count += 1
287 step = workflow.get_step(execution.current_step)
289 if not step:
290 raise ValueError(f"Step '{execution.current_step}' not found")
292 # Record step entry
293 step_record = {
294 "step_id": step.id,
295 "step_name": step.name,
296 "started_at": datetime.now().isoformat(),
297 "result": None,
298 "error": None
299 }
301 try:
302 next_step = self._execute_step(step, execution)
303 step_record["result"] = execution.results.get(step.id)
304 step_record["next_step"] = next_step
305 execution.current_step = next_step
307 except Exception as e:
308 step_record["error"] = str(e)
310 # Retry logic
311 if step.retry_count < step.max_retries:
312 step.retry_count += 1
313 step_record["retry"] = step.retry_count
314 continue
315 else:
316 raise
318 finally:
319 step_record["completed_at"] = datetime.now().isoformat()
320 execution.step_history.append(step_record)
322 if step_count >= max_steps:
323 raise RuntimeError("Maximum step count exceeded - possible infinite loop")
325 execution.status = WorkflowStatus.COMPLETED
327 def _execute_step(
328 self,
329 step: WorkflowStep,
330 execution: WorkflowExecution
331 ) -> Optional[str]:
332 """
333 Execute a single workflow step.
335 Returns:
336 The ID of the next step, or None if workflow is complete
337 """
338 if step.step_type == StepType.ACTION:
339 return self._execute_action_step(step, execution)
341 elif step.step_type == StepType.CONDITION:
342 return self._execute_condition_step(step, execution)
344 elif step.step_type == StepType.TRANSFORM:
345 return self._execute_transform_step(step, execution)
347 elif step.step_type == StepType.DELAY:
348 return self._execute_delay_step(step, execution)
350 elif step.step_type == StepType.PARALLEL:
351 return self._execute_parallel_step(step, execution)
353 elif step.step_type == StepType.SUBPROCESS:
354 return self._execute_subprocess_step(step, execution)
356 else:
357 raise ValueError(f"Unknown step type: {step.step_type}")
359 def _execute_action_step(
360 self,
361 step: WorkflowStep,
362 execution: WorkflowExecution
363 ) -> Optional[str]:
364 """Execute an action step."""
365 if step.action:
366 result = step.action(execution.context)
367 execution.results[step.id] = result
369 # Update context if result is a dict
370 if isinstance(result, dict):
371 execution.context.update(result)
373 return step.next_step
375 def _execute_condition_step(
376 self,
377 step: WorkflowStep,
378 execution: WorkflowExecution
379 ) -> Optional[str]:
380 """Execute a condition step."""
381 if step.condition:
382 result = step.condition(execution.context)
383 execution.results[step.id] = result
385 if result:
386 return step.on_true
387 else:
388 return step.on_false
390 return step.next_step
392 def _execute_transform_step(
393 self,
394 step: WorkflowStep,
395 execution: WorkflowExecution
396 ) -> Optional[str]:
397 """Execute a transform step."""
398 if step.transform:
399 # Get input from context or previous results
400 input_data = execution.context.get("_transform_input", execution.context)
401 result = step.transform(input_data)
402 execution.results[step.id] = result
404 # Update context
405 if isinstance(result, dict):
406 execution.context.update(result)
407 else:
408 execution.context["_transform_output"] = result
410 return step.next_step
412 def _execute_delay_step(
413 self,
414 step: WorkflowStep,
415 execution: WorkflowExecution
416 ) -> Optional[str]:
417 """Execute a delay step."""
418 import time
420 if step.delay_seconds > 0:
421 # In a real implementation, this might be async
422 # For testing, we just record it
423 execution.results[step.id] = {
424 "delayed": True,
425 "seconds": step.delay_seconds
426 }
427 # Simulate a small delay for testing
428 time.sleep(min(step.delay_seconds, 0.1))
430 return step.next_step
432 def _execute_parallel_step(
433 self,
434 step: WorkflowStep,
435 execution: WorkflowExecution
436 ) -> Optional[str]:
437 """Execute parallel steps."""
438 workflow = self._workflows.get(execution.workflow_id)
439 if not workflow:
440 raise ValueError("Workflow not found")
442 results = {}
443 for parallel_step_id in step.parallel_steps:
444 parallel_step = workflow.get_step(parallel_step_id)
445 if parallel_step:
446 # Execute each parallel step
447 # In a real implementation, this would be concurrent
448 try:
449 self._execute_step(parallel_step, execution)
450 results[parallel_step_id] = execution.results.get(parallel_step_id)
451 except Exception as e:
452 results[parallel_step_id] = {"error": str(e)}
454 execution.results[step.id] = results
455 return step.next_step
457 def _execute_subprocess_step(
458 self,
459 step: WorkflowStep,
460 execution: WorkflowExecution
461 ) -> Optional[str]:
462 """Execute a subprocess (nested workflow)."""
463 if step.subprocess_id:
464 subprocess_result = self.run(
465 step.subprocess_id,
466 context=execution.context.copy()
467 )
468 execution.results[step.id] = {
469 "subprocess_id": subprocess_result.id,
470 "status": subprocess_result.status.value,
471 "results": subprocess_result.results
472 }
474 # Merge subprocess context back
475 execution.context.update(subprocess_result.context)
477 return step.next_step
479 def pause_execution(self, execution_id: str) -> bool:
480 """
481 Pause a running execution.
483 Args:
484 execution_id: The execution ID
486 Returns:
487 True if paused, False if not found or not running
488 """
489 if execution_id in self._executions:
490 execution = self._executions[execution_id]
491 if execution.status == WorkflowStatus.RUNNING:
492 execution.status = WorkflowStatus.PAUSED
493 return True
494 return False
496 def cancel_execution(self, execution_id: str) -> bool:
497 """
498 Cancel an execution.
500 Args:
501 execution_id: The execution ID
503 Returns:
504 True if cancelled, False if not found
505 """
506 if execution_id in self._executions:
507 execution = self._executions[execution_id]
508 if execution.status in (WorkflowStatus.PENDING, WorkflowStatus.RUNNING, WorkflowStatus.PAUSED):
509 execution.status = WorkflowStatus.CANCELLED
510 execution.completed_at = datetime.now()
511 return True
512 return False
514 def get_execution(self, execution_id: str) -> Optional[WorkflowExecution]:
515 """
516 Get an execution by ID.
518 Args:
519 execution_id: The execution ID
521 Returns:
522 The execution or None
523 """
524 return self._executions.get(execution_id)
526 def list_executions(
527 self,
528 workflow_id: Optional[str] = None,
529 status: Optional[WorkflowStatus] = None
530 ) -> List[WorkflowExecution]:
531 """
532 List workflow executions.
534 Args:
535 workflow_id: Optional filter by workflow
536 status: Optional filter by status
538 Returns:
539 List of matching executions
540 """
541 executions = list(self._executions.values())
543 if workflow_id:
544 executions = [e for e in executions if e.workflow_id == workflow_id]
546 if status:
547 executions = [e for e in executions if e.status == status]
549 return executions
551 def register_global_action(
552 self,
553 name: str,
554 action: Callable[[Dict[str, Any]], Any]
555 ) -> None:
556 """
557 Register a global action that can be used in workflows.
559 Args:
560 name: The action name
561 action: The action function
562 """
563 self._global_actions[name] = action
565 def get_global_action(self, name: str) -> Optional[Callable]:
566 """Get a registered global action."""
567 return self._global_actions.get(name)
569 def create_step(
570 self,
571 step_id: str,
572 name: str,
573 step_type: StepType = StepType.ACTION,
574 action: Optional[Callable] = None,
575 condition: Optional[Callable] = None,
576 transform: Optional[Callable] = None,
577 next_step: Optional[str] = None,
578 on_true: Optional[str] = None,
579 on_false: Optional[str] = None,
580 delay_seconds: float = 0,
581 parallel_steps: Optional[List[str]] = None,
582 subprocess_id: Optional[str] = None,
583 metadata: Optional[Dict[str, Any]] = None
584 ) -> WorkflowStep:
585 """
586 Factory method to create a workflow step.
588 Args:
589 step_id: Unique step ID
590 name: Step name
591 step_type: Type of step
592 action: Action function (for ACTION type)
593 condition: Condition function (for CONDITION type)
594 transform: Transform function (for TRANSFORM type)
595 next_step: Default next step
596 on_true: Next step if condition is true
597 on_false: Next step if condition is false
598 delay_seconds: Delay in seconds (for DELAY type)
599 parallel_steps: List of step IDs (for PARALLEL type)
600 subprocess_id: Workflow ID (for SUBPROCESS type)
601 metadata: Optional metadata
603 Returns:
604 The created WorkflowStep
605 """
606 return WorkflowStep(
607 id=step_id,
608 name=name,
609 step_type=step_type,
610 action=action,
611 condition=condition,
612 transform=transform,
613 next_step=next_step,
614 on_true=on_true,
615 on_false=on_false,
616 delay_seconds=delay_seconds,
617 parallel_steps=parallel_steps or [],
618 subprocess_id=subprocess_id,
619 metadata=metadata or {}
620 )