Skip to content

Planner

from workflow_auto_assembler import LlmHandler

llm_handler = LlmHandler(
    llm_h_type="ollama",
    llm_h_params={
        "connection_string": "http://localhost:11434",
        "model_name": "gpt-oss:20b"
    }
)
from workflow_auto_assembler import create_avc_items, LlmFunctionItemInput

from typing import Type
from pydantic import BaseModel, Field

# --- Example usage ---

# Define mock functions and their associated Pydantic models:

# 1. get_weather function


class GetWeatherInput(BaseModel):
    city: str = Field(..., description="Name of the city for which weather to be extracted.")

class GetWeatherOutput(BaseModel):
    condition: str = Field(..., description="Weather condition in the requested city.")
    temperature: float = Field(..., description="Termperature in C in the requested city.")
    humidity: float = Field(None, description="Name of the city for which weather to be extracted.")

def get_weather(inputs: GetWeatherInput) -> GetWeatherOutput:
    """Get the current weather for a city from weather forcast api."""
    return GetWeatherOutput(
        condition = "Sunny",
        temperature = 20,
        humidity = 0.6
    )

# 2. send_report_email function

class EmailInformationPoint(BaseModel):
    title: str = Field(None, description="Few word description of the information.")
    content: str = Field(..., description="Content of the information.")

class SendReportEmailInput(BaseModel):
    city: str = Field(..., description="Name of the city where report will be send.")
    information: list[EmailInformationPoint]

class SendReportEmailOutput(BaseModel):
    email_sent: bool = Field(..., description="Conformation that email was send successfully.")
    message: str = Field(None, description="Optional comments from the process.")

def send_report_email(inputs: SendReportEmailInput) -> SendReportEmailOutput:
    """Sends a report email with given information points to a city."""
    return SendReportEmailOutput(
        email_sent = True,
        message = "Email sent to city of your choosing!"
    )

# 3. query_database function

class QueryDatabaseInput(BaseModel):
    topic: str = Field(..., description="Topic of a requested piece of information.")
    location: str = Field(None, description="Filter for location name.")
    uid: str = Field(None, description="Filter for unique indentifier of the database item.")

class QueryDatabaseOutput(BaseModel):
    info: str = Field(..., description="Content of the information.")
    uid: str = Field(None, description="Unique indentifier of the database item.")

def query_database(inputs : QueryDatabaseInput) -> QueryDatabaseOutput:
    """Get information from the database with provided filters."""
    return QueryDatabaseOutput(
        info = "Content extracted from the database for your query is ...",
        uid = "0000"
    )

# Create structured items for each function
available_tools = create_avc_items(functions = [
    LlmFunctionItemInput(**{"func" : get_weather , "input_model" : GetWeatherInput, "output_model" : GetWeatherOutput}),
    LlmFunctionItemInput(**{"func" : send_report_email , "input_model" : SendReportEmailInput, "output_model" : SendReportEmailOutput}),
    LlmFunctionItemInput(**{"func" : query_database , "input_model" : QueryDatabaseInput, "output_model" : QueryDatabaseOutput})
])

available_functions = available_tools["available_functions"]

available_callables = available_tools["available_callables"]

0. Initialize Planner

from workflow_auto_assembler import WorkflowPlanner, WorkflowErrorType, WorkflowError
import logging

wp = WorkflowPlanner(
    workflow_error = WorkflowError,
    workflow_error_types = WorkflowErrorType,
    llm_h = llm_handler,
    available_functions=available_functions,
    loggerLvl = logging.DEBUG)

1. Generating simple workflow using available functions (no input or output model)

task_description = """Query database to find information on birds and get latest weather for Berlin, then send an email there."""

planned_workflow_obj = await wp.generate_workflow(
    task_description=task_description)

planned_workflow_obj.workflow
[{'name': 'query_database', 'args': {'topic': 'birds', 'location': 'Berlin'}},
 {'name': 'get_weather', 'args': {'city': 'Berlin'}},
 {'name': 'send_report_email',
  'args': {'city': 'Berlin',
   'information': [{'title': 'Bird Information',
     'content': 'source: query_database.output.info'},
    {'title': 'Weather',
     'content': 'Condition: source: get_weather.output.condition, Temperature: source: get_weather.output.temperature°C'}]}}]

2. Generating simple workflow using available functions (no output model)

task_description_a = """Query database to find information on birds and get latest weather for the city, then send an email there."""

class WfInputs(BaseModel):
    city: str = Field(..., description="Name of the city for which weather to be extracted.")

planned_workflow_obj_a = await wp.generate_workflow(
    input_model = WfInputs,
    task_description=task_description_a)

