When building conversational AI agents, one of the key requirements is maintaining context across multiple interactions. While agent functions are designed to be stateless, Agent Stack provides mechanisms to access and manage conversation history and interactions with agents. The context memory system provided by Agent Stack enables the conversation continuity that users expect from AI assistants.
History Management Control
| Operation | Purpose |
|---|
| await context.store(input) | Stores current user message in conversation history. Storage of messages must be explicitly requested |
| await context.store(response) | Stores agent’s responses in conversation history, and must be explicitly requested |
| context: RunContext) | Sets up a RunContext instance for storing and accessing the conversation history |
| context_store=PlatformContextStore() | Configures server to use the platform’s persistent context store to maintain conversation history across agent restarts |
Simple History Access Example
Here’s an example agent that maintains conversation history and counts the number of interactions:
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0
import os
from a2a.types import Message
from a2a.utils.message import get_message_text
from agentstack_sdk.a2a.types import AgentMessage
from agentstack_sdk.server import Server
from agentstack_sdk.server.context import RunContext
server = Server()
@server.agent()
# Set the context as an instance of the conversation history to store and load previous messages
async def basic_history_example(input: Message, context: RunContext):
"""Agent that demonstrates conversation history access"""
# Store the current message in the context store
await context.store(input)
# Get the current user message
current_message = get_message_text(input)
print(f"Current message: {current_message}")
# Load all messages from conversation history (including current message)
history = [message async for message in context.load_history() if isinstance(message, Message) and message.parts]
# Filter and process the conversation history
print(f"Found {len(history)} messages in conversation (including current)")
# Your agent logic here - you can now reference all messages in the conversation
message = AgentMessage(text=f"Hello! I can see we have {len(history)} messages in our conversation.")
yield message
# Store the message in the context store
await context.store(message)
def run():
server.run(host=os.getenv("HOST", "127.0.0.1"), port=int(os.getenv("PORT", 8000)))
if __name__ == "__main__":
run()
Steps
- Access conversation history: Use
RunContext to set up an instance of the conversation history to store and load previous messages.
- Store incoming messages: Use
await context.store(input) to store the current user message in the conversation history.
- Filter and process history: Retrieve the conversation history with
load_history() and filter to get the messages relevant to your agent’s logic.
- Store agent responses: Use
await context.store(response) to store your agent’s responses for future conversation context.
Streaming with Buffered History Example
Use this pattern when you want to stream partial outputs to users while keeping one clean assistant message in conversation history.
This is usecase-specific and one my opt for a combination of this and previous approach.
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0
import asyncio
import os
from a2a.types import Message
from a2a.utils.message import get_message_text
from agentstack_sdk.a2a.types import AgentMessage
from agentstack_sdk.server import Server
from agentstack_sdk.server.context import RunContext
from agentstack_sdk.server.store.platform_context_store import PlatformContextStore
server = Server()
async def example_tool() -> str:
await asyncio.sleep(.1) # doing some agent work
return "tool result"
async def history_counter(history: list[Message]) -> str:
"""Create a concise conversation-state summary."""
await asyncio.sleep(.1) # doing some agent work
user_count = sum(1 for item in history if item.role.value == "user")
agent_count = sum(1 for item in history if item.role.value == "agent")
history_count = len(history)
return f"total={history_count}, user={user_count}, agent={agent_count}"
@server.agent()
async def streaming_agent_w_single_history_write_example(input: Message, context: RunContext):
"""
Stream partial answers, execute tools, and persist one finalized assistant message.
See other examples for actual implementation of multi-turn conversation agent with tool use.
"""
# Store the user input as the first persisted item for this turn.
await context.store(data=input)
history = [message async for message in context.load_history() if isinstance(message, Message) and message.parts]
current_message = get_message_text(input)
# Stream user-facing partial output as each step completes.
# This simulates an agent that produces intermediate outputs throughout its turn which are immediately useful to the user and so sent to them
buffered_parts: list[str] = []
try:
part_1 = f"Received input: '{current_message}'"
buffered_parts.append(part_1)
yield AgentMessage(text=part_1)
tool_result = await example_tool()
part_2 = f"Tool call completed with result: '{tool_result}'"
buffered_parts.append(part_2)
yield AgentMessage(text=part_2)
if len(history) > 3:
raise ValueError("History is too long!")
history_summary = await history_counter(history)
history_part = f"History message counts including last user message, not including any of the current agent output: {history_summary}"
buffered_parts.append(history_part)
yield AgentMessage(text=history_part)
except Exception as e:
error_part = f"Error during execution: {e!s}"
buffered_parts.append(error_part)
yield AgentMessage(text=error_part)
finally:
# IMPORTANT: Persisting only once after streaming finishes.
#
# The finally block ensures the aggregated response is always at least partially persisted up until the point of failure.
# This does not need to be the go-to approach in all cases, sometimes the partial outputs are of no value and one does not want them to be properly stored.
#
# Why not store each chunk?
# - Calling `context.store()`, PlatformContextStore saves every message as a distinct history item.
# - Storing per chunk would fragment one assistant turn into many partial messages.
# - A single aggregated write keeps replay, memory, and history semantics clean.
#
aggregated_response = AgentMessage(text="\n".join(buffered_parts))
yield "Final result check:\n" + str(aggregated_response.text)
await context.store(data=aggregated_response)
def run():
server.run(
host=os.getenv("HOST", "127.0.0.1"),
port=int(os.getenv("PORT", "8000")),
context_store=PlatformContextStore(),
)
if __name__ == "__main__":
run()
When to use buffering
- Use simple yield + store when your agent emits a single final response.
- Use stream + buffer + single store when your agent emits multiple partial chunks which are streamed to the user.
- With
PlatformContextStore, each context.store() call creates a persisted history item, so buffering prevents chunk-level history fragmentation.
Advanced BeeAI Framework Example
Here’s a sophisticated example using the BeeAI Framework to build a multi-turn chat agent that leverages conversation history and LLM capabilities:
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0
import os
from typing import Annotated
from a2a.types import Message, Role
from a2a.utils.message import get_message_text
from agentstack_sdk.a2a.extensions import (
LLMServiceExtensionServer,
LLMServiceExtensionSpec,
)
from agentstack_sdk.a2a.types import AgentMessage
from agentstack_sdk.server import Server
from agentstack_sdk.server.context import RunContext
from agentstack_sdk.server.store.platform_context_store import PlatformContextStore
from beeai_framework.adapters.agentstack.backend.chat import AgentStackChatModel
from beeai_framework.agents.requirement import RequirementAgent
from beeai_framework.agents.requirement.requirements.conditional import (
ConditionalRequirement,
)
from beeai_framework.backend import AssistantMessage, UserMessage
from beeai_framework.tools.think import ThinkTool
server = Server()
FrameworkMessage = UserMessage | AssistantMessage
def to_framework_message(message: Message) -> FrameworkMessage:
"""Convert A2A Message to BeeAI Framework Message format"""
message_text = "".join(part.root.text for part in message.parts if part.root.kind == "text")
if message.role == Role.agent:
return AssistantMessage(message_text)
elif message.role == Role.user:
return UserMessage(message_text)
else:
raise ValueError(f"Invalid message role: {message.role}")
@server.agent()
async def advanced_history_example(
input: Message,
context: RunContext,
llm: Annotated[LLMServiceExtensionServer, LLMServiceExtensionSpec.single_demand()],
):
"""Multi-turn chat agent with conversation memory and LLM integration"""
await context.store(input)
# Load conversation history
history = [message async for message in context.load_history() if isinstance(message, Message) and message.parts]
# Initialize BeeAI Framework LLM client
llm_client = AgentStackChatModel(tool_choice_support={"none", "auto"})
llm_client.set_context(llm)
# Create a RequirementAgent with conversation memory
agent = RequirementAgent(
name="Agent",
llm=llm_client,
role="helpful assistant",
instructions="You are a helpful assistant that is supposed to remember users name. Ask them for their name and remember it.",
tools=[ThinkTool()],
requirements=[ConditionalRequirement(ThinkTool, force_at_step=1)],
save_intermediate_steps=False,
middlewares=[],
)
# Load conversation history into agent memory
await agent.memory.add_many(to_framework_message(item) for item in history)
# Process the current message and generate response
async for event, meta in agent.run(get_message_text(input)):
if meta.name == "success" and event.state.steps:
step = event.state.steps[-1]
if not step.tool:
continue
tool_name = step.tool.name
if tool_name == "final_answer":
response = AgentMessage(text=step.input["response"])
yield response
await context.store(response)
def run():
server.run(
host=os.getenv("HOST", "127.0.0.1"),
port=int(os.getenv("PORT", "8000")),
context_store=PlatformContextStore(), # Enable persistent storage
)
if __name__ == "__main__":
run()
This advanced example demonstrates several key concepts:
- LLM Integration: Uses the platform’s LLM service extension to get model access
- Framework Integration: Leverages the BeeAI Framework for sophisticated agent capabilities
- Memory Management: Converts conversation history to framework format and loads it into agent memory
- Tool Usage: Includes thinking tools and conditional requirements for better reasoning
- Persistent Storage: Uses
PlatformContextStore for conversation persistence
Using Content History
Persistent Storage Example
By default, conversation history is stored in memory and is lost when the agent process restarts. For production applications, you’ll want to use persistent context storage to maintain conversation history across agent restarts. The PlatformContextStore automatically handles conversation persistence, ensuring that users can continue their conversations even after agent restarts or deployments.
import os
from agentstack_sdk.server import Server
from agentstack_sdk.server.store.platform_context_store import PlatformContextStore
server = Server()
def run():
server.run(
host=os.getenv("HOST", "127.0.0.1"),
port=int(os.getenv("PORT", 8000)),
context_store=PlatformContextStore()
)
History Contents
The context.load_history() method returns an async iterator containing all items in the conversation, including the current message. This can include:
- A2A Messages: Both user and assistant messages from the conversation, including the current A2A message
- Artifacts: Any files, documents, or other artifacts shared during the conversation
For multi-turn conversations, you’ll primarily work with A2A messages, which include:
- User messages: Messages sent by the user
- Assistant messages: Previous responses from your agent
The history includes the current message, so if you want only previous messages, you may need to filter out the last message or use the current message separately.
The history iterator returns all message types. Always filter messages using isinstance(message, Message) to ensure you’re working with the correct message format.
Editing and Removing Messages from History
Sometimes you may need to edit a previous message in a conversation or remove messages that are no longer relevant.
The Agent Stack provides a mechanism to delete history items from a specific point onward, allowing you to effectively “rewind” the conversation and replace a message with an edited version.
Possible use cases include editing a previous message, clearing irrelevant exchanges, or removing messages that resulted from processing errors.
Here’s an example of a function for editing a user message in a conversation using the context API. This assumes you know the context message id, which can be obtained as an id field of an object returned by RunContext.load_history(load_history_items=True), Context.list_history or Context.list_all_history.
import uuid
from uuid import UUID
from typing import Any
from a2a.types import Message, Part, Role, TextPart
from agentstack_sdk.platform.context import Context
from agentstack_sdk.server.context import RunContext
async def edit_message_in_context(run_context: RunContext, id: UUID, new_text: str, metadata: dict[str, Any] | None = None):
# Step 1: Delete from this message onwards
await run_context.delete_history_from_id(from_id=id)
# Step 2: Create the corrected message
corrected_message = Message(
message_id=str(uuid.uuid4()),
parts=[Part(TextPart(text=new_text))],
role=Role.user,
kind="message",
metadata=metadata,
)
# Step 3: Store the corrected message
await run_context.store(data=corrected_message)
When you delete history from a specific message onwards, all messages created after that point (including the message itself) are removed. This effectively creates a new conversation branch starting from the message before the deleted one.
This operation is permanent. Once messages are deleted, they cannot be recovered. Consider informing users about this operation or implementing a confirmation step for important conversations.
Message Storage Guidelines
Since messages are not automatically stored, you need to explicitly call context.store() for any message you want to be available in future interactions. Here are the key guidelines:
Store Request Example
@server.agent()
async def my_agent(input: Message, context: RunContext):
# Store the incoming user message immediately
await context.store(input)
# Process the message and generate response
response = AgentMessage(text="Your response here")
yield response
# Store the agent's response after yielding
await context.store(response)
What to Store
Store all of the messages you may want to use later. If you don’t store messages, they won’t be available in context.load_history() for future interactions, causing your agent to lose conversation context. Stored material can include:
- User messages: Always store incoming user messages to maintain conversation context
- Agent responses: Store your agent’s responses so they’re available for future reference
- Important artifacts: Store any files, documents, or other artifacts that should persist
Storage Best Practices
- Store early: Store user messages at the beginning of your agent function
- Store after yielding: Store agent responses after yielding them to the user
- Be selective: Only store messages that are relevant for future conversation context
- Handle errors: Consider what happens if storage fails - your agent should still function