You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
3.6 KiB
Python

import logging
from agentkit.context import ConversationContext
from agentkit.types import AgentKitFlowStep
class BreakLoopInterrupt(Exception):
pass
class AgentKitFlowExecutor:
def __init__(self, flow_script: list[AgentKitFlowStep]):
self.flow = flow_script
def _execute_step(self, step: AgentKitFlowStep, context: ConversationContext):
"""
Execute a single flow step.
Args:
step (dict): The step to execute.
Returns:
Any: The result of the step execution.
"""
step_type = step["type"]
if step_type == "call":
return self._execute_call(step, context)
elif step_type == "if_else":
return self._execute_if_else(step, context)
elif step_type == "loop":
return self._execute_loop(step)
elif step_type == "break_loop":
return self._execute_break_loop(step)
else:
raise ValueError(f"Unsupported step type: {step_type}")
def _execute_call(self, step: dict, context: ConversationContext):
"""
Execute a 'call' step.
Args:
step (dict): The call step configuration.
Returns:
Any: The result of the call.
"""
func_id = step.get("id")
config = {k: self._resolve_value(v) for k, v in step.get("config", {}).items()}
output_map = step.get("output_map", {})
# Simulate calling a function (replace with actual function calls if needed)
logging.info(f"Calling function {func_id} with config: {config}")
result = {key: f"mocked_value_{key}" for key in output_map.keys()} # Mocked output
# Map outputs to variables
for key, var_name in output_map.items():
self.variables[var_name] = result.get(key)
return result
def _execute_if_else(self, step: dict, context: ConversationContext):
"""
Execute an 'if_else' step.
Args:
step (dict): The if_else step configuration.
Returns:
Any: The result of the executed branch.
"""
condition = step.get("condition")
condition_input = {k: self._resolve_value(v) for k, v in step.get("condition_input", {}).items()}
# Evaluate the condition
condition_result = eval(condition, {}, condition_input)
# Execute the appropriate branch
branch = step["true_branch"] if condition_result else step["false_branch"]
for sub_step in branch:
self._execute_step(sub_step, context)
def _execute_loop(self, step: dict, context: ConversationContext):
"""
Execute a 'loop' step.
Args:
step (dict): The loop step configuration.
Returns:
Any: The result of the loop execution.
"""
loop_num = step.get("loop_num")
index_var = step.get("index_var")
loop_body = step.get("loop_body", [])
for i in range(loop_num):
if index_var:
context.state[index_var] = i
for sub_step in loop_body:
self._execute_step(sub_step, context)
def _execute_break_loop(self):
"""
Execute a 'break_loop' step.
Args:
step (dict): The break_loop step configuration.
Returns:
None
"""
raise BreakLoopInterrupt()
def execute(self, context: ConversationContext):
"""
Execute the entire flow.
Returns:
None
"""
try:
for step in self.flow:
self._execute_step(step)
except BreakLoopInterrupt:
pass