planned_workflow_obj_a.workflow
[{'name': 'query_database', 'args': {'topic': 'birds'}},
 {'name': 'get_weather', 'args': {'city': 'source: input_model.output.city'}},
 {'name': 'send_report_email',
  'args': {'city': 'source: input_model.output.city',
   'information': [{'title': 'Bird Information',
     'content': 'source: query_database.output.info'},
    {'title': 'Weather Condition',
     'content': 'source: get_weather.output.condition'},
    {'title': 'Temperature (C)',
     'content': 'source: get_weather.output.temperature'}]}}]

3. Generating simple workflow using available functions

task_description_b = """Query database to find information on birds and get latest weather for the city, then send an email there."""

class WfInputs(BaseModel):
    city: str = Field(..., description="Name of the city for which weather to be extracted.")

class WfOutputs(BaseModel):
    city: str = Field(..., description="Name of the city for which weather was extracted.")
    information: list[EmailInformationPoint] = Field(default=[], description="Information sent via email.")

planned_workflow_obj_b = await wp.generate_workflow(
    input_model = WfInputs,
    output_model = WfOutputs,
    task_description=task_description_b)

planned_workflow_obj_b.workflow
DEBUG:WorkflowPlanner:Reset caused by PLANNING_OUTPUT_MODEL type error!
DEBUG:WorkflowPlanner:Attempt: 1





[{'name': 'query_database', 'args': {'topic': 'birds'}},
 {'name': 'get_weather', 'args': {'city': 'source: input_model.output.city'}},
 {'name': 'send_report_email',
  'args': {'city': 'source: input_model.output.city',
   'information': [{'title': 'Birds',
     'content': 'source: query_database.output.info'},
    {'title': 'Weather',
     'content': 'Condition: source: get_weather.output.condition, Temperature: source: get_weather.output.temperature'}]}},
 {'name': 'output_model',
  'args': {'city': 'source: input_model.output.city',
   'information': [{'title': 'Birds',
     'content': 'source: query_database.output.info'},
    {'title': 'Weather',
     'content': 'Condition: source: get_weather.output.condition, Temperature: source: get_weather.output.temperature'}]}}]

Resetting errors uncovered post planning

from workflow_auto_assembler import WorkflowError, WorkflowErrorType
Resetting based on runner error
class GetWeatherInput(BaseModel):
    city: str = Field(..., description="Name of the city for which weather to be extracted.")

class GetWeatherOutput(BaseModel):
    condition: str = Field(..., description="Weather condition in the requested city.")
    temperature: float = Field(..., description="Termperature in C in the requested city.")
    humidity: float = Field(None, description="Name of the city for which weather to be extracted.")

def get_weather(inputs: GetWeatherInput) -> GetWeatherOutput:
    """Get the current weather for a city from weather forcast api."""
    return GetWeatherOutput(
        condition = "Sunny",
        temperature = 20,
        humidity = 0.6
    )

def get_weather_report(inputs: GetWeatherInput) -> GetWeatherOutput:
    """Get the current weather for a city from weather forcast api (Improved)."""
    return GetWeatherOutput(
        condition = "Sunny",
        temperature = 20,
        humidity = 0.6
    )
# Create structured items for each function

available_tools2 = create_avc_items(functions = [
    LlmFunctionItemInput(**{"func" : get_weather , "input_model" : GetWeatherInput, "output_model" : GetWeatherOutput}),
    LlmFunctionItemInput(**{"func" : send_report_email , "input_model" : SendReportEmailInput, "output_model" : SendReportEmailOutput}),
    LlmFunctionItemInput(**{"func" : query_database , "input_model" : QueryDatabaseInput, "output_model" : QueryDatabaseOutput}),
     LlmFunctionItemInput(**{"func" : get_weather_report , "input_model" : GetWeatherInput, "output_model" : GetWeatherOutput})
])

available_functions2 = available_tools2["available_functions"]

available_callables2 = available_tools2["available_callables"]
runner_error = WorkflowError(
    error_message = 'Traceback (most recent call last):\n  File "/home/kyriosskia/miniforge3/envs/testenv/lib/python3.10/site-packages/workflow_auto_assembler/workflow_auto_assembler.py", line 780, in run_workflow\n    func_item = [av for av in available_functions \\\nIndexError: list index out of range\n',
    error_type = WorkflowErrorType.RUNNER,
    additional_info = {
        "ffunction" : "get_weather"
    }
)
import logging

wp2 = WorkflowPlanner(
    workflow_error = WorkflowError,
    workflow_error_types = WorkflowErrorType,
    llm_h = llm_handler,
    available_functions=available_functions2,
    loggerLvl = logging.DEBUG)

planned_workflow_obj_b.errors.append(runner_error)

planned_workflow_obj_tb = await wp2.generate_workflow(planned_workflow = planned_workflow_obj_b)

