From f080268c33973ba3b078a5c7505e7f981cc28782 Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 14 Jan 2025 15:57:36 +0100 Subject: [PATCH 01/24] initalize pyris webhook endpoint --- app/domain/data/metrics/transcription_dto.py | 26 +++++++++++++++++++ ...iption_ingestion_pipeline_execution_dto.py | 15 +++++++++++ app/web/routers/webhooks.py | 16 ++++++++++++ 3 files changed, 57 insertions(+) create mode 100644 app/domain/data/metrics/transcription_dto.py create mode 100644 app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py diff --git a/app/domain/data/metrics/transcription_dto.py b/app/domain/data/metrics/transcription_dto.py new file mode 100644 index 00000000..2b4b94a7 --- /dev/null +++ b/app/domain/data/metrics/transcription_dto.py @@ -0,0 +1,26 @@ +from typing import List + +from pydantic import BaseModel, Field +from sqlalchemy import Double + + +class TranscriptionSegmentDTO(BaseModel): + start_time: Double = Field(default="", alias="startTime") + end_time: Double = Field(default="", alias="endTime") + text: str = Field(default="", alias="text") + slide_number: int = Field(default=0, alias="slideNumber") + lecture_unit_id: int = Field(default=0, alias="lectureUnitId") + +class TranscriptionDTO(BaseModel): + language: str = Field(default="", alias="language") + segments: List[TranscriptionSegmentDTO] = Field(default=[], alias="segments") + lecture_id: int = Field(alias="lectureId") + +class TranscriptionWebhookDTO(BaseModel): + transcription: TranscriptionDTO = Field(default="", alias="transcription") + lecture_id: int = Field(alias="lectureId") + lecture_name: str = Field(default="", alias="lectureName") + lecture_unit_link: str = Field(default="", alias="lectureUnitLink") + course_id: int = Field(alias="courseId") + course_name: str = Field(default="", alias="courseName") + course_description: str = Field(default="", alias="courseDescription") \ No newline at end of file diff --git a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py new file mode 100644 index 00000000..2a3076eb --- /dev/null +++ b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py @@ -0,0 +1,15 @@ +from typing import List, Optional + +from pydantic import Field + +from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO +from app.domain.data.metrics.transcription_dto import TranscriptionWebhookDTO +from app.domain.status.stage_dto import StageDTO + + +class TranscriptionIngestionPipelineExecutionDto(PipelineExecutionDTO): + transcription_dto: TranscriptionWebhookDTO + settings: Optional[PipelineExecutionSettingsDTO] + initial_stages: Optional[List[StageDTO]] = Field( + default=None, alias="initialStages" + ) diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index 739a9bbb..e93535fa 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -2,6 +2,7 @@ from asyncio.log import logger from threading import Thread, Semaphore +from openai.types.audio import Transcription from sentry_sdk import capture_exception from fastapi import APIRouter, status, Depends @@ -14,6 +15,8 @@ from ...domain.ingestion.deletionPipelineExecutionDto import ( LecturesDeletionExecutionDto, ) +from ...domain.ingestion.transcription_ingestion.transcription_ingestion_pipeline_execution_dto import \ + TranscriptionIngestionPipelineExecutionDto from ...pipeline.lecture_ingestion_pipeline import LectureIngestionPipeline from ...vector_database.database import VectorDatabase @@ -91,3 +94,16 @@ def lecture_deletion_webhook(dto: LecturesDeletionExecutionDto): """ thread = Thread(target=run_lecture_deletion_pipeline_worker, args=(dto,)) thread.start() + +@router.post( + "/transcriptions/fullIngestion", + status_code=status.HTTP_202_ACCEPTED, + dependencies=[Depends(TokenValidator())], +) +def transcription_ingestion_webhook(dto: TranscriptionIngestionPipelineExecutionDto): + """ + Webhook endpoint to trigger the exercise chat pipeline + """ + # thread = Thread(target=run_lecture_update_pipeline_worker, args=(dto,)) + # thread.start() + print(f"transcription ingestion got DTO {dto}") \ No newline at end of file From aa5d9671aceeab93078c23f7fce2b9670fd17157 Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 14 Jan 2025 16:04:39 +0100 Subject: [PATCH 02/24] change data type --- app/domain/data/metrics/transcription_dto.py | 5 ++--- app/web/routers/webhooks.py | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/app/domain/data/metrics/transcription_dto.py b/app/domain/data/metrics/transcription_dto.py index 2b4b94a7..bc217108 100644 --- a/app/domain/data/metrics/transcription_dto.py +++ b/app/domain/data/metrics/transcription_dto.py @@ -1,12 +1,11 @@ from typing import List from pydantic import BaseModel, Field -from sqlalchemy import Double class TranscriptionSegmentDTO(BaseModel): - start_time: Double = Field(default="", alias="startTime") - end_time: Double = Field(default="", alias="endTime") + start_time: float = Field(default="", alias="startTime") + end_time: float = Field(default="", alias="endTime") text: str = Field(default="", alias="text") slide_number: int = Field(default=0, alias="slideNumber") lecture_unit_id: int = Field(default=0, alias="lectureUnitId") diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index e93535fa..e1fa73f4 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -2,7 +2,6 @@ from asyncio.log import logger from threading import Thread, Semaphore -from openai.types.audio import Transcription from sentry_sdk import capture_exception from fastapi import APIRouter, status, Depends From 96a7f66aed23f0a625bbcd1fe7dcbedcde20ec73 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Mon, 20 Jan 2025 16:14:35 +0100 Subject: [PATCH 03/24] Minor changes --- app/domain/data/metrics/transcription_dto.py | 9 ++++----- .../transcription_ingestion_pipeline_execution_dto.py | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/app/domain/data/metrics/transcription_dto.py b/app/domain/data/metrics/transcription_dto.py index bc217108..3bec2d91 100644 --- a/app/domain/data/metrics/transcription_dto.py +++ b/app/domain/data/metrics/transcription_dto.py @@ -13,13 +13,12 @@ class TranscriptionSegmentDTO(BaseModel): class TranscriptionDTO(BaseModel): language: str = Field(default="", alias="language") segments: List[TranscriptionSegmentDTO] = Field(default=[], alias="segments") - lecture_id: int = Field(alias="lectureId") class TranscriptionWebhookDTO(BaseModel): transcription: TranscriptionDTO = Field(default="", alias="transcription") lecture_id: int = Field(alias="lectureId") - lecture_name: str = Field(default="", alias="lectureName") - lecture_unit_link: str = Field(default="", alias="lectureUnitLink") + # lecture_name: str = Field(default="", alias="lectureName") + # lecture_unit_link: str = Field(default="", alias="lectureUnitLink") course_id: int = Field(alias="courseId") - course_name: str = Field(default="", alias="courseName") - course_description: str = Field(default="", alias="courseDescription") \ No newline at end of file + # course_name: str = Field(default="", alias="courseName") + # course_description: str = Field(default="", alias="courseDescription") \ No newline at end of file diff --git a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py index 2a3076eb..c5db0bda 100644 --- a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py +++ b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py @@ -8,7 +8,7 @@ class TranscriptionIngestionPipelineExecutionDto(PipelineExecutionDTO): - transcription_dto: TranscriptionWebhookDTO + transcription: TranscriptionWebhookDTO settings: Optional[PipelineExecutionSettingsDTO] initial_stages: Optional[List[StageDTO]] = Field( default=None, alias="initialStages" From 6eafaf411d12ad53c92e7cd960eb4ba420550b72 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Tue, 28 Jan 2025 13:48:29 +0100 Subject: [PATCH 04/24] Add transcription pipeline --- app/domain/data/metrics/transcription_dto.py | 9 +- ...iption_ingestion_pipeline_execution_dto.py | 2 +- app/pipeline/chat/course_chat_pipeline.py | 6 +- .../chat/exercise_chat_agent_pipeline.py | 4 +- app/pipeline/shared/citation_pipeline.py | 3 +- .../transcription_ingestion_pipeline.py | 103 ++++++++++++++++++ .../lecture_transcription_schema.py | 99 +++++++++++++++++ app/web/routers/webhooks.py | 38 ++++++- 8 files changed, 249 insertions(+), 15 deletions(-) create mode 100644 app/pipeline/transcription_ingestion_pipeline.py create mode 100644 app/vector_database/lecture_transcription_schema.py diff --git a/app/domain/data/metrics/transcription_dto.py b/app/domain/data/metrics/transcription_dto.py index 3bec2d91..c20c27ee 100644 --- a/app/domain/data/metrics/transcription_dto.py +++ b/app/domain/data/metrics/transcription_dto.py @@ -10,15 +10,16 @@ class TranscriptionSegmentDTO(BaseModel): slide_number: int = Field(default=0, alias="slideNumber") lecture_unit_id: int = Field(default=0, alias="lectureUnitId") + class TranscriptionDTO(BaseModel): language: str = Field(default="", alias="language") segments: List[TranscriptionSegmentDTO] = Field(default=[], alias="segments") + class TranscriptionWebhookDTO(BaseModel): transcription: TranscriptionDTO = Field(default="", alias="transcription") lecture_id: int = Field(alias="lectureId") - # lecture_name: str = Field(default="", alias="lectureName") - # lecture_unit_link: str = Field(default="", alias="lectureUnitLink") + lecture_name: str = Field(default="", alias="lectureName") course_id: int = Field(alias="courseId") - # course_name: str = Field(default="", alias="courseName") - # course_description: str = Field(default="", alias="courseDescription") \ No newline at end of file + course_name: str = Field(default="", alias="courseName") + course_description: str = Field(default="", alias="courseDescription") diff --git a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py index c5db0bda..73bc9cd8 100644 --- a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py +++ b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py @@ -8,7 +8,7 @@ class TranscriptionIngestionPipelineExecutionDto(PipelineExecutionDTO): - transcription: TranscriptionWebhookDTO + transcriptions: List[TranscriptionWebhookDTO] settings: Optional[PipelineExecutionSettingsDTO] initial_stages: Optional[List[StageDTO]] = Field( default=None, alias="initialStages" diff --git a/app/pipeline/chat/course_chat_pipeline.py b/app/pipeline/chat/course_chat_pipeline.py index 9e3306f9..f0904116 100644 --- a/app/pipeline/chat/course_chat_pipeline.py +++ b/app/pipeline/chat/course_chat_pipeline.py @@ -100,14 +100,16 @@ def __init__( requirements=RequirementList( gpt_version_equivalent=4.5, ) - ), completion_args=completion_args + ), + completion_args=completion_args, ) self.llm_small = IrisLangchainChatModel( request_handler=CapabilityRequestHandler( requirements=RequirementList( gpt_version_equivalent=4.25, ) - ), completion_args=completion_args + ), + completion_args=completion_args, ) self.callback = callback diff --git a/app/pipeline/chat/exercise_chat_agent_pipeline.py b/app/pipeline/chat/exercise_chat_agent_pipeline.py index ff9e86da..676f96c6 100644 --- a/app/pipeline/chat/exercise_chat_agent_pipeline.py +++ b/app/pipeline/chat/exercise_chat_agent_pipeline.py @@ -533,7 +533,9 @@ def lecture_content_retrieval() -> str: ] ) - guide_response = (self.prompt | self.llm_small | StrOutputParser()).invoke( + guide_response = ( + self.prompt | self.llm_small | StrOutputParser() + ).invoke( { "response": out, } diff --git a/app/pipeline/shared/citation_pipeline.py b/app/pipeline/shared/citation_pipeline.py index 22e13360..fc71016b 100644 --- a/app/pipeline/shared/citation_pipeline.py +++ b/app/pipeline/shared/citation_pipeline.py @@ -57,7 +57,8 @@ def create_formatted_string(self, paragraphs): paragraph.get(LectureSchema.LECTURE_NAME.value), paragraph.get(LectureSchema.LECTURE_UNIT_NAME.value), paragraph.get(LectureSchema.PAGE_NUMBER.value), - paragraph.get(LectureSchema.LECTURE_UNIT_LINK.value) or "No link available", + paragraph.get(LectureSchema.LECTURE_UNIT_LINK.value) + or "No link available", paragraph.get(LectureSchema.PAGE_TEXT_CONTENT.value), ) formatted_string += lct diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py new file mode 100644 index 00000000..398e4cd1 --- /dev/null +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -0,0 +1,103 @@ +import threading +from typing import Optional, List, Dict, Any + +from weaviate import WeaviateClient + +from asyncio.log import logger + +from app.domain.data.metrics.transcription_dto import ( + TranscriptionWebhookDTO, +) +from app.domain.ingestion.transcription_ingestion.transcription_ingestion_pipeline_execution_dto import ( + TranscriptionIngestionPipelineExecutionDto, +) +from app.ingestion.abstract_ingestion import AbstractIngestion +from app.llm import BasicRequestHandler +from app.pipeline import Pipeline +from app.vector_database.lecture_transcription_schema import ( + init_lecture_transcription_schema, + LectureTranscriptionSchema, +) +from app.web.status.ingestion_status_callback import IngestionStatusCallback + +batch_insert_lock = threading.Lock() + + +class TranscriptionIngestionPipeline(AbstractIngestion, Pipeline): + def __init__( + self, + client: WeaviateClient, + dto: Optional[TranscriptionIngestionPipelineExecutionDto], + callback: IngestionStatusCallback, + ) -> None: + super().__init__() + self.client = client + self.dto = dto + self.callback = callback + self.collection = init_lecture_transcription_schema(client) + self.llm_embedding = BasicRequestHandler("embedding-small") + + def __call__(self) -> None: + try: + self.callback.in_progress("Chunking transcriptions...") + chunks = self.chunk_data(self.dto.transcriptions) + self.batch_insert(chunks) + self.callback.done("Transcriptions ingested successfully") + + except Exception as e: + print(e) + + def batch_insert(self, chunks): + global batch_insert_lock + with batch_insert_lock: + with self.collection.batch.rate_limit(requests_per_minute=600) as batch: + try: + for ( + index, + chunk, + ) in enumerate(chunks): + embed_chunk = self.llm_embedding.embed( + chunk[LectureTranscriptionSchema.SEGMENT_TEXT.value] + ) + print(f"Embedding chunk {index}") + print(chunk) + batch.add_object(properties=chunk, vector=embed_chunk) + except Exception as e: + logger.error(f"Error embedding lecture transcription chunk: {e}") + self.callback.error( + f"Failed to ingest lecture transcriptions into the database: {e}", + exception=e, + tokens=self.tokens, + ) + + def chunk_data( + self, transcriptions: List[TranscriptionWebhookDTO] + ) -> List[Dict[str, Any]]: + slide_chunks = {} + for transcription in transcriptions: + for segment in transcription.transcription.segments: + slide_key = f"{transcription.lecture_id}_{segment.lecture_unit_id}_{segment.slide_number}" + + if slide_key not in slide_chunks: + chunk = { + LectureTranscriptionSchema.COURSE_ID.value: transcription.course_id, + LectureTranscriptionSchema.COURSE_NAME.value: transcription.course_name, + LectureTranscriptionSchema.LECTURE_ID.value: transcription.lecture_id, + LectureTranscriptionSchema.LECTURE_NAME.value: transcription.lecture_name, + LectureTranscriptionSchema.LANGUAGE.value: transcription.transcription.language, + LectureTranscriptionSchema.SEGMENT_START.value: segment.start_time, + LectureTranscriptionSchema.SEGMENT_END.value: segment.end_time, + LectureTranscriptionSchema.SEGMENT_TEXT.value: segment.text, + LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDES_ID.value: segment.lecture_unit_id, + LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDE_NUMBER.value: segment.slide_number, + } + + slide_chunks[slide_key] = chunk + else: + slide_chunks[slide_key][ + LectureTranscriptionSchema.SEGMENT_TEXT.value + ] += (" " + segment.text) + slide_chunks[slide_key][ + LectureTranscriptionSchema.SEGMENT_END.value + ] = segment.end_time + return list(slide_chunks.values()) diff --git a/app/vector_database/lecture_transcription_schema.py b/app/vector_database/lecture_transcription_schema.py new file mode 100644 index 00000000..ea3cf7b8 --- /dev/null +++ b/app/vector_database/lecture_transcription_schema.py @@ -0,0 +1,99 @@ +from enum import Enum + +from weaviate.classes.config import Property +from weaviate import WeaviateClient +from weaviate.collections import Collection +from weaviate.collections.classes.config import Configure, VectorDistances, DataType + + +class LectureTranscriptionSchema(Enum): + """ + Schema for the lecture transcriptions + """ + + COLLECTION_NAME = "LectureTranscriptions" + COURSE_ID = "course_id" + COURSE_NAME = "course_name" + LECTURE_ID = "lecture_id" + LECTURE_NAME = "lecture_name" + LANGUAGE = "language" + SEGMENT_START = "segment_start" + SEGMENT_END = "segment_end" + SEGMENT_TEXT = "segment_text" + SEGMENT_LECTURE_UNIT_SLIDES_ID = "segment_lecture_unit_slides_id" + SEGMENT_LECTURE_UNIT_SLIDE_NUMBER = "segment_lecture_unit_slide_number" + + +def init_lecture_transcription_schema(client: WeaviateClient) -> Collection: + if client.collections.exists(LectureTranscriptionSchema.COLLECTION_NAME.value): + return client.collections.get(LectureTranscriptionSchema.COLLECTION_NAME.value) + + return client.collections.create( + name=LectureTranscriptionSchema.COLLECTION_NAME.value, + vectorizer_config=Configure.Vectorizer.none(), + vector_index_config=Configure.VectorIndex.hnsw( + distance_metric=VectorDistances.COSINE + ), + properties=[ + Property( + name=LectureTranscriptionSchema.COURSE_ID.value, + description="The ID of the course", + data_type=DataType.INT, + index_searable=False, + ), + Property( + name=LectureTranscriptionSchema.COURSE_NAME.value, + description="The name of the course", + data_type=DataType.TEXT, + index_searchable=False, + ), + Property( + name=LectureTranscriptionSchema.LECTURE_ID.value, + description="The ID of the lecture", + data_type=DataType.INT, + index_searchable=False, + ), + Property( + name=LectureTranscriptionSchema.LECTURE_NAME.value, + description="The name of the lecture", + data_type=DataType.TEXT, + index_searchable=False, + ), + Property( + name=LectureTranscriptionSchema.LANGUAGE.value, + description="The language of the text", + data_type=DataType.TEXT, + index_searchable=False, + ), + Property( + name=LectureTranscriptionSchema.SEGMENT_START.value, + description="The start of the segment", + data_type=DataType.NUMBER, + index_searchable=False, + ), + Property( + name=LectureTranscriptionSchema.SEGMENT_END.value, + description="The end of the segment", + data_type=DataType.NUMBER, + index_searchable=False, + ), + Property( + name=LectureTranscriptionSchema.SEGMENT_TEXT.value, + description="The transcription of the segment", + data_type=DataType.TEXT, + index_searchable=True, + ), + Property( + name=LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDES_ID.value, + description="The id of the lecture unit slides of the segment", + data_type=DataType.INT, + index_searchable=False, + ), + Property( + name=LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDE_NUMBER.value, + description="The slide number of the lecture unit of the segment", + data_type=DataType.INT, + index_searchable=False, + ), + ], + ) diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index e1fa73f4..5a6d73bd 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -14,9 +14,11 @@ from ...domain.ingestion.deletionPipelineExecutionDto import ( LecturesDeletionExecutionDto, ) -from ...domain.ingestion.transcription_ingestion.transcription_ingestion_pipeline_execution_dto import \ - TranscriptionIngestionPipelineExecutionDto +from ...domain.ingestion.transcription_ingestion.transcription_ingestion_pipeline_execution_dto import ( + TranscriptionIngestionPipelineExecutionDto, +) from ...pipeline.lecture_ingestion_pipeline import LectureIngestionPipeline +from ...pipeline.transcription_ingestion_pipeline import TranscriptionIngestionPipeline from ...vector_database.database import VectorDatabase router = APIRouter(prefix="/api/v1/webhooks", tags=["webhooks"]) @@ -69,6 +71,29 @@ def run_lecture_deletion_pipeline_worker(dto: LecturesDeletionExecutionDto): logger.error(traceback.format_exc()) +def run_transcription_ingestion_pipeline_worker( + dto: TranscriptionIngestionPipelineExecutionDto, +): + """ + Run the transcription ingestion pipeline in a separate thread + """ + try: + callback = IngestionStatusCallback( + run_id=dto.settings.authentication_token, + base_url=dto.settings.artemis_base_url, + initial_stages=dto.initial_stages, + ) + db = VectorDatabase() + client = db.get_client() + pipeline = TranscriptionIngestionPipeline( + client=client, dto=dto, callback=callback + ) + pipeline() + except Exception as e: + logger.error(f"Error while deleting lectures: {e}") + logger.error(traceback.format_exc()) + + @router.post( "/lectures/fullIngestion", status_code=status.HTTP_202_ACCEPTED, @@ -94,6 +119,7 @@ def lecture_deletion_webhook(dto: LecturesDeletionExecutionDto): thread = Thread(target=run_lecture_deletion_pipeline_worker, args=(dto,)) thread.start() + @router.post( "/transcriptions/fullIngestion", status_code=status.HTTP_202_ACCEPTED, @@ -101,8 +127,8 @@ def lecture_deletion_webhook(dto: LecturesDeletionExecutionDto): ) def transcription_ingestion_webhook(dto: TranscriptionIngestionPipelineExecutionDto): """ - Webhook endpoint to trigger the exercise chat pipeline + Webhook endpoint to trigger the lecture transcription ingestion pipeline """ - # thread = Thread(target=run_lecture_update_pipeline_worker, args=(dto,)) - # thread.start() - print(f"transcription ingestion got DTO {dto}") \ No newline at end of file + print(f"transcription ingestion got DTO {dto}") + thread = Thread(target=run_transcription_ingestion_pipeline_worker, args=(dto,)) + thread.start() From 72350f1637035b6c3ca9bb796190aea086bcc849 Mon Sep 17 00:00:00 2001 From: isabella Date: Mon, 3 Feb 2025 17:15:25 +0100 Subject: [PATCH 05/24] add summarizing and chunking for lecture transcriptions --- app/common/PipelineEnum.py | 1 + .../transcription_ingestion_prompts.py | 6 + .../transcription_ingestion_pipeline.py | 133 ++++++++++++++++-- .../lecture_transcription_schema.py | 7 + app/web/routers/webhooks.py | 5 +- .../transcription_ingestion_callback.py | 54 +++++++ requirements.txt | 2 +- 7 files changed, 198 insertions(+), 10 deletions(-) create mode 100644 app/pipeline/prompts/transcription_ingestion_prompts.py create mode 100644 app/web/status/transcription_ingestion_callback.py diff --git a/app/common/PipelineEnum.py b/app/common/PipelineEnum.py index a3283705..1db3a863 100644 --- a/app/common/PipelineEnum.py +++ b/app/common/PipelineEnum.py @@ -18,4 +18,5 @@ class PipelineEnum(str, Enum): IRIS_FAQ_RETRIEVAL_PIPELINE = "IRIS_FAQ_RETRIEVAL_PIPELINE" IRIS_INCONSISTENCY_CHECK = "IRIS_INCONSISTENCY_CHECK" IRIS_REWRITING_PIPELINE = "IRIS_REWRITING_PIPELINE" + IRIS_VIDEO_TRANSCRIPTION_INGESTION = "IRIS_VIDEO_TRANSCRIPTION_INGESTION" NOT_SET = "NOT_SET" diff --git a/app/pipeline/prompts/transcription_ingestion_prompts.py b/app/pipeline/prompts/transcription_ingestion_prompts.py new file mode 100644 index 00000000..8926a4de --- /dev/null +++ b/app/pipeline/prompts/transcription_ingestion_prompts.py @@ -0,0 +1,6 @@ +def transcription_summary_prompt(lecture_name: str, chunk_content: str): + return f""" + You are a helpful assistant. A snippet of the spoken content of one lecture of the lecture {lecture_name} will be given to you, summarize the information without adding details and return only the summary nothing more. + This is the text you should summarize: + {chunk_content} + """ diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index 398e4cd1..f5e777a4 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -1,34 +1,50 @@ +import os import threading +from functools import reduce +from idlelib.pyparse import trans from typing import Optional, List, Dict, Any +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.runnables import Runnable +from langchain_experimental.text_splitter import SemanticChunker +from langchain_openai import AzureOpenAIEmbeddings +from langchain_text_splitters import RecursiveCharacterTextSplitter from weaviate import WeaviateClient from asyncio.log import logger +from app.common.PipelineEnum import PipelineEnum from app.domain.data.metrics.transcription_dto import ( - TranscriptionWebhookDTO, + TranscriptionWebhookDTO, TranscriptionSegmentDTO, ) from app.domain.ingestion.transcription_ingestion.transcription_ingestion_pipeline_execution_dto import ( TranscriptionIngestionPipelineExecutionDto, ) from app.ingestion.abstract_ingestion import AbstractIngestion -from app.llm import BasicRequestHandler +from app.llm import BasicRequestHandler, CapabilityRequestHandler, RequirementList, CompletionArguments +from app.llm.langchain import IrisLangchainChatModel from app.pipeline import Pipeline +from app.pipeline.prompts.transcription_ingestion_prompts import transcription_summary_prompt from app.vector_database.lecture_transcription_schema import ( init_lecture_transcription_schema, LectureTranscriptionSchema, ) from app.web.status.ingestion_status_callback import IngestionStatusCallback +from app.web.status.transcription_ingestion_callback import TranscriptionIngestionStatus batch_insert_lock = threading.Lock() -class TranscriptionIngestionPipeline(AbstractIngestion, Pipeline): +class TranscriptionIngestionPipeline(Pipeline): + llm: IrisLangchainChatModel + pipeline: Runnable + prompt: ChatPromptTemplate def __init__( self, client: WeaviateClient, dto: Optional[TranscriptionIngestionPipelineExecutionDto], - callback: IngestionStatusCallback, + callback: TranscriptionIngestionStatus, ) -> None: super().__init__() self.client = client @@ -37,10 +53,26 @@ def __init__( self.collection = init_lecture_transcription_schema(client) self.llm_embedding = BasicRequestHandler("embedding-small") + request_handler = CapabilityRequestHandler( + requirements=RequirementList( + gpt_version_equivalent=4.5, + context_length=16385, + privacy_compliance=True, + ) + ) + completion_args = CompletionArguments(temperature=0, max_tokens=2000) + self.llm = IrisLangchainChatModel( + request_handler=request_handler, completion_args=completion_args + ) + self.pipeline = self.llm | StrOutputParser() + def __call__(self) -> None: try: self.callback.in_progress("Chunking transcriptions...") - chunks = self.chunk_data(self.dto.transcriptions) + chunks = self.chunk_transcriptions(self.dto.transcriptions) + + chunks = self.summarize_chunks(chunks) + self.batch_insert(chunks) self.callback.done("Transcriptions ingested successfully") @@ -70,11 +102,13 @@ def batch_insert(self, chunks): tokens=self.tokens, ) - def chunk_data( + + def chunk_transcriptions( self, transcriptions: List[TranscriptionWebhookDTO] ) -> List[Dict[str, Any]]: - slide_chunks = {} + chunks = [] for transcription in transcriptions: + slide_chunks = {} for segment in transcription.transcription.segments: slide_key = f"{transcription.lecture_id}_{segment.lecture_unit_id}_{segment.slide_number}" @@ -100,4 +134,87 @@ def chunk_data( slide_chunks[slide_key][ LectureTranscriptionSchema.SEGMENT_END.value ] = segment.end_time - return list(slide_chunks.values()) + + for i, segment in enumerate(slide_chunks.values()): + if len(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) < 1200: + chunks.append(segment) + continue + + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=512, chunk_overlap=102 + ) + + semantic_chunks = text_splitter.split_text(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) + + for j, chunk in enumerate(semantic_chunks): + offset_slide_chunk = reduce( + lambda acc, txt: acc + len(txt), + map(lambda seg: seg[LectureTranscriptionSchema.SEGMENT_TEXT.value], list(slide_chunks.values())[:i]), + 0 + ) + + offset_semantic_chunk = reduce( + lambda acc, txt: acc + len(txt), + semantic_chunks[:j], + 0 + ) + + offset_start = offset_slide_chunk + offset_semantic_chunk + offset_end = offset_start + len(chunk) + + start_time = self.get_segment_of_char_position(offset_start, transcription.transcription.segments).start_time + end_time = self.get_segment_of_char_position(offset_end, transcription.transcription.segments).end_time + + chunks.append( + { + **segment, + LectureTranscriptionSchema.SEGMENT_START.value: start_time, + LectureTranscriptionSchema.SEGMENT_END.value: end_time, + LectureTranscriptionSchema.SEGMENT_TEXT.value: chunk, + } + ) + + return chunks + + @staticmethod + def get_segment_of_char_position(char_position: int, segments: List[TranscriptionSegmentDTO]): + offset_lookup_counter = 0 + segment_index = 0 + while offset_lookup_counter < char_position and segment_index < len(segments): + offset_lookup_counter += len(segments[segment_index].text) + segment_index += 1 + + if segment_index >= len(segments): + return segments[-1] + return segments[segment_index] + + def summarize_chunks(self, chunks): + chunks_with_summaries = [] + for chunk in chunks: + print(chunk) + self.prompt = ChatPromptTemplate.from_messages( + [ + ("system", transcription_summary_prompt(chunk[LectureTranscriptionSchema.LECTURE_NAME.value], + chunk[LectureTranscriptionSchema.SEGMENT_TEXT.value])), + ] + ) + prompt_val = self.prompt.format_messages() + self.prompt = ChatPromptTemplate.from_messages(prompt_val) + try: + response = (self.prompt | self.pipeline).invoke({}) + ### summary for chunk ### + print(response) + + # self._append_tokens(self.llm.tokens, PipelineEnum.IRIS_VIDEO_TRANSCRIPTION_INGESTION) + + chunks_with_summaries.append( + { + **chunk, + LectureTranscriptionSchema.SEGMENT_SUMMARY.value: response + } + ) + except Exception as e: + raise e + return chunks_with_summaries + + diff --git a/app/vector_database/lecture_transcription_schema.py b/app/vector_database/lecture_transcription_schema.py index ea3cf7b8..9685cd09 100644 --- a/app/vector_database/lecture_transcription_schema.py +++ b/app/vector_database/lecture_transcription_schema.py @@ -22,6 +22,7 @@ class LectureTranscriptionSchema(Enum): SEGMENT_TEXT = "segment_text" SEGMENT_LECTURE_UNIT_SLIDES_ID = "segment_lecture_unit_slides_id" SEGMENT_LECTURE_UNIT_SLIDE_NUMBER = "segment_lecture_unit_slide_number" + SEGMENT_SUMMARY = "segment_summary" def init_lecture_transcription_schema(client: WeaviateClient) -> Collection: @@ -95,5 +96,11 @@ def init_lecture_transcription_schema(client: WeaviateClient) -> Collection: data_type=DataType.INT, index_searchable=False, ), + Property( + name=LectureTranscriptionSchema.SEGMENT_SUMMARY.value, + description="The summary of the text of the segment", + data_type=DataType.TEXT, + index_searchable=True, + ), ], ) diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index af7344af..777b352a 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -13,6 +13,7 @@ from ..status.faq_ingestion_status_callback import FaqIngestionStatus from ..status.ingestion_status_callback import IngestionStatusCallback from ..status.lecture_deletion_status_callback import LecturesDeletionStatusCallback +from ..status.transcription_ingestion_callback import TranscriptionIngestionStatus from ...domain.ingestion.deletionPipelineExecutionDto import ( LecturesDeletionExecutionDto, FaqDeletionExecutionDto, @@ -83,10 +84,11 @@ def run_transcription_ingestion_pipeline_worker( Run the transcription ingestion pipeline in a separate thread """ try: - callback = IngestionStatusCallback( + callback = TranscriptionIngestionStatus( run_id=dto.settings.authentication_token, base_url=dto.settings.artemis_base_url, initial_stages=dto.initial_stages, + lecture_id=dto.transcriptions[0].lecture_id, ) db = VectorDatabase() client = db.get_client() @@ -97,6 +99,7 @@ def run_transcription_ingestion_pipeline_worker( except Exception as e: logger.error(f"Error while deleting lectures: {e}") logger.error(traceback.format_exc()) + def run_faq_update_pipeline_worker(dto: FaqIngestionPipelineExecutionDto): """ Run the exercise chat pipeline in a separate thread diff --git a/app/web/status/transcription_ingestion_callback.py b/app/web/status/transcription_ingestion_callback.py new file mode 100644 index 00000000..38ecff51 --- /dev/null +++ b/app/web/status/transcription_ingestion_callback.py @@ -0,0 +1,54 @@ +from typing import List + +from .status_update import StatusCallback +from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO +from ...domain.status.stage_state_dto import StageStateEnum +from ...domain.status.stage_dto import StageDTO +import logging + +logger = logging.getLogger(__name__) + + +class TranscriptionIngestionStatus(StatusCallback): + """ + Callback class for updating the status of a Faq ingestion Pipeline run. + """ + + def __init__( + self, + run_id: str, + base_url: str, + initial_stages: List[StageDTO] = None, + lecture_id: int = None, + ): + url = ( + f"{base_url}/api/public/pyris/webhooks/ingestion/transcriptions/runs/{run_id}/status" + ) + + current_stage_index = len(initial_stages) if initial_stages else 0 + stages = initial_stages or [] + stages += [ + StageDTO( + weight=10, + state=StageStateEnum.NOT_STARTED, + name="Remove old transcription" + ), + StageDTO( + weight=20, + state=StageStateEnum.NOT_STARTED, + name="Chunk transcription", + ), + StageDTO( + weight=50, + state=StageStateEnum.NOT_STARTED, + name="Summarize transcription", + ), + StageDTO( + weight=20, + state=StageStateEnum.NOT_STARTED, + name="Ingest transcription", + ), + ] + status = IngestionStatusUpdateDTO(stages=stages, id=lecture_id) + stage = stages[current_stage_index] + super().__init__(url, run_id, status, stage, current_stage_index) diff --git a/requirements.txt b/requirements.txt index 81663ea8..3cc93b35 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ fastapi==0.115.5 flake8==7.1.1 langchain==0.3.8 ollama==0.3.3 -openai==1.54.4 +openai==1.60.2 pre-commit==4.0.1 psutil==6.1.0 pydantic==2.9.2 From 679dd15bc6cda62521b74e5c30a7826be6daff05 Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 4 Feb 2025 10:59:53 +0100 Subject: [PATCH 06/24] add lecture id to transcriptioningestiondto --- .../transcription_ingestion_pipeline_execution_dto.py | 1 + app/web/routers/webhooks.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py index 73bc9cd8..10673f00 100644 --- a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py +++ b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py @@ -9,6 +9,7 @@ class TranscriptionIngestionPipelineExecutionDto(PipelineExecutionDTO): transcriptions: List[TranscriptionWebhookDTO] + lectureId: int settings: Optional[PipelineExecutionSettingsDTO] initial_stages: Optional[List[StageDTO]] = Field( default=None, alias="initialStages" diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index 777b352a..ee9dc5e6 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -88,7 +88,7 @@ def run_transcription_ingestion_pipeline_worker( run_id=dto.settings.authentication_token, base_url=dto.settings.artemis_base_url, initial_stages=dto.initial_stages, - lecture_id=dto.transcriptions[0].lecture_id, + lecture_id=dto.lectureId ) db = VectorDatabase() client = db.get_client() From 1e75c6eddefbb8f9ec60f127f69d0fdd44f44890 Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 4 Feb 2025 11:55:54 +0100 Subject: [PATCH 07/24] fix token usage --- app/pipeline/transcription_ingestion_pipeline.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index f5e777a4..dd28b8d2 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -65,6 +65,7 @@ def __init__( request_handler=request_handler, completion_args=completion_args ) self.pipeline = self.llm | StrOutputParser() + self.tokens = [] def __call__(self) -> None: try: @@ -191,7 +192,6 @@ def get_segment_of_char_position(char_position: int, segments: List[Transcriptio def summarize_chunks(self, chunks): chunks_with_summaries = [] for chunk in chunks: - print(chunk) self.prompt = ChatPromptTemplate.from_messages( [ ("system", transcription_summary_prompt(chunk[LectureTranscriptionSchema.LECTURE_NAME.value], @@ -202,11 +202,7 @@ def summarize_chunks(self, chunks): self.prompt = ChatPromptTemplate.from_messages(prompt_val) try: response = (self.prompt | self.pipeline).invoke({}) - ### summary for chunk ### - print(response) - - # self._append_tokens(self.llm.tokens, PipelineEnum.IRIS_VIDEO_TRANSCRIPTION_INGESTION) - + self._append_tokens(self.llm.tokens, PipelineEnum.IRIS_VIDEO_TRANSCRIPTION_INGESTION) chunks_with_summaries.append( { **chunk, From 92d6c3b1f76fd992887bdf733028632e608f5fd4 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Tue, 4 Feb 2025 13:55:35 +0100 Subject: [PATCH 08/24] Fix chunking --- .../transcription_ingestion_pipeline.py | 106 ++++++++++-------- 1 file changed, 60 insertions(+), 46 deletions(-) diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index f5e777a4..4feb3ab7 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -1,36 +1,37 @@ -import os import threading from functools import reduce -from idlelib.pyparse import trans from typing import Optional, List, Dict, Any from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import Runnable -from langchain_experimental.text_splitter import SemanticChunker -from langchain_openai import AzureOpenAIEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from weaviate import WeaviateClient from asyncio.log import logger -from app.common.PipelineEnum import PipelineEnum from app.domain.data.metrics.transcription_dto import ( - TranscriptionWebhookDTO, TranscriptionSegmentDTO, + TranscriptionWebhookDTO, + TranscriptionSegmentDTO, ) from app.domain.ingestion.transcription_ingestion.transcription_ingestion_pipeline_execution_dto import ( TranscriptionIngestionPipelineExecutionDto, ) -from app.ingestion.abstract_ingestion import AbstractIngestion -from app.llm import BasicRequestHandler, CapabilityRequestHandler, RequirementList, CompletionArguments +from app.llm import ( + BasicRequestHandler, + CapabilityRequestHandler, + RequirementList, + CompletionArguments, +) from app.llm.langchain import IrisLangchainChatModel from app.pipeline import Pipeline -from app.pipeline.prompts.transcription_ingestion_prompts import transcription_summary_prompt +from app.pipeline.prompts.transcription_ingestion_prompts import ( + transcription_summary_prompt, +) from app.vector_database.lecture_transcription_schema import ( init_lecture_transcription_schema, LectureTranscriptionSchema, ) -from app.web.status.ingestion_status_callback import IngestionStatusCallback from app.web.status.transcription_ingestion_callback import TranscriptionIngestionStatus batch_insert_lock = threading.Lock() @@ -40,6 +41,7 @@ class TranscriptionIngestionPipeline(Pipeline): llm: IrisLangchainChatModel pipeline: Runnable prompt: ChatPromptTemplate + def __init__( self, client: WeaviateClient, @@ -68,31 +70,33 @@ def __init__( def __call__(self) -> None: try: - self.callback.in_progress("Chunking transcriptions...") + self.callback.in_progress("Chunking transcriptions") chunks = self.chunk_transcriptions(self.dto.transcriptions) + self.callback.in_progress("Summarizing transcriptions") chunks = self.summarize_chunks(chunks) + self.callback.in_progress("Ingesting transcriptions into vector database") self.batch_insert(chunks) self.callback.done("Transcriptions ingested successfully") except Exception as e: - print(e) + logger.error(f"Error processing transcription ingestion pipeline: {e}") + self.callback.error( + f"Error processing transcription ingestion pipeline: {e}", + exception=e, + tokens=self.tokens, + ) def batch_insert(self, chunks): global batch_insert_lock with batch_insert_lock: with self.collection.batch.rate_limit(requests_per_minute=600) as batch: try: - for ( - index, - chunk, - ) in enumerate(chunks): + for chunk in chunks: embed_chunk = self.llm_embedding.embed( chunk[LectureTranscriptionSchema.SEGMENT_TEXT.value] ) - print(f"Embedding chunk {index}") - print(chunk) batch.add_object(properties=chunk, vector=embed_chunk) except Exception as e: logger.error(f"Error embedding lecture transcription chunk: {e}") @@ -102,11 +106,12 @@ def batch_insert(self, chunks): tokens=self.tokens, ) - def chunk_transcriptions( self, transcriptions: List[TranscriptionWebhookDTO] ) -> List[Dict[str, Any]]: + CHUNK_SEPARATOR_CHAR = "\x1F" chunks = [] + for transcription in transcriptions: slide_chunks = {} for segment in transcription.transcription.segments: @@ -130,54 +135,64 @@ def chunk_transcriptions( else: slide_chunks[slide_key][ LectureTranscriptionSchema.SEGMENT_TEXT.value - ] += (" " + segment.text) + ] += (CHUNK_SEPARATOR_CHAR + segment.text) slide_chunks[slide_key][ LectureTranscriptionSchema.SEGMENT_END.value ] = segment.end_time for i, segment in enumerate(slide_chunks.values()): if len(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) < 1200: + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = segment[LectureTranscriptionSchema.SEGMENT_TEXT.value].replace(CHUNK_SEPARATOR_CHAR, " ") chunks.append(segment) continue - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=512, chunk_overlap=102 - ) + text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=0) - semantic_chunks = text_splitter.split_text(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) + semantic_chunks = text_splitter.split_text( + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] + ) for j, chunk in enumerate(semantic_chunks): offset_slide_chunk = reduce( - lambda acc, txt: acc + len(txt), - map(lambda seg: seg[LectureTranscriptionSchema.SEGMENT_TEXT.value], list(slide_chunks.values())[:i]), - 0 + lambda acc, txt: acc + len(txt.replace(CHUNK_SEPARATOR_CHAR, "")), + map( + lambda seg: seg[ + LectureTranscriptionSchema.SEGMENT_TEXT.value + ], + list(slide_chunks.values())[:i], + ), + 0, ) - offset_semantic_chunk = reduce( - lambda acc, txt: acc + len(txt), - semantic_chunks[:j], - 0 + offset_semantic_chunk = reduce( + lambda acc, txt: acc + len(txt.replace(CHUNK_SEPARATOR_CHAR, "")), semantic_chunks[:j], 0 ) - offset_start = offset_slide_chunk + offset_semantic_chunk - offset_end = offset_start + len(chunk) + offset_start = offset_slide_chunk + offset_semantic_chunk + 1 + offset_end = offset_start + len(chunk.replace(CHUNK_SEPARATOR_CHAR, "")) - start_time = self.get_segment_of_char_position(offset_start, transcription.transcription.segments).start_time - end_time = self.get_segment_of_char_position(offset_end, transcription.transcription.segments).end_time + start_time = self.get_transcription_segment_of_char_position( + offset_start, transcription.transcription.segments + ).start_time + end_time = self.get_transcription_segment_of_char_position( + offset_end, transcription.transcription.segments + ).end_time chunks.append( { **segment, LectureTranscriptionSchema.SEGMENT_START.value: start_time, LectureTranscriptionSchema.SEGMENT_END.value: end_time, - LectureTranscriptionSchema.SEGMENT_TEXT.value: chunk, + LectureTranscriptionSchema.SEGMENT_TEXT.value: chunk.replace(CHUNK_SEPARATOR_CHAR, " ").strip(), } ) return chunks @staticmethod - def get_segment_of_char_position(char_position: int, segments: List[TranscriptionSegmentDTO]): + def get_transcription_segment_of_char_position( + char_position: int, segments: List[TranscriptionSegmentDTO] + ): offset_lookup_counter = 0 segment_index = 0 while offset_lookup_counter < char_position and segment_index < len(segments): @@ -191,30 +206,29 @@ def get_segment_of_char_position(char_position: int, segments: List[Transcriptio def summarize_chunks(self, chunks): chunks_with_summaries = [] for chunk in chunks: - print(chunk) self.prompt = ChatPromptTemplate.from_messages( [ - ("system", transcription_summary_prompt(chunk[LectureTranscriptionSchema.LECTURE_NAME.value], - chunk[LectureTranscriptionSchema.SEGMENT_TEXT.value])), + ( + "system", + transcription_summary_prompt( + chunk[LectureTranscriptionSchema.LECTURE_NAME.value], + chunk[LectureTranscriptionSchema.SEGMENT_TEXT.value], + ), + ), ] ) prompt_val = self.prompt.format_messages() self.prompt = ChatPromptTemplate.from_messages(prompt_val) try: response = (self.prompt | self.pipeline).invoke({}) - ### summary for chunk ### - print(response) - # self._append_tokens(self.llm.tokens, PipelineEnum.IRIS_VIDEO_TRANSCRIPTION_INGESTION) chunks_with_summaries.append( { **chunk, - LectureTranscriptionSchema.SEGMENT_SUMMARY.value: response + LectureTranscriptionSchema.SEGMENT_SUMMARY.value: response, } ) except Exception as e: raise e return chunks_with_summaries - - From 036e31a1c5d9c2490b61d60444d7d7edbf4c1c08 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Tue, 4 Feb 2025 13:58:06 +0100 Subject: [PATCH 09/24] Format code --- .../transcription_ingestion_pipeline.py | 31 ++++++++++++++----- app/web/routers/webhooks.py | 4 ++- .../transcription_ingestion_callback.py | 6 ++-- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index 2cfa536f..c9b66be8 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -10,6 +10,7 @@ from asyncio.log import logger +from app.common.PipelineEnum import PipelineEnum from app.domain.data.metrics.transcription_dto import ( TranscriptionWebhookDTO, TranscriptionSegmentDTO, @@ -112,7 +113,7 @@ def chunk_transcriptions( ) -> List[Dict[str, Any]]: CHUNK_SEPARATOR_CHAR = "\x1F" chunks = [] - + for transcription in transcriptions: slide_chunks = {} for segment in transcription.transcription.segments: @@ -143,11 +144,15 @@ def chunk_transcriptions( for i, segment in enumerate(slide_chunks.values()): if len(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) < 1200: - segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = segment[LectureTranscriptionSchema.SEGMENT_TEXT.value].replace(CHUNK_SEPARATOR_CHAR, " ") + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = segment[ + LectureTranscriptionSchema.SEGMENT_TEXT.value + ].replace(CHUNK_SEPARATOR_CHAR, " ") chunks.append(segment) continue - text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=0) + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=1024, chunk_overlap=0 + ) semantic_chunks = text_splitter.split_text( segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] @@ -155,7 +160,8 @@ def chunk_transcriptions( for j, chunk in enumerate(semantic_chunks): offset_slide_chunk = reduce( - lambda acc, txt: acc + len(txt.replace(CHUNK_SEPARATOR_CHAR, "")), + lambda acc, txt: acc + + len(txt.replace(CHUNK_SEPARATOR_CHAR, "")), map( lambda seg: seg[ LectureTranscriptionSchema.SEGMENT_TEXT.value @@ -166,11 +172,16 @@ def chunk_transcriptions( ) offset_semantic_chunk = reduce( - lambda acc, txt: acc + len(txt.replace(CHUNK_SEPARATOR_CHAR, "")), semantic_chunks[:j], 0 + lambda acc, txt: acc + + len(txt.replace(CHUNK_SEPARATOR_CHAR, "")), + semantic_chunks[:j], + 0, ) offset_start = offset_slide_chunk + offset_semantic_chunk + 1 - offset_end = offset_start + len(chunk.replace(CHUNK_SEPARATOR_CHAR, "")) + offset_end = offset_start + len( + chunk.replace(CHUNK_SEPARATOR_CHAR, "") + ) start_time = self.get_transcription_segment_of_char_position( offset_start, transcription.transcription.segments @@ -184,7 +195,9 @@ def chunk_transcriptions( **segment, LectureTranscriptionSchema.SEGMENT_START.value: start_time, LectureTranscriptionSchema.SEGMENT_END.value: end_time, - LectureTranscriptionSchema.SEGMENT_TEXT.value: chunk.replace(CHUNK_SEPARATOR_CHAR, " ").strip(), + LectureTranscriptionSchema.SEGMENT_TEXT.value: chunk.replace( + CHUNK_SEPARATOR_CHAR, " " + ).strip(), } ) @@ -222,7 +235,9 @@ def summarize_chunks(self, chunks): self.prompt = ChatPromptTemplate.from_messages(prompt_val) try: response = (self.prompt | self.pipeline).invoke({}) - self._append_tokens(self.llm.tokens, PipelineEnum.IRIS_VIDEO_TRANSCRIPTION_INGESTION) + self._append_tokens( + self.llm.tokens, PipelineEnum.IRIS_VIDEO_TRANSCRIPTION_INGESTION + ) chunks_with_summaries.append( { **chunk, diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index ee9dc5e6..d5c0761a 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -88,7 +88,7 @@ def run_transcription_ingestion_pipeline_worker( run_id=dto.settings.authentication_token, base_url=dto.settings.artemis_base_url, initial_stages=dto.initial_stages, - lecture_id=dto.lectureId + lecture_id=dto.lectureId, ) db = VectorDatabase() client = db.get_client() @@ -100,6 +100,7 @@ def run_transcription_ingestion_pipeline_worker( logger.error(f"Error while deleting lectures: {e}") logger.error(traceback.format_exc()) + def run_faq_update_pipeline_worker(dto: FaqIngestionPipelineExecutionDto): """ Run the exercise chat pipeline in a separate thread @@ -190,6 +191,7 @@ def transcription_ingestion_webhook(dto: TranscriptionIngestionPipelineExecution thread = Thread(target=run_transcription_ingestion_pipeline_worker, args=(dto,)) thread.start() + @router.post( "/faqs", status_code=status.HTTP_202_ACCEPTED, diff --git a/app/web/status/transcription_ingestion_callback.py b/app/web/status/transcription_ingestion_callback.py index 38ecff51..580f0e63 100644 --- a/app/web/status/transcription_ingestion_callback.py +++ b/app/web/status/transcription_ingestion_callback.py @@ -21,9 +21,7 @@ def __init__( initial_stages: List[StageDTO] = None, lecture_id: int = None, ): - url = ( - f"{base_url}/api/public/pyris/webhooks/ingestion/transcriptions/runs/{run_id}/status" - ) + url = f"{base_url}/api/public/pyris/webhooks/ingestion/transcriptions/runs/{run_id}/status" current_stage_index = len(initial_stages) if initial_stages else 0 stages = initial_stages or [] @@ -31,7 +29,7 @@ def __init__( StageDTO( weight=10, state=StageStateEnum.NOT_STARTED, - name="Remove old transcription" + name="Remove old transcription", ), StageDTO( weight=20, From b1db0bc56d8a27103158a8a25325311eafe91390 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Tue, 4 Feb 2025 14:14:29 +0100 Subject: [PATCH 10/24] Improve code --- app/domain/data/metrics/transcription_dto.py | 8 ++++---- app/pipeline/transcription_ingestion_pipeline.py | 4 ++-- app/vector_database/lecture_transcription_schema.py | 2 +- app/web/status/transcription_ingestion_callback.py | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/app/domain/data/metrics/transcription_dto.py b/app/domain/data/metrics/transcription_dto.py index c20c27ee..d9ac7d5a 100644 --- a/app/domain/data/metrics/transcription_dto.py +++ b/app/domain/data/metrics/transcription_dto.py @@ -4,20 +4,20 @@ class TranscriptionSegmentDTO(BaseModel): - start_time: float = Field(default="", alias="startTime") - end_time: float = Field(default="", alias="endTime") + start_time: float = Field(default=0.0, alias="startTime") + end_time: float = Field(default=0.0, alias="endTime") text: str = Field(default="", alias="text") slide_number: int = Field(default=0, alias="slideNumber") lecture_unit_id: int = Field(default=0, alias="lectureUnitId") class TranscriptionDTO(BaseModel): - language: str = Field(default="", alias="language") + language: str = Field(default="en", alias="language") segments: List[TranscriptionSegmentDTO] = Field(default=[], alias="segments") class TranscriptionWebhookDTO(BaseModel): - transcription: TranscriptionDTO = Field(default="", alias="transcription") + transcription: TranscriptionDTO = Field(alias="transcription") lecture_id: int = Field(alias="lectureId") lecture_name: str = Field(default="", alias="lectureName") course_id: int = Field(alias="courseId") diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index c9b66be8..e4a3a2f6 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -76,10 +76,10 @@ def __call__(self) -> None: chunks = self.chunk_transcriptions(self.dto.transcriptions) self.callback.in_progress("Summarizing transcriptions") - chunks = self.summarize_chunks(chunks) + # chunks = self.summarize_chunks(chunks) self.callback.in_progress("Ingesting transcriptions into vector database") - self.batch_insert(chunks) + # self.batch_insert(chunks) self.callback.done("Transcriptions ingested successfully") except Exception as e: diff --git a/app/vector_database/lecture_transcription_schema.py b/app/vector_database/lecture_transcription_schema.py index 9685cd09..b8abd382 100644 --- a/app/vector_database/lecture_transcription_schema.py +++ b/app/vector_database/lecture_transcription_schema.py @@ -40,7 +40,7 @@ def init_lecture_transcription_schema(client: WeaviateClient) -> Collection: name=LectureTranscriptionSchema.COURSE_ID.value, description="The ID of the course", data_type=DataType.INT, - index_searable=False, + index_searchable=False, ), Property( name=LectureTranscriptionSchema.COURSE_NAME.value, diff --git a/app/web/status/transcription_ingestion_callback.py b/app/web/status/transcription_ingestion_callback.py index 580f0e63..72e759a3 100644 --- a/app/web/status/transcription_ingestion_callback.py +++ b/app/web/status/transcription_ingestion_callback.py @@ -11,7 +11,7 @@ class TranscriptionIngestionStatus(StatusCallback): """ - Callback class for updating the status of a Faq ingestion Pipeline run. + Callback class for updating the status of a Lecture Transcription ingestion Pipeline run. """ def __init__( From d2c164736c26b9bfbba1b300ccd6708b08576ced Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 4 Feb 2025 14:17:15 +0100 Subject: [PATCH 11/24] add semaphore --- app/web/routers/webhooks.py | 36 ++++++++++--------- .../transcription_ingestion_callback.py | 2 +- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index d5c0761a..5ba06196 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -83,22 +83,26 @@ def run_transcription_ingestion_pipeline_worker( """ Run the transcription ingestion pipeline in a separate thread """ - try: - callback = TranscriptionIngestionStatus( - run_id=dto.settings.authentication_token, - base_url=dto.settings.artemis_base_url, - initial_stages=dto.initial_stages, - lecture_id=dto.lectureId, - ) - db = VectorDatabase() - client = db.get_client() - pipeline = TranscriptionIngestionPipeline( - client=client, dto=dto, callback=callback - ) - pipeline() - except Exception as e: - logger.error(f"Error while deleting lectures: {e}") - logger.error(traceback.format_exc()) + with semaphore: + try: + callback = TranscriptionIngestionStatus( + run_id=dto.settings.authentication_token, + base_url=dto.settings.artemis_base_url, + initial_stages=dto.initial_stages, + lecture_id=dto.lectureId + ) + db = VectorDatabase() + client = db.get_client() + pipeline = TranscriptionIngestionPipeline( + client=client, dto=dto, callback=callback + ) + pipeline() + except Exception as e: + logger.error(f"Error while deleting lectures: {e}") + logger.error(traceback.format_exc()) + capture_exception(e) + finally: + semaphore.release() def run_faq_update_pipeline_worker(dto: FaqIngestionPipelineExecutionDto): diff --git a/app/web/status/transcription_ingestion_callback.py b/app/web/status/transcription_ingestion_callback.py index 72e759a3..a9666aaf 100644 --- a/app/web/status/transcription_ingestion_callback.py +++ b/app/web/status/transcription_ingestion_callback.py @@ -11,7 +11,7 @@ class TranscriptionIngestionStatus(StatusCallback): """ - Callback class for updating the status of a Lecture Transcription ingestion Pipeline run. + Callback class for updating the status of a Transcription ingestion Pipeline run. """ def __init__( From d8b11c914d637e5c1625a4fe185b084075fb497f Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 4 Feb 2025 14:19:39 +0100 Subject: [PATCH 12/24] reformat --- app/web/routers/webhooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index 5ba06196..d14e9e19 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -89,7 +89,7 @@ def run_transcription_ingestion_pipeline_worker( run_id=dto.settings.authentication_token, base_url=dto.settings.artemis_base_url, initial_stages=dto.initial_stages, - lecture_id=dto.lectureId + lecture_id=dto.lectureId, ) db = VectorDatabase() client = db.get_client() From b991690951d36ff63063642f3222cc83eec63780 Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 4 Feb 2025 14:21:33 +0100 Subject: [PATCH 13/24] minor improvements --- app/pipeline/transcription_ingestion_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index e4a3a2f6..c9b66be8 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -76,10 +76,10 @@ def __call__(self) -> None: chunks = self.chunk_transcriptions(self.dto.transcriptions) self.callback.in_progress("Summarizing transcriptions") - # chunks = self.summarize_chunks(chunks) + chunks = self.summarize_chunks(chunks) self.callback.in_progress("Ingesting transcriptions into vector database") - # self.batch_insert(chunks) + self.batch_insert(chunks) self.callback.done("Transcriptions ingested successfully") except Exception as e: From ffd9c4391f71e79f5f05e8e7d0088127f57574bf Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 4 Feb 2025 18:23:52 +0100 Subject: [PATCH 14/24] process transcription by lecture unit --- .../transcription_ingestion_pipeline_execution_dto.py | 2 +- app/web/routers/webhooks.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py index 10673f00..409f00ee 100644 --- a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py +++ b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py @@ -9,7 +9,7 @@ class TranscriptionIngestionPipelineExecutionDto(PipelineExecutionDTO): transcriptions: List[TranscriptionWebhookDTO] - lectureId: int + lectureUnitId: int settings: Optional[PipelineExecutionSettingsDTO] initial_stages: Optional[List[StageDTO]] = Field( default=None, alias="initialStages" diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index d14e9e19..487b5aa2 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -89,7 +89,7 @@ def run_transcription_ingestion_pipeline_worker( run_id=dto.settings.authentication_token, base_url=dto.settings.artemis_base_url, initial_stages=dto.initial_stages, - lecture_id=dto.lectureId, + lecture_id=dto.lectureUnitId, ) db = VectorDatabase() client = db.get_client() From 7fa57e4e2939b1d16833dea47e344e3de7673f70 Mon Sep 17 00:00:00 2001 From: isabella Date: Mon, 10 Feb 2025 15:19:33 +0100 Subject: [PATCH 15/24] fix linter error --- app/pipeline/prompts/transcription_ingestion_prompts.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/pipeline/prompts/transcription_ingestion_prompts.py b/app/pipeline/prompts/transcription_ingestion_prompts.py index 8926a4de..4028c8f1 100644 --- a/app/pipeline/prompts/transcription_ingestion_prompts.py +++ b/app/pipeline/prompts/transcription_ingestion_prompts.py @@ -1,6 +1,8 @@ def transcription_summary_prompt(lecture_name: str, chunk_content: str): return f""" - You are a helpful assistant. A snippet of the spoken content of one lecture of the lecture {lecture_name} will be given to you, summarize the information without adding details and return only the summary nothing more. + You are a helpful assistant. + A snippet of the spoken content of one lecture of the lecture {lecture_name} will be given to you, + summarize the information without adding details and return only the summary nothing more. This is the text you should summarize: {chunk_content} """ From 898327741e7877016b24387dfbb61db69e416408 Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 11 Feb 2025 13:14:07 +0100 Subject: [PATCH 16/24] integrate feedback --- app/domain/data/metrics/transcription_dto.py | 22 +++++++++---------- .../transcription_ingestion_prompts.py | 9 +++++--- .../transcription_ingestion_pipeline.py | 15 +++++++------ .../lecture_transcription_schema.py | 12 +++++----- app/web/routers/webhooks.py | 2 +- 5 files changed, 32 insertions(+), 28 deletions(-) diff --git a/app/domain/data/metrics/transcription_dto.py b/app/domain/data/metrics/transcription_dto.py index d9ac7d5a..3190d0f0 100644 --- a/app/domain/data/metrics/transcription_dto.py +++ b/app/domain/data/metrics/transcription_dto.py @@ -4,22 +4,22 @@ class TranscriptionSegmentDTO(BaseModel): - start_time: float = Field(default=0.0, alias="startTime") - end_time: float = Field(default=0.0, alias="endTime") - text: str = Field(default="", alias="text") + start_time: float = Field(..., alias="startTime") + end_time: float = Field(..., alias="endTime") + text: str = Field(..., alias="text") slide_number: int = Field(default=0, alias="slideNumber") - lecture_unit_id: int = Field(default=0, alias="lectureUnitId") + # lecture_unit_id: int = Field(..., alias="lectureUnitId") class TranscriptionDTO(BaseModel): language: str = Field(default="en", alias="language") - segments: List[TranscriptionSegmentDTO] = Field(default=[], alias="segments") + segments: List[TranscriptionSegmentDTO] = Field(..., alias="segments") class TranscriptionWebhookDTO(BaseModel): - transcription: TranscriptionDTO = Field(alias="transcription") - lecture_id: int = Field(alias="lectureId") - lecture_name: str = Field(default="", alias="lectureName") - course_id: int = Field(alias="courseId") - course_name: str = Field(default="", alias="courseName") - course_description: str = Field(default="", alias="courseDescription") + transcription: TranscriptionDTO = Field(..., alias="transcription") + lecture_id: int = Field(..., alias="lectureId") + lecture_name: str = Field(..., alias="lectureName") + course_id: int = Field(..., alias="courseId") + course_name: str = Field(..., alias="courseName") + # course_description: str = Field(..., alias="courseDescription") diff --git a/app/pipeline/prompts/transcription_ingestion_prompts.py b/app/pipeline/prompts/transcription_ingestion_prompts.py index 4028c8f1..c4ab72b0 100644 --- a/app/pipeline/prompts/transcription_ingestion_prompts.py +++ b/app/pipeline/prompts/transcription_ingestion_prompts.py @@ -1,8 +1,11 @@ def transcription_summary_prompt(lecture_name: str, chunk_content: str): return f""" - You are a helpful assistant. - A snippet of the spoken content of one lecture of the lecture {lecture_name} will be given to you, - summarize the information without adding details and return only the summary nothing more. + You are an excellent tutor with deep expertise in computer science and practical applications, teaching at the university level. + A snippet of the spoken content of one lecture of the lecture {lecture_name} will be given to you. + Please accurately follow the instructions below. + 1. Summarize the information in a clear and accurate manner. + 2. Do not add additional information. + 3. Only answer in complete sentences. This is the text you should summarize: {chunk_content} """ diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index c9b66be8..b55f562d 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -26,6 +26,7 @@ ) from app.llm.langchain import IrisLangchainChatModel from app.pipeline import Pipeline +from app.pipeline.faq_ingestion_pipeline import batch_update_lock from app.pipeline.prompts.transcription_ingestion_prompts import ( transcription_summary_prompt, ) @@ -35,7 +36,7 @@ ) from app.web.status.transcription_ingestion_callback import TranscriptionIngestionStatus -batch_insert_lock = threading.Lock() +batch_insert_lock = batch_update_lock class TranscriptionIngestionPipeline(Pipeline): @@ -74,7 +75,7 @@ def __call__(self) -> None: try: self.callback.in_progress("Chunking transcriptions") chunks = self.chunk_transcriptions(self.dto.transcriptions) - + logger.info("chunked data") self.callback.in_progress("Summarizing transcriptions") chunks = self.summarize_chunks(chunks) @@ -126,8 +127,8 @@ def chunk_transcriptions( LectureTranscriptionSchema.LECTURE_ID.value: transcription.lecture_id, LectureTranscriptionSchema.LECTURE_NAME.value: transcription.lecture_name, LectureTranscriptionSchema.LANGUAGE.value: transcription.transcription.language, - LectureTranscriptionSchema.SEGMENT_START.value: segment.start_time, - LectureTranscriptionSchema.SEGMENT_END.value: segment.end_time, + LectureTranscriptionSchema.SEGMENT_START_TIME.value: segment.start_time, + LectureTranscriptionSchema.SEGMENT_END_TIME.value: segment.end_time, LectureTranscriptionSchema.SEGMENT_TEXT.value: segment.text, LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDES_ID.value: segment.lecture_unit_id, LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDE_NUMBER.value: segment.slide_number, @@ -139,7 +140,7 @@ def chunk_transcriptions( LectureTranscriptionSchema.SEGMENT_TEXT.value ] += (CHUNK_SEPARATOR_CHAR + segment.text) slide_chunks[slide_key][ - LectureTranscriptionSchema.SEGMENT_END.value + LectureTranscriptionSchema.SEGMENT_END_TIME.value ] = segment.end_time for i, segment in enumerate(slide_chunks.values()): @@ -193,8 +194,8 @@ def chunk_transcriptions( chunks.append( { **segment, - LectureTranscriptionSchema.SEGMENT_START.value: start_time, - LectureTranscriptionSchema.SEGMENT_END.value: end_time, + LectureTranscriptionSchema.SEGMENT_START_TIME.value: start_time, + LectureTranscriptionSchema.SEGMENT_END_TIME.value: end_time, LectureTranscriptionSchema.SEGMENT_TEXT.value: chunk.replace( CHUNK_SEPARATOR_CHAR, " " ).strip(), diff --git a/app/vector_database/lecture_transcription_schema.py b/app/vector_database/lecture_transcription_schema.py index b8abd382..bb1215b3 100644 --- a/app/vector_database/lecture_transcription_schema.py +++ b/app/vector_database/lecture_transcription_schema.py @@ -17,8 +17,8 @@ class LectureTranscriptionSchema(Enum): LECTURE_ID = "lecture_id" LECTURE_NAME = "lecture_name" LANGUAGE = "language" - SEGMENT_START = "segment_start" - SEGMENT_END = "segment_end" + SEGMENT_START_TIME = "segment_start_time" + SEGMENT_END_TIME = "segment_end_time" SEGMENT_TEXT = "segment_text" SEGMENT_LECTURE_UNIT_SLIDES_ID = "segment_lecture_unit_slides_id" SEGMENT_LECTURE_UNIT_SLIDE_NUMBER = "segment_lecture_unit_slide_number" @@ -67,14 +67,14 @@ def init_lecture_transcription_schema(client: WeaviateClient) -> Collection: index_searchable=False, ), Property( - name=LectureTranscriptionSchema.SEGMENT_START.value, - description="The start of the segment", + name=LectureTranscriptionSchema.SEGMENT_START_TIME.value, + description="The start time of the segment", data_type=DataType.NUMBER, index_searchable=False, ), Property( - name=LectureTranscriptionSchema.SEGMENT_END.value, - description="The end of the segment", + name=LectureTranscriptionSchema.SEGMENT_END_TIME.value, + description="The end time of the segment", data_type=DataType.NUMBER, index_searchable=False, ), diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index 487b5aa2..4a9b2e23 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -191,7 +191,7 @@ def transcription_ingestion_webhook(dto: TranscriptionIngestionPipelineExecution """ Webhook endpoint to trigger the lecture transcription ingestion pipeline """ - print(f"transcription ingestion got DTO {dto}") + logger.info(f"transcription ingestion got DTO {dto}") thread = Thread(target=run_transcription_ingestion_pipeline_worker, args=(dto,)) thread.start() From 216cdf73df7f35020eaf1fa34a5015f209398afa Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 11 Feb 2025 13:25:59 +0100 Subject: [PATCH 17/24] handle transcriptions on lecture unit base --- app/domain/data/metrics/transcription_dto.py | 1 + app/pipeline/transcription_ingestion_pipeline.py | 4 ++-- app/vector_database/lecture_transcription_schema.py | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/app/domain/data/metrics/transcription_dto.py b/app/domain/data/metrics/transcription_dto.py index 3190d0f0..472bc1bb 100644 --- a/app/domain/data/metrics/transcription_dto.py +++ b/app/domain/data/metrics/transcription_dto.py @@ -22,4 +22,5 @@ class TranscriptionWebhookDTO(BaseModel): lecture_name: str = Field(..., alias="lectureName") course_id: int = Field(..., alias="courseId") course_name: str = Field(..., alias="courseName") + lecture_unit_id: int = Field(..., alias="lectureUnitId") # course_description: str = Field(..., alias="courseDescription") diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index b55f562d..4155b2d7 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -118,7 +118,7 @@ def chunk_transcriptions( for transcription in transcriptions: slide_chunks = {} for segment in transcription.transcription.segments: - slide_key = f"{transcription.lecture_id}_{segment.lecture_unit_id}_{segment.slide_number}" + slide_key = f"{transcription.lecture_id}_{transcription.lecture_unit_id}_{segment.slide_number}" if slide_key not in slide_chunks: chunk = { @@ -126,11 +126,11 @@ def chunk_transcriptions( LectureTranscriptionSchema.COURSE_NAME.value: transcription.course_name, LectureTranscriptionSchema.LECTURE_ID.value: transcription.lecture_id, LectureTranscriptionSchema.LECTURE_NAME.value: transcription.lecture_name, + LectureTranscriptionSchema.LECTURE_UNIT_ID.value: transcription.lecture_unit_id, LectureTranscriptionSchema.LANGUAGE.value: transcription.transcription.language, LectureTranscriptionSchema.SEGMENT_START_TIME.value: segment.start_time, LectureTranscriptionSchema.SEGMENT_END_TIME.value: segment.end_time, LectureTranscriptionSchema.SEGMENT_TEXT.value: segment.text, - LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDES_ID.value: segment.lecture_unit_id, LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDE_NUMBER.value: segment.slide_number, } diff --git a/app/vector_database/lecture_transcription_schema.py b/app/vector_database/lecture_transcription_schema.py index bb1215b3..247a1e93 100644 --- a/app/vector_database/lecture_transcription_schema.py +++ b/app/vector_database/lecture_transcription_schema.py @@ -16,11 +16,11 @@ class LectureTranscriptionSchema(Enum): COURSE_NAME = "course_name" LECTURE_ID = "lecture_id" LECTURE_NAME = "lecture_name" + LECTURE_UNIT_ID = "lecture_unit_id" LANGUAGE = "language" SEGMENT_START_TIME = "segment_start_time" SEGMENT_END_TIME = "segment_end_time" SEGMENT_TEXT = "segment_text" - SEGMENT_LECTURE_UNIT_SLIDES_ID = "segment_lecture_unit_slides_id" SEGMENT_LECTURE_UNIT_SLIDE_NUMBER = "segment_lecture_unit_slide_number" SEGMENT_SUMMARY = "segment_summary" @@ -85,8 +85,8 @@ def init_lecture_transcription_schema(client: WeaviateClient) -> Collection: index_searchable=True, ), Property( - name=LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDES_ID.value, - description="The id of the lecture unit slides of the segment", + name=LectureTranscriptionSchema.LECTURE_UNIT_ID.value, + description="The id of the lecture unit of the transcription", data_type=DataType.INT, index_searchable=False, ), From 5bd5e0d99641a87ea35aa7f427bb93448cf670a8 Mon Sep 17 00:00:00 2001 From: isabella Date: Tue, 11 Feb 2025 13:28:24 +0100 Subject: [PATCH 18/24] fix linters --- app/pipeline/prompts/transcription_ingestion_prompts.py | 3 ++- app/pipeline/transcription_ingestion_pipeline.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/pipeline/prompts/transcription_ingestion_prompts.py b/app/pipeline/prompts/transcription_ingestion_prompts.py index c4ab72b0..33874fcd 100644 --- a/app/pipeline/prompts/transcription_ingestion_prompts.py +++ b/app/pipeline/prompts/transcription_ingestion_prompts.py @@ -1,6 +1,7 @@ def transcription_summary_prompt(lecture_name: str, chunk_content: str): return f""" - You are an excellent tutor with deep expertise in computer science and practical applications, teaching at the university level. + You are an excellent tutor with deep expertise in computer science and practical applications, + teaching at the university level. A snippet of the spoken content of one lecture of the lecture {lecture_name} will be given to you. Please accurately follow the instructions below. 1. Summarize the information in a clear and accurate manner. diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index 4155b2d7..c842221f 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -1,4 +1,3 @@ -import threading from functools import reduce from typing import Optional, List, Dict, Any From 50363965cbaf33e0a4c70c77df26f2893e4eb335 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Tue, 11 Feb 2025 13:30:47 +0100 Subject: [PATCH 19/24] Add semantic chunking --- app/llm/external/openai_embeddings.py | 29 ++++++-- .../request_handler/basic_request_handler.py | 17 ++++- .../transcription_ingestion_pipeline.py | 73 ++++++++++--------- requirements.txt | 4 +- 4 files changed, 82 insertions(+), 41 deletions(-) diff --git a/app/llm/external/openai_embeddings.py b/app/llm/external/openai_embeddings.py index 1f9106e6..842d03d0 100644 --- a/app/llm/external/openai_embeddings.py +++ b/app/llm/external/openai_embeddings.py @@ -1,13 +1,14 @@ import logging from typing import Literal, Any + +from langchain_experimental.text_splitter import SemanticChunker from openai import ( - OpenAI, APIError, APITimeoutError, RateLimitError, InternalServerError, ) -from openai.lib.azure import AzureOpenAI +from langchain_openai import AzureOpenAIEmbeddings, OpenAIEmbeddings from ...llm.external.model import EmbeddingModel import time @@ -16,7 +17,7 @@ class OpenAIEmbeddingModel(EmbeddingModel): model: str api_key: str - _client: OpenAI + _client: OpenAIEmbeddings def embed(self, text: str) -> list[float]: retries = 5 @@ -44,12 +45,30 @@ def embed(self, text: str) -> list[float]: time.sleep(wait_time) raise Exception(f"Failed to get embedding from OpenAI after {retries} retries.") + def split_text_semantically( + self, + text: str, + breakpoint_threshold_type: Literal[ + "percentile", "standard_deviation", "interquartile", "gradient" + ] = "gradient", + breakpoint_threshold_amount: float = 95.0, + min_chunk_size: int = 512, + ): + chunker = SemanticChunker( + self._client, + breakpoint_threshold_type=breakpoint_threshold_type, + breakpoint_threshold_amount=breakpoint_threshold_amount, + min_chunk_size=min_chunk_size, + ) + + return chunker.split_text(text) + class DirectOpenAIEmbeddingModel(OpenAIEmbeddingModel): type: Literal["openai_embedding"] def model_post_init(self, __context: Any) -> None: - self._client = OpenAI(api_key=self.api_key) + self._client = OpenAIEmbeddings(api_key=self.api_key) def __str__(self): return f"OpenAIEmbedding('{self.model}')" @@ -62,7 +81,7 @@ class AzureOpenAIEmbeddingModel(OpenAIEmbeddingModel): api_version: str def model_post_init(self, __context: Any) -> None: - self._client = AzureOpenAI( + self._client = AzureOpenAIEmbeddings( azure_endpoint=self.endpoint, azure_deployment=self.azure_deployment, api_version=self.api_version, diff --git a/app/llm/request_handler/basic_request_handler.py b/app/llm/request_handler/basic_request_handler.py index ada78da1..a24be822 100644 --- a/app/llm/request_handler/basic_request_handler.py +++ b/app/llm/request_handler/basic_request_handler.py @@ -1,4 +1,4 @@ -from typing import Optional, Sequence, Union, Dict, Any, Type, Callable +from typing import Optional, Sequence, Union, Dict, Any, Type, Callable, Literal from langchain_core.tools import BaseTool from pydantic import ConfigDict @@ -46,6 +46,21 @@ def embed(self, text: str) -> list[float]: llm = self.llm_manager.get_llm_by_id(self.model_id) return llm.embed(text) + def split_text_semantically( + self, + text: str, + breakpoint_threshold_type: Literal[ + "percentile", "standard_deviation", "interquartile", "gradient" + ] = "gradient", + breakpoint_threshold_amount: float = 95.0, + min_chunk_size: int = 512, + ): + llm = self.llm_manager.get_llm_by_id(self.model_id) + + return llm.split_text_semantically( + text, breakpoint_threshold_type, breakpoint_threshold_amount, min_chunk_size + ) + def bind_tools( self, tools: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]], diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index c9b66be8..f840bf36 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -5,7 +5,6 @@ from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import Runnable -from langchain_text_splitters import RecursiveCharacterTextSplitter from weaviate import WeaviateClient from asyncio.log import logger @@ -37,6 +36,8 @@ batch_insert_lock = threading.Lock() +CHUNK_SEPARATOR_CHAR = "\31" + class TranscriptionIngestionPipeline(Pipeline): llm: IrisLangchainChatModel @@ -80,6 +81,7 @@ def __call__(self) -> None: self.callback.in_progress("Ingesting transcriptions into vector database") self.batch_insert(chunks) + self.callback.done("Transcriptions ingested successfully") except Exception as e: @@ -111,7 +113,6 @@ def batch_insert(self, chunks): def chunk_transcriptions( self, transcriptions: List[TranscriptionWebhookDTO] ) -> List[Dict[str, Any]]: - CHUNK_SEPARATOR_CHAR = "\x1F" chunks = [] for transcription in transcriptions: @@ -143,44 +144,38 @@ def chunk_transcriptions( ] = segment.end_time for i, segment in enumerate(slide_chunks.values()): + # If the segment is shorter than 1200 characters, we can just add it as is if len(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) < 1200: - segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = segment[ + # Add the segment to the chunks list and replace the chunk separator character with a space + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = self.replace_seperator_char(segment[ LectureTranscriptionSchema.SEGMENT_TEXT.value - ].replace(CHUNK_SEPARATOR_CHAR, " ") + ]) chunks.append(segment) continue - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=1024, chunk_overlap=0 + semantic_chunks = self.llm_embedding.split_text_semantically( + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value], + breakpoint_threshold_type="gradient", + breakpoint_threshold_amount=60.0, + min_chunk_size=512, ) - semantic_chunks = text_splitter.split_text( - segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] + # Calculate the offset of the current slide chunk to the start of the transcript + offset_slide_chunk = reduce( + lambda acc, txt: acc + + len(self.remove_seperator_char(txt)), + map( + lambda seg: seg[ + LectureTranscriptionSchema.SEGMENT_TEXT.value + ], + list(slide_chunks.values())[:i], + ), + 0, ) - + offset_start = offset_slide_chunk for j, chunk in enumerate(semantic_chunks): - offset_slide_chunk = reduce( - lambda acc, txt: acc - + len(txt.replace(CHUNK_SEPARATOR_CHAR, "")), - map( - lambda seg: seg[ - LectureTranscriptionSchema.SEGMENT_TEXT.value - ], - list(slide_chunks.values())[:i], - ), - 0, - ) - - offset_semantic_chunk = reduce( - lambda acc, txt: acc - + len(txt.replace(CHUNK_SEPARATOR_CHAR, "")), - semantic_chunks[:j], - 0, - ) - - offset_start = offset_slide_chunk + offset_semantic_chunk + 1 offset_end = offset_start + len( - chunk.replace(CHUNK_SEPARATOR_CHAR, "") + self.remove_seperator_char(chunk) ) start_time = self.get_transcription_segment_of_char_position( @@ -195,11 +190,10 @@ def chunk_transcriptions( **segment, LectureTranscriptionSchema.SEGMENT_START.value: start_time, LectureTranscriptionSchema.SEGMENT_END.value: end_time, - LectureTranscriptionSchema.SEGMENT_TEXT.value: chunk.replace( - CHUNK_SEPARATOR_CHAR, " " - ).strip(), + LectureTranscriptionSchema.SEGMENT_TEXT.value: self.cleanup_chunk(self.replace_seperator_char(chunk)), } ) + offset_start = offset_end + 1 return chunks @@ -209,7 +203,7 @@ def get_transcription_segment_of_char_position( ): offset_lookup_counter = 0 segment_index = 0 - while offset_lookup_counter < char_position and segment_index < len(segments): + while offset_lookup_counter + len(segments[segment_index].text) < char_position and segment_index < len(segments): offset_lookup_counter += len(segments[segment_index].text) segment_index += 1 @@ -217,6 +211,17 @@ def get_transcription_segment_of_char_position( return segments[-1] return segments[segment_index] + @staticmethod + def cleanup_chunk(text: str): + return text.replace(" ", " ").strip() + + @staticmethod + def replace_seperator_char(text: str, replace_with: str = " ") -> str: + return text.replace(CHUNK_SEPARATOR_CHAR, replace_with) + + def remove_seperator_char(self, text: str) -> str: + return self.replace_seperator_char(text, "") + def summarize_chunks(self, chunks): chunks_with_summaries = [] for chunk in chunks: diff --git a/requirements.txt b/requirements.txt index 3cc93b35..25aae157 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,4 +18,6 @@ weaviate-client==4.9.3 langchain-core~=0.3.17 starlette~=0.41.2 langsmith~=0.1.142 -langchain-text-splitters~=0.3.2 \ No newline at end of file +langchain-text-splitters~=0.3.2 +langchain-experimental~=0.3.3 +langchain-openai~=0.3.3 \ No newline at end of file From 7458423b3254c954e24de30d263e543735731998 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Tue, 11 Feb 2025 13:34:59 +0100 Subject: [PATCH 20/24] Fix linter errors --- .../transcription_ingestion_pipeline.py | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index 1dddbf82..cb86b2f3 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -147,9 +147,11 @@ def chunk_transcriptions( # If the segment is shorter than 1200 characters, we can just add it as is if len(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) < 1200: # Add the segment to the chunks list and replace the chunk separator character with a space - segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = self.replace_separator_char(segment[ - LectureTranscriptionSchema.SEGMENT_TEXT.value - ]) + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = ( + self.replace_separator_char( + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] + ) + ) chunks.append(segment) continue @@ -162,21 +164,16 @@ def chunk_transcriptions( # Calculate the offset of the current slide chunk to the start of the transcript offset_slide_chunk = reduce( - lambda acc, txt: acc - + len(self.remove_separator_char(txt)), + lambda acc, txt: acc + len(self.remove_separator_char(txt)), map( - lambda seg: seg[ - LectureTranscriptionSchema.SEGMENT_TEXT.value - ], + lambda seg: seg[LectureTranscriptionSchema.SEGMENT_TEXT.value], list(slide_chunks.values())[:i], ), 0, ) offset_start = offset_slide_chunk for j, chunk in enumerate(semantic_chunks): - offset_end = offset_start + len( - self.remove_separator_char(chunk) - ) + offset_end = offset_start + len(self.remove_separator_char(chunk)) start_time = self.get_transcription_segment_of_char_position( offset_start, transcription.transcription.segments @@ -190,7 +187,9 @@ def chunk_transcriptions( **segment, LectureTranscriptionSchema.SEGMENT_START_TIME.value: start_time, LectureTranscriptionSchema.SEGMENT_END_TIME.value: end_time, - LectureTranscriptionSchema.SEGMENT_TEXT.value: self.cleanup_chunk(self.replace_separator_char(chunk)), + LectureTranscriptionSchema.SEGMENT_TEXT.value: self.cleanup_chunk( + self.replace_separator_char(chunk) + ), } ) offset_start = offset_end + 1 @@ -203,7 +202,9 @@ def get_transcription_segment_of_char_position( ): offset_lookup_counter = 0 segment_index = 0 - while offset_lookup_counter + len(segments[segment_index].text) < char_position and segment_index < len(segments): + while offset_lookup_counter + len( + segments[segment_index].text + ) < char_position and segment_index < len(segments): offset_lookup_counter += len(segments[segment_index].text) segment_index += 1 From 710fdca276aecc316817c9612fec24fa3133335c Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Mon, 24 Feb 2025 18:07:04 +0100 Subject: [PATCH 21/24] Minor changes --- app/domain/data/metrics/transcription_dto.py | 3 +-- .../transcription_ingestion_pipeline_execution_dto.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/app/domain/data/metrics/transcription_dto.py b/app/domain/data/metrics/transcription_dto.py index 472bc1bb..cba3bd1e 100644 --- a/app/domain/data/metrics/transcription_dto.py +++ b/app/domain/data/metrics/transcription_dto.py @@ -8,7 +8,6 @@ class TranscriptionSegmentDTO(BaseModel): end_time: float = Field(..., alias="endTime") text: str = Field(..., alias="text") slide_number: int = Field(default=0, alias="slideNumber") - # lecture_unit_id: int = Field(..., alias="lectureUnitId") class TranscriptionDTO(BaseModel): @@ -23,4 +22,4 @@ class TranscriptionWebhookDTO(BaseModel): course_id: int = Field(..., alias="courseId") course_name: str = Field(..., alias="courseName") lecture_unit_id: int = Field(..., alias="lectureUnitId") - # course_description: str = Field(..., alias="courseDescription") + course_description: str = Field("", alias="courseDescription") diff --git a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py index 409f00ee..b65d8044 100644 --- a/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py +++ b/app/domain/ingestion/transcription_ingestion/transcription_ingestion_pipeline_execution_dto.py @@ -8,7 +8,7 @@ class TranscriptionIngestionPipelineExecutionDto(PipelineExecutionDTO): - transcriptions: List[TranscriptionWebhookDTO] + transcription: TranscriptionWebhookDTO lectureUnitId: int settings: Optional[PipelineExecutionSettingsDTO] initial_stages: Optional[List[StageDTO]] = Field( From 86245126c05132e79132f994e5378a9a67e03490 Mon Sep 17 00:00:00 2001 From: isabella Date: Fri, 28 Feb 2025 19:26:36 +0100 Subject: [PATCH 22/24] fix endpoint dto --- .../transcription_ingestion_pipeline.py | 159 +++++++++--------- 1 file changed, 79 insertions(+), 80 deletions(-) diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index cb86b2f3..9e9f8d55 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -73,13 +73,13 @@ def __init__( def __call__(self) -> None: try: - self.callback.in_progress("Chunking transcriptions") - chunks = self.chunk_transcriptions(self.dto.transcriptions) + self.callback.in_progress("Chunking transcription") + chunks = self.chunk_transcription(self.dto.transcription) logger.info("chunked data") - self.callback.in_progress("Summarizing transcriptions") + self.callback.in_progress("Summarizing transcription") chunks = self.summarize_chunks(chunks) - self.callback.in_progress("Ingesting transcriptions into vector database") + self.callback.in_progress("Ingesting transcription into vector database") self.batch_insert(chunks) self.callback.done("Transcriptions ingested successfully") @@ -110,89 +110,88 @@ def batch_insert(self, chunks): tokens=self.tokens, ) - def chunk_transcriptions( - self, transcriptions: List[TranscriptionWebhookDTO] + def chunk_transcription( + self, transcription: TranscriptionWebhookDTO ) -> List[Dict[str, Any]]: chunks = [] - for transcription in transcriptions: - slide_chunks = {} - for segment in transcription.transcription.segments: - slide_key = f"{transcription.lecture_id}_{transcription.lecture_unit_id}_{segment.slide_number}" - - if slide_key not in slide_chunks: - chunk = { - LectureTranscriptionSchema.COURSE_ID.value: transcription.course_id, - LectureTranscriptionSchema.COURSE_NAME.value: transcription.course_name, - LectureTranscriptionSchema.LECTURE_ID.value: transcription.lecture_id, - LectureTranscriptionSchema.LECTURE_NAME.value: transcription.lecture_name, - LectureTranscriptionSchema.LECTURE_UNIT_ID.value: transcription.lecture_unit_id, - LectureTranscriptionSchema.LANGUAGE.value: transcription.transcription.language, - LectureTranscriptionSchema.SEGMENT_START_TIME.value: segment.start_time, - LectureTranscriptionSchema.SEGMENT_END_TIME.value: segment.end_time, - LectureTranscriptionSchema.SEGMENT_TEXT.value: segment.text, - LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDE_NUMBER.value: segment.slide_number, - } - - slide_chunks[slide_key] = chunk - else: - slide_chunks[slide_key][ - LectureTranscriptionSchema.SEGMENT_TEXT.value - ] += (CHUNK_SEPARATOR_CHAR + segment.text) - slide_chunks[slide_key][ - LectureTranscriptionSchema.SEGMENT_END_TIME.value - ] = segment.end_time - - for i, segment in enumerate(slide_chunks.values()): - # If the segment is shorter than 1200 characters, we can just add it as is - if len(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) < 1200: - # Add the segment to the chunks list and replace the chunk separator character with a space - segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = ( - self.replace_separator_char( - segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] - ) + slide_chunks = {} + for segment in transcription.transcription.segments: + slide_key = f"{transcription.lecture_id}_{transcription.lecture_unit_id}_{segment.slide_number}" + + if slide_key not in slide_chunks: + chunk = { + LectureTranscriptionSchema.COURSE_ID.value: transcription.course_id, + LectureTranscriptionSchema.COURSE_NAME.value: transcription.course_name, + LectureTranscriptionSchema.LECTURE_ID.value: transcription.lecture_id, + LectureTranscriptionSchema.LECTURE_NAME.value: transcription.lecture_name, + LectureTranscriptionSchema.LECTURE_UNIT_ID.value: transcription.lecture_unit_id, + LectureTranscriptionSchema.LANGUAGE.value: transcription.transcription.language, + LectureTranscriptionSchema.SEGMENT_START_TIME.value: segment.start_time, + LectureTranscriptionSchema.SEGMENT_END_TIME.value: segment.end_time, + LectureTranscriptionSchema.SEGMENT_TEXT.value: segment.text, + LectureTranscriptionSchema.SEGMENT_LECTURE_UNIT_SLIDE_NUMBER.value: segment.slide_number, + } + + slide_chunks[slide_key] = chunk + else: + slide_chunks[slide_key][ + LectureTranscriptionSchema.SEGMENT_TEXT.value + ] += (CHUNK_SEPARATOR_CHAR + segment.text) + slide_chunks[slide_key][ + LectureTranscriptionSchema.SEGMENT_END_TIME.value + ] = segment.end_time + + for i, segment in enumerate(slide_chunks.values()): + # If the segment is shorter than 1200 characters, we can just add it as is + if len(segment[LectureTranscriptionSchema.SEGMENT_TEXT.value]) < 1200: + # Add the segment to the chunks list and replace the chunk separator character with a space + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] = ( + self.replace_separator_char( + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value] ) - chunks.append(segment) - continue - - semantic_chunks = self.llm_embedding.split_text_semantically( - segment[LectureTranscriptionSchema.SEGMENT_TEXT.value], - breakpoint_threshold_type="gradient", - breakpoint_threshold_amount=60.0, - min_chunk_size=512, ) + chunks.append(segment) + continue + + semantic_chunks = self.llm_embedding.split_text_semantically( + segment[LectureTranscriptionSchema.SEGMENT_TEXT.value], + breakpoint_threshold_type="gradient", + breakpoint_threshold_amount=60.0, + min_chunk_size=512, + ) - # Calculate the offset of the current slide chunk to the start of the transcript - offset_slide_chunk = reduce( - lambda acc, txt: acc + len(self.remove_separator_char(txt)), - map( - lambda seg: seg[LectureTranscriptionSchema.SEGMENT_TEXT.value], - list(slide_chunks.values())[:i], - ), - 0, + # Calculate the offset of the current slide chunk to the start of the transcript + offset_slide_chunk = reduce( + lambda acc, txt: acc + len(self.remove_separator_char(txt)), + map( + lambda seg: seg[LectureTranscriptionSchema.SEGMENT_TEXT.value], + list(slide_chunks.values())[:i], + ), + 0, + ) + offset_start = offset_slide_chunk + for j, chunk in enumerate(semantic_chunks): + offset_end = offset_start + len(self.remove_separator_char(chunk)) + + start_time = self.get_transcription_segment_of_char_position( + offset_start, transcription.transcription.segments + ).start_time + end_time = self.get_transcription_segment_of_char_position( + offset_end, transcription.transcription.segments + ).end_time + + chunks.append( + { + **segment, + LectureTranscriptionSchema.SEGMENT_START_TIME.value: start_time, + LectureTranscriptionSchema.SEGMENT_END_TIME.value: end_time, + LectureTranscriptionSchema.SEGMENT_TEXT.value: self.cleanup_chunk( + self.replace_separator_char(chunk) + ), + } ) - offset_start = offset_slide_chunk - for j, chunk in enumerate(semantic_chunks): - offset_end = offset_start + len(self.remove_separator_char(chunk)) - - start_time = self.get_transcription_segment_of_char_position( - offset_start, transcription.transcription.segments - ).start_time - end_time = self.get_transcription_segment_of_char_position( - offset_end, transcription.transcription.segments - ).end_time - - chunks.append( - { - **segment, - LectureTranscriptionSchema.SEGMENT_START_TIME.value: start_time, - LectureTranscriptionSchema.SEGMENT_END_TIME.value: end_time, - LectureTranscriptionSchema.SEGMENT_TEXT.value: self.cleanup_chunk( - self.replace_separator_char(chunk) - ), - } - ) - offset_start = offset_end + 1 + offset_start = offset_end + 1 return chunks From 9ac4624ccf7ffe9e1df09605310362194ecda835 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Fri, 28 Feb 2025 19:39:32 +0100 Subject: [PATCH 23/24] Fix text embedding --- app/llm/external/openai_embeddings.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/app/llm/external/openai_embeddings.py b/app/llm/external/openai_embeddings.py index 842d03d0..cf1b749c 100644 --- a/app/llm/external/openai_embeddings.py +++ b/app/llm/external/openai_embeddings.py @@ -27,12 +27,7 @@ def embed(self, text: str) -> list[float]: for attempt in range(retries): try: - response = self._client.embeddings.create( - model=self.model, - input=text, - encoding_format="float", - ) - return response.data[0].embedding + return self._client.embed_query(text) except ( APIError, APITimeoutError, From 5f34c5a06066b0302977c079b77fd57ff84c1795 Mon Sep 17 00:00:00 2001 From: Sebastian Loose Date: Fri, 28 Feb 2025 20:05:17 +0100 Subject: [PATCH 24/24] Minor improvement --- app/pipeline/transcription_ingestion_pipeline.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/pipeline/transcription_ingestion_pipeline.py b/app/pipeline/transcription_ingestion_pipeline.py index 9e9f8d55..bda58de9 100644 --- a/app/pipeline/transcription_ingestion_pipeline.py +++ b/app/pipeline/transcription_ingestion_pipeline.py @@ -201,9 +201,11 @@ def get_transcription_segment_of_char_position( ): offset_lookup_counter = 0 segment_index = 0 - while offset_lookup_counter + len( - segments[segment_index].text - ) < char_position and segment_index < len(segments): + while ( + segment_index < len(segments) + and offset_lookup_counter + len(segments[segment_index].text) + < char_position + ): offset_lookup_counter += len(segments[segment_index].text) segment_index += 1