Source code for chatlog

# https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports
from __future__ import annotations
import datetime
import os
import json
from pathlib import Path
import logging
from mdvtools.logging_config import get_logger

from typing import List, Optional, Dict, Any, TYPE_CHECKING

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
from langchain_core.outputs import LLMResult

from dataclasses import dataclass
# from mdvtools.websocket import log
from flask_socketio import SocketIO

if TYPE_CHECKING:
    from mdvtools.mdvproject import MDVProject


# all getting a bit messy and confusing in here - will likely be further refactored
[docs] logger = get_logger(__name__)
[docs] log = logger.info
@dataclass
[docs] class ChatLogItem: """Represents a chat log entry for a request and response"""
[docs] context: str
[docs] query: str
[docs] prompt_template: str
[docs] response: str
[docs] timestamp: str
[docs] conversation_id: Optional[str] = None
[docs] view_name: Optional[str] = None
[docs] error: Optional[bool] = None
[docs] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" return { "context": self.context, "query": self.query, "prompt_template": self.prompt_template, "response": self.response, "timestamp": self.timestamp, "conversation_id": self.conversation_id, "view_name": self.view_name, "error": self.error, }
@classmethod
[docs] def from_dict(cls, data: Dict[str, Any]) -> 'ChatLogItem': """Create from dictionary (e.g. from JSON)""" return cls( context=data["context"], query=data["query"], prompt_template=data["prompt_template"], response=data["response"], timestamp=data["timestamp"], conversation_id=data.get("conversation_id"), view_name=data.get("view_name"), error=data.get("error"), )
[docs] class ChatLogger: """Handles chat logging functionality""" def __init__(self, log_file_path: str):
[docs] self.log_file_path = Path(log_file_path)
# note - not calling _ensure_log_file_exists here, # it will currently be called when logging or reading logs # this is not necessarily the best design, but should avoid some current issues # and also I don't think we should be so eager to create the log file anyway.
[docs] def _ensure_log_file_exists(self): """Ensure the log file exists and is properly initialized""" if not self.log_file_path.exists(): self.log_file_path.parent.mkdir(parents=True, exist_ok=True) with open(self.log_file_path, 'w') as f: json.dump([], f)
[docs] def log_chat(self, item: ChatLogItem): """Log a chat item to the JSON file""" try: self._ensure_log_file_exists() # Read existing logs with open(self.log_file_path, 'r') as f: logs = json.load(f) # Append new log logs.append(item.to_dict()) # Write back with open(self.log_file_path, 'w') as f: json.dump(logs, f, indent=4) except Exception as e: logging.error(f"Error logging chat: {e}")
[docs] def get_logs(self) -> List[ChatLogItem]: """Get all chat logs""" try: self._ensure_log_file_exists() with open(self.log_file_path, 'r') as f: logs = json.load(f) return [ChatLogItem.from_dict(log) for log in logs] except Exception as e: logging.error(f"Error reading chat logs: {e}") return []
[docs] def get_conversation_logs(self, conversation_id: str) -> List[ChatLogItem]: """Get logs for a specific conversation""" return [log for log in self.get_logs() if log.conversation_id == conversation_id]
[docs] class ChatSocketAPI: """ An instance of this class is created for each chat request. It will instantiate a Logger instance & SocketIOHandler which should be GCed when the request is finished. """ def __init__(self, project: MDVProject, id: str, room: str, conversation_id: str):
[docs] self.project = project
from mdvtools.websocket import socketio if socketio is None: raise ValueError("SocketIO is not initialized")
[docs] self.socketio = socketio
# todo refactor event/room/namespace names log_name = "chat"
[docs] self.progress_name = "chat_progress"
# we need to reevaluate so that 'logging' isn't sent to all clients. logger = logging.Logger(f"{log_name}_{project.id}_{id}") logger.propagate = False # avoid unintentional memory retention logger.setLevel(logging.INFO)
[docs] self.project_namespace = f"/project/{project.id}"
[docs] self.room = room
handler = ChatSocketIOHandler(socketio, log_name, self.project_namespace, id, room) # todo - move more of this to another file, also add a handler to output to a file in project directory # probably don't need to have separate class for ChatSocketIOHandler, we could make this inherit logging.StreamHandler # would that be better, worse, or indifferent? logger.addHandler(handler) log_dir = os.path.join(project.dir, "logs") os.makedirs(log_dir, exist_ok=True) file_handler = logging.FileHandler(os.path.join(project.dir, f"logs/chat__{conversation_id}.log")) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler)
[docs] self.logger = logger
# self.log(f"ChatSocketAPI initialized for request {id} in room {room}") # self.update_chat_progress("ChatSocketAPI initialized", id, 0, 0) # from time import sleep # sleep(1)
[docs] def log(self, msg: str): self.logger.info(msg)
[docs] def update_chat_progress(self, message: str, id: str, progress: int, delta: int): """ Send a message to the chat log and also update the progress bar. Args: message (str): the message to send to the chat log id (str): the id of the associated chat request progress (int): the progress value (%) to update the progress bar with delta (int): the expected cost of the current operation (%) """ # we should descriminate which user to send this to... # which implies that this instance should be associated... # or that we can map the chat request ID to a user ID. # I think simplest way to do this will be to use request.sid, # which implies that the `/chat` endpoint should be socket.io rather than REST. # to = chat_sid_map.get(id, None) # if to is None: # log(f"Chat progress update for {id} but no associated user found, skipping.") # return self.socketio.emit(self.progress_name, { "message": message, "id": id, "progress": progress, "delta": delta }, namespace=self.project_namespace, to=self.room)
[docs] class ChatSocketIOHandler(logging.StreamHandler): def __init__(self, socketio: SocketIO, event_name: str, namespace: str, id: str, room: str): super().__init__() log(f"handler initialized for event: {event_name}")
[docs] self.socketio = socketio
# todo - event_name vs namespace vs room refactor
[docs] self.event_name = event_name
[docs] self.namespace = namespace
[docs] self.id = id
[docs] self.room = room
# def my_function_handler(data): # log(f"{namespace}/{event_name}: {data}") # we could handle a cancel event handler here? # socketio.on_event(event_name, my_function_handler, namespace=namespace)
[docs] def emit(self, record): """ Emit a record - send it via socketio & also print it to the console. subject to change. """ try: msg = self.format(record) log(f"[ {self.event_name} #{self.id} ] {msg}") #!!! to=self.id? self.socketio.emit(self.event_name, msg, namespace=self.namespace, to=self.room) except Exception: self.handleError(record)
[docs] mypath = os.path.dirname(__file__)
# this needs to be reviewed - causing trouble when running in different contexts
[docs] json_keyfile_path = os.path.join(mypath, "../../../chatlog.json")
# Function to ensure the JSON log file exists
[docs] def initialize_json_log(): if not os.path.exists(json_keyfile_path): with open(json_keyfile_path, 'w') as file: json.dump([], file) # Initialize with an empty list
# Function to log data to the JSON file
[docs] def log_to_json(context, prompt, prompt_template, response): # Ensure the log file exists initialize_json_log() # Prepare log entry timestamp = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S') log_entry = { "timestamp": timestamp, "context": context, "prompt": prompt, "prompt_template": prompt_template, "response": response } # Read the existing logs with open(json_keyfile_path, 'r') as file: logs = json.load(file) # Append the new log entry logs.append(log_entry) # Write back the updated logs to the file with open(json_keyfile_path, 'w') as file: json.dump(logs, file, indent=4)
try:
[docs] file = Path(json_keyfile_path).exists()
except Exception as e: logging.error(f"Error checking log file exists: {e}")
[docs] class LangchainLoggingHandler(BaseCallbackHandler): def __init__(self, logger: logging.Logger):
[docs] self.logger = logger
[docs] self.log = logger.info
[docs] def on_chat_model_start( self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs ) -> None: self.log("Chat model started")
[docs] def on_llm_end(self, response: LLMResult, **kwargs) -> None: self.log(f"Chat model ended, response: {response}")
[docs] def on_chain_start( self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs ) -> None: self.log(f"Chain {serialized.get('name')} started")
[docs] def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> None: self.log(f"Chain ended, outputs: {outputs}")
[docs] def log_chat_item(project, question, output, prompt_template, response, conversation_id, view_name: str | None, error: bool = False): """ Log a chat interaction to the chat log file. Args: project: The MDVProject instance output: Result of invoke 'from langchain.chains import RetrievalQA' (can be None for errors) prompt_template: The template used for the prompt (can be empty for errors) response: The response generated (error message if error) conversation_id: ID to group messages from the same conversation error: Whether this log is for an error """ # Create a ChatLogger instance for this project chat_file = os.path.join(project.dir, "chat_log.json") chat_logger = ChatLogger(chat_file) if error or output is None: context = "[]" view_name = None prompt_template = prompt_template or "" else: context_information = output['source_documents'] context_information_metadata = [context_information[i].metadata for i in range(len(context_information))] context_information_metadata_url = [context_information_metadata[i]['url'] for i in range(len(context_information_metadata))] context_information_metadata_name = [s for s in context_information_metadata_url] context = str(context_information_metadata_name) chat_item = ChatLogItem( context=context, query=question, prompt_template=prompt_template, response=response, timestamp=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), conversation_id=conversation_id, view_name=view_name, error=error ) chat_logger.log_chat(chat_item)