Files
alfred/tests/test_memory_edge_cases.py

543 lines
17 KiB
Python

"""Edge case tests for the Memory system."""
import json
import os
import pytest
from alfred.infrastructure.persistence import (
EpisodicMemory,
LongTermMemory,
Memory,
ShortTermMemory,
get_memory,
init_memory,
set_memory,
)
from alfred.infrastructure.persistence.context import _memory_ctx
class TestLongTermMemoryEdgeCases:
"""Edge case tests for LongTermMemory."""
def test_config_with_none_value(self):
"""Should handle None values in config."""
ltm = LongTermMemory()
ltm.set_config("key", None)
assert ltm.get_config("key") is None
assert not ltm.has_config("key")
def test_config_with_empty_string(self):
"""Should handle empty string values."""
ltm = LongTermMemory()
ltm.set_config("key", "")
assert ltm.get_config("key") == ""
assert ltm.has_config("key") # Empty string is still a value
def test_config_with_complex_types(self):
"""Should handle complex types in config."""
ltm = LongTermMemory()
ltm.set_config("list", [1, 2, 3])
ltm.set_config("dict", {"nested": {"deep": "value"}})
ltm.set_config("bool", False)
ltm.set_config("int", 0)
assert ltm.get_config("list") == [1, 2, 3]
assert ltm.get_config("dict")["nested"]["deep"] == "value"
assert ltm.get_config("bool") is False
assert ltm.get_config("int") == 0
def test_library_with_missing_imdb_id(self):
"""Should handle media without imdb_id."""
ltm = LongTermMemory()
media = {"title": "No ID Movie"}
ltm.add_to_library("movies", media)
# Should still add (imdb_id will be None)
assert len(ltm.library["movies"]) == 1
def test_library_duplicate_check_with_none_id(self):
"""Should handle duplicate check when imdb_id is None."""
ltm = LongTermMemory()
media1 = {"title": "Movie 1"}
media2 = {"title": "Movie 2"}
ltm.add_to_library("movies", media1)
ltm.add_to_library("movies", media2)
# May dedupe or not depending on implementation
assert len(ltm.library["movies"]) >= 1
def test_from_dict_with_extra_keys(self):
"""Should ignore extra keys in dict."""
data = {
"config": {},
"preferences": {},
"library": {"movies": []},
"following": [],
"extra_key": "should be ignored",
"another_extra": [1, 2, 3],
}
ltm = LongTermMemory.from_dict(data)
assert not hasattr(ltm, "extra_key")
def test_from_dict_with_wrong_types(self):
"""Should handle wrong types gracefully."""
data = {
"config": "not a dict", # Should be dict
"preferences": [], # Should be dict
"library": "wrong", # Should be dict
"following": {}, # Should be list
}
# Should not crash, but behavior may vary
try:
ltm = LongTermMemory.from_dict(data)
# If it doesn't crash, check it has some defaults
assert ltm is not None
except (TypeError, AttributeError):
# This is also acceptable behavior
pass
def test_to_dict_preserves_unicode(self):
"""Should preserve unicode in serialization."""
ltm = LongTermMemory()
ltm.set_config("japanese", "日本語")
ltm.set_config("emoji", "🎬🎥")
ltm.add_to_library("movies", {"title": "Amélie", "imdb_id": "tt1"})
data = ltm.to_dict()
assert data["config"]["japanese"] == "日本語"
assert data["config"]["emoji"] == "🎬🎥"
assert data["library"]["movies"][0]["title"] == "Amélie"
class TestShortTermMemoryEdgeCases:
"""Edge case tests for ShortTermMemory."""
def test_add_message_with_empty_content(self):
"""Should handle empty message content."""
stm = ShortTermMemory()
stm.add_message("user", "")
assert len(stm.conversation_history) == 1
assert stm.conversation_history[0]["content"] == ""
def test_add_message_with_very_long_content(self):
"""Should handle very long messages."""
stm = ShortTermMemory()
long_content = "x" * 100000
stm.add_message("user", long_content)
assert len(stm.conversation_history[0]["content"]) == 100000
def test_add_message_with_special_characters(self):
"""Should handle special characters."""
stm = ShortTermMemory()
special = "Line1\nLine2\tTab\r\nWindows\x00Null"
stm.add_message("user", special)
assert stm.conversation_history[0]["content"] == special
def test_max_history_zero(self):
"""Should handle max_history of 0."""
stm = ShortTermMemory()
stm.max_history = 0
stm.add_message("user", "Hello")
# Behavior: either empty or keeps last message
assert len(stm.conversation_history) <= 1
def test_max_history_one(self):
"""Should handle max_history of 1."""
stm = ShortTermMemory()
stm.max_history = 1
stm.add_message("user", "First")
stm.add_message("user", "Second")
assert len(stm.conversation_history) == 1
assert stm.conversation_history[0]["content"] == "Second"
def test_get_recent_history_zero(self):
"""Should handle n=0."""
stm = ShortTermMemory()
stm.add_message("user", "Hello")
recent = stm.get_recent_history(0)
# May return empty or all messages depending on implementation
assert isinstance(recent, list)
def test_get_recent_history_negative(self):
"""Should handle negative n."""
stm = ShortTermMemory()
stm.add_message("user", "Hello")
recent = stm.get_recent_history(-1)
# Python slicing with negative returns empty or last element
assert isinstance(recent, list)
def test_workflow_with_empty_target(self):
"""Should handle empty workflow target."""
stm = ShortTermMemory()
stm.start_workflow("download", {})
assert stm.current_workflow["target"] == {}
def test_workflow_with_none_target(self):
"""Should handle None workflow target."""
stm = ShortTermMemory()
stm.start_workflow("download", None)
assert stm.current_workflow["target"] is None
def test_entity_with_none_value(self):
"""Should store None as entity value."""
stm = ShortTermMemory()
stm.set_entity("key", None)
assert stm.get_entity("key") is None
assert "key" in stm.extracted_entities
def test_entity_overwrite(self):
"""Should overwrite existing entity."""
stm = ShortTermMemory()
stm.set_entity("key", "value1")
stm.set_entity("key", "value2")
assert stm.get_entity("key") == "value2"
def test_topic_with_empty_string(self):
"""Should handle empty topic."""
stm = ShortTermMemory()
stm.set_topic("")
assert stm.current_topic == ""
class TestEpisodicMemoryEdgeCases:
"""Edge case tests for EpisodicMemory."""
def test_store_empty_results(self):
"""Should handle empty results list."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [])
assert episodic.last_search_results is not None
assert episodic.last_search_results["results"] == []
def test_store_results_with_none_values(self):
"""Should handle results with None values."""
episodic = EpisodicMemory()
results = [
{"name": None, "seeders": None},
{"name": "Valid", "seeders": 100},
]
episodic.store_search_results("query", results)
assert len(episodic.last_search_results["results"]) == 2
def test_get_result_by_index_after_clear(self):
"""Should return None after clearing results."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [{"name": "Test"}])
episodic.clear_search_results()
result = episodic.get_result_by_index(1)
assert result is None
def test_get_result_by_very_large_index(self):
"""Should handle very large index."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [{"name": "Test"}])
result = episodic.get_result_by_index(999999999)
assert result is None
def test_download_with_missing_fields(self):
"""Should handle download with missing fields."""
episodic = EpisodicMemory()
episodic.add_active_download({}) # Empty dict
assert len(episodic.active_downloads) == 1
assert "started_at" in episodic.active_downloads[0]
def test_update_nonexistent_download(self):
"""Should not crash when updating nonexistent download."""
episodic = EpisodicMemory()
# Should not raise
episodic.update_download_progress("nonexistent", 50)
assert episodic.active_downloads == []
def test_complete_nonexistent_download(self):
"""Should return None for nonexistent download."""
episodic = EpisodicMemory()
result = episodic.complete_download("nonexistent", "/path")
assert result is None
def test_error_with_empty_context(self):
"""Should handle error with None context."""
episodic = EpisodicMemory()
episodic.add_error("action", "error", None)
assert episodic.recent_errors[0]["context"] == {}
def test_error_with_very_long_message(self):
"""Should handle very long error messages."""
episodic = EpisodicMemory()
long_error = "x" * 10000
episodic.add_error("action", long_error)
assert len(episodic.recent_errors[0]["error"]) == 10000
def test_pending_question_with_empty_options(self):
"""Should handle question with no options."""
episodic = EpisodicMemory()
episodic.set_pending_question("Question?", [], {})
assert episodic.pending_question["options"] == []
def test_resolve_question_invalid_index(self):
"""Should return None for invalid answer index."""
episodic = EpisodicMemory()
episodic.set_pending_question(
"Question?",
[{"index": 1, "label": "Option"}],
{},
)
result = episodic.resolve_pending_question(999)
assert result is None
assert episodic.pending_question is None # Still cleared
def test_resolve_question_when_none(self):
"""Should handle resolving when no question pending."""
episodic = EpisodicMemory()
result = episodic.resolve_pending_question(1)
assert result is None
def test_background_event_with_empty_data(self):
"""Should handle event with empty data."""
episodic = EpisodicMemory()
episodic.add_background_event("event", {})
assert episodic.background_events[0]["data"] == {}
def test_get_unread_events_multiple_calls(self):
"""Should return empty on second call."""
episodic = EpisodicMemory()
episodic.add_background_event("event", {})
first = episodic.get_unread_events()
second = episodic.get_unread_events()
assert len(first) == 1
assert len(second) == 0
def test_max_errors_boundary(self):
"""Should keep exactly max_errors."""
episodic = EpisodicMemory()
episodic.max_errors = 3
for i in range(3):
episodic.add_error("action", f"Error {i}")
assert len(episodic.recent_errors) == 3
episodic.add_error("action", "Error 3")
assert len(episodic.recent_errors) == 3
assert episodic.recent_errors[0]["error"] == "Error 1"
def test_max_events_boundary(self):
"""Should keep exactly max_events."""
episodic = EpisodicMemory()
episodic.max_events = 3
for i in range(5):
episodic.add_background_event("event", {"i": i})
assert len(episodic.background_events) == 3
assert episodic.background_events[0]["data"]["i"] == 2
class TestMemoryEdgeCases:
"""Edge case tests for Memory manager."""
def test_init_with_nonexistent_directory(self, temp_dir):
"""Should create directory if not exists."""
new_dir = temp_dir / "new" / "nested" / "dir"
# Don't create the directory - let Memory do it
Memory(storage_dir=str(new_dir))
assert new_dir.exists()
def test_init_with_readonly_directory(self, temp_dir):
"""Should handle readonly directory gracefully."""
readonly_dir = temp_dir / "readonly"
readonly_dir.mkdir()
# Make readonly (may not work on all systems)
try:
os.chmod(readonly_dir, 0o444)
# This might raise or might work depending on OS
Memory(storage_dir=str(readonly_dir))
except (PermissionError, OSError):
pass # Expected on some systems
finally:
os.chmod(readonly_dir, 0o755)
def test_load_ltm_with_empty_file(self, temp_dir):
"""Should handle empty LTM file."""
ltm_file = temp_dir / "ltm.json"
ltm_file.write_text("")
memory = Memory(storage_dir=str(temp_dir))
# Should use defaults
assert memory.ltm.config == {}
def test_load_ltm_with_partial_data(self, temp_dir):
"""Should handle partial LTM data."""
ltm_file = temp_dir / "ltm.json"
ltm_file.write_text('{"config": {"key": "value"}}')
memory = Memory(storage_dir=str(temp_dir))
assert memory.ltm.get_config("key") == "value"
# Other fields should have defaults
assert memory.ltm.library == {"movies": [], "tv_shows": []}
def test_save_with_unicode(self, temp_dir):
"""Should save unicode correctly."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("japanese", "日本語テスト")
memory.save()
# Read back and verify
ltm_file = temp_dir / "ltm.json"
data = json.loads(ltm_file.read_text(encoding="utf-8"))
assert data["config"]["japanese"] == "日本語テスト"
def test_save_preserves_formatting(self, temp_dir):
"""Should save with readable formatting."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("key", "value")
memory.save()
ltm_file = temp_dir / "ltm.json"
content = ltm_file.read_text()
# Should be indented (pretty printed)
assert "\n" in content
def test_concurrent_access_simulation(self, temp_dir):
"""Should handle rapid save/load cycles."""
memory = Memory(storage_dir=str(temp_dir))
for i in range(100):
memory.ltm.set_config(f"key_{i}", f"value_{i}")
memory.save()
# Reload and verify
memory2 = Memory(storage_dir=str(temp_dir))
assert memory2.ltm.get_config("key_99") == "value_99"
def test_clear_session_preserves_ltm(self, temp_dir):
"""Should preserve LTM after clear_session."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("important", "data")
memory.stm.add_message("user", "Hello")
memory.episodic.store_search_results("query", [{}])
memory.clear_session()
assert memory.ltm.get_config("important") == "data"
assert memory.stm.conversation_history == []
assert memory.episodic.last_search_results is None
def test_get_context_for_prompt_empty(self, temp_dir):
"""Should handle empty memory state."""
memory = Memory(storage_dir=str(temp_dir))
context = memory.get_context_for_prompt()
assert context["config"] == {}
assert context["last_search"]["query"] is None
assert context["last_search"]["result_count"] == 0
def test_get_full_state_serializable(self, temp_dir):
"""Should return JSON-serializable state."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("key", "value")
memory.stm.add_message("user", "Hello")
memory.episodic.store_search_results("query", [{"name": "Test"}])
state = memory.get_full_state()
# Should be JSON serializable
json_str = json.dumps(state)
assert json_str is not None
class TestMemoryContextEdgeCases:
"""Edge case tests for memory context."""
def test_multiple_init_calls(self, temp_dir):
"""Should handle multiple init calls."""
_memory_ctx.set(None)
init_memory(str(temp_dir))
mem2 = init_memory(str(temp_dir))
# Second call should replace first
assert get_memory() is mem2
def test_set_memory_with_none(self):
"""Should handle setting None."""
_memory_ctx.set(None)
set_memory(None)
with pytest.raises(RuntimeError):
get_memory()
def test_context_isolation(self, temp_dir):
"""Context should be isolated per context."""
from contextvars import copy_context
_memory_ctx.set(None)
mem1 = init_memory(str(temp_dir))
# Create a copy of context
ctx = copy_context()
# In the copy, memory should still be set
def check_memory():
return get_memory()
result = ctx.run(check_memory)
assert result is mem1