DEBUG:WorkflowPlanner:Reset caused by RUNNER type error!
DEBUG:WorkflowPlanner:Attempt: 2
planned_workflow_obj_tb.workflow
[{'name': 'query_database', 'args': {'topic': 'birds'}},
 {'name': 'get_weather_report',
  'args': {'city': 'source: input_model.output.city'}},
 {'name': 'send_report_email',
  'args': {'city': 'source: input_model.output.city',
   'information': [{'title': 'Birds',
     'content': 'source: query_database.output.info'},
    {'title': 'Weather',
     'content': 'Condition: source: get_weather_report.output.condition, Temperature: source: get_weather_report.output.temperature'}]}},
 {'name': 'output_model',
  'args': {'city': 'source: input_model.output.city',
   'information': [{'title': 'Birds',
     'content': 'source: query_database.output.info'},
    {'title': 'Weather',
     'content': 'Condition: source: get_weather_report.output.condition, Temperature: source: get_weather_report.output.temperature'}]}}]
Resetting based on hallusinated function
from workflow_auto_assembler import WorkflowPlannerResponse
planned_workflow_obj_b3 = WorkflowPlannerResponse(
    **{'include_output' :True ,
    'include_input' : True,
    'retries': 0,
 'workflow': [{'name': 'query_database_data', 'args': {'topic': 'birds'}},
  {'name': 'get_weather', 'args': {'city': 'source: WfInputs.output.city'}},
  {'name': 'send_report_email',
   'args': {'city': 'source: WfInputs.output.city',
    'information': [{'title': 'Weather Condition',
      'content': 'source: get_weather.output.condition'},
     {'title': 'Temperature',
      'content': 'source: get_weather.output.temperature'},
     {'title': 'Birds Info',
      'content': 'source: query_database_data.output.info'}]}}],
 'init_messages': [{'role': 'system',
   'content': '## Role\nYou are a Workflow Agent tasked with creating a complete workflow for a given task.  Your workflow must be constructed solely from calls to the functions provided. Each workflow should be represented as a JSON list, where each element is an object representing a single function call. For any function input that is meant to be filled using the output of a previous step rather than provided directly, indicate this using the format: "source: <previous_function_name>.output.<field_name>".\n\n## Output Requirements\n- **Respond ONLY with valid JSON** — specifically, an **array** of objects. - **Do NOT** provide any additional commentary, explanations, or text outside this JSON array. - Each object in the array must represent **one tool call**, with **exactly** the following fields:\n  1. `"name"` (string) - the tool\'s name from the list below.\n  2. `"args"` (object) - any arguments the tool requires.\n- **Important:** If a function argument is intended to be sourced from a previous step\'s output, indicate this using the format: "source: <previous_function_name>.output.<field_name>".\n  \n### Expected Format\n[\n  {\n    "name": "function_name_1",\n    "args": {}\n  },\n  ...,\n  {\n    "name": "function_name_n",\n    "args": {"arg1": "value for arg1"}\n  }\n]\n\n## Available functions\nBelow is the list of available functions that you can use to build your workflow. Each function is defined by its name, a description, and a JSON schema for its parameters. For example, the function list is as follows:\n[{"name": "get_weather", "description": "Get the current weather for a city from weather forcast api.", "input_schema_json": {"properties": {"city": {"description": "Name of the city for which weather to be extracted.", "title": "City", "type": "string"}}, "required": ["city"], "title": "GetWeatherInput", "type": "object"}, "output_schema_json": {"properties": {"condition": {"description": "Weather condition in the requested city.", "title": "Condition", "type": "string"}, "temperature": {"description": "Termperature in C in the requested city.", "title": "Temperature", "type": "number"}, "humidity": {"default": null, "description": "Name of the city for which weather to be extracted.", "title": "Humidity", "type": "number"}}, "required": ["condition", "temperature"], "title": "GetWeatherOutput", "type": "object"}}, {"name": "send_report_email", "description": "Sends a report email with given information points to a city.", "input_schema_json": {"$defs": {"EmailInformationPoint": {"properties": {"title": {"default": null, "description": "Few word description of the information.", "title": "Title", "type": "string"}, "content": {"description": "Content of the information.", "title": "Content", "type": "string"}}, "required": ["content"], "title": "EmailInformationPoint", "type": "object"}}, "properties": {"city": {"description": "Name of the city where report will be send.", "title": "City", "type": "string"}, "information": {"items": {"$ref": "#/$defs/EmailInformationPoint"}, "title": "Information", "type": "array"}}, "required": ["city", "information"], "title": "SendReportEmailInput", "type": "object"}, "output_schema_json": {"properties": {"email_sent": {"description": "Conformation that email was send successfully.", "title": "Email Sent", "type": "boolean"}, "message": {"default": null, "description": "Optional comments from the process.", "title": "Message", "type": "string"}}, "required": ["email_sent"], "title": "SendReportEmailOutput", "type": "object"}}, {"name": "query_database", "description": "Get information from the database with provided filters.", "input_schema_json": {"properties": {"topic": {"description": "Topic of a requested piece of information.", "title": "Topic", "type": "string"}, "location": {"default": null, "description": "Filter for location name.", "title": "Location", "type": "string"}, "uid": {"default": null, "description": "Filter for unique indentifier of the database item.", "title": "Uid", "type": "string"}}, "required": ["topic"], "title": "QueryDatabaseInput", "type": "object"}, "output_schema_json": {"properties": {"info": {"description": "Content of the information.", "title": "Info", "type": "string"}, "uid": {"default": null, "description": "Unique indentifier of the database item.", "title": "Uid", "type": "string"}}, "required": ["info"], "title": "QueryDatabaseOutput", "type": "object"}}]\n\n'},
  {'role': 'user',
   'content': 'Query database to find information on birds and get latest weather for the city, then send an email there.\n--- The following is the expected input model for the workflow, reference values from it if necessesary, given the task with  format "source: input_model.output.<field_name>". \n{\'properties\': {\'city\': {\'description\': \'Name of the city for which weather to be extracted.\', \'title\': \'City\', \'type\': \'string\'}}, \'required\': [\'city\'], \'title\': \'WfInputs\', \'type\': \'object\'}\n---\n\n--- The following is the expected output model for the workflow, outputs from functions selected in the workflow should  be able to populate its fields, given the task with format "source: <previous_workflow_step>.output.<field_name>" or "source: input_model.output.<field_name>". \n{\'$defs\': {\'EmailInformationPoint\': {\'properties\': {\'title\': {\'default\': None, \'description\': \'Few word description of the information.\', \'title\': \'Title\', \'type\': \'string\'}, \'content\': {\'description\': \'Content of the information.\', \'title\': \'Content\', \'type\': \'string\'}}, \'required\': [\'content\'], \'title\': \'EmailInformationPoint\', \'type\': \'object\'}}, \'properties\': {\'city\': {\'description\': \'Name of the city for which weather was extracted.\', \'title\': \'City\', \'type\': \'string\'}, \'information\': {\'default\': [], \'description\': \'Information sent via email.\', \'items\': {\'$ref\': \'#/$defs/EmailInformationPoint\'}, \'title\': \'Information\', \'type\': \'array\'}}, \'required\': [\'city\'], \'title\': \'WfOutputs\', \'type\': \'object\'}\n---\n\n'}],
 'additional_messages': [],
 'errors': []}
)
planning_error = WorkflowError(
    error_message = 'Traceback (most recent call last):\n  File "/home/kyriosskia/miniforge3/envs/testenv/lib/python3.10/site-packages/workflow_auto_assembler/workflow_auto_assembler.py", line 780, in run_workflow\n    func_item = [av for av in available_functions \\\nIndexError: list index out of range\n',
    error_type = WorkflowErrorType.PLANNING_HF,
)
import logging

