# Importing libraries import asyncio from email.header import decode_header from fastmcp import FastMCP, Client from src.utils.cache import RedisCacheManagement from src.handlers.response_handler import build_final_cache_response from src.handlers.email_handler import get_action_detail, update_email_body,redeem_points_email_handler,smtp_port import imaplib from email.utils import parsedate_to_datetime from flask import Flask, request, jsonify, session, Response, abort, stream_with_context from flask_session import Session from flask_cors import CORS from uuid import uuid4 from agent_generic_streamlit import CustomOpenAIAgents, TopicModule, AgentModule, ActionModule, \ KaptureGlobalSearchModule, ProductRecommendationModule, ActionWithoutTopicModule, generate_content_kapture from agent_generic_streamlit import chat_with_agent, display_welcome_message, RedisStateStore, LongTermMemoryManager, send_data_opensearch_for_dashboard, identify_case_type,open_source_chat_with_agent, LLMConfiguration, user_input_validation from agent_generic_streamlit_lang import graph_store, CustomConfigAgents, get_or_build_graph, \ chat_with_langgraph_agent_flow_builder, get_all_attached_flow_details_with_topic, get_updated_flow_list, \ send_flow_notification, get_action_details, FlowModule, audit_scheduler_task, get_flow_name, get_previous_flows, \ CustomUserInput from build_multi_agents import final_formatting_of_responses, get_all_attached_flow_details_with_topic2, MultiAgentState, MultiAgentToolModule,get_supervisor_agent_details,get_all_agent_details from sentence_transformers import SentenceTransformer, util import traceback import spacy import re import json from huggingface_hub import login from flask_redis import FlaskRedis from datetime import datetime, timezone, timedelta from langchain_core.messages import SystemMessage, HumanMessage import asyncio import uuid import logging import atexit from flask import Flask, request, jsonify from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.triggers.cron import CronTrigger from apscheduler.jobstores.redis import RedisJobStore from apscheduler.triggers.date import DateTrigger from pytz import timezone as pytztimezone from pytz.exceptions import UnknownTimeZoneError from get_patient_information import parse_paragraphs from redis import Redis from ai_solution import process_prescription from ai_solution_attribute_extraction import AIPrescriptionProcessor import os from dotenv import load_dotenv from imapclient import IMAPClient import email from email.header import decode_header from datetime import datetime import time from langchain_openai import ChatOpenAI from langchain.prompts import ChatPromptTemplate import threading from build_multi_agents import _process_topics_concurrent_helper,call_to_get_topic_and_tone_details,generate_hash_key,get_tone_details,_process_topics_concurrent_helper2,process_all_agents_parallel from langgraph_supervisor import create_supervisor from opensearchpy import OpenSearch import secrets import string from agentic_rag import TicketSummarizer from rank_bm25 import BM25Okapi from difflib import SequenceMatcher from build_mcp_tools import add_mcp_tools,delete_mcp_tool,get_tools_on_tag from langchain_mcp_adapters.client import MultiServerMCPClient from llama_index.tools.mcp import BasicMCPClient,McpToolSpec from fastmcp.client.auth import OAuth from fastmcp.client import StreamableHttpTransport from agent_creation_helper_function import AgentCreationHelper, TopicCreationHelper, FlowCreationHelper, get_all_actions, get_top_relevant_actions from langgraph.checkpoint.redis import RedisSaver from build_mcp_tools import generate_token from security.extract_sensitive_data import IdentifySensitiveData,EncryptionData,load_list_for_key,flush_tempfile,PHIEliminationPipeline from anamoly_detection.loyalty_transcations_isolation_forest import LoyaltyAnomalyDetector from agentic_rag import AgenticRagDataExtractionModule from src.handlers.response_handler import extract_parsed_json from memory.memory_management import AgentMemoryManager,SummaryMemoryManager from llama_index.core.llms import ChatMessage from anamoly_detection.utils import update_member_miles,execute_anomaly_detection # pii_engine = IdentifySensitiveData() identify_data_engine = PHIEliminationPipeline() encryption_engine = EncryptionData() anamoly_detection_engine = LoyaltyAnomalyDetector() rag_data_class = AgenticRagDataExtractionModule() in_memory_store = RedisSaver( redis_url="redis://localhost:6379") checkpointer = RedisSaver( redis_url="redis://localhost:6379") sub_agents_store = {} redis_client2 = Redis(host="localhost", port=6379, db=3) QUEUE_KEY = "mail_queue" PROCESSED_KEY = "processed_uids" load_dotenv() topic_module_obj = TopicModule() agent_module_object = AgentModule() action_module_object = ActionModule() kapture_module_object = KaptureGlobalSearchModule() question_module_object = ProductRecommendationModule() custom_config_agent_obj = CustomConfigAgents() flow_module_object = FlowModule() multi_agent_tool_object = MultiAgentToolModule() ticket_summ_obj = TicketSummarizer() llm_config_obj = LLMConfiguration() cache_mang_obj = RedisCacheManagement() # logging.basicConfig(level=logging.DEBUG) # logging.basicConfig(level=logging.INFO) # logger = logging.getLogger(__name__) LOG_FILE = r"C:\Users\ManikantaSivaKumarBo\Downloads\nlp_research\49_Kapture-Agent-Reasoning-Engine\Agent_Dev_Server_deployables\Service Connect Agent\logfile.txt" print(logging.getLogger().handlers) logging.getLogger().handlers.clear() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] [PID:%(process)d] [Thread:%(threadName)s] [%(filename)s:%(lineno)d] %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize the Flask app & configuration application = Flask(__name__) application.config['CACHE_TYPE'] = 'simple' application.config['CACHE_DEFAULT_TIMEOUT'] = 300 application.config['SESSION_TYPE'] = 'filesystem' application.config['SESSION_COOKIE_NAME'] = 'agent_chatbot' application.config['SESSION_COOKIE_HTTPONLY'] = True application.config['SESSION_COOKIE_SECURE'] = True application.config['SESSION_COOKIE_SAMESITE'] = 'Lax' application.config['JSON_SORT_KEYS'] = False application.json.sort_keys = False Session(application) CORS(application) nlp = spacy.load("en_core_web_sm") STOPWORDS = nlp.Defaults.stop_words application.secret_key = os.urandom(24) # Handle Redis Memory related APIS memory_manager = LongTermMemoryManager() long_term_memory_manager = AgentMemoryManager() summary_long_term_manager = SummaryMemoryManager(long_term_memory_manager) # Get the agent objects separately to remove each call custom_agent_obj = CustomOpenAIAgents(long_term_memory_manager,summary_long_term_manager) # Load model for semantic caching login(token="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") model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") application.config['REDIS_URL'] = "redis://localhost:6379/0" redis_store = FlaskRedis(application) state_store = RedisStateStore(redis_store) mcp_endpoint = os.getenv("MCP_SERVER_URL") jobstores = { "default": RedisJobStore( host="localhost", port=6379, db=5, ) } client_mcp = Client("http://localhost:8000/mcp") langchain_mcp_client = MultiServerMCPClient({ "KaptureMCPServer": { "url": "http://localhost:8000/mcp", "transport": "streamable_http", } }) mcp_client = BasicMCPClient("http://localhost:8000/mcp") mcp_tool_spec = McpToolSpec(client=mcp_client) scheduler = BackgroundScheduler(executors={'default': ThreadPoolExecutor(max_workers=10)}, job_defaults={'coalesce': True, 'max_instances': 1},jobstores=jobstores) scheduler.start() redis_client = Redis(host="localhost", port=6379, db=6) date_format = "%Y-%m-%dT%H:%M:%S" pattern = r'\b[a-zA-Z]+\d+|\d+[a-zA-Z]+\b' failed_trigger_message = "Failed due to empty input, failed parsing the user_input" failed_start_flow_message = "Flow has failed to start" no_session_id_flow_message = "session id not found, flow not started" unexpected_error_message = "Unexpected Error" mime_type = "application/json" unexpected_error_occurred_message = "An unexpected error occurred" invalid_value_message = "Invalid value provided" missing_key_message = "Missing key in state or session" user_id_missing_message = "User ID is missing" user_id_not_provided_message = "User ID is not provided" flow_id_not_provided_message = "Flow ID is not provided" user_input_required_message = "User input is required" user_input_not_provided_message = "User input is not provided" session_id_missing_message = "Session ID is missing" invalid_json_payload_message = "Invalid or missing JSON payload" user_pk_id_missing_message = "user_pk_id is missing" payload_missing_message = "payload is missing" # llm = ChatOpenAI(model="gpt-4o", temperature=0, timeout =15,max_retries=2) llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3) def extract_keywords(text): text = text.lower() text = re.sub(r"[^a-z0-9 ]+", " ", text) tokens = [word for word in text.split() if word not in STOPWORDS] return tokens def keyword_overlap(prompt_keywords, agent_keywords): return len(set(prompt_keywords) & set(agent_keywords)) def fuzzy_score(a, b): if not a or not b: return 0 return SequenceMatcher(None, a.lower(), b.lower()).ratio() def extract_entities(query): """Extract important entities from a query.""" doc = nlp(query) entities = {ent.text.lower() for ent in doc.ents} # print(entities) return entities def normalize_response(response): if hasattr(response, "to_dict"): return response.to_dict() if not isinstance(response, (str, dict, list)): return str(response) return response def stream_events(generator): """Converts generator outputs into SSE.""" for chunk in generator: json_str = json.dumps(chunk) yield f"data: {json_str}\n\n" def format_message(msg): return { "type": msg.type, "content": msg.content, "name": getattr(msg, "name", None), "tool_calls": getattr(msg, "tool_calls", None) } def escape_curly_braces(text): return text.replace("{", "{{").replace("}", "}}") def preprocess_query(query): """Normalize and preprocess the query.""" query = query.lower().strip() doc = nlp(query) return " ".join([token.lemma_ for token in doc]) def validate_auth(request, config): auth_type = config.get("authType", "NONE") if auth_type == "NONE": return True elif auth_type == "BEARER": token = request.headers.get("Authorization") return token == f"Bearer {config['accessToken']}" elif auth_type == "BASIC": import base64 auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Basic "): return False encoded = auth_header.split(" ")[1] decoded = base64.b64decode(encoded).decode("utf-8") user, pwd = decoded.split(":") return user == config["userName"] and pwd == config["password"] return False @application.route('/kapture_agent_ping', methods=['GET']) def ping(): return jsonify({ 'status': 'success', 'message': 'pong' }), 200 async def get_all_details(agent_id,topic_id,identified_topic_id): (attached_workflows, flow_id), action_details, is_product_recommendation = await asyncio.gather( get_all_attached_flow_details_with_topic2(agent_id=agent_id,topic_id=topic_id), action_module_object.get_action_details(agent_id=agent_id,topic_id=topic_id), topic_module_obj.get_product_recommendation_details(topic_id=identified_topic_id) ) return attached_workflows, flow_id, action_details, is_product_recommendation async def get_all_details_mcp(agent_id,topic_id,identified_topic_id): (attached_workflows, flow_id), is_product_recommendation = await asyncio.gather( get_all_attached_flow_details_with_topic2(agent_id=agent_id,topic_id=topic_id), topic_module_obj.get_product_recommendation_details(topic_id=identified_topic_id) ) return attached_workflows, flow_id, is_product_recommendation @application.route('/kapture_agent_call_specific_tool', methods=['POST', 'GET']) def process_user_input_to_call_specific_tool(intent_response, instructions, topic_id, tone_details, memory_type, token_limit, user_pk_id, model_type, model_name, third_party_data_source, customer_id,output_guard_rails,data_encryption): global response, action_list, flow_list save_response = True extracted_entities = [] mapping = [] current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: data = request.get_json() if not data: return jsonify({"error": invalid_json_payload_message, 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": session_id_missing_message, 'status': 400}) user_id = data.get("user_id", "") if not user_id: return jsonify({"error": user_id_missing_message, "details": user_id_not_provided_message, 'status': 400}) state = state_store.get(session_id) if state is None: # Create new session state state = { "chat_history": [], "user_input": "", "processed_input": "", "collected_params": {} } state_store[session_id] = state collected_params = state_store.get_collected_params(session_id) user_input = data.get("user_input", "").strip() agent_id = data.get("agent_id", "").strip() if not user_input: return jsonify({"error": user_input_required_message, 'status': 400}) state["processed_input"] = user_input flow_list = [] action_list = [] human_input = False our_code_start_time = time.time() preprocessed_input = preprocess_query(user_input) identified_intent = intent_response identified_topic_id = topic_id attached_workflows, flow_id, action_details, is_product_recommendation = asyncio.run(get_all_details(agent_id, topic_id, identified_topic_id)) style = tone_details st_time = time.time() summary_long_term_manager.retrieve_summarized_memory(user_id, agent_id, user_input) summary_long_term_manager.search_unsummarized_docs(user_id, agent_id, user_input) logger.info(f"Length of agent memories\n, {len(summary_long_term_manager.memory_manager.agent_memories)}") logging.info(f"long term memory latency check {time.time()-st_time} seconds") if not attached_workflows: if topic_id is None or instructions is None: print('In no topic code') action_without_topic_obj = ActionWithoutTopicModule() index_metadata_names, _, action_instructions = action_without_topic_obj.get_datasource_index_details(agent_id=agent_id, user_pk_id=user_pk_id) action_instructions = "The Data Retrieval Tool is designed to fetch relevant information from indexed data sources (such as databases, vector stores, or search indexes) based on a user’s query. It interprets the query, applies semantic and/or keyword search, and returns the most relevant records. This tool ensures that users receive accurate and contextually aligned information, which can then be used to answer questions, generate summaries, or provide detailed responses." if index_metadata_names: agent_module_object = AgentModule() response_type = asyncio.run(agent_module_object.get_agent_setting_details(agent_id=agent_id)) current_utc_time = datetime.now(timezone.utc) instructions = "You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'." if data_encryption: docs_,mapping = rag_data_class.extract_index_docs(data_encryption,index_metadata_names,user_input) raw_input = user_input user_input = encryption_engine.query_anonymize(raw_input,mapping) custom_tools_list = action_without_topic_obj.create_custom_rag_tools(user_input, style, response_type, index_metadata_names, action_instructions, model_type=model_type, data_encryption=data_encryption,raw_input = raw_input) else: custom_tools_list = action_without_topic_obj.create_custom_rag_tools(user_input, style, response_type, index_metadata_names, action_instructions,model_type =model_type) print("In chat with RAG function") llm_start_time = time.time() if model_type.lower() == "subscription": instructions = f"You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'. 6. Current date and time is : {current_utc_time} for reference if needed or user asked question based on current date and time scenario." custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit, customer_id,output_guard_rails) response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) else: custom_tools_agent, memory = custom_agent_obj.opensource_load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit,output_guard_rails) response, action_list = open_source_chat_with_agent(custom_tools_agent, custom_tools_list, memory, user_input) if data_encryption and mapping: try: response, extracted_entities = EncryptionData().deanonymize2(text=response, entities=mapping) except Exception as e: print("Error occured in decryption -->", e) total_llm_time = time.time() - llm_start_time print(f"Total time taken by llm: {total_llm_time:.2f} sec") else: if third_party_data_source: index_metadata_names = ["siebel_attachments"] agent_module_object = AgentModule() response_type = asyncio.run(agent_module_object.get_agent_setting_details(agent_id=agent_id)) current_utc_time = datetime.now(timezone.utc) instructions = f"You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'. 6. Current date and time is : {current_utc_time} for reference if needed or user asked question based on current date and time scenario." # custom_tools_list = action_without_topic_obj.create_custom_rag_tools(user_input, style, # response_type, # index_metadata_names, # action_instructions) custom_tools_list = action_without_topic_obj.create_custom_rag_tools(user_input, style, response_type, index_metadata_names, action_instructions, model_type=model_type) print("In chat with RAG function") llm_start_time = time.time() if model_type.lower() == "subscription": instructions = f"You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'. 6. Current date and time is : {current_utc_time} for reference if needed or user asked question based on current date and time scenario." custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit, customer_id,output_guard_rails) response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) else: instructions = "You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'." custom_tools_agent, memory = custom_agent_obj.opensource_load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit,output_guard_rails) response, action_list = open_source_chat_with_agent(custom_tools_agent, custom_tools_list, memory, user_input) total_llm_time = time.time() - llm_start_time print(f"Total time taken by llm: {total_llm_time:.2f} sec") else: response = 'Your request falls outside the allowed topic scope for this system.\nI can only assist with queries that are relevant to the supported domain and intended use cases or any data sources. Please rephrase your question or ask something aligned with the approved topics.' our_code_end_time = time.time() our_code_execution_time = round(our_code_end_time - our_code_start_time, 2) return Response( json.dumps({ "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list": flow_list, "agent_response":response , "status": 200, "human_input": human_input, "file_upload": {"file_upload": False, "config_data": {}}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, "total_code_execution_time": our_code_execution_time, "chat_history": state_store.get_chat_history(session_id=session_id) }), mimetype=mime_type ) elif "kapture global search" in identified_intent.lower() or "mgss web" in identified_intent.lower() or "mazda product recommendation" in identified_intent.lower() or "product recommendation-mazda usa vehicles" in identified_intent.lower() or "product recommendation for mazda usa vehicles" in identified_intent.lower(): print("In separate web search class") if not action_details: return jsonify({"error": f"No Actions assigned to identified {identified_intent} topic", "details": f"No Actions assigned to identified {identified_intent} topic", 'status': 400}) else: response, action_list = kapture_module_object.get_kapture_global_search_response(action_details, user_input, tone_details) custom_agent_obj.set_memory_mode(user_id=user_id, mode=memory_type) custom_agent_obj.initialize_memory(user_id, agent_id, session_id, token_limit) else: if not action_details: return jsonify({"error": f"No Actions assigned to identified {identified_intent} topic", "details": f"No Actions assigned to identified {identified_intent} topic", 'status': 400}) if model_type.lower() == "subscription": custom_tools_list = action_module_object.create_custom_methods_tools_new(action_details, user_input, tone_details, collected_params, is_product_recommendation, state_store, session_id) custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit, customer_id,output_guard_rails) print("In chat with agent function") response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) else: custom_tools_list = action_module_object.opensource_create_custom_methods_tools_new(action_details, user_input, tone_details, collected_params, is_product_recommendation, state_store, session_id) custom_tools_agent, memory = custom_agent_obj.opensource_load_tools_agent(instructions,custom_tools_list,user_id, agent_id,session_id,memory_type, token_limit,output_guard_rails) print("In chat with agent function") response, action_list = open_source_chat_with_agent(custom_tools_agent, custom_tools_list, memory,user_input) if output_guard_rails: memory = custom_agent_obj.short_term_memory_manager.initialize_working_memory(session_id, 10000) response = custom_agent_obj.output_guard_rail_validation(response,output_guard_rails,memory) save_response = False action_module_object.reset() else: flow_executor_agent, flow_data = get_or_build_graph(session_id=session_id, flow_id=flow_id) customer_user_id = f"Use the value {user_id} as the current user identifier (user_id or email_id) whenever user context is needed" flow_instructions = flow_data["flow_instructions"] if not flow_instructions: return jsonify({"error": "Flow instructions is required", "details": "No flow instructions available to execute",'status': 400}) final_flow_instructions = customer_user_id + flow_instructions custom_system_prompt = SystemMessage(content=(final_flow_instructions)) messages = [custom_system_prompt, HumanMessage(content=user_input)] initial_state = { "messages": messages, "variables_list": [], "workflow_steps": flow_data["workflow_steps"], 'current_node_id': flow_data["current_node_id"], 'flow_instruction': '', 'execution_path': [], 'variables': {'user_input': user_input,"user_id":user_id}, 'errors': [], 'response': '', 'condition_result': 'false', 'wait_result': '', "session": session_id, 'flow_id': flow_id, "human_input": "", "file_upload" : {"file_upload": False, "config_data": {}}, "human_input_type": {}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, 'loop_count': 0 } config = {"configurable": {"thread_id": session_id}} response, flow_list = chat_with_langgraph_agent_flow_builder(flow_executor_agent, initial_state, config, trigger_time=current_time) current_utc_time = datetime.now(timezone.utc) timestamp = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ') # if memory_type == 'LONG_TERM': # custom_agent_obj.append_to_memory([("user", user_input), ("assistant", response)]) summary_long_term_manager.memory_manager.agent_memories = [] messages = [ ChatMessage(role="user", content=user_input), ChatMessage(role="assistant", content=response if isinstance(response, str) else response.get("agent_response",'')) ] # long_term_memory_manager.store_long_term_memory(messages,user_id,agent_id,session_id) threading.Thread( target=long_term_memory_manager.store_long_term_memory, args=(messages, user_id, agent_id, session_id), daemon=True ).start() check_json_response_type = extract_parsed_json(response) user_input = user_input.split("##Model settings ##")[0] if user_input.split("##Model settings") else user_input if isinstance(response, str): state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list" : flow_list, "agent_response": response }, user_input=user_input, processed_input=preprocessed_input ) response = normalize_response(response) matches = re.findall(pattern, user_input) if save_response and (not matches or not response): cache_mang_obj.update_cache(agent_id=agent_id, user_id=user_id, user_query=user_input, identified_topic_name=identified_intent, flow_list= flow_list, human_input=human_input, response=response,response_type=check_json_response_type) our_code_end_time = time.time() our_code_execution_time = round((our_code_end_time - our_code_start_time), 2) history_payload = { "user_id": user_id, "agent_id": agent_id, "session_id": session_id, "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "agent_response": response, "status": 200, "timestamp": timestamp, "total_code_execution_time": our_code_execution_time } send_data_opensearch_for_dashboard(payload=history_payload) final_result = { "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list": flow_list, "encrypted_data": extracted_entities, "agent_response": response, "status": 200, "human_input": human_input, "file_upload": {"file_upload": False, "config_data": {}}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, "total_code_execution_time": our_code_execution_time, "chat_history": state_store.get_chat_history(session_id=session_id) } if check_json_response_type: final_result.update({"response_type":'JSON'}) return Response( json.dumps(final_result), mimetype=mime_type ) else: state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list": flow_list, "agent_response": response.get("agent_response",''), "human_input_query": response.get("human_input_query", '') }, user_input=user_input, processed_input=preprocessed_input ) response = normalize_response(response) human_input = True matches = re.findall(pattern, user_input) if not matches or not response: cache_mang_obj.update_cache(agent_id=agent_id, user_id=user_id, user_query=user_input, identified_topic_name=identified_intent, flow_list=flow_list, human_input=human_input, response=response,response_type=check_json_response_type) our_code_end_time = time.time() our_code_execution_time = round((our_code_end_time - our_code_start_time), 2) history_payload = { "user_id": user_id, "agent_id": agent_id, "session_id": session_id, "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "agent_response": response.get("agent_response",''), "human_input_query": response.get("human_input_query", ''), "status": 200, "timestamp": timestamp, "total_code_execution_time": our_code_execution_time } send_data_opensearch_for_dashboard(payload=history_payload) final_result = { "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list": flow_list, "encrypted_data":extracted_entities, "agent_response": response.get("agent_response",''), "status": 200, "human_input": human_input, "file_upload": response.get("file_upload", {}), "feedback_node": response.get("feedback_node", {}), "human_input_type": response.get("human_input_type", ''), "human_input_query": response.get("human_input_query", ''), "session": response.get("session", ""), "flow_id": response.get("flow_id", ""), "total_code_execution_time": our_code_execution_time, "chat_history": state_store.get_chat_history(session_id=session_id) } return Response( json.dumps(final_result), mimetype=mime_type ) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: return jsonify({"error": unexpected_error_occurred_message, "details": str(e), 'status': 500}) @application.route('/kapture_agent_identify_intent_with_agent_and_get_response', methods=['POST', 'GET']) def process_user_input(): timestamp = time.time() try: data = request.get_json() input_time = time.time() if not data: return _error_response(invalid_json_payload_message, 400) session_id, user_id, agent_id, user_input = _extract_and_validate_inputs(data) user_pk_id = data.get("user_pk_id") customer_id = data.get("customer_id", "") if not user_pk_id: return jsonify({"error": user_pk_id_missing_message, 'status': 400}) if isinstance(session_id, Response): return session_id # Early return if validation failed collected_params = state_store.get_collected_params(session_id) if not user_input: return _error_response(user_input_required_message, 400) our_code_start_time = time.time() cached_response, topic_name, flow_list, human_input, _ = cache_mang_obj.get_cached_response(agent_id=agent_id, user_id =user_id, user_query=user_input) if cached_response: return _build_final_response(user_id=user_id, agent_id=agent_id, session_id=session_id, user_input=user_input, identified_intent=topic_name, identified_topic_id='', action_list=[], response=cached_response, start_time=input_time, timestamp=timestamp) memory_type, token_limit,third_party_data_source, is_mcp_enabled,guard_rails,data_encryption = asyncio.run(_fetch_agent_settings(agent_id)) predefined_topics = topic_module_obj.get_topic_details(agent_id=agent_id) tone_details, language_details = agent_module_object.get_tone_details(agent_id=agent_id) input_guard_rails = guard_rails.get("inputGuardRail",[]) output_guard_rails = guard_rails.get("outputGuardRail",[]) if not memory_type: return _error_response("Please configure memory setting first", 400, "Please configure memory setting first to go ahead") preprocessed_input = preprocess_query(user_input) model_type, model_name = llm_config_obj.get_model_config_details(agent_id) if model_type is None or model_name is None: return jsonify({"error": "Model type or model name is not configured.", "details": "Please configure the model type or model name first", 'status': 400}) if model_type.lower() == "subscription": intent_response, instructions, topic_id = custom_agent_obj.identify_intent_with_agent( user_input, tone_details, language_details, predefined_topics,session_id, user_id, agent_id,input_guard_rails) else: intent_response, instructions, topic_id = custom_agent_obj.opensource_identify_intent_with_agent(user_input, tone_details, language_details, predefined_topics,session_id,input_guard_rails) if intent_response.lower() == "blocked": response = "We're sorry, your input violates our safety and content guidelines" if not instructions else instructions return _build_final_response(user_id=user_id, agent_id=agent_id, session_id=session_id, user_input=user_input, identified_intent='', identified_topic_id='', action_list=[], response=response, start_time=input_time, timestamp=timestamp) response, action_list, identified_intent, identified_topic_id = _handle_intent( intent_response, instructions, topic_id, agent_id, user_id, session_id, user_input, tone_details, memory_type, token_limit, collected_params, state_store,model_type,user_pk_id, customer_id,output_guard_rails ) timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') if memory_type == 'LONG_TERM': custom_agent_obj.append_to_memory([("user", user_input), ("assistant", response)]) return _finalize_and_build_response( user_id, agent_id, session_id, user_input, identified_intent, identified_topic_id, action_list, response, preprocessed_input, our_code_start_time, timestamp, human_input ) except ValueError as e: return _error_response(invalid_value_message, 400, str(e)) except KeyError as e: return _error_response(missing_key_message, 400, str(e)) except Exception as e: return _error_response(unexpected_error_occurred_message, 500, str(e)) # ---------------------- Helper Functions ---------------------- async def process_user_input_to_call_specific_mcp_tool(intent_response, instructions, topic_id, tone_details, memory_type, token_limit,user_pk_id,model_type,model_name,third_party_data_source,customer_id,output_guard_rails,data_encryption): global response, action_list, flow_list current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: save_response = True mapping = [] extracted_entities = [] data = request.get_json() if not data: return jsonify({"error": invalid_json_payload_message, 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": session_id_missing_message, 'status': 400}) user_id = data.get("user_id", "") customer_id = data.get("customer_id", "") if not user_id: return jsonify({"error": user_id_missing_message, "details": user_id_not_provided_message, 'status': 400}) state = state_store.get(session_id) if state is None: # Create new session state state = { "chat_history": [], "user_input": "", "processed_input": "", "collected_params": {} } state_store[session_id] = state collected_params = state_store.get_collected_params(session_id) user_input = data.get("user_input", "").strip() agent_id = data.get("agent_id", "").strip() if not user_input: return jsonify({"error": user_input_required_message, 'status': 400}) state["processed_input"] = user_input flow_list = [] action_list = [] human_input = False our_code_start_time = time.time() preprocessed_input = preprocess_query(user_input) identified_intent = intent_response identified_topic_id = topic_id attached_workflows, flow_id, is_product_recommendation = asyncio.run(get_all_details_mcp(agent_id=agent_id, topic_id=topic_id, identified_topic_id=topic_id)) style = tone_details # if True: if not attached_workflows: if is_product_recommendation: tools = asyncio.run(action_module_object.get_action_details(agent_id=agent_id,topic_id=topic_id)) for config in tools: name = config.get("apiName") id = config.get("id") try: # mcp.remove_tool(name) asyncio.run(delete_mcp_tool([id])) asyncio.run(add_mcp_tools(topic_name=intent_response, actions_ids=[id])) except: print("No such tool exists") token_response = await generate_token() token = token_response.get("access_token", '') async with Client( transport=StreamableHttpTransport(mcp_endpoint, headers={"Authorization": f"Bearer {token}"})) as client: action_details = await get_tools_on_tag(client, identified_intent, token) if topic_id is None or instructions is None: print('In no topic code') action_without_topic_obj = ActionWithoutTopicModule() index_metadata_names, _, action_instructions = action_without_topic_obj.get_datasource_index_details( agent_id=agent_id, user_pk_id=user_pk_id) action_instructions = "The Data Retrieval Tool is designed to fetch relevant information from indexed data sources (such as databases, vector stores, or search indexes) based on a user’s query. It interprets the query, applies semantic and/or keyword search, and returns the most relevant records. This tool ensures that users receive accurate and contextually aligned information, which can then be used to answer questions, generate summaries, or provide detailed responses." if index_metadata_names: agent_module_object = AgentModule() response_type = asyncio.run(agent_module_object.get_agent_setting_details(agent_id=agent_id)) current_utc_time = datetime.now(timezone.utc) # instructions = f"You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'. 6. Current date and time is : {current_utc_time} for reference if needed or user asked question based on current date and time scenario." instructions = "You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'." if data_encryption: docs_, mapping = rag_data_class.extract_index_docs(data_encryption, index_metadata_names, user_input) raw_input = user_input user_input = encryption_engine.query_anonymize(raw_input, mapping) custom_tools_list = action_without_topic_obj.create_custom_rag_tools(user_input, style, response_type, index_metadata_names, action_instructions, model_type=model_type, data_encryption=data_encryption, raw_input=raw_input) else: custom_tools_list = action_without_topic_obj.create_custom_rag_tools(user_input, style, response_type, index_metadata_names, action_instructions, model_type=model_type) # custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit) print("In chat with RAG function") llm_start_time = time.time() if model_type.lower() == "subscription": instructions = f"You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'. 6. Current date and time is : {current_utc_time} for reference if needed or user asked question based on current date and time scenario." custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit, customer_id,output_guard_rails) response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) else: custom_tools_agent, memory = custom_agent_obj.opensource_load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit,output_guard_rails) response, action_list = open_source_chat_with_agent(custom_tools_agent, custom_tools_list, memory, user_input) if data_encryption and mapping: try: response, extracted_entities = EncryptionData().deanonymize2(text=response, entities=mapping) except Exception as e: print("Error occured in decryption -->", e) # response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) total_llm_time = time.time() - llm_start_time print(f"Total time taken by llm: {total_llm_time:.2f} sec") else: if third_party_data_source: index_metadata_names = ["siebel_attachments"] agent_module_object = AgentModule() response_type = asyncio.run(agent_module_object.get_agent_setting_details(agent_id=agent_id)) current_utc_time = datetime.now(timezone.utc) instructions = f"You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'. 6. Current date and time is : {current_utc_time} for reference if needed or user asked question based on current date and time scenario." # custom_tools_list = action_without_topic_obj.create_custom_rag_tools(user_input, style, # response_type, # index_metadata_names, # action_instructions) custom_tools_list = action_without_topic_obj.create_custom_rag_tools(user_input, style, response_type, index_metadata_names, action_instructions, model_type=model_type) print("In chat with RAG function") llm_start_time = time.time() # custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, # agent_id, session_id, memory_type, # # token_limit) if model_type.lower() == "subscription": instructions = f"You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'. 6. Current date and time is : {current_utc_time} for reference if needed or user asked question based on current date and time scenario." custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit, customer_id,output_guard_rails) response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) else: instructions = "You are an assistant with access to various tools. 1. Only call a tool **if and only if** the user's question **clearly and directly matches** a tool’s description and purpose. 2. Do not make assumptions or attempt partial matches.3. If no tool description is a perfect or near-perfect match for the user's input, respond only with: 'No context available'.4. Do not attempt to interpret or guess the user's intent beyond the tool description.5. If the query is vague, off-topic, or general, do not call any tool — instead respond with: 'No context available'." custom_tools_agent, memory = custom_agent_obj.opensource_load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit,output_guard_rails) response, action_list = open_source_chat_with_agent(custom_tools_agent, custom_tools_list, memory, user_input) total_llm_time = time.time() - llm_start_time print(f"Total time taken by llm: {total_llm_time:.2f} sec") else: return jsonify({"error": "Matching Topic and Assigned Datasource not Found", "details": "Not Find suitable topic and Datasource of user input", 'status': 400}) elif "kapture global search" in identified_intent.lower() or "mgss web" in identified_intent.lower() or "mazda product recommendation" in identified_intent.lower() or "product recommendation-mazda usa vehicles" in identified_intent.lower() or "product recommendation for mazda usa vehicles" in identified_intent.lower(): print("In separate web search class") if not action_details: return jsonify({"error": f"No Actions assigned to identified {identified_intent} topic", "details": f"No Actions assigned to identified {identified_intent} topic", 'status': 400}) else: response, action_list = kapture_module_object.get_kapture_global_search_response(action_details, user_input, tone_details) custom_agent_obj.set_memory_mode(user_id=user_id, mode=memory_type) custom_agent_obj.initialize_memory(user_id, agent_id, session_id, token_limit) else: print("MCP TOOLS", len(action_details)) if not action_details: return jsonify({"error": f"No Actions assigned to identified {identified_intent} topic", "details": f"No Actions assigned to identified {identified_intent} topic", 'status': 400}) if model_type.lower() == "subscription": custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, action_details, user_id, agent_id, session_id, memory_type, token_limit, customer_id,output_guard_rails) print("In chat with agent function") response, action_list = await chat_with_agent(custom_tools_agent, user_input) else: custom_tools_agent, memory = custom_agent_obj.opensource_load_tools_agent(instructions,action_details,user_id, agent_id,session_id,memory_type, token_limit,output_guard_rails) print("In chat with agent function") response, action_list = open_source_chat_with_agent(custom_tools_agent, action_details, memory,user_input) if output_guard_rails: memory = custom_agent_obj.short_term_memory_manager.initialize_working_memory(session_id, 10000) response = custom_agent_obj.output_guard_rail_validation(response, output_guard_rails,memory) save_response = False action_module_object.reset() else: flow_executor_agent, flow_data = get_or_build_graph(session_id=session_id, flow_id=flow_id) customer_user_id = f"Use the value {user_id} as the current user identifier (user_id or email_id) whenever user context is needed" flow_instructions = flow_data["flow_instructions"] if not flow_instructions: return jsonify({"error": "Flow instructions is required", "details": "No flow instructions available to execute",'status': 400}) final_flow_instructions = customer_user_id + flow_instructions custom_system_prompt = SystemMessage(content=(final_flow_instructions)) messages = [custom_system_prompt, HumanMessage(content=user_input)] initial_state = { "messages": messages, "variables_list": [], "workflow_steps": flow_data["workflow_steps"], 'current_node_id': flow_data["current_node_id"], 'flow_instruction': '', 'execution_path': [], 'variables': {'user_input': user_input,"user_id":user_id}, 'errors': [], 'response': '', 'condition_result': 'false', 'wait_result': '', "session": session_id, 'flow_id': flow_id, "human_input": "", "file_upload" : {"file_upload": False, "config_data": {}}, "human_input_type": {}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, 'loop_count': 0 } config = {"configurable": {"thread_id": session_id}} response, flow_list = chat_with_langgraph_agent_flow_builder(flow_executor_agent, initial_state, config, trigger_time=current_time) current_utc_time = datetime.now(timezone.utc) timestamp = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ') if memory_type == 'LONG_TERM': custom_agent_obj.append_to_memory([("user", user_input), ("assistant", response)]) check_json_response_type = extract_parsed_json(response) if isinstance(response, str): state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list" : flow_list, "agent_response": response }, user_input=user_input, processed_input=preprocessed_input ) response = normalize_response(response) matches = re.findall(pattern, user_input) if save_response and (not matches or not response): cache_mang_obj.update_cache(agent_id=agent_id, user_id=user_id, user_query=user_input, identified_topic_name=identified_intent, flow_list=flow_list, human_input=human_input , response=response,response_type=check_json_response_type) our_code_end_time = time.time() our_code_execution_time = round((our_code_end_time - our_code_start_time), 2) history_payload = { "user_id": user_id, "agent_id": agent_id, "session_id": session_id, "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "agent_response": response, "status": 200, "timestamp": timestamp, "total_code_execution_time": our_code_execution_time } send_data_opensearch_for_dashboard(payload=history_payload) return Response( json.dumps({ "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list": flow_list, "encrypted_data": extracted_entities, "agent_response": response, "status": 200, "human_input": human_input, "file_upload": {"file_upload": False, "config_data": {}}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, "total_code_execution_time": our_code_execution_time, "chat_history": state_store.get_chat_history(session_id=session_id) }), mimetype=mime_type ) else: state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list": flow_list, "agent_response": response.get("agent_response",''), "human_input_query": response.get("human_input_query", '') }, user_input=user_input, processed_input=preprocessed_input ) response = normalize_response(response) human_input = True matches = re.findall(pattern, user_input) if not matches or not response: cache_mang_obj.update_cache(agent_id=agent_id, user_id=user_id, user_query=user_input, identified_topic_name=identified_intent, flow_list=flow_list, human_input=human_input, response=response,response_type=check_json_response_type) our_code_end_time = time.time() our_code_execution_time = round((our_code_end_time - our_code_start_time), 2) history_payload = { "user_id": user_id, "agent_id": agent_id, "session_id": session_id, "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "agent_response": response.get("agent_response",''), "human_input_query": response.get("human_input_query", ''), "status": 200, "timestamp": timestamp, "total_code_execution_time": our_code_execution_time } send_data_opensearch_for_dashboard(payload=history_payload) return Response( json.dumps({ "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "flow_list": flow_list, "encrypted_data": extracted_entities, "agent_response": response.get("agent_response",''), "status": 200, "human_input": human_input, "file_upload": response.get("file_upload", {}), "feedback_node": response.get("feedback_node", {}), "human_input_type": response.get("human_input_type", ''), "human_input_query": response.get("human_input_query", ''), "session": response.get("session", ""), "flow_id": response.get("flow_id", ""), "total_code_execution_time": our_code_execution_time, "chat_history": state_store.get_chat_history(session_id=session_id) }), mimetype=mime_type ) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: return jsonify({"error": unexpected_error_occurred_message, "details": str(e), 'status': 500}) def _finalize_and_stream_response(user_id, agent_id, session_id, user_input, start_time, timestamp, streamed_event=None): if streamed_event is None: streamed_event = {} chat_entry = { "multi_agent_id":agent_id, "session_id":session_id, "user_id":user_id, "streamed_event": streamed_event } state_store.update_session_state( session_id, chat_entry=chat_entry, user_input=user_input, processed_input=user_input ) execution_time = round(time.time() - start_time, 2) history_payload = { "user_id": user_id, "agent_id": agent_id, "session_id": session_id, "user_input": user_input, "streamed_event": streamed_event, "status": 200, "timestamp": timestamp, "total_code_execution_time": execution_time } final_payload = { "user_input": user_input, "streamed_event": streamed_event, "user_id": user_id, "agent_id": agent_id, "session_id": session_id, "status": 200, "total_code_execution_time": execution_time, "chat_history": state_store.get_chat_history(session_id=session_id) } send_data_opensearch_for_dashboard(payload=history_payload) return final_payload def _finalize_and_build_response(user_id, agent_id, session_id, user_input, identified_intent, identified_topic_id, action_list, response, preprocessed_input, start_time, timestamp, human_input, supervisor_actions=None): # Update session with actual agent response if supervisor_actions is None: supervisor_actions = [] chat_entry = { "User": user_input, "Identified Topic": identified_intent, "Topic ID": identified_topic_id, "Action List": action_list, "Agent": response } if supervisor_actions: print("chat entry", supervisor_actions) chat_entry.update({"supervisor_agent_list":supervisor_actions}) state_store.update_session_state( session_id, chat_entry=chat_entry, user_input=user_input, processed_input=preprocessed_input ) # Normalize response response = normalize_response(response) # Update semantic cache if no regex matches or empty response matches = re.findall(pattern, user_input) if not matches or not response: # update_semantic_cache(preprocessed_input, response, user_input) cache_mang_obj.update_cache(agent_id=agent_id, user_id=user_id, user_query=user_input, identified_topic_name=identified_intent, flow_list=flow_list, human_input=human_input, response=response,response_type=None) return _build_final_response( user_id, agent_id, session_id, user_input, identified_intent, identified_topic_id, action_list, response, start_time, timestamp,supervisor_actions=supervisor_actions ) def _finalize_response_with_cache(user_id, agent_id, session_id, user_input, identified_intent, identified_topic_id, action_list, cached_response, preprocessed_input, start_time, timestamp): # Update session with cached response state_store.update_session_state( session_id, chat_entry={ "User": user_input, "Identified Topic": identified_intent, "Topic ID": identified_topic_id, "Action List": action_list, "Agent": cached_response }, user_input=user_input, processed_input=preprocessed_input ) return _build_final_response( user_id, agent_id, session_id, user_input, identified_intent, identified_topic_id, action_list, cached_response, start_time, timestamp ) def _build_final_response(user_id, agent_id, session_id, user_input, identified_intent, identified_topic_id, action_list, response, start_time, timestamp, supervisor_actions=None): """Shared logic for finalizing response (cached or not).""" if supervisor_actions is None: supervisor_actions = [] execution_time = round(time.time() - start_time, 2) history_payload = { "user_id": user_id, "agent_id": agent_id, "session_id": session_id, "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "agent_response": response, "status": 200, "timestamp": timestamp, "total_code_execution_time": execution_time } final_payload = { "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "action_list": action_list, "agent_response": response, "status": 200, "total_code_execution_time": execution_time, "chat_history": state_store.get_chat_history(session_id=session_id) } if supervisor_actions: history_payload.update({"supervisor_agent_list": supervisor_actions}) final_payload.update({"supervisor_agent_list": supervisor_actions}) send_data_opensearch_for_dashboard(payload=history_payload) return Response( json.dumps(final_payload), mimetype=mime_type ) def _extract_and_validate_inputs(data): session_id = data.get("session_id", "") if not session_id: return _error_response(session_id_missing_message, 400) user_id = data.get("user_id", "") if not user_id: return _error_response(user_id_missing_message, 400, user_id_not_provided_message) agent_id = data.get("agent_id", "").strip() user_input = data.get("user_input", "").strip() return session_id, user_id, agent_id, user_input def _get_or_create_session_state(session_id): state = state_store.get(session_id) if state is None: state = {"chat_history": [], "user_input": "", "processed_input": "", "collected_params": {}} state_store[session_id] = state else: print("Existing session found. Loaded state:", state) return state async def _fetch_agent_settings(agent_id): memory_type, token_limit,third_party_data_source,is_mcp_enabled,guard_rails,data_encryption = await agent_module_object.get_memory_setting_details(agent_id=agent_id) return memory_type, token_limit, third_party_data_source,is_mcp_enabled,guard_rails,data_encryption def _handle_intent(intent_response, instructions, topic_id, agent_id, user_id, session_id, user_input, tone_details, memory_type, token_limit, collected_params, state_store,model_type,user_pk_id,customer_id,output_guard_rails): identified_intent, _ = intent_response, topic_id if topic_id is None or instructions is None: return _handle_no_topic(agent_id, user_input, tone_details, instructions, user_id, session_id, memory_type, token_limit,model_type,user_pk_id,customer_id,output_guard_rails) if _is_web_search_intent(identified_intent): return _handle_web_search(agent_id, topic_id, identified_intent, user_input, tone_details) return _handle_standard_intent(agent_id, topic_id, identified_intent, user_input, tone_details, collected_params, instructions, user_id, session_id, memory_type, token_limit, state_store,model_type, customer_id,output_guard_rails) def _handle_no_topic(agent_id, user_input, style, instructions, user_id, session_id, memory_type, token_limit,model_type,user_pk_id, customer_id,output_guard_rails): action_without_topic_obj = ActionWithoutTopicModule() index_metadata_names = action_without_topic_obj.get_datasource_index_details(agent_id=agent_id, user_pk_id=user_pk_id) if not index_metadata_names: return _error_response("Matching Topic and Assigned Datasource not Found", 400, "Not Find suitable topic and Datasource of user input") instructions = "Always call only single relevant tool from all available tools to retrieve data. Provide accurate, concise information from the retrieve data." response_type = agent_module_object.get_agent_setting_details(agent_id=agent_id) custom_tools_list = action_without_topic_obj.create_custom_rag_tools( user_input, style, response_type, index_metadata_names, instructions,model_type=model_type ) # custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, # agent_id, session_id, memory_type, token_limit) # response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) if model_type.lower() == "subscription": custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit, customer_id,output_guard_rails) response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) else: custom_tools_agent, memory = custom_agent_obj.opensource_load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit,output_guard_rails) response, action_list = open_source_chat_with_agent(custom_tools_agent, custom_tools_list, memory, user_input) return response, action_list, "Unknown", None def _handle_web_search(agent_id, topic_id, identified_intent, user_input, tone_details): print("In separate web search class") action_details = kapture_module_object.get_action_details(agent_id=agent_id, topic_id=topic_id) if not action_details: return _error_response(f"No Actions assigned to identified {identified_intent} topic", 400, f"No Actions assigned to identified {identified_intent} topic") response, action_list = kapture_module_object.get_kapture_global_search_response( action_details, user_input, tone_details ) return response, action_list, identified_intent, topic_id def _handle_standard_intent(agent_id, topic_id, identified_intent, user_input, tone_details, collected_params, instructions, user_id, session_id, memory_type, token_limit, state_store,model_type, customer_id,output_guard_rails): attached_workflows, flow_id, action_details, is_product_recommendation = asyncio.run( get_all_details(agent_id, topic_id, topic_id)) if not action_details: return _error_response(f"No Actions assigned to identified {identified_intent} topic", 400, f"No Actions assigned to identified {identified_intent} topic") # custom_tools_list = action_module_object.create_custom_methods_tools_new( # action_details, user_input, tone_details, collected_params, # is_product_recommendation, state_store, session_id # ) # custom_tools_agent = custom_agent_obj.load_tools_agent( # instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit # ) # print("In chat with agent function") # response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) if model_type.lower() == "subscription": custom_tools_list = action_module_object.create_custom_methods_tools_new(action_details, user_input, tone_details, collected_params, is_product_recommendation, state_store, session_id) custom_tools_agent = custom_agent_obj.load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit, customer_id,output_guard_rails) print("In chat with agent function") response, action_list = asyncio.run(chat_with_agent(custom_tools_agent, user_input)) action_module_object.reset() else: custom_tools_list = action_module_object.opensource_create_custom_methods_tools_new(action_details, user_input, tone_details, collected_params, is_product_recommendation, state_store, session_id) custom_tools_agent, memory = custom_agent_obj.opensource_load_tools_agent(instructions, custom_tools_list, user_id, agent_id, session_id, memory_type, token_limit) print("In chat with agent function") response, action_list = open_source_chat_with_agent(custom_tools_agent, custom_tools_list, memory, user_input) action_module_object.reset() return response, action_list, identified_intent, topic_id def _is_web_search_intent(identified_intent): keywords = [ "kapture global search", "mgss web", "mazda product recommendation", "product recommendation-mazda usa vehicles", "product recommendation for mazda usa vehicles" ] return any(kw in identified_intent.lower() for kw in keywords) def _error_response(error, status, details=None): payload = {"error": error, "status": status} if details: payload["details"] = details return jsonify(payload) @application.route('/kapture_agent_identify_topic', methods=['POST', 'GET']) def process_user_input_to_identify_topic(): our_code_start_time = time.time() input_time= time.time() global identified_sub_topic try: data = request.get_json() if not data: return jsonify({"error": invalid_json_payload_message, 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": session_id_missing_message, "details": session_id_missing_message, 'status': 400}) user_pk_id = data.get("user_pk_id", "") if not user_pk_id: return jsonify({"error": user_pk_id_missing_message, 'status': 400}) user_id = data.get("user_id", "") customer_id= data.get("customer_id","") if not user_id: return jsonify({"error": user_id_missing_message, "details": user_id_not_provided_message, 'status': 400}) settings = data.get("anomaly_model_settings", "") state = state_store.get(session_id) if state is None: # Create new session state state = { "chat_history": [], "user_input": "", "processed_input": "", "collected_params": {} } state_store[session_id] = state else: print("Existing session found. Loaded state:", state) case_creation = False identified_sub_topic = [] user_input = data.get("user_input", "").strip() agent_id = data.get("agent_id", "").strip() if not user_input: return jsonify({"error": user_input_required_message, 'status': 400}) if not agent_id: return jsonify({"error": "Agent ID is missing", "details": "Agent ID is missing", 'status': 400}) cached_response, topic_name, flow_list_info, human_input, response_type = cache_mang_obj.get_cached_response(agent_id=agent_id, user_id =user_id, user_query=user_input) if cached_response: return build_final_cache_response(user_id=user_id, agent_id=agent_id, session_id=session_id, user_input=user_input, identified_intent=topic_name, identified_topic_id='', action_list=[],flow_list=flow_list_info, human_input=human_input, response=cached_response, our_code_start_time=our_code_start_time, timestamp=input_time, state_store=state_store,response_type=response_type) # predefined_topics = topic_module_obj.get_topic_details(agent_id=agent_id) # memory_type, token_limit = agent_module_object.get_memory_setting_details(agent_id=agent_id) predefined_topics, tone_details, language_details, model_type, model_name = asyncio.run(call_to_get_topic_and_tone_details(agent_id)) memory_type, token_limit, third_party_data_source,enableMCP,guard_rails,data_encryption = asyncio.run(agent_module_object.get_memory_setting_details(agent_id=agent_id)) # model_type, model_name = llm_config_obj.get_model_config_details(agent_id) print("details about models:", model_type, model_name) input_guard_rails = guard_rails.get("inputGuardRail") output_guard_rails = guard_rails.get("outputGuardRail") if model_type is None or model_name is None: return jsonify({"error": "Model type or model name is not configured.", "details": "Please configure the model type or model name first", 'status': 400}) if not memory_type: return jsonify({"error": "Please configure memory setting first", "details": "Please configure memory setting first to go ahead", 'status': 400}) if model_type.lower() == "subscription": intent_response, instructions, topic_id = custom_agent_obj.identify_intent_with_agent(user_input, tone_details, language_details, predefined_topics,session_id, user_id, agent_id,input_guard_rails) else: intent_response, instructions, topic_id = custom_agent_obj.opensource_identify_intent_with_agent(user_input, tone_details, language_details, predefined_topics,session_id,input_guard_rails) identified_intent = intent_response identified_topic_id = topic_id blocked_message = "We're sorry, your input violates our safety and content guidelines" if not instructions else instructions print("Identified Topic Details :", identified_intent) if intent_response.lower() == "blocked": return _build_final_response(user_id=user_id, agent_id=agent_id, session_id=session_id, user_input=user_input, identified_intent='', identified_topic_id='', action_list=[], response=blocked_message, start_time=input_time, timestamp=input_time) ## Create Summaries for long term threading.Thread( target=summary_long_term_manager.summarize_sessions_missing_summary, kwargs={ "user_id": user_id, "agent_id": agent_id, "current_session_id": session_id }, daemon=True ).start() is_case_type,final_sub_topic= identify_case_type(identified_intent) # update instructions if settings: user_input += "##Model settings ## " + json.dumps(settings) + "##"+json.dumps(data.get("anamoly_data_features",[])) if is_case_type: print("Case creation type") case_creation = True identified_sub_topic = final_sub_topic return Response( json.dumps({ "user_input": user_input, "identified_topic": identified_intent, "topic_id": identified_topic_id, "topic_list": identified_sub_topic, "case_creation": case_creation, "status": 200, }), mimetype=mime_type ) else: final_payload = { "user_input": user_input, "agent_id": agent_id, "session_id": session_id, "user_id": user_id } input_time2= time.time() with application.test_request_context(json=final_payload): try: if enableMCP: response = asyncio.run(process_user_input_to_call_specific_mcp_tool(intent_response=intent_response, instructions=instructions, topic_id=topic_id, tone_details=tone_details, memory_type= memory_type, token_limit= token_limit,user_pk_id=user_pk_id,model_type=model_type, model_name =model_name,third_party_data_source=third_party_data_source, customer_id=customer_id,output_guard_rails=output_guard_rails,data_encryption=data_encryption)) else: response = process_user_input_to_call_specific_tool(intent_response=intent_response,instructions=instructions, topic_id=topic_id,tone_details=tone_details,memory_type=memory_type, token_limit=token_limit,user_pk_id=user_pk_id,model_type=model_type, model_name =model_name, third_party_data_source=third_party_data_source, customer_id=customer_id,output_guard_rails=output_guard_rails,data_encryption=data_encryption) except Exception as e: traceback.print_exc() print("Exception occured at specific tool decision --->") output_time_total = time.time() - input_time output_time_topic_only = time.time() - input_time2 print(f"Total captured time: {output_time_total:.2f} sec") print(f"Total captured topic time: {output_time_topic_only:.2f} sec") return response except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: traceback.print_exc() return jsonify({"error": unexpected_error_occurred_message, "details": str(e), 'status': 500}) @application.route('/kapture_agent_welcome_message', methods=['GET', 'POST']) def process_welcome_message(): try: data = request.get_json() agent_id = data.get("agent_id", "").strip() session_id = data.get("session_id", "").strip() if session_id: session["session_id"] = session_id if session_id not in state_store: state_store[session_id] = { "chat_history": [], "user_input": "", "processed_input": "", } print("state_store_after_receiving_session_id :", state_store) elif "session_id" not in session: session["session_id"] = str(uuid4()) print(f"New session ID created: {session['session_id']}") response = display_welcome_message(agent_id) return jsonify({ "welcome_message": response }) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) @application.route('/kapture_agent_reset_chat_history', methods=['GET', 'POST']) def reset_chat_history(): try: data = request.get_json() if not data: return jsonify({"error": invalid_json_payload_message, 'status': 400}) session_id = data.get("session_id", "") print("Received session_id is :", session_id) if not session_id: return jsonify({"error": session_id_missing_message, "details": session_id_missing_message, 'status': 400}) if session_id: state_store.reset_session_state(session_id=session_id) return jsonify({"message": "Chat history has been reset for this session.", "status": 200}) else: return jsonify({"error": "Session ID not found", 'status': 400}) except Exception as e: return jsonify({"error": str(e), "status": 500}) @application.route('/kapture_agent_close_session_history', methods=['GET', 'POST']) def reset_session_history(): try: data = request.get_json() session_id = data.get("session_id", "") user_id = data.get("user_id", "") if not user_id: return jsonify({"details": "User ID is required.","error":"User ID is required not provided" ,"status": 400}) if not session_id: return jsonify({"details": "Session ID is required.","error":"Session ID is required not provided" ,"status": 400}) # custom_agent_obj.clear_session(session_id=session_id) if session_id: state_store.delete(session_id=session_id) if session_id in graph_store: del graph_store[session_id] return jsonify({"message": "Session history has been reset.", "status": 200}) else: return jsonify({"details": "Session ID is required.","error":"Session ID is required not provided" ,"status": 400}) except Exception as e: return jsonify({"status": 500, "details": "Unexpected error occurred" , "error":str(e)}) ####### Cache Mechanism APIs ################### @application.route('/kapture_agent_clear_agent_cache', methods=['GET', 'POST']) def clear_agent_cache(): try: data = request.get_json() user_id = data.get("user_id", "") agent_id = data.get("agent_id", "") if not user_id: return jsonify({"details": "User ID is required.","error":"User ID not provided" ,"status": 400}) if not agent_id: return jsonify({"details": "Agent ID is required.","error":"Agent ID not provided" ,"status": 400}) if agent_id and user_id: cache_mang_obj.reset_agent_cache(user_id=user_id,agent_id=agent_id) return jsonify({"message": f"Agent Cache has been reset for agent_id: {agent_id}", "status": 200}) else: return jsonify({"details": "Agent ID is required.","error":"Agent ID is required not provided" ,"status": 400}) except Exception as e: return jsonify({"status": 500, "details": "Unexpected error occurred" , "error":str(e)}) @application.route('/kapture_agent_clear_user_cache', methods=['GET', 'POST']) def clear_user_cache(): try: data = request.get_json() user_id = data.get("user_id", "") if not user_id: return jsonify({"details": "User ID is required.","error":"User ID not provided" ,"status": 400}) if user_id: cache_mang_obj.reset_user_cache_by_user_id(user_id=user_id) return jsonify({"message": f"User Cache has been reset for user id: {user_id}", "status": 200}) else: return jsonify({"details": "User ID is required.","error":"User ID is not provided" ,"status": 400}) except Exception as e: return jsonify({"status": 500, "details": "Unexpected error occurred" , "error":str(e)}) ############ Generalize code for the product recommendation ################# session_store = {} def init_session(session_id): session_store[session_id] = { 'answers': [], 'current_question': 1, 'is_first_iteration': True } @application.route('/kapture_agent_vehicle_recommendation', methods=['POST']) def vehicle_recommendation(): data = request.json if data is None: return jsonify({"error": "Invalid or missing JSON data"}), 400 agent_id = data.get('agent_id') session_id = data.get('session_id') if not agent_id or not session_id: return jsonify({"error": "Missing agent_id or session_id"}), 400 user_id = data.get("user_id", "") if not user_id: return jsonify({"error": user_id_missing_message, "details": user_id_not_provided_message, 'status': 400}) questions = question_module_object.get_question_details_for_agent(agent_id=agent_id) session_data = session_store.get(session_id) if not session_data: init_session(session_id) first_q = questions[0] return jsonify({ "question": first_q["questionLabel"], "options": first_q["options"], "type": "question" }) current_q_index = session_data['current_question'] user_input = data.get('user_input') if user_input is not None: if session_data['is_first_iteration']: prev_q = questions[current_q_index - 1] if user_input not in prev_q["options"]: return jsonify({ "question": "Invalid choice. Please select a valid option.\n" + prev_q["questionLabel"], "options": prev_q["options"], "type": "question" }), 200 selected_value = prev_q["options"].get(user_input) session_data['answers'].append({prev_q["questionKey"]: selected_value}) else: first_q = questions[0] session_data['is_first_iteration'] = True return jsonify({ "question": first_q["questionLabel"], "options": first_q["options"], "type": "question" }) if len(session_data['answers']) == len(questions): final_answers = {} for ans in session_data['answers']: final_answers.update(ans) # print('Final Response we are getting:', final_answers) final_payload = { "user_input": str(final_answers), "agent_id": agent_id, "session_id": session_id, "user_id": user_id } with application.test_request_context(json=final_payload): response = process_user_input() init_session(session_id) session_store[session_id]['is_first_iteration'] = False return response next_q = questions[len(session_data['answers'])] session_data['current_question'] = len(session_data['answers']) + 1 session_store[session_id] = session_data return jsonify({ "question": next_q["questionLabel"], "options": next_q["options"], "type": "question" }) ## Redis Memory manager APIS @application.route('/kapture_agent_delete_all_redis_sessions', methods=['POST']) def delete_all_sessions(): data = request.get_json() agent_id = data.get("agent_id", "") user_id = data.get("user_id", "") if not agent_id: return jsonify({"error": "agent_id is required"}), 400 if not user_id: return jsonify({"error": "user id is required"}), 400 message = memory_manager.delete_all_sessions_for_agent(user_id, agent_id) return jsonify({"message": message}) # Route: Delete specific session for an agent @application.route('/kapture_agent_delete_selected_redis_session', methods=['POST']) def delete_session(): data = request.get_json() agent_id = data.get("agent_id", "") session_id = data.get("session_id", "") user_id = data.get("user_id", "") if not agent_id or not session_id: return jsonify({"error": "agent_id and session_id are required"}), 400 if not user_id: return jsonify({"error": "user id is required"}), 400 message = memory_manager.delete_session_for_agent(user_id, agent_id, session_id) return jsonify({"message": message}) @application.route("/kapture_agent_get_all_session_memory_data_for_selected_agent", methods=["POST"]) def get_first_questions(): data = request.get_json() user_id = data.get("user_id", "") agent_id = data.get("agent_id", "") if not user_id: return jsonify({"error": "Missing required parameter: user_id"}), 400 try: results = memory_manager.get_first_questions_by_user_and_agent(user_id, agent_id) return jsonify({"user_id": user_id, "agent_id": agent_id, "sessions": results}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 ## Flow execution api for flow builder @application.route('/kapture_agent_kapture_execution_of_flow', methods= ["POST"]) def flow_executor_only(): current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: data = request.get_json() if not data: return jsonify({"error": invalid_json_payload_message, 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": session_id_missing_message, 'status': 400}) user_id = data.get("user_id", "") if not user_id: return jsonify({"error": user_id_missing_message, "details": user_id_not_provided_message, 'status': 400}) flow_id = data.get("flow_id", "") if not user_id: return jsonify({"error": user_id_missing_message, "details": flow_id_not_provided_message, 'status': 400}) user_input = data.get("user_input", "").strip() if not user_input: return jsonify({"error": user_input_required_message, "details": user_input_not_provided_message, 'status': 400}) state = state_store.get(session_id) if state is None: # Create new session state state = { "chat_history": [], "user_input": "", "processed_input": "", "collected_params": {} } state_store[session_id] = state flow_executor_agent, flow_data = get_or_build_graph(session_id=session_id, flow_id=flow_id) customer_user_id = f"Use the value {user_id} as the current user identifier (user_id or email_id) whenever user context is needed" flow_instructions = flow_data["flow_instructions"] if not flow_instructions: return jsonify({"error": "Flow instructions is required", "details": "No flow instructions available to execute", 'status': 400}) final_flow_instructions = customer_user_id +"#" +flow_instructions custom_system_prompt = SystemMessage(content=(final_flow_instructions)) messages = [custom_system_prompt,HumanMessage(content=user_input)] # Save the Mermaid graph as a PNG image # graph_bytes = flow_executor_agent.get_graph().draw_mermaid_png() # Save it to a file # with open("flow_graph.png", "wb") as f: # f.write(graph_bytes) initial_state = { "messages": messages, "variables_list": [], "workflow_steps": flow_data["workflow_steps"], 'current_node_id': flow_data["current_node_id"], 'flow_instruction': '', 'execution_path': [], 'variables': {'user_input': user_input,"user_id":user_id}, 'errors': [], 'response': '', 'condition_result':'false', 'wait_result': '', "session": session_id, 'flow_id': flow_id, "human_input": "", "file_upload" : {"file_upload": False, "config_data": {}}, "human_input_type": {}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, 'loop_count': 0 } config = {"configurable": {"thread_id": session_id}} response, action_list = chat_with_langgraph_agent_flow_builder(flow_executor_agent, initial_state, config, trigger_time=current_time) flow_list = action_list if isinstance(response,str): state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "action_list": action_list, "flow_list": flow_list, "agent_response": response, "date_time": current_time }, user_input=user_input, processed_input=user_input ) return Response( json.dumps({ "user_input": user_input, "action_list": action_list, "flow_list": flow_list, "agent_response": response, "date_time": current_time, "chat_history": state_store.get_chat_history(session_id=session_id), "human_input": False, "file_upload": {"file_upload": False, "config_data": {}}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, "status": 200 }), mimetype=mime_type ) else: state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "action_list": action_list, "flow_list": flow_list, "agent_response": response.get("agent_response",''), "human_input_query": response.get("human_input_query", ''), "date_time": current_time }, user_input=user_input, processed_input=user_input ) return Response( json.dumps({ "user_input": user_input, "action_list": action_list, "flow_list": flow_list, "agent_response": response.get("agent_response",''), "date_time": current_time, "chat_history": state_store.get_chat_history(session_id=session_id), "status": 200, "human_input_type": response.get("human_input_type",''), "human_input_query": response.get("human_input_query",''), "file_upload": response.get("file_upload", {}), "feedback_node": response.get("feedback_node", {}), "human_input": True, "session": response["session"], "flow_id": response["flow_id"] }), mimetype=mime_type ) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: return jsonify({"error": unexpected_error_occurred_message, "details": str(e), 'status': 500}) @application.route("/kapture_agent_flow_builder_human_input_resume", methods= ["POST"]) def flow_builder_resume_with_human_input(): try: current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") data = request.get_json() flow_id = data.get("flow_id") user_input = data.get("user_input",'') session_id = data.get("session_id") scheduler = data.get("scheduler",False) flow_executor_agent, _ = get_or_build_graph(session_id=session_id, flow_id=flow_id) config = {"configurable": {"thread_id": session_id}} response, action_list = chat_with_langgraph_agent_flow_builder(flow_executor_agent, user_input, config, trigger_time=current_time, scheduler=scheduler) flow_list = action_list if isinstance(response, str): state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "action_list": action_list, "flow_list": flow_list, "agent_response": response, "date_time": current_time }, user_input=user_input, processed_input=user_input ) return Response( json.dumps({ "user_input": user_input, "action_list": flow_list, "agent_response": response, "date_time": current_time, "human_input": False, "file_upload": {"file_upload": False, "config_data": {}}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, "chat_history": state_store.get_chat_history(session_id=session_id), "status": 200 }), mimetype=mime_type ) else: state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "action_list": action_list, "flow_list": flow_list, "agent_response": response.get("agent_response",''), "human_input_query": response.get("human_input_query", ''), "date_time": current_time }, user_input=user_input, processed_input=user_input ) return Response( json.dumps({ "user_input": user_input, "action_list": flow_list, "agent_response": response.get("agent_response",''), "date_time": current_time, "chat_history": state_store.get_chat_history(session_id=session_id), "human_input_type": response.get("human_input_type", ''), "human_input_query": response.get("human_input_query", ''), "status": 200, "file_upload": response.get("file_upload", {}), "feedback_node": response.get("feedback_node", {}), "human_input": True, "session": response["session"], "flow_id": response["flow_id"] }), mimetype=mime_type ) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: return jsonify({"error": unexpected_error_occurred_message, "details": str(e), 'status': 500}) def schedule_email_task(email_config=None, flow_id='', session_id='', user_id='', trigger_time=None, type=''): """ Main entry: Orchestrates email reading, parsing, and flow execution. """ if email_config is None: email_config = {} if not trigger_time: trigger_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: messages = fetch_matching_emails(email_config) user_inputs = [process_email(msg, email_config) for msg in messages] for user_input in filter(None, user_inputs): handle_flow_execution(user_input, flow_id, session_id, user_id, trigger_time, type) if not any(user_inputs): handle_failed_trigger(flow_id, session_id, user_id, trigger_time, type,notification_check=False) except Exception as e: logger.error(f"schedule_email_task failed: {e}") handle_failed_trigger(flow_id, session_id, user_id, trigger_time, type, str(e)) # ---------- Email Handling ---------- def fetch_matching_emails(email_config): """ Connect to mailbox and return matching email messages. """ email_host = email_config["emailHost"] email_port = email_config["emailPort"] email_user = email_config["emailUser"] email_pass = email_config["emailPass"] mail_box_name = email_config.get("mailBoxName", "INBOX") email_from = email_config.get("emailFrom") email_subject = email_config.get("emailSubject") action = email_config.get("action", "") now = datetime.now() start_time = now - timedelta(minutes=1) mail = imaplib.IMAP4_SSL(email_host, email_port) mail.login(email_user, email_pass) mail.select(mail_box_name, readonly=False) criteria = '(UNSEEN)' if email_from: criteria += f' FROM "{email_from}"' if email_subject: criteria += f' SUBJECT "{email_subject}"' _, messages = mail.search(None, criteria) email_ids = messages[0].split() results = [] for e_id in email_ids: fetch_cmd = "(RFC822)" if action.lower() == "mark_as_read" else "(BODY.PEEK[])" _, msg_data = mail.fetch(e_id, fetch_cmd) msg = email.message_from_bytes(msg_data[0][1]) # Optional: filter by date if "do nothing" if action == "do nothing" and not is_recent(msg, start_time): continue results.append(msg) mail.logout() return results def is_recent(msg, start_time): """Check if email timestamp falls within allowed window.""" msg_dt = parsedate_to_datetime(msg["Date"]) if msg_dt.tzinfo: msg_dt = msg_dt.astimezone(start_time.tzinfo) return start_time <= msg_dt <= datetime.now() # ---------- Email Parsing & LLM ---------- def process_email(msg, email_config): """ Extracts subject, body, and builds user_input with LLM. """ description = email_config["description"] subject, encoding = decode_header(msg["subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding or "utf-8") body = extract_body(msg) try: return build_user_input_with_llm(description, subject, body) except Exception as e: logger.warning(f"LLM parsing failed: {e}") return None def extract_body(msg): """Return plain-text body of an email.""" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": return part.get_payload(decode=True).decode(errors="ignore") return msg.get_payload(decode=True).decode(errors="ignore") def build_user_input_with_llm(description, subject, body): """Generate cleaned user_input using LLM.""" system_prompt = f""" You are a user input parser assistant. Your task is to generate a single clean **user_input string** for LangGraph flow extraction. ... Description: {description} Email Subject: {subject} Email Body: {body} Output only in the format: user_input: """ llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3) prompt = ChatPromptTemplate.from_messages([("system", system_prompt), ("human", "Return user input in text format.")]) response = (prompt | llm).invoke({}) return response.content.strip() # ---------- Flow Execution & Error Handling ---------- def handle_flow_execution(user_input, flow_id, session_id, user_id, trigger_time, type): try: build_flow_graph_and_execute( flow_id=flow_id, session_id=session_id, user_id=user_id, user_input=user_input, trigger_time=trigger_time, type=type ) except Exception as e: handle_flow_failure(flow_id, session_id, user_id, user_input, trigger_time, str(e), type) def handle_flow_failure(flow_id, session_id, user_id, user_input, trigger_time, error, type): """Log failure, update redis, send notifications.""" end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") audit_scheduler_task(flow_id, user_input, user_id, trigger_time, end_time, "Failed", [], error, type) redis_client.hset(flow_id, session_id, json.dumps({ "status": "Error", "message": error, "flow_id": flow_id, "session_id": session_id, "user_id": user_id, "user_input": user_input })) send_flow_notification(session_id, flow_id, 'Failed', failed_start_flow_message, trigger_time, end_time) def handle_failed_trigger(flow_id, session_id, user_id, trigger_time, type, error="No valid user input",notification_check=True): """Handle scenario when no valid input was found.""" redis_client.hset(flow_id, session_id, json.dumps({ "status": "Failed", "message": error, "flow_id": flow_id, "session_id": session_id, "user_id": user_id, "user_input": None, "type": "EMAIL" })) if notification_check: send_flow_notification(session_id, flow_id, 'Failed', f"Flow failed: {error}", trigger_time, datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def start_email_listener_idle(email_config: dict, flow_id='', session_id='', user_id='', trigger_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), type=''): host = email_config["emailHost"] port = email_config["emailPort"] user = email_config["emailUser"] password = email_config["emailPass"] mailbox = email_config.get("mailBoxName", "INBOX") description = email_config["description"] email_subject = email_config.get("emailSubject", "") email_from = email_config.get("emailFrom", "") action = email_config.get("action", "") # Extract action def parse_user_input(subject, body, description): prompt = f""" You are a user input parser assistant. Generate a single clean **user_input string** for LangGraph flow extraction. Rules: - Capture only the main intent (action + details). - Ignore greetings, closings, signatures. - Output must be exactly: user_input: Description: {description} Email Subject: {subject} Email Body: {body} """ llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3) prompt = ChatPromptTemplate.from_messages([ ("system", prompt), ("human", "Return user input in a text format.") ]) chain = prompt | llm response = chain.invoke({}) return response.content.strip() def process_new_mail(server, uid): try: print("process new mail",datetime.now()) raw_message = server.fetch([uid], ["RFC822"])[uid][b"RFC822"] msg = email.message_from_bytes(raw_message) subject, encoding = decode_header(msg["subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding or "utf-8") body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": body = part.get_payload(decode=True).decode(errors="ignore") break else: body = msg.get_payload(decode=True).decode(errors="ignore") user_input = parse_user_input(subject, body, description) if not user_input: raise ValueError("Failed to parse user_input") trigger_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") build_flow_graph_and_execute( flow_id=flow_id, session_id=session_id, user_id=user_id, user_input=user_input, trigger_time=trigger_time, type=type ) # Mark as read if configured if action == "mark as read": server.add_flags([uid], r'\Seen') # Mark UID processed in Redis redis_client.sadd(PROCESSED_KEY, uid) logger.info(f"✅ Processed UID={uid} : {subject}") except Exception as e: logger.error(f"❌ Error processing UID={uid}: {e}") # Push back into queue for retry redis_client.lpush(QUEUE_KEY, uid) # ------------------------------ # Worker thread: pulls from Redis # ------------------------------ def worker(): # Each worker creates its own connection while True: try: worker_server = IMAPClient(host, port=port, ssl=True, use_uid=True) worker_server.login(user, password) worker_server.select_folder(mailbox) # Blocking pop from Redis queue _, uid_bytes = redis_client.brpop(QUEUE_KEY) uid = int(uid_bytes) if not redis_client.sismember(PROCESSED_KEY, uid): process_new_mail(worker_server, uid) else: logger.info(f"⚠️ Skipping already processed UID={uid}") except Exception as e: logger.error(f"[Worker] Error: {e}. Reconnecting in 5s...") time.sleep(5) finally: if 'worker_server' in locals(): worker_server.logout() # ------------------------------ # Listener thread: pushes UIDs # ------------------------------ def idle_listener(server): while True: try: server.idle() responses = server.idle_check(timeout=60) server.idle_done() for resp in responses: print("GOT MAIL") if resp[1] == b"EXISTS": # Fixed: str, not bytes search_criteria = ["UNSEEN"] if email_from: search_criteria += ["FROM", email_from] if email_subject: search_criteria += ["SUBJECT", email_subject] messages = server.search(search_criteria) for uid in messages: # Push only if not processed if not redis_client.sismember(PROCESSED_KEY, uid): redis_client.lpush(QUEUE_KEY, uid) logger.info(f"📥 Queued UID={uid}") except Exception as e: logger.error(f"[Listener] Error: {e}. Reconnecting...") time.sleep(5) server = IMAPClient(host, port=port, ssl=True, use_uid=True) server.login(user, password) server.select_folder(mailbox) def search_and_queue_unseen(server): """Search for unseen emails and push to Redis queue if not processed.""" try: search_criteria = ["UNSEEN"] if email_from: search_criteria += ["FROM", email_from] if email_subject: search_criteria += ["SUBJECT", email_subject] messages = server.search(search_criteria) if not messages: logger.info("📭 No new unseen messages found.") return logger.info(f"📬 Found {len(messages)} unseen message(s).") for uid in messages: if not redis_client.sismember(PROCESSED_KEY, uid): redis_client.lpush(QUEUE_KEY, uid) logger.info(f"📥 Queued UID={uid}") else: logger.info(f"⚠️ Skipping already processed UID={uid}") except Exception as e: logger.error(f"❌ Error during unseen mail search: {e}") def idle_listener_new(server): """Listen for new emails using IMAP IDLE and periodic polling.""" try: last_poll = time.time() logger.info("📨 Entered IMAP IDLE listener...") while True: try: print("🕓 IDLE MODE STARTED", time.time(), datetime.now()) server.idle() responses = server.idle_check(timeout=60) server.idle_done() if responses: print(f"📨 IDLE responses: {responses}") for resp in responses: # Handle "EXISTS" response when a new email arrives if resp[1] == b"EXISTS": logger.info("📩 New mail detected via IDLE") search_and_queue_unseen(server) # 🔁 Poll every 90 seconds (for backup) if time.time() - last_poll > 90: logger.info("🔄 Performing periodic mail poll...") search_and_queue_unseen(server) last_poll = time.time() print("new poll", datetime.now()) server.noop() except Exception as e: logger.error(f"⚠️ Error during IDLE cycle: {e}") time.sleep(5) continue except Exception as e: logger.error(f"❌ IMAP listener fatal error: {e}. Retrying in 10s...") time.sleep(10) # ------------------------------ # Start service # ------------------------------ server = IMAPClient(host, port=port, ssl=True, use_uid=True) server.login(user, password) server.select_folder(mailbox) logger.info(f"📨 Listening on {mailbox} with IDLE (FROM={email_from}, SUBJECT={email_subject})...") # Start worker threads (now without passing server) for _ in range(2): threading.Thread(target=worker, daemon=True).start() # Start listener threading.Thread(target=idle_listener_new, args=(server,), daemon=True).start() # Keep main thread alive while True: time.sleep(60) def start_email_listener_idle2(email_config: dict, flow_id='', session_id='', user_id='', trigger_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), type=''): host = email_config["emailHost"] port = email_config["emailPort"] user = email_config["emailUser"] password = email_config["emailPass"] mailbox = email_config.get("mailBoxName", "INBOX") description = email_config["description"] email_subject = email_config.get("emailSubject", "") email_from = email_config.get("emailFrom", "") action = email_config.get("action", "") def parse_user_input(subject, body, description): prompt = f""" You are a user input parser assistant. Generate a single clean **user_input string** for LangGraph flow extraction. Rules: - Capture only the main intent (action + details). - Ignore greetings, closings, signatures. - Output must be exactly: user_input: Description: {description} Email Subject: {subject} Email Body: {body} """ llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3) prompt = ChatPromptTemplate.from_messages([ ("system", prompt), ("human", "Return user input in a text format.") ]) chain = prompt | llm response = chain.invoke({}) return response.content.strip() def process_new_mail(msg,server,uid): try: print("process new mail",datetime.now()) # raw_message = server.fetch([uid], ["RFC822"])[uid][b"RFC822"] # msg = email.message_from_bytes(raw_message) subject, encoding = decode_header(msg["subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding or "utf-8") body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": body = part.get_payload(decode=True).decode(errors="ignore") break else: body = msg.get_payload(decode=True).decode(errors="ignore") user_input = parse_user_input(subject, body, description) if not user_input: raise ValueError("Failed to parse user_input") trigger_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") build_flow_graph_and_execute( flow_id=flow_id, session_id=session_id, user_id=user_id, user_input=user_input, trigger_time=trigger_time, type=type ) # Mark as read if configured if action == "mark as read": server.add_flags([uid], r'\Seen') print(f"✅ Processed and marked message UID={uid} as read.") # Mark UID processed in Redis redis_client.sadd(PROCESSED_KEY, uid) logger.info(f"✅ Processed UID={uid} : {subject}") except Exception as e: logger.error(f"❌ Error processing UID={uid}: {e}") # Push back into queue for retry redis_client.lpush(QUEUE_KEY, uid) def process_pending_from_queue(client): """Process any UIDs still present in the Redis queue.""" try: while redis_client.llen(QUEUE_KEY) > 0: uid_bytes = redis_client.rpop(QUEUE_KEY) if not uid_bytes: break uid = int(uid_bytes) if redis_client.sismember(PROCESSED_KEY, uid): logger.info(f"⚠️ Skipping already processed UID={uid}") continue logger.info(f"🔁 Processing pending UID={uid} from Redis queue") message_data = client.fetch([uid], ["RFC822"]).get(uid) if not message_data: logger.warning(f"⚠️ Message UID={uid} not found in mailbox.") continue msg = email.message_from_bytes(message_data[b"RFC822"]) process_new_mail(msg, client, uid) redis_client.sadd(PROCESSED_KEY, uid) except Exception as e: logger.error(f"❌ Error while processing Redis queue: {e}") def check_new_mail(client): """Connect, fetch unread emails, and process them.""" try: search_criteria = ["UNSEEN"] if email_from: search_criteria += ["FROM", email_from] if email_subject: search_criteria += ["SUBJECT", email_subject] messages = client.search(search_criteria) if not messages: print("📭 No new messages.") return print(f"📬 Found {len(messages)} new message(s).") for uid, message_data in client.fetch(messages, "RFC822").items(): raw_email = message_data[b"RFC822"] msg = email.message_from_bytes(raw_email) if not redis_client.sismember(PROCESSED_KEY, uid): redis_client.lpush(QUEUE_KEY, uid) logger.info(f"📥 Queued UID={uid}") process_new_mail(msg, client, uid) else: logger.info(f"⚠️ Skipping already processed UID={uid}") # client.add_flags(uid, [b"\\Seen"]) process_pending_from_queue(client) except Exception as e: print(f"❌ Error checking mail: {e}") redis_client.lpush(QUEUE_KEY, uid) def listen_for_mail(): while True: try: with IMAPClient(host, ssl=True) as client: client.login(user, password) client.select_folder(mailbox) print("🕓 Entering IDLE mode...") logger.info(f"📨 Listening on {mailbox} with IDLE (FROM={email_from}, SUBJECT={email_subject})...") last_poll = time.time() print("last poll time", last_poll, datetime.now()) while True: print("IDLE MODE STARTED", time.time(),datetime.now()) client.idle() responses = client.idle_check(timeout=60) client.idle_done() if responses: print(f"📨 IDLE responses: {responses}") check_new_mail(client) if time.time() - last_poll > 90: check_new_mail(client) last_poll = time.time() print("new poll", datetime.now()) client.noop() except Exception as e: print(f"❌ IMAP connection error: {e}. Retrying in 10s...",datetime.now()) time.sleep(10) server = IMAPClient(host, port=port, ssl=True) server.login(user, password) server.select_folder(mailbox) listener_thread = threading.Thread( target=listen_for_mail, daemon=True ) listener_thread.start() def build_flow_graph_and_execute(flow_id='', session_id='', user_id='', user_input='',trigger_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),type =''): action_list = [] try: send_flow_notification(session_id, flow_id, 'Started', "Flow has started execution", trigger_time, datetime.now().strftime(date_format)) flow_executor_agent, flow_data = get_or_build_graph(session_id=session_id, flow_id=flow_id) customer_user_id = f"Use the value {user_id} as the current user identifier (user_id or email_id) whenever user context is needed" flow_instructions = flow_data["flow_instructions"] if not flow_instructions: flow_instructions = '' final_flow_instructions = customer_user_id + flow_instructions custom_system_prompt = SystemMessage(content=(final_flow_instructions)) messages = [custom_system_prompt, HumanMessage(content=user_input)] # Save the Mermaid graph as a PNG image # graph_bytes = flow_executor_agent.get_graph().draw_mermaid_png() # Save it to a file # with open("flow_graph.png", "wb") as f: # f.write(graph_bytes) initial_state = { "messages": messages, "variables_list": [], "workflow_steps": flow_data["workflow_steps"], 'current_node_id': flow_data["current_node_id"], 'flow_instruction': '', 'execution_path': [], 'variables': {'user_input': user_input,"user_id":user_id}, 'errors': [], 'response': '', 'condition_result': 'false', 'wait_result': '', "session": session_id, 'flow_id': flow_id, "human_input": "", "human_input_type": {}, "feedback_node": {"feedback_node_type": False, "config_data": {}}, 'loop_count': 0 } config = {"configurable": {"thread_id": session_id}} response, action_list = chat_with_langgraph_agent_flow_builder(flow_executor_agent, initial_state, config,trigger_time=trigger_time,scheduler=True,type=type) flow_list = action_list existing = redis_client.hget(flow_id, session_id) if existing: record = json.loads(existing) else: record = {} redis_client.hset(flow_id,session_id, json.dumps({"final_status": "Completed",**record})) return action_list, flow_list, response except Exception as e: end_time = datetime.now().strftime(date_format) audit_scheduler_task(flow_id, user_input, user_id, trigger_time, end_time, "Failed", action_list,str(e),type) redis_client.hset(flow_id, session_id, json.dumps({"status": "Error", **{ "message": str(e), "flow_id": flow_id, "session_id": session_id, "user_id": user_id, "user_input": user_input, "type": type }})) send_flow_notification(session_id, flow_id, 'Failed', failed_start_flow_message, trigger_time, end_time ) return None,None,None @application.route("/kapture_agent_trigger_schedule_task", methods=["POST"]) def trigger_scheduler_tasks(): """ Main entrypoint for scheduling tasks (cron/email/webhook). Decides based on request payload and delegates to helpers. """ triggered_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") data = request.get_json(force=True) expression = data.get("expression", "") email_config = data.get("email_config", "") webhook_config = data.get("webhook_config", "") flow_id = data.get("flow_id", "") user_id = data.get("user_id", {}) user_input = data.get("user_input", "") tz_name = data.get("timezone", "UTC") current_message = "Flow is in Pending status" # Clean up previous scheduled jobs cleanup_previous_jobs(flow_id) # Dispatch by type if expression: return handle_cron_schedule(expression, flow_id, user_id, user_input, tz_name, triggered_time,current_message) elif email_config: return handle_email_schedule(email_config, flow_id, user_id, user_input, tz_name, triggered_time,current_message) elif webhook_config: return handle_webhook_schedule(webhook_config, flow_id, user_id, user_input, tz_name, triggered_time,current_message) return jsonify({"message": "No valid schedule configuration provided"}), 400 # ---------- Housekeeping ---------- def cleanup_previous_jobs(flow_id): """Remove previously scheduled jobs for the same flow.""" flow_details = get_flow_name(flow_id) flow_reference_id = flow_details.get("data", {}).get("flowReferenceId", " ") previous_flow_ids = [ dat.get("workflowId", "") for dat in get_previous_flows(flow_reference_id).get("data", []) ] if not previous_flow_ids: return for prev_id in previous_flow_ids: data = redis_client.hgetall(prev_id) records = {k.decode(): v.decode() for k, v in data.items()} for rec in records: if scheduler.get_job(rec): scheduler.remove_job(rec) redis_client.hdel(flow_id, rec) # ---------- Cron Scheduling ---------- def handle_cron_schedule(expression, flow_id, user_id, user_input, tz_name, triggered_time,current_message): """Handle CRON-based scheduling of flows.""" # Normalize user_input user_input = normalize_user_input(user_input) if not user_input: return fail_schedule(flow_id, user_id, user_input, "CRON", triggered_time, "Flow has failed due to incorrect user input format") # Validate timezone try: tz = pytztimezone(tz_name) except UnknownTimeZoneError: return jsonify({"message": f"Invalid timezone: {tz_name}"}), 400 # Validate cron expression try: minute, hour, dom, month, dow = expression.split() cron_trigger = CronTrigger( minute=minute, hour=hour, day=dom, month=month, day_of_week=dow, timezone=tz ) except Exception as e: return jsonify({"message": f"Invalid cron expression: {expression}, error: {e}"}), 400 all_task_details = [] for usr_inpt in user_input: schedule_id = str(uuid.uuid4()) try: scheduler.add_job( build_flow_graph_and_execute, trigger=cron_trigger, id=schedule_id, kwargs={"flow_id": flow_id, "session_id": schedule_id, "user_id": user_id, "user_input": usr_inpt, "type": "CRON"}, replace_existing=True, ) redis_client.hset(flow_id, schedule_id, json.dumps({ "status": "Pending", "flow_id": flow_id, "session_id": schedule_id, "user_id": user_id, "user_input": user_input, "type": "CRON" })) send_flow_notification(schedule_id, flow_id, 'Pending', current_message, triggered_time, datetime.now().strftime(date_format)) all_task_details.append({ "message": "Task scheduled", "schedule_id": schedule_id, "flow_id": flow_id, "expression": expression, "timezone": tz_name, "type": "CRON" }) except Exception as e: handle_schedule_failure(flow_id, schedule_id, user_id, user_input, triggered_time, str(e), "CRON") all_task_details.append({ "message": f"Task scheduling failed: {e}", "schedule_id": schedule_id, "flow_id": flow_id, "expression": expression, "timezone": tz_name, "type": "CRON" }) return jsonify(all_task_details) # ---------- Email Scheduling ---------- def handle_email_schedule(email_config, flow_id, user_id, user_input, tz_name, triggered_time,current_message): """Handle EMAIL-based scheduling of flows.""" schedule_id = str(uuid.uuid4()) all_task_details = [] try: # scheduler.add_job( # schedule_email_task, id=schedule_id, seconds=60, trigger="interval", # kwargs={"email_config": email_config, "flow_id": flow_id, # "session_id": schedule_id, "user_id": user_id, "type": "EMAIL"}, # replace_existing=True, # ) scheduler.add_job( start_email_listener_idle2, 'date', id=schedule_id, kwargs={"email_config": email_config, "flow_id": flow_id, "session_id": schedule_id, "user_id": user_id, "type": "EMAIL"}, replace_existing=True ) redis_client.hset(flow_id, schedule_id, json.dumps({ "status": "Pending", "flow_id": flow_id, "session_id": schedule_id, "user_id": user_id, "user_input": user_input, "type": "EMAIL" })) send_flow_notification(schedule_id, flow_id, 'Pending', current_message, triggered_time, datetime.now().strftime(date_format)) all_task_details.append({ "message": "Task scheduled", "schedule_id": schedule_id, "flow_id": flow_id, "expression": "", "timezone": tz_name, "type": "EMAIL" }) except Exception as e: handle_schedule_failure(flow_id, schedule_id, user_id, user_input, triggered_time, str(e), "EMAIL") all_task_details.append({ "message": f"Task scheduling failed: {e}", "schedule_id": schedule_id, "flow_id": flow_id, "expression": "", "timezone": tz_name, "type": "EMAIL" }) return jsonify(all_task_details) # ---------- Webhook Scheduling ---------- def handle_webhook_schedule(webhook_config, flow_id, user_id, user_input, tz_name, triggered_time,current_message): """Handle WEBHOOK-based scheduling of flows.""" schedule_id = str(uuid.uuid4()) ts_millis = int(time.time() * 1000) ts_iso = datetime.now(timezone.utc).isoformat() user_input = normalize_user_input(user_input) config = { "webhook_config": webhook_config, "flow_id": flow_id, "session_id": schedule_id, "user_id": user_id, "user_input": json.dumps(user_input), "type": "WEBHOOK", ts_millis: f"{ts_iso}|{triggered_time}" } redis_client.hset(flow_id, schedule_id, json.dumps({"status": "Pending", **config})) send_flow_notification(schedule_id, flow_id, 'Pending', current_message, triggered_time, datetime.now().strftime(date_format)) return jsonify({ "message": "Webhook Task scheduled", "schedule_id": schedule_id, "flow_id": flow_id, "timezone": tz_name, "type": "WEBHOOK" }) # ---------- Common Helpers ---------- def normalize_user_input(user_input): """Standardize user_input into a list of strings.""" if isinstance(user_input, dict): return [escape_curly_braces(json.dumps(user_input, indent=2))] if isinstance(user_input, list): return [json.dumps(i) for i in user_input] if isinstance(user_input, str): return [user_input] return None def fail_schedule(flow_id, user_id, user_input, type, triggered_time, message): """Register a failed schedule attempt.""" schedule_id = str(uuid.uuid4()) redis_client.hset(flow_id, schedule_id, json.dumps({ "status": "Failed", "message": message, "flow_id": flow_id, "session_id": schedule_id, "user_id": user_id, "user_input": user_input, "type": type })) send_flow_notification(schedule_id, flow_id, 'Failed', message, triggered_time, datetime.now().strftime(date_format)) return jsonify({"message": message}), 500 def handle_schedule_failure(flow_id, schedule_id, user_id, user_input, triggered_time, error, type): """Handle errors when adding a job to APScheduler.""" end_time = datetime.now().strftime(date_format) audit_scheduler_task(flow_id, user_input, user_id, triggered_time, end_time, "Failed", [], error, type) redis_client.hset(flow_id, schedule_id, json.dumps({ "status": "Error", "message": error, "flow_id": flow_id, "session_id": schedule_id, "user_id": user_id, "user_input": user_input, "type": type })) send_flow_notification(schedule_id, flow_id, 'Failed', failed_start_flow_message, triggered_time, end_time) logger.error(f"Error scheduling task: {error}") @application.route("/kapture_agent_stop_scheduled_task", methods=["GET"]) def stop_task(): try: session_id= request.args.get("session_id") flow_id = request.args.get("flow_id") if not session_id: return jsonify({"message": "session_id is required"}), 400 scheduler.remove_job(session_id) redis_client.hdel(flow_id, session_id) # redis_client.delete(flow_id) logger.info(f"Stopped job ID={session_id}") return jsonify({"message": f"Stopped schedule {session_id}"}) except Exception as e: logger.error(f"Error stopping task: {str(e)}") return jsonify({"message": f"Error stopping task: {str(e)}"}), 500 @application.route("/kapture_agent_stop_all_tasks", methods=["GET"]) def stop_all_tasks(): try: flow_id = request.args.get("flow_id") jobs = scheduler.get_jobs() if flow_id: data = redis_client.hgetall(flow_id) records ={key.decode('utf-8'): value.decode('utf-8') for key, value in data.items()} for rec in records: if scheduler.get_job(rec): scheduler.remove_job(rec) redis_client.hdel(flow_id, rec) else: for job in jobs: scheduler.remove_job(job.id) redis_client.hdel(str(job.kwargs.get("flow_id",'')), job.id) return jsonify({"message": "Stopped all tasks"} if not flow_id else {"message": f"Stopped all tasks related to {flow_id}"}) except Exception as e: logger.error(f"Error stopping task: {str(e)}") return jsonify({"message": f"Error stopping task: {str(e)}"}), 500 @application.route("/kapture_agent_list_scheduled_tasks", methods=["GET"]) def list_tasks(): try: jobs = scheduler.get_jobs() job_list = [] for job in jobs: job_list.append( { "schedule_id": job.id, "next_run_time": str(job.next_run_time), "trigger": str(job.trigger), "flow_id":str(job.kwargs.get("flow_id",'')), "type":str(job.kwargs.get("type",'')) } ) return jsonify({"schedules": job_list}) except Exception as e: logger.error(f"Error listing tasks: {str(e)}") return jsonify({"message": f"Error listing tasks: {str(e)}"}), 500 @application.route("/kapture_agent_cron_scheduler_task_status", methods=["GET"]) def task_status(): try: flow_id = request.args.get("flow_id") session_id = request.args.get("session_id") current_time =datetime.now().strftime("%Y-%m-%d %H:%M:%S") if not flow_id: return jsonify({"message": "flow id is required"}), 400 data = redis_client.hget(flow_id, session_id) #close the task if not data: return jsonify({"message": "Task not found"}), 404 record = json.loads(data) user_input = record['user_input'] if record.get("status") == "completed": if isinstance(record['result'], str): state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "action_list": record['action_list'], "flow_list": record['action_list'], "agent_response": record['final_response'], "date_time": current_time }, user_input=user_input, processed_input=user_input ) return Response( json.dumps({ "user_input": user_input, "action_list": record['action_list'], "flow_list": record['action_list'], "agent_response": record['final_response'], "date_time": current_time, "chat_history": state_store.get_chat_history(session_id=session_id), "human_input": False, "status": 200 }), mimetype=mime_type ) else: state_store.update_session_state( session_id, chat_entry={ "user_input": user_input, "action_list": record['action_list'], "flow_list": record['action_list'], "agent_response": record['result'].get("agent_response", ''), "human_input_query": record['result'].get("human_input_query", ''), "date_time": current_time }, user_input=user_input, processed_input=user_input ) return Response( json.dumps({ "user_input": user_input, "action_list": record['action_list'], "flow_list": record['action_list'], "agent_response": record['result'].get("agent_response", ''), "date_time": current_time, "chat_history": state_store.get_chat_history(session_id=session_id), "status": 200, "human_input_type": record['result'].get("human_input_type", ''), "human_input_query": record['result'].get("human_input_query", ''), "human_input": True, "session": record['result']["session"], "flow_id": record['result']["flow_id"] }), mimetype=mime_type ) else: return jsonify({"status": record.get("status"), "message": f"The flow id {flow_id} is in {record.get('status')}",'status_code': 200}) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: return jsonify({"error": unexpected_error_occurred_message, "details": str(e), 'status': 500}) @application.route("/kapture_agent/flows//webhook", methods=["POST"]) def webhook_handler(flow_id: str): triggered_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: # ----------- Input validation ----------- data, form_data = request.get_json(force=True), request.form if not flow_id: return jsonify({"message": "flow id is required"}), 400 if not data and form_data: return jsonify({"message": "Missing data"}), 400 # ----------- Load config ----------- config = get_latest_config_from_redis(flow_id) if not validate_auth(request, config.get("webhook_config", {})): abort(401, description="Unauthorized") # ----------- Extract body ----------- body = extract_request_body(data, form_data) if not body: return handle_missing_body(flow_id, config, triggered_time) user_input = escape_curly_braces(json.dumps(body, indent=2)) # ----------- Execute Job ----------- return schedule_webhook_job(flow_id, config, user_input, triggered_time) except Exception as e: return jsonify({ "error": unexpected_error_message, "message": str(e), "status_code": 500 }), 500 # ----------------- Helpers ----------------- def get_latest_config_from_redis(flow_id: str) -> dict: """Fetch the latest config record from Redis for a given flowId.""" all_fields = redis_client.hgetall(flow_id) latest_ts, config = -1, {} for field, value in all_fields.items(): record = json.loads(value.decode("utf-8")) ts_keys = [int(k) for k in record.keys() if k.isdigit()] if ts_keys: record_ts = max(ts_keys) if record_ts > latest_ts: latest_ts, config = record_ts, record return config def extract_request_body(data, form_data) -> dict: """Safely extract body from JSON or form-data.""" try: return data if data else dict(form_data) except Exception: return {} def handle_missing_body(flow_id: str, config: dict, triggered_time: str): """Handle the case when webhook body is missing.""" end_time = datetime.now().strftime(date_format) session_id = config.get("session_id") redis_client.hset(flow_id, session_id, json.dumps({ "status": "Error", "message": "User input not found", "flow_id": flow_id, "session_id": session_id, "user_id": config.get("user_id"), "user_input": None, "type": "WEBHOOK" })) send_flow_notification(session_id, flow_id, 'Failed', "Flow failed due to user input not found", triggered_time, end_time) return jsonify({ "error": "Missing body", "message": "Unexpected Error due to missing body", "status_code": 500 }), 500 def schedule_webhook_job(flow_id: str, config: dict, user_input: str, triggered_time: str): """Schedule a job for webhook-triggered execution.""" try: session_id = config.get("session_id") if not session_id: return handle_missing_session(flow_id, config, user_input, triggered_time) job = scheduler.add_job( build_flow_graph_and_execute, trigger=DateTrigger(run_date=datetime.now(timezone.utc)), kwargs={ "flow_id": flow_id, "session_id": session_id, "user_id": config.get("user_id"), "user_input": user_input, "type": config.get("type") }, id=session_id, replace_existing=True, ) return handle_job_status(flow_id, config, user_input, triggered_time, job) except Exception as e: return handle_job_error(flow_id, config, user_input, triggered_time, str(e)) def handle_missing_session(flow_id: str, config: dict, user_input: str, triggered_time: str): """Handle missing session id in config.""" end_time = datetime.now().strftime(date_format) session_id = config.get("session_id") audit_scheduler_task(flow_id, user_input, config.get("user_id"), triggered_time, end_time, "Failed", [], no_session_id_flow_message, "WEBHOOK") redis_client.hset(flow_id, session_id, json.dumps({ "status": "Error", "message": no_session_id_flow_message, "flow_id": flow_id, "session_id": session_id, "user_id": config.get("user_id"), "user_input": user_input, })) send_flow_notification(session_id, flow_id, 'Failed', "Flow not started due to session id not found", triggered_time, end_time) return jsonify({ "error": unexpected_error_message, "message": no_session_id_flow_message, "status_code": 500 }), 500 def handle_job_status(flow_id: str, config: dict, user_input: str, triggered_time: str, job): """Return response based on job status.""" session_id = config.get("session_id") if not job.pending: return jsonify({ "status": "success", "session_id": session_id, "message": "Job has started execution", "status_code": 200 }), 200 # Handle case where job not started end_time = datetime.now().strftime(date_format) redis_client.hset(flow_id, session_id, json.dumps({ "status": "Error", "message": "Job has not started execution", "flow_id": flow_id, "session_id": session_id, "user_id": config.get("user_id"), "user_input": user_input, })) send_flow_notification(session_id, flow_id, 'Failed', "Flow has not started execution", triggered_time, end_time) return jsonify({ "status": "success", "session_id": session_id, "message": "Job has not started execution", "status_code": 200, }), 200 def handle_job_error(flow_id: str, config: dict, user_input: str, triggered_time: str, error_msg: str): """Handle errors during job scheduling.""" end_time = datetime.now().strftime(date_format) session_id = config.get("session_id") audit_scheduler_task(flow_id, user_input, config.get("user_id"), triggered_time, end_time, "Failed", [], error_msg, "WEBHOOK") redis_client.hset(flow_id, session_id, json.dumps({ "status": "Error", "message": error_msg, "flow_id": flow_id, "session_id": session_id, "user_id": config.get("user_id"), "user_input": user_input, "type": "WEBHOOK" })) send_flow_notification(session_id, flow_id, 'Failed', failed_start_flow_message, triggered_time, end_time) return jsonify({ "error": unexpected_error_message, "message": error_msg, "status_code": 500 }), 500 @application.route("/get_patient_information", methods=["POST"]) def parse_api(): if not request.is_json: return jsonify({"error": f"Content-Type must be {mime_type}"}), 415 body = request.get_json(silent=True) if body is None: msg = "Invalid JSON" return jsonify({"error": msg}), 400 paragraphs = body.get("paragraphs") if not isinstance(paragraphs, list) or not all(isinstance(p, (str, type(None))) for p in paragraphs): return jsonify({"error": "'paragraphs' must be a list of strings"}), 422 try: return jsonify(parse_paragraphs(paragraphs)), 200 except Exception: return jsonify({"error": "Failed to parse paragraphs"}), 500 @application.route("/get_patient_information_ai_solution", methods=["POST"]) def parse_ai_api(): if not request.is_json: return jsonify({"error": "Content-Type must be application/json"}), 415 body = request.get_json(silent=True) if body is None: msg = "Invalid JSON" return jsonify({"error": msg}), 400 paragraphs = body.get("paragraphs") if not isinstance(paragraphs, list) or not all(isinstance(p, (str, type(None))) for p in paragraphs): return jsonify({"error": "'paragraphs' must be a list of strings"}), 422 try: api_key = os.getenv("AZURE_API_KEY") processor = AIPrescriptionProcessor(api_key=api_key) return jsonify(processor.process_prescription(paragraphs)), 200 except Exception as e: return jsonify({"error": "Failed to parse paragraphs", "details": str(e)}), 500 @application.route("/kapture_agent_generate_content", methods=["POST"]) def kapture_generate_content(): if not request.is_json: return jsonify({"error": "Content-Type must be application/json"}), 415 body = request.get_json(silent=True) if body is None: msg = "JSON body is not provided" return jsonify({"error": msg}), 400 category = body.get("category","topic") sub_category = body.get("sub_category","Description") content = body.get("content") answer = {} if not content: return jsonify({"error": "Content is not provided"}), 500 try: generated_content = generate_content_kapture(category,sub_category,content) if generated_content: answer['result'] = generated_content answer['status'] = "success" else: answer['result'] = None answer['status'] = "error" answer['error'] = "Failed to generate content" return jsonify(answer) except Exception as e: return jsonify({"status":"error","error": "Failed to generate content", "details": str(e), "result":None}), 500 @application.route('/kapture_agent_build_sub_agents', methods=['POST']) def build_multi_sub_agents(): global response, action_list, flow_list st_time = time.perf_counter() try: data = request.get_json() if not data: return jsonify({"error": "Invalid or missing JSON payload", 'status': 400}) code = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(5)) session_id = data.get("session_id",code) multi_agent_id = data.get("multi_agent_id", "").strip() sub_agents_details = get_supervisor_agent_details(multi_agent_id) agent_ids = [] for item in sub_agents_details.get("assignments", []): config = item.get("subAgent") if not config: continue id = config.get("id") topic_ids = [i.get("id") for i in item.get("topics")] agent_ids.append({"agent_id": id, "topic_ids": topic_ids}) collected_params = state_store.get_collected_params(session_id) final_agents = asyncio.run( process_all_agents_parallel(agent_ids, session_id, '', state_store, checkpointer, in_memory_store, llm, collected_params) ) print("TIME FOR BUILDING SUB AGENTS", time.perf_counter() - st_time) hash_key = generate_hash_key(multi_agent_id) if final_agents: sub_agents_store.update({hash_key:final_agents}) return jsonify( {"status": "success", "message": f"{len(final_agents)} Sub agents have updated for multi-agent-id {multi_agent_id}"}), 200 else: return jsonify( {"status": "error", "message": f"No agents have been updated for the multi-agent-id {multi_agent_id}"}), 200 except Exception as e: return jsonify( {"status": "error", "error": "Failed to generate sub_agents", "details": str(e), "result": None}), 500 @application.route('/kapture_agent_execute_multi_agent_process', methods=['POST']) def build_execute_multi_agent_process(): our_code_start_time = time.time() st_time= time.perf_counter() try: data = request.get_json() if not data: return jsonify({"error": "Invalid or missing JSON payload", 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": "Session ID is missing", 'status': 400}) user_id = data.get("user_id", "") if not user_id: return jsonify({"error": "User ID is missing", "details": "User ID is not provided", 'status': 400}) state = state_store.get(session_id) if state is None: # Create new session state state = { "chat_history": [], "user_input": "", "processed_input": "", "collected_params": {} } state_store[session_id] = state collected_params = state_store.get_collected_params(session_id) user_input = data.get("user_input", "").strip() multi_agent_id = data.get("multi_agent_id", "").strip() sub_agents_details = get_supervisor_agent_details(multi_agent_id) supervisor_prompt = sub_agents_details.get("superVisorPrompt", "").strip() is_supervisor = sub_agents_details.get("isSuperVisorAgent", "") state["processed_input"] = user_input hash_key = generate_hash_key(multi_agent_id) CustomUserInput().user_input = user_input if not user_input: return jsonify({"error": "User input is required", 'status': 400}) configuration = {"configurable": {"thread_id": session_id}} if not sub_agents_store.get(hash_key): agent_ids = [] for item in sub_agents_details.get("assignments", []): config = item.get("subAgent") if not config: continue id = config.get("id") topic_ids = [i.get("id") for i in item.get("topics")] agent_ids.append({"agent_id": id, "topic_ids": topic_ids}) try: final_agents = asyncio.run( process_all_agents_parallel(agent_ids, session_id, user_input, state_store, checkpointer, in_memory_store, llm, collected_params) ) # loop = asyncio.get_event_loop() # asyncio.set_event_loop(loop) # final_agents = loop.run_until_complete( # process_all_agents_parallel(agent_ids, session_id, user_input, state_store, checkpointer, # in_memory_store, llm, collected_params) # ) # loop.close() if final_agents: sub_agents_store.update({hash_key: final_agents}) except Exception as e: print("EXCEPTION OCCURED", e) final_agents = [] print("TIME FOR BUILDING SUB AGENTS", time.perf_counter() - st_time) else: final_agents = sub_agents_store.get(hash_key) seen_topic_ids = set() sub_agents = [] cached_response, topic_name, flow_list, human_input, _ = cache_mang_obj.get_cached_response(agent_id=multi_agent_id, user_id =user_id, user_query=user_input) if cached_response: return _finalize_response_with_cache( user_id, multi_agent_id, session_id, user_input, topic_name, '', [], cached_response, user_input, our_code_start_time, datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') ) if final_agents: for res in final_agents: topic_id = res.get("topic_id") if topic_id not in seen_topic_ids: seen_topic_ids.add(topic_id) if res.get("sub_agent"): sub_agents.append(res.get("sub_agent")) customer_user_id = f"Use the value {user_id} as the current user identifier (user_id or email_id) whenever user context is needed" supervisor_prebuilt_workflow = create_supervisor( agents=sub_agents, output_mode="last_message", model=llm, prompt=(supervisor_prompt+customer_user_id), state_schema=MultiAgentState, add_handoff_messages=True, add_handoff_back_messages =True ) supervisor_prebuilt = supervisor_prebuilt_workflow.compile(name=f"{multi_agent_id}_agent", checkpointer=checkpointer, store=in_memory_store) # graph_bytes = supervisor_prebuilt.get_graph().draw_mermaid_png() # # with open("multi_agent_graph.png", "wb") as f: # f.write(graph_bytes) result = supervisor_prebuilt.invoke({"messages": [HumanMessage(content=user_input)],"user_input":user_input,"session_id":session_id}, config=configuration) multi_agent_tool_object.reset() CustomUserInput().user_input = '' if "__interrupt__" in result: pass for message in result["messages"]: message.pretty_print() a2a_workflow_list = final_formatting_of_responses(result["messages"]) response = result["messages"][-1].content timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') return _finalize_and_build_response( user_id, multi_agent_id, session_id, user_input, '', '', [], response, user_input, our_code_start_time, timestamp,supervisor_actions = a2a_workflow_list ) except ValueError as e: return jsonify({"error": "Invalid value provided", "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": "Missing key in state or session", "details": str(e), 'status': 400}) except Exception as e: traceback.print_exc() return jsonify({"error": "An unexpected error occurred", "details": str(e), 'status': 500}) @application.route('/kapture_agent_stream_multi_agent_process', methods=['POST']) def stream_multi_agent_process(): our_code_start_time = time.time() st_time= time.perf_counter() try: data = request.get_json() if not data: return jsonify({"error": "Invalid or missing JSON payload", 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": "Session ID is missing", 'status': 400}) user_id = data.get("user_id", "") if not user_id: return jsonify({"error": "User ID is missing", "details": "User ID is not provided", 'status': 400}) state = state_store.get(session_id) if state is None: # Create new session state state = { "chat_history": [], "user_input": "", "processed_input": "", "collected_params": {} } state_store[session_id] = state collected_params = state_store.get_collected_params(session_id) user_input = data.get("user_input", "").strip() multi_agent_id = data.get("multi_agent_id", "").strip() sub_agents_details = get_supervisor_agent_details(multi_agent_id) supervisor_prompt = sub_agents_details.get("superVisorPrompt", "").strip() is_supervisor = sub_agents_details.get("isSuperVisorAgent", "") state["processed_input"] = user_input hash_key = generate_hash_key(multi_agent_id) CustomUserInput().user_input = user_input if not user_input: return jsonify({"error": "User input is required", 'status': 400}) configuration = {"configurable": {"thread_id": session_id}} if not sub_agents_store.get(hash_key): agent_ids = [] for item in sub_agents_details.get("assignments", []): config = item.get("subAgent") if not config: continue id = config.get("id") topic_ids = [i.get("id") for i in item.get("topics")] agent_ids.append({"agent_id": id, "topic_ids": topic_ids}) try: final_agents = asyncio.run( process_all_agents_parallel(agent_ids, session_id, user_input, state_store, checkpointer, in_memory_store, llm, collected_params) ) # loop = asyncio.get_event_loop() # asyncio.set_event_loop(loop) # final_agents = loop.run_until_complete( # process_all_agents_parallel(agent_ids, session_id, user_input, state_store, checkpointer, # in_memory_store, llm, collected_params) # ) # loop.close() if final_agents: sub_agents_store.update({hash_key: final_agents}) except Exception as e: print("EXCEPTION OCCURED", e) final_agents = [] print("TIME FOR BUILDING SUB AGENTS", time.perf_counter() - st_time) else: final_agents = sub_agents_store.get(hash_key) seen_topic_ids = set() sub_agents = [] # cached_response = get_semantic_cached_response(user_input) # # if cached_response: # return _finalize_response_with_cache( # user_id, multi_agent_id, session_id, user_input, # '', '', # [], cached_response, user_input, # our_code_start_time, datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') # ) if final_agents: for res in final_agents: topic_id = res.get("topic_id") if topic_id not in seen_topic_ids: seen_topic_ids.add(topic_id) if res.get("sub_agent"): sub_agents.append(res.get("sub_agent")) customer_user_id = f"Use the value {user_id} as the current user identifier (user_id or email_id) whenever user context is needed" supervisor_prebuilt_workflow = create_supervisor( agents=sub_agents, output_mode="last_message", model=llm, prompt=(supervisor_prompt+customer_user_id), state_schema=MultiAgentState, add_handoff_messages=True, add_handoff_back_messages =True ) supervisor_prebuilt = supervisor_prebuilt_workflow.compile(name=f"{multi_agent_id}_agent", checkpointer=checkpointer, store=in_memory_store) def generate(): sent = set() result = supervisor_prebuilt.stream({"messages": [HumanMessage(content=user_input)],"user_input":user_input,"session_id":session_id}, config=configuration) for event in result: for node, payload in event.items(): if payload and "messages" in payload: for msg in payload["messages"]: uid = getattr(msg, "id", f"{msg.type}:{hash(str(msg.content))}") if uid not in sent: sent.add(uid) final_payload = _finalize_and_stream_response( user_id, multi_agent_id, session_id, user_input, our_code_start_time, timestamp, streamed_event=format_message(msg) ) yield final_payload multi_agent_tool_object.reset() CustomUserInput().user_input = '' timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') # response = Response( # stream_with_context(generate()), # mimetype="text/event-stream", # ) response = Response(stream_with_context(stream_events(generate())), mimetype="text/event-stream") return response except ValueError as e: return jsonify({"error": "Invalid value provided", "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": "Missing key in state or session", "details": str(e), 'status': 400}) except Exception as e: traceback.print_exc() return jsonify({"error": "An unexpected error occurred", "details": str(e), 'status': 500}) @application.route('/kapture_agent_summarize_ticket_details', methods=['POST']) def summarize_ticket_details(): try: data = request.get_json() if not data: return jsonify({"error": "Invalid or missing JSON payload", 'status': 400}) ticket_details = data.get("ticket_data", {}) summary = ticket_summ_obj.summarize_ticket_details_with_llm(ticket_details) return jsonify({"ticket_summary_details": summary, "status": 200}) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: return jsonify({"error": unexpected_error_message, "details": str(e), 'status': 500}) @application.route('/get_date_time_details', methods=["GET"]) def get_current_date_time(): try: current_utc_time = datetime.now(timezone.utc) print("Current UTC DateTime :", current_utc_time) except Exception as e: return {"status": 400, "error": str(e)} return {"Current Date and time": current_utc_time, "status": 200} @application.route('/get_ticket_details', methods=["GET", "POST"]) def get_ticket_details(): try: data = request.get_json() ticket_id = data.get("ticket_id", "").strip() load_dotenv() index_name = os.getenv("USE_CASE_2_TICKET_DETAILS") host_name = os.getenv("OPENSEARCH_ENDPOINT") # print("index name is",index_name) result = {} client = OpenSearch( hosts=[{"host": host_name, "port": os.getenv("OPENSEARCH_PORT", 9200)}], http_auth=(os.getenv("OS_MASTER_USERNAME", ""), os.getenv("OS_MASTER_PASSWORD", ""))) query = { "_source": ["date_time", "subject", "description", "ticket_id", "department"], "size": 1000, "query": { "term": { "ticket_id": ticket_id } } } response = client.search( body=query, index=index_name ) print("response is", response) ticket_details = [] for hit in response['hits']['hits']: ticket_details.append(hit['_source']) return jsonify({ "status": 200, "ticket_details": ticket_details }) except Exception as e: print("welcome message not displayed") return jsonify({ "status": 500, "message":str(e), "ticket_details": [] }) @application.route('/kapture_agent_validate_ticket_time', methods=["POST"]) def validate_ticket_time(): try: # Get time from query param, form or json json_data = request.get_json(silent=True) created_time_str = ( request.args.get("date_time") or request.form.get("date_time") or (json_data.get("date_time") if json_data else None) ) if not created_time_str: return jsonify({"status": 400, "error": "date_time is required"}) ts = created_time_str.strip().replace("Z", "+00:00") if "." in ts: base, _, tz = ts.partition("+") if "." in base: dt_part, fraction = base.split(".") fraction = fraction[:6] base = f"{dt_part}.{fraction}" ts = base + "+" + tz created_time = datetime.fromisoformat(ts) current_time = datetime.now(timezone.utc) hours = (current_time - created_time).total_seconds() / 3600 if 0 <= hours < 8: condition = "within 0 to 8 hours" elif 8 <= hours <= 16: condition = "within 8 to 16 hours" else: condition = "greater than 16 hours" return jsonify({ "status": 200, "time_difference_hours": round(hours, 2), "condition": condition }) except Exception as e: return jsonify({"status": 400, "error": str(e)}) @application.route("/kapture_agent_filter_agents", methods=["POST"]) def filter_agents(): try: data = request.json supervisor_prompt = data.get("prompt", "") ui_agent_name = data.get("supervisor_agent_name", "") top_n = data.get("top_n", 5) hybrid_search = data.get("hybrid_search", False) prompt_keywords = extract_keywords(supervisor_prompt) ui_keywords = extract_keywords(ui_agent_name) agents = get_all_agent_details() agent_names = [a["agentName"] for a in agents] corpus = [extract_keywords(agent) for agent in agent_names] if not hybrid_search: if not supervisor_prompt and not ui_agent_name: return jsonify({"error": "Provide supervisor_prompt or agent_name"}), 400 # Final keyword bag for matching final_keywords = list(set(prompt_keywords + ui_keywords)) # Prepare corpus for BM25 (agent names only) bm25 = BM25Okapi(corpus) agent_scores = [] for idx, agent in enumerate(agents): agent_keywords = extract_keywords(agent["agentName"]) kw_score = keyword_overlap(final_keywords, agent_keywords) bm25_score = bm25.get_scores(final_keywords)[idx] fuzzy = fuzzy_score(ui_agent_name, agent["agentName"]) final_score = (2.0 * kw_score) + bm25_score + (0.8 * fuzzy) agent_scores.append((final_score, agent)) THRESHOLD = 0.7 filtered = [agent for score, agent in agent_scores if score >= THRESHOLD] # Sort by score high → low filtered_sorted = sorted(filtered, key=lambda a: -[ s for s, ag in agent_scores if ag["agent_id"] == a["agent_id"] ][0]) else: combined_keywords = list(set(prompt_keywords + ui_keywords)) bm25 = BM25Okapi(corpus) bm25_scores = bm25.get_scores(combined_keywords) query_text = supervisor_prompt + " " + ui_agent_name query_emb = model.encode(query_text, convert_to_tensor=True) agent_embeddings = model.encode(agent_names, convert_to_tensor=True) semantic_scores = util.cos_sim(query_emb, agent_embeddings)[0] kw_scores = [] for name in agent_names: kw_scores.append(keyword_overlap(combined_keywords, extract_keywords(name))) final_scores = [] for i in range(len(agents)): final_score = ( 0.35 * float(semantic_scores[i]) + 0.55 * bm25_scores[i] + 0.10 * kw_scores[i] ) final_scores.append((final_score, agents[i])) # Filtering threshold for accuracy THRESHOLD = 0.25 filtered = [a for score, a in final_scores if score >= THRESHOLD] # Sort descending by score filtered_sorted = sorted(filtered, key=lambda x: -[ s for s, ag in final_scores if ag["agent_id"] == x["agent_id"] ][0]) if filtered_sorted: filtered_sorted = filtered_sorted[:top_n] return jsonify({ "status":200, "matched_agents": [ {"agent_id": a["agent_id"], "agent_name": a["agentName"]} for a in filtered_sorted ] }) else: return jsonify({ "status":200, "matched_agents": [ ] }) except Exception as e: return jsonify({ "status": 500, "message":f"Error occured with {e}", "matched_agents": [ ] }) @application.route("/kapture_agent_mcp_client_node_tools", methods=["POST"]) async def list_mcp_tools(): data = request.get_json() endpoint = data.get("endpoint") auth = data.get("authentication") llama_tools = [] if auth: if str(auth.get("authType")).lower() == "none": mcp_client = BasicMCPClient(endpoint) mcp_tool_spec = McpToolSpec(client=mcp_client) llama_tools = await mcp_tool_spec.to_tool_list_async() elif str(auth.get("authType")).lower() == "bearer": token = auth.get("token") async with Client(endpoint, auth=token) as client: mcp_tool_spec = McpToolSpec(client=client.session) llama_tools = await mcp_tool_spec.to_tool_list_async() elif str(auth.get("authType")).lower() == "multiple_header": headers = {} for header in auth.get("headers"): value = header.get("value") name = header.get("name") headers.update({name: value}) async with Client(transport=StreamableHttpTransport(endpoint, headers=headers)) as client: mcp_tool_spec = McpToolSpec(client=client.session) llama_tools = await mcp_tool_spec.to_tool_list_async() elif str(auth.get("authType")).lower() == "oauth": oauth = OAuth(mcp_url=endpoint) async with Client(endpoint, auth=oauth) as client: mcp_tool_spec = McpToolSpec(client=client.session) llama_tools = await mcp_tool_spec.to_tool_list_async() all_tools = [ tool.metadata.name for tool in llama_tools ] return jsonify({"tools": all_tools, "Length of tools":len(all_tools), "Endpoint":endpoint}) @application.route('/kapture_agent_creation_by_prompt_automation', methods=['POST']) def create_agent_by_prompt_automation(): final_response = {} try: data = request.get_json() if not data: return jsonify({"error": invalid_json_payload_message, 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": session_id_missing_message, 'status': 400}) user_email = data.get("user_id", "") if not user_email: return jsonify({"error": user_id_missing_message, "details": user_id_not_provided_message, 'status': 400}) state = state_store.get(session_id) if state is None: state = { "chat_history": [], "user_input": "", } state_store[session_id] = state user_query = data.get("user_input", "") if not user_query: return jsonify({"error": user_input_required_message, 'status': 400}) answer, response_info = user_input_validation(user_input=user_query) if answer == "ALLOWED": agent_creation_helper_obj = AgentCreationHelper(user_query=user_query, user_email=user_email) agent_creation_response = agent_creation_helper_obj.agent_automation() final_response.update(agent_creation_response) created_agent_id = agent_creation_response.get("agent_creation_response", {}).get("response", {}).get("data", "") topic_creation_helper_obj = TopicCreationHelper(user_query=user_query, user_email=user_email, agent_id=created_agent_id) local_topic_creation_with_action_assign_response = topic_creation_helper_obj.topic_automation() final_response.update(local_topic_creation_with_action_assign_response) return jsonify(final_response) else: return jsonify({"error": response_info, "details": response_info, 'status': 400}) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: return jsonify({"error": unexpected_error_occurred_message, "details": str(e), 'status': 500}) @application.route("/kapture_agent_extract_sensitive_data", methods=["POST"]) def extract_sensitive_data(): data = request.get_json(silent=True) if not data or "text" not in data: return jsonify({"error": "Missing 'text' field"}), 400 text = data["text"] # spans = pii_engine.detect(text) residual, spans, covered = identify_data_engine.process_document(text) entities = [ { "text": span.text, "label": span.label, "start": span.start, "end": span.end, "confidence": round(span.score, 4), "source": span.source } for span in spans ] # entities = [ # { # "text": span.text, # "label": span.label.value, # "start": span.start, # "end": span.end, # "confidence": round(span.score, 4), # "tier": span.tier, # } # for span in spans # ] # converted_entities = encryption_engine.entities_conversion(entities) # encrypted_text,anonymized_entities,mapping = encryption_engine.anonymize(text, converted_entities) # decrypted_text = encryption_engine.deanonymize(encrypted_text,anonymized_entities) encrypted_text,mapping = identify_data_engine.anonymize_original(text, spans) decrypted_text = identify_data_engine.deanonymize(encrypted_text,mapping) return jsonify({ "count": len(entities), "sensitive_data": entities, "encrypted_text":encrypted_text, "mapping":mapping, "decrypted_text":decrypted_text }) @application.route('/kapture_agent_flow_creation_by_prompt_automation', methods=['POST']) def create_flow_by_prompt_automation(): try: data = request.get_json() if not data: return jsonify({"error": invalid_json_payload_message, 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": session_id_missing_message, 'status': 400}) user_email = data.get("user_id", "") if not user_email: return jsonify({"error": user_id_missing_message, "details": user_id_not_provided_message, 'status': 400}) state = state_store.get(session_id) if state is None: state = { "chat_history": [], "user_input": "", } state_store[session_id] = state user_query = data.get("user_input", "") if not user_query: return jsonify({"error": user_input_required_message, 'status': 400}) answer, response_info = user_input_validation(user_input=user_query) if answer == "ALLOWED": flow_creation_helper_obj = FlowCreationHelper(user_query=user_query, user_email=user_email) flow_creation_response = flow_creation_helper_obj.flow_automation() node_selector_response = flow_creation_helper_obj.node_selector() flow_creation_response["steps"] = node_selector_response["steps"] return jsonify( {"data":flow_creation_response ,"success": True }) else: return jsonify({"error": response_info, "details": response_info, 'status': 400}) except ValueError as e: return jsonify({"error": invalid_value_message, "details": str(e), 'status': 400}) except KeyError as e: return jsonify({"error": missing_key_message, "details": str(e), 'status': 400}) except Exception as e: return jsonify({"error": unexpected_error_occurred_message, "details": str(e), 'status': 500}) @application.route("/kapture_agent_filter_actions", methods=["POST"]) def filter_actions(): try: data = request.json if not data: return jsonify({"error": invalid_json_payload_message, 'status': 400}) session_id = data.get("session_id", "") if not session_id: return jsonify({"error": session_id_missing_message, 'status': 400}) user_prompt = data.get("user_input", "") if not user_prompt: return jsonify({"error": user_input_required_message, "details": user_input_not_provided_message, 'status': 400}) hybrid_search = data.get("hybrid_search", False) actions = get_all_actions() top_actions = get_top_relevant_actions( user_prompt=user_prompt, actions=actions, top_n=5, hybrid_search=hybrid_search ) return jsonify({ "status": 200, "matched_actions": [ { "action_id": a.get("id"), "action_name": a.get("actionLabel"), "actionType": a.get("actionType") } for a in top_actions ] }) except Exception as e: return jsonify({ "status": 500, "message": str(e), "matched_actions": [] }) @application.post("/kapture_agent_anomalies/detect") def run_anomaly_detection(): try: payload = request.get_json(silent=True) or {} n_estimators = int(payload.get("n_estimators", 300)) contamination = float(payload.get("contamination", 0.03)) random_state = int(payload.get("random_state", 42)) top_n = int(payload.get("top_n", 20)) top_n_features = int(payload.get("top_n_features", 2)) data_features = payload.get("anomaly_data_features",[ "amount_usd", "miles_earned", "miles_redeemed", "avg_miles_earned_30d", "tx_count_24h", "minutes_since_last_tx", "distance_from_zip_location", "hour_of_day", ]) # results = anamoly_detection_engine.run( # n_estimators=n_estimators, # contamination=contamination, # random_state=random_state, # top_n=top_n, # top_n_features = top_n_features, # data_features = data_features # ) results = execute_anomaly_detection( numeric_features=data_features, contamination=contamination, n_estimators=n_estimators, random_state=random_state, top_n=top_n, top_n_features=top_n_features ) return jsonify( { "results": results.get("results",[]), "results_html":results.get("results_html",'') } ) except Exception as e: return jsonify({"error": str(e)}), 500 @application.route("/kapture_agent/ingest/transaction", methods=["POST"]) def ingest_transaction(): try: data = request.get_json(force=True, silent=False) payload = data.get("payload",{}) recipient_email = data.get("user_id") if not recipient_email: return jsonify({"error":user_id_missing_message , 'status': 400}) if not payload: return jsonify( {"error": payload_missing_message, 'status': 400}) action_details = get_action_detail("6a690e5b-ce90-4173-a5b4-af32d4671e8c") email_html = action_details.get("emailBody", "") smtp_server = action_details.get("smtpServer", "") sender_email = action_details.get("senderEmail", "") subject = action_details.get("subject", "") cc = action_details.get("cc", "") bcc = action_details.get("bcc", "") updated_email = update_email_body(payload, email_html) available_miles = payload.get("available_miles", 0) miles_redeemed = payload.get("miles_redeemed", 0) balance_miles = available_miles - miles_redeemed member_id = payload.get("member_id", "") loyalty_member_id = payload.get("loyalty_member_id", "") if miles_redeemed > 300000: resp = anamoly_detection_engine.insert_transaction_only(payload) result = redeem_points_email_handler(updated_email, smtp_server, smtp_port, sender_email, recipient_email, cc, bcc, subject) return jsonify({ "message": "Upsert successful", "opensearch_result": resp.get("result"), "_id": resp.get("_id"), "_index": resp.get("_index"), "_version": resp.get("_version"), "result": "Email sent successfully" if result else None }), 200 else: update_member = update_member_miles(loyalty_member_id, balance_miles) resp = anamoly_detection_engine.insert_transaction_only(payload) return jsonify({ "message": "Miles redeemed is below the threshold for anomaly detection. No email sent", "details":"Your transaction has been processed and member miles have been updated.", "result": "No email sent" }), 200 except Exception as e: return jsonify({"error": str(e)}), 400 atexit.register(lambda: scheduler.shutdown(wait=False)) if __name__ == "__main__": application.run(host="0.0.0.0", port=5656)