Files
alfred/tests/test_memory_edge_cases.py
2025-12-06 23:55:21 +01:00

547 lines
17 KiB
Python

"""Edge case tests for the Memory system."""
import pytest
import json
import os
from pathlib import Path
from datetime import datetime
from unittest.mock import patch, mock_open
from infrastructure.persistence import (
Memory,
LongTermMemory,
ShortTermMemory,
EpisodicMemory,
init_memory,
get_memory,
set_memory,
)
from infrastructure.persistence.context import _memory_ctx
class TestLongTermMemoryEdgeCases:
"""Edge case tests for LongTermMemory."""
def test_config_with_none_value(self):
"""Should handle None values in config."""
ltm = LongTermMemory()
ltm.set_config("key", None)
assert ltm.get_config("key") is None
assert not ltm.has_config("key")
def test_config_with_empty_string(self):
"""Should handle empty string values."""
ltm = LongTermMemory()
ltm.set_config("key", "")
assert ltm.get_config("key") == ""
assert ltm.has_config("key") # Empty string is still a value
def test_config_with_complex_types(self):
"""Should handle complex types in config."""
ltm = LongTermMemory()
ltm.set_config("list", [1, 2, 3])
ltm.set_config("dict", {"nested": {"deep": "value"}})
ltm.set_config("bool", False)
ltm.set_config("int", 0)
assert ltm.get_config("list") == [1, 2, 3]
assert ltm.get_config("dict")["nested"]["deep"] == "value"
assert ltm.get_config("bool") is False
assert ltm.get_config("int") == 0
def test_library_with_missing_imdb_id(self):
"""Should handle media without imdb_id."""
ltm = LongTermMemory()
media = {"title": "No ID Movie"}
ltm.add_to_library("movies", media)
# Should still add (imdb_id will be None)
assert len(ltm.library["movies"]) == 1
def test_library_duplicate_check_with_none_id(self):
"""Should handle duplicate check when imdb_id is None."""
ltm = LongTermMemory()
media1 = {"title": "Movie 1"}
media2 = {"title": "Movie 2"}
ltm.add_to_library("movies", media1)
ltm.add_to_library("movies", media2)
# May dedupe or not depending on implementation
assert len(ltm.library["movies"]) >= 1
def test_from_dict_with_extra_keys(self):
"""Should ignore extra keys in dict."""
data = {
"config": {},
"preferences": {},
"library": {"movies": []},
"following": [],
"extra_key": "should be ignored",
"another_extra": [1, 2, 3],
}
ltm = LongTermMemory.from_dict(data)
assert not hasattr(ltm, "extra_key")
def test_from_dict_with_wrong_types(self):
"""Should handle wrong types gracefully."""
data = {
"config": "not a dict", # Should be dict
"preferences": [], # Should be dict
"library": "wrong", # Should be dict
"following": {}, # Should be list
}
# Should not crash, but behavior may vary
try:
ltm = LongTermMemory.from_dict(data)
# If it doesn't crash, check it has some defaults
assert ltm is not None
except (TypeError, AttributeError):
# This is also acceptable behavior
pass
def test_to_dict_preserves_unicode(self):
"""Should preserve unicode in serialization."""
ltm = LongTermMemory()
ltm.set_config("japanese", "日本語")
ltm.set_config("emoji", "🎬🎥")
ltm.add_to_library("movies", {"title": "Amélie", "imdb_id": "tt1"})
data = ltm.to_dict()
assert data["config"]["japanese"] == "日本語"
assert data["config"]["emoji"] == "🎬🎥"
assert data["library"]["movies"][0]["title"] == "Amélie"
class TestShortTermMemoryEdgeCases:
"""Edge case tests for ShortTermMemory."""
def test_add_message_with_empty_content(self):
"""Should handle empty message content."""
stm = ShortTermMemory()
stm.add_message("user", "")
assert len(stm.conversation_history) == 1
assert stm.conversation_history[0]["content"] == ""
def test_add_message_with_very_long_content(self):
"""Should handle very long messages."""
stm = ShortTermMemory()
long_content = "x" * 100000
stm.add_message("user", long_content)
assert len(stm.conversation_history[0]["content"]) == 100000
def test_add_message_with_special_characters(self):
"""Should handle special characters."""
stm = ShortTermMemory()
special = "Line1\nLine2\tTab\r\nWindows\x00Null"
stm.add_message("user", special)
assert stm.conversation_history[0]["content"] == special
def test_max_history_zero(self):
"""Should handle max_history of 0."""
stm = ShortTermMemory()
stm.max_history = 0
stm.add_message("user", "Hello")
# Behavior: either empty or keeps last message
assert len(stm.conversation_history) <= 1
def test_max_history_one(self):
"""Should handle max_history of 1."""
stm = ShortTermMemory()
stm.max_history = 1
stm.add_message("user", "First")
stm.add_message("user", "Second")
assert len(stm.conversation_history) == 1
assert stm.conversation_history[0]["content"] == "Second"
def test_get_recent_history_zero(self):
"""Should handle n=0."""
stm = ShortTermMemory()
stm.add_message("user", "Hello")
recent = stm.get_recent_history(0)
# May return empty or all messages depending on implementation
assert isinstance(recent, list)
def test_get_recent_history_negative(self):
"""Should handle negative n."""
stm = ShortTermMemory()
stm.add_message("user", "Hello")
recent = stm.get_recent_history(-1)
# Python slicing with negative returns empty or last element
assert isinstance(recent, list)
def test_workflow_with_empty_target(self):
"""Should handle empty workflow target."""
stm = ShortTermMemory()
stm.start_workflow("download", {})
assert stm.current_workflow["target"] == {}
def test_workflow_with_none_target(self):
"""Should handle None workflow target."""
stm = ShortTermMemory()
stm.start_workflow("download", None)
assert stm.current_workflow["target"] is None
def test_entity_with_none_value(self):
"""Should store None as entity value."""
stm = ShortTermMemory()
stm.set_entity("key", None)
assert stm.get_entity("key") is None
assert "key" in stm.extracted_entities
def test_entity_overwrite(self):
"""Should overwrite existing entity."""
stm = ShortTermMemory()
stm.set_entity("key", "value1")
stm.set_entity("key", "value2")
assert stm.get_entity("key") == "value2"
def test_topic_with_empty_string(self):
"""Should handle empty topic."""
stm = ShortTermMemory()
stm.set_topic("")
assert stm.current_topic == ""
class TestEpisodicMemoryEdgeCases:
"""Edge case tests for EpisodicMemory."""
def test_store_empty_results(self):
"""Should handle empty results list."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [])
assert episodic.last_search_results is not None
assert episodic.last_search_results["results"] == []
def test_store_results_with_none_values(self):
"""Should handle results with None values."""
episodic = EpisodicMemory()
results = [
{"name": None, "seeders": None},
{"name": "Valid", "seeders": 100},
]
episodic.store_search_results("query", results)
assert len(episodic.last_search_results["results"]) == 2
def test_get_result_by_index_after_clear(self):
"""Should return None after clearing results."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [{"name": "Test"}])
episodic.clear_search_results()
result = episodic.get_result_by_index(1)
assert result is None
def test_get_result_by_very_large_index(self):
"""Should handle very large index."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [{"name": "Test"}])
result = episodic.get_result_by_index(999999999)
assert result is None
def test_download_with_missing_fields(self):
"""Should handle download with missing fields."""
episodic = EpisodicMemory()
episodic.add_active_download({}) # Empty dict
assert len(episodic.active_downloads) == 1
assert "started_at" in episodic.active_downloads[0]
def test_update_nonexistent_download(self):
"""Should not crash when updating nonexistent download."""
episodic = EpisodicMemory()
# Should not raise
episodic.update_download_progress("nonexistent", 50)
assert episodic.active_downloads == []
def test_complete_nonexistent_download(self):
"""Should return None for nonexistent download."""
episodic = EpisodicMemory()
result = episodic.complete_download("nonexistent", "/path")
assert result is None
def test_error_with_empty_context(self):
"""Should handle error with None context."""
episodic = EpisodicMemory()
episodic.add_error("action", "error", None)
assert episodic.recent_errors[0]["context"] == {}
def test_error_with_very_long_message(self):
"""Should handle very long error messages."""
episodic = EpisodicMemory()
long_error = "x" * 10000
episodic.add_error("action", long_error)
assert len(episodic.recent_errors[0]["error"]) == 10000
def test_pending_question_with_empty_options(self):
"""Should handle question with no options."""
episodic = EpisodicMemory()
episodic.set_pending_question("Question?", [], {})
assert episodic.pending_question["options"] == []
def test_resolve_question_invalid_index(self):
"""Should return None for invalid answer index."""
episodic = EpisodicMemory()
episodic.set_pending_question(
"Question?",
[{"index": 1, "label": "Option"}],
{},
)
result = episodic.resolve_pending_question(999)
assert result is None
assert episodic.pending_question is None # Still cleared
def test_resolve_question_when_none(self):
"""Should handle resolving when no question pending."""
episodic = EpisodicMemory()
result = episodic.resolve_pending_question(1)
assert result is None
def test_background_event_with_empty_data(self):
"""Should handle event with empty data."""
episodic = EpisodicMemory()
episodic.add_background_event("event", {})
assert episodic.background_events[0]["data"] == {}
def test_get_unread_events_multiple_calls(self):
"""Should return empty on second call."""
episodic = EpisodicMemory()
episodic.add_background_event("event", {})
first = episodic.get_unread_events()
second = episodic.get_unread_events()
assert len(first) == 1
assert len(second) == 0
def test_max_errors_boundary(self):
"""Should keep exactly max_errors."""
episodic = EpisodicMemory()
episodic.max_errors = 3
for i in range(3):
episodic.add_error("action", f"Error {i}")
assert len(episodic.recent_errors) == 3
episodic.add_error("action", "Error 3")
assert len(episodic.recent_errors) == 3
assert episodic.recent_errors[0]["error"] == "Error 1"
def test_max_events_boundary(self):
"""Should keep exactly max_events."""
episodic = EpisodicMemory()
episodic.max_events = 3
for i in range(5):
episodic.add_background_event("event", {"i": i})
assert len(episodic.background_events) == 3
assert episodic.background_events[0]["data"]["i"] == 2
class TestMemoryEdgeCases:
"""Edge case tests for Memory manager."""
def test_init_with_nonexistent_directory(self, temp_dir):
"""Should create directory if not exists."""
new_dir = temp_dir / "new" / "nested" / "dir"
# Create parent directories first
new_dir.mkdir(parents=True, exist_ok=True)
memory = Memory(storage_dir=str(new_dir))
assert new_dir.exists()
def test_init_with_readonly_directory(self, temp_dir):
"""Should handle readonly directory gracefully."""
readonly_dir = temp_dir / "readonly"
readonly_dir.mkdir()
# Make readonly (may not work on all systems)
try:
os.chmod(readonly_dir, 0o444)
# This might raise or might work depending on OS
memory = Memory(storage_dir=str(readonly_dir))
except (PermissionError, OSError):
pass # Expected on some systems
finally:
os.chmod(readonly_dir, 0o755)
def test_load_ltm_with_empty_file(self, temp_dir):
"""Should handle empty LTM file."""
ltm_file = temp_dir / "ltm.json"
ltm_file.write_text("")
memory = Memory(storage_dir=str(temp_dir))
# Should use defaults
assert memory.ltm.config == {}
def test_load_ltm_with_partial_data(self, temp_dir):
"""Should handle partial LTM data."""
ltm_file = temp_dir / "ltm.json"
ltm_file.write_text('{"config": {"key": "value"}}')
memory = Memory(storage_dir=str(temp_dir))
assert memory.ltm.get_config("key") == "value"
# Other fields should have defaults
assert memory.ltm.library == {"movies": [], "tv_shows": []}
def test_save_with_unicode(self, temp_dir):
"""Should save unicode correctly."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("japanese", "日本語テスト")
memory.save()
# Read back and verify
ltm_file = temp_dir / "ltm.json"
data = json.loads(ltm_file.read_text(encoding="utf-8"))
assert data["config"]["japanese"] == "日本語テスト"
def test_save_preserves_formatting(self, temp_dir):
"""Should save with readable formatting."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("key", "value")
memory.save()
ltm_file = temp_dir / "ltm.json"
content = ltm_file.read_text()
# Should be indented (pretty printed)
assert "\n" in content
def test_concurrent_access_simulation(self, temp_dir):
"""Should handle rapid save/load cycles."""
memory = Memory(storage_dir=str(temp_dir))
for i in range(100):
memory.ltm.set_config(f"key_{i}", f"value_{i}")
memory.save()
# Reload and verify
memory2 = Memory(storage_dir=str(temp_dir))
assert memory2.ltm.get_config("key_99") == "value_99"
def test_clear_session_preserves_ltm(self, temp_dir):
"""Should preserve LTM after clear_session."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("important", "data")
memory.stm.add_message("user", "Hello")
memory.episodic.store_search_results("query", [{}])
memory.clear_session()
assert memory.ltm.get_config("important") == "data"
assert memory.stm.conversation_history == []
assert memory.episodic.last_search_results is None
def test_get_context_for_prompt_empty(self, temp_dir):
"""Should handle empty memory state."""
memory = Memory(storage_dir=str(temp_dir))
context = memory.get_context_for_prompt()
assert context["config"] == {}
assert context["last_search"]["query"] is None
assert context["last_search"]["result_count"] == 0
def test_get_full_state_serializable(self, temp_dir):
"""Should return JSON-serializable state."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("key", "value")
memory.stm.add_message("user", "Hello")
memory.episodic.store_search_results("query", [{"name": "Test"}])
state = memory.get_full_state()
# Should be JSON serializable
json_str = json.dumps(state)
assert json_str is not None
class TestMemoryContextEdgeCases:
"""Edge case tests for memory context."""
def test_multiple_init_calls(self, temp_dir):
"""Should handle multiple init calls."""
_memory_ctx.set(None)
mem1 = init_memory(str(temp_dir))
mem2 = init_memory(str(temp_dir))
# Second call should replace first
assert get_memory() is mem2
def test_set_memory_with_none(self):
"""Should handle setting None."""
_memory_ctx.set(None)
set_memory(None)
with pytest.raises(RuntimeError):
get_memory()
def test_context_isolation(self, temp_dir):
"""Context should be isolated per context."""
import asyncio
from contextvars import copy_context
_memory_ctx.set(None)
mem1 = init_memory(str(temp_dir))
# Create a copy of context
ctx = copy_context()
# In the copy, memory should still be set
def check_memory():
return get_memory()
result = ctx.run(check_memory)
assert result is mem1