wp3 = WorkflowPlanner(
    workflow_error = WorkflowError,
    workflow_error_types = WorkflowErrorType,
    llm_h = llm_handler,
    available_functions=available_functions,
    loggerLvl = logging.DEBUG)


planned_workflow_obj_b3.errors.append(planning_error)
planned_workflow_obj_b3.additional_messages = []

planned_workflow_obj_tb3 = await wp3.generate_workflow(planned_workflow = planned_workflow_obj_b3, output_model = WfOutputs)
DEBUG:WorkflowPlanner:Reset caused by PLANNING_HF type error!
DEBUG:WorkflowPlanner:Attempt: 1


DEBUG:WorkflowPlanner:Reset caused by PLANNING_MISSOUTPUT type error!
DEBUG:WorkflowPlanner:Attempt: 2
DEBUG:WorkflowPlanner:Reset caused by PLANNING_OUTPUT_MODEL type error!
DEBUG:WorkflowPlanner:Attempt: 3
planned_workflow_obj_tb3.workflow
[{'name': 'query_database', 'args': {'topic': 'birds'}},
 {'name': 'get_weather', 'args': {'city': 'source: input_model.output.city'}},
 {'name': 'send_report_email',
  'args': {'city': 'source: input_model.output.city',
   'information': [{'title': 'Bird Information',
     'content': 'source: query_database.output.info'},
    {'title': 'Weather', 'content': 'source: get_weather.output.condition'}]}},
 {'name': 'output_model',
  'args': {'city': 'source: input_model.output.city',
   'information': [{'title': 'Bird Information',
     'content': 'source: query_database.output.info'},
    {'title': 'Weather Condition',
     'content': 'source: get_weather.output.condition'}]}}]