Skip to content

Reference

This part of the project documentation focuses on an information-oriented approach. Use it as a reference for the technical implementation of the podflix project code.

podflix package.

db

A module for managing database connections and operations.

db_factory

SqlAlchemy database interface factory and implementations.

DBInterfaceFactory

Singleton factory class for creating database interface instances.

create classmethod
create(db_path='db.sqlite')

Creates and returns appropriate database interface based on environment settings.

Returns the same instance on subsequent calls.

Parameters:

Name Type Description Default
db_path str | Path

Path to SQLite database file. Only used for SQLite interface.

'db.sqlite'

Returns:

Name Type Description
SqlAlchemyDBInterface SqlAlchemyDBInterface

Singleton instance of appropriate database interface.

Source code in src/podflix/db/db_factory.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def create(cls, db_path: str | Path = "db.sqlite") -> SqlAlchemyDBInterface:
    """Creates and returns appropriate database interface based on environment settings.

    Returns the same instance on subsequent calls.

    Args:
        db_path: Path to SQLite database file. Only used for SQLite interface.

    Returns:
        SqlAlchemyDBInterface: Singleton instance of appropriate database interface.
    """
    if cls._db_interface is None:
        if env_settings.enable_sqlite_data_layer is True:
            cls._db_interface = SQLiteDBInterface(db_path)

        # NOTE: Implement postgres
        else:
            cls._db_interface = PostgresDBInterface()

    return cls._db_interface

PostgresDBInterface

Bases: SqlAlchemyDBInterface

PostgreSQL database interface implementation.

async_connection
async_connection()

Returns the async PostgreSQL connection string.

Returns:

Type Description
str

A string representing the async PostgreSQL connection URL.

Source code in src/podflix/db/db_factory.py
106
107
108
109
110
111
112
def async_connection(self) -> str:
    """Returns the async PostgreSQL connection string.

    Returns:
        A string representing the async PostgreSQL connection URL.
    """
    return f"postgresql+asyncpg://{self.get_connection_path()}"
get_connection_path
get_connection_path()

Returns the PostgreSQL connection path.

Returns:

Type Description
str

A string containing the PostgreSQL connection details including user, password, host, port and database.

Source code in src/podflix/db/db_factory.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_connection_path(self) -> str:
    """Returns the PostgreSQL connection path.

    Returns:
        A string containing the PostgreSQL connection details including user, password, host, port and database.
    """
    connection_str = os.getenv("DATABASE_URL", None)

    if connection_str is None:
        raise ValueError("DATABASE_URL environment variable not set")

    # Remove protocol prefix if present
    if connection_str.startswith(("postgresql://", "postgres://")):
        connection_str = connection_str.split("://", 1)[1]

    return connection_str
sync_connection
sync_connection()

Returns the sync PostgreSQL connection string.

Returns:

Type Description
str

A string representing the sync PostgreSQL connection URL.

Source code in src/podflix/db/db_factory.py
114
115
116
117
118
119
120
def sync_connection(self) -> str:
    """Returns the sync PostgreSQL connection string.

    Returns:
        A string representing the sync PostgreSQL connection URL.
    """
    return f"postgresql://{self.get_connection_path()}"

SQLiteDBInterface

SQLiteDBInterface(db_path='db.sqlite')

Bases: SqlAlchemyDBInterface

SQLite database interface implementation.

Source code in src/podflix/db/db_factory.py
57
58
59
def __init__(self, db_path: str | Path = "db.sqlite"):
    self.db_path = Path(db_path)
    self.db_path.parent.mkdir(parents=True, exist_ok=True)
async_connection
async_connection()

Returns the async SQLite connection string.

Returns:

Type Description
str

A string representing the async SQLite connection URL.

Source code in src/podflix/db/db_factory.py
69
70
71
72
73
74
75
def async_connection(self) -> str:
    """Returns the async SQLite connection string.

    Returns:
        A string representing the async SQLite connection URL.
    """
    return f"sqlite+aiosqlite:///{self.get_connection_path()}"
get_connection_path
get_connection_path()

Returns the SQLite database file path.

Returns:

Type Description
str

A string representing the path to the SQLite database file.

Source code in src/podflix/db/db_factory.py
61
62
63
64
65
66
67
def get_connection_path(self) -> str:
    """Returns the SQLite database file path.

    Returns:
        A string representing the path to the SQLite database file.
    """
    return str(self.db_path)
sync_connection
sync_connection()

Returns the sync SQLite connection string.

Returns:

Type Description
str

A string representing the sync SQLite connection URL.

Source code in src/podflix/db/db_factory.py
77
78
79
80
81
82
83
def sync_connection(self) -> str:
    """Returns the sync SQLite connection string.

    Returns:
        A string representing the sync SQLite connection URL.
    """
    return f"sqlite:///{self.get_connection_path()}"

SqlAlchemyDBInterface

Bases: ABC

Abstract base class for database interfaces.

async_connection abstractmethod
async_connection()

Returns the async database connection string.

Returns:

Type Description
str

A string representing the async database connection URL.

Source code in src/podflix/db/db_factory.py
21
22
23
24
25
26
27
@abstractmethod
def async_connection(self) -> str:
    """Returns the async database connection string.

    Returns:
        A string representing the async database connection URL.
    """
check_db_connection
check_db_connection()

Checks if database connection is valid.

Raises:

Type Description
Exception

If database connection fails, with details about the error.

Source code in src/podflix/db/db_factory.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def check_db_connection(self) -> None:
    """Checks if database connection is valid.

    Raises:
        Exception: If database connection fails, with details about the error.
    """
    from sqlalchemy import create_engine

    try:
        engine = create_engine(self.sync_connection())
        with engine.connect() as conn:
            conn.execute("SELECT 1")
    except Exception as e:
        db_type = self.__class__.__name__.replace("DBInterface", "")
        raise Exception(f"{db_type} connection error:\n{e}") from e
get_connection_path abstractmethod
get_connection_path()

Returns the database connection path.

Returns:

Type Description
str

A string representing the database connection path.

Source code in src/podflix/db/db_factory.py
13
14
15
16
17
18
19
@abstractmethod
def get_connection_path(self) -> str:
    """Returns the database connection path.

    Returns:
        A string representing the database connection path.
    """
sync_connection abstractmethod
sync_connection()

Returns the sync database connection string.

Returns:

Type Description
str

A string representing the sync database connection URL.

Source code in src/podflix/db/db_factory.py
29
30
31
32
33
34
35
@abstractmethod
def sync_connection(self) -> str:
    """Returns the sync database connection string.

    Returns:
        A string representing the sync database connection URL.
    """

db_manager

Base database management functionality.

This module provides database management capabilities including initialization, connection handling, and SQL file execution with retry logic.

Examples:

>>> from podflix.db.db_manager import DatabaseManager
>>> db_manager = DatabaseManager()
>>> db_manager.execute_sql_file(Path("init.sql"), True, "Initialize")

The module contains the following class:

  • DatabaseManager - Handles database operations with retry logic.

DatabaseManager

DatabaseManager(max_retries=5, retry_delay=2)

Manages database operations with retry logic and error handling.

This class provides functionality to manage database operations including reading SQL files, checking table existence, and executing SQL statements with retry logic.

Parameters:

Name Type Description Default
max_retries int

Maximum number of connection retry attempts. Defaults to 5.

5
retry_delay int

Initial delay between retries in seconds. Defaults to 2.

2

Examples:

>>> db_manager = DatabaseManager(max_retries=3, retry_delay=1)
>>> db_manager.execute_sql_file(Path("init.sql"), True, "Initialize")
Source code in src/podflix/db/db_manager.py
48
49
50
51
def __init__(self, max_retries: int = 5, retry_delay: int = 2):
    self.max_retries = max_retries
    self.retry_delay = retry_delay
    self.engine = sa.create_engine(DBInterfaceFactory.create().sync_connection())
execute_sql_file
execute_sql_file(sql_file, check_exists, operation_name)

Execute SQL statements from a file with retry logic.

Parameters:

Name Type Description Default
sql_file Path

Path to the SQL file to execute.

required
check_exists bool

If True, checks for existing tables before execution.

required
operation_name str

Name of the operation for logging purposes.

required

Raises:

Type Description
OperationalError

If database connection fails after max retries.

Exception

If SQL statement execution fails.

Examples:

>>> db_manager = DatabaseManager()
>>> db_manager.execute_sql_file(
...     Path("init.sql"),
...     check_exists=True,
...     operation_name="Initialize"
... )
Source code in src/podflix/db/db_manager.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=2, min=2, max=30),
    retry=retry_if_exception_type(OperationalError),
    before_sleep=lambda retry_state: logger.warning(
        f"Database connection attempt {retry_state.attempt_number} failed, retrying..."
    ),
)
def execute_sql_file(self, sql_file: Path, check_exists: bool, operation_name: str):
    """Execute SQL statements from a file with retry logic.

    Args:
        sql_file: Path to the SQL file to execute.
        check_exists: If True, checks for existing tables before execution.
        operation_name: Name of the operation for logging purposes.

    Raises:
        OperationalError: If database connection fails after max retries.
        Exception: If SQL statement execution fails.

    Examples:
        >>> db_manager = DatabaseManager()
        >>> db_manager.execute_sql_file(
        ...     Path("init.sql"),
        ...     check_exists=True,
        ...     operation_name="Initialize"
        ... )
    """
    with self.engine.connect() as conn:
        exists = self.table_exists(conn)
        if check_exists and exists:
            logger.info("Database already initialized, skipping...")
            return

        if not check_exists and not exists:
            logger.info("No tables found in database, nothing to drop...")
            return

        logger.info(f"{operation_name} database...")
        raw_sql_statements = self.read_sql_file(sql_file)

        for stmt in raw_sql_statements:
            try:
                conn.execute(sa.text(stmt))
            except Exception as e:
                logger.error(f"Error executing statement: {e}")
                raise

        conn.commit()
        logger.info(f"Database {operation_name.lower()} completed successfully")
read_sql_file
read_sql_file(file_path)

Read and parse SQL statements from a file.

Parameters:

Name Type Description Default
file_path str | Path

Path to the SQL file to be read.

required

Returns:

Type Description
list[str]

A list of SQL statements parsed from the file.

Examples:

>>> db_manager = DatabaseManager()
>>> statements = db_manager.read_sql_file("init.sql")
>>> len(statements) > 0
True
Source code in src/podflix/db/db_manager.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def read_sql_file(self, file_path: str | Path) -> list[str]:
    """Read and parse SQL statements from a file.

    Args:
        file_path: Path to the SQL file to be read.

    Returns:
        A list of SQL statements parsed from the file.

    Examples:
        >>> db_manager = DatabaseManager()
        >>> statements = db_manager.read_sql_file("init.sql")
        >>> len(statements) > 0
        True
    """
    with Path(file_path).open("r") as f:
        return [x.strip() for x in f.read().split(";") if x.strip()]
table_exists
table_exists(conn)

Check if users table exists in the database.

Parameters:

Name Type Description Default
conn

SQLAlchemy connection object to use for the query.

required

Returns:

Type Description
bool

True if the users table exists, False otherwise.

Raises:

Type Description
OperationalError

If there's a problem connecting to the database.

ProgrammingError

If there's a problem executing the SQL query.

Examples:

>>> db_manager = DatabaseManager()
>>> with db_manager.engine.connect() as conn:
...     exists = db_manager.table_exists(conn)
>>> isinstance(exists, bool)
True
Source code in src/podflix/db/db_manager.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def table_exists(self, conn) -> bool:
    """Check if users table exists in the database.

    Args:
        conn: SQLAlchemy connection object to use for the query.

    Returns:
        True if the users table exists, False otherwise.

    Raises:
        OperationalError: If there's a problem connecting to the database.
        ProgrammingError: If there's a problem executing the SQL query.

    Examples:
        >>> db_manager = DatabaseManager()
        >>> with db_manager.engine.connect() as conn:
        ...     exists = db_manager.table_exists(conn)
        >>> isinstance(exists, bool)
        True
    """
    try:
        if env_settings.enable_sqlite_data_layer is True:
            query = """
                SELECT COUNT(*)
                FROM sqlite_master
                WHERE type='table' AND name='users'
            """
        else:  # postgresql
            query = """
                SELECT EXISTS (
                    SELECT FROM information_schema.tables
                    WHERE table_schema = 'public'
                    AND table_name = 'User'
                );
            """
        result = conn.execute(sa.text(query)).scalar()
        return bool(result)
    except (OperationalError, ProgrammingError) as e:
        logger.error(f"Error checking for tables: {e}")
        return False

init_db

Initialize the database schema using SQL statements from init_db.sql file.

initialize_db

initialize_db(max_retries=5, retry_delay=2)

Initialize the database using SQL statements from init_db.sql file.

Source code in src/podflix/db/init_db.py
 8
 9
10
11
12
13
14
15
def initialize_db(max_retries: int = 5, retry_delay: int = 2):
    """Initialize the database using SQL statements from init_db.sql file."""
    db_manager = DatabaseManager(max_retries, retry_delay)
    db_manager.execute_sql_file(
        Path(__file__).parent / "init_db.sql",
        check_exists=True,
        operation_name="Initializing",
    )

env_settings

Application configuration for environment variables.

EnvSettings

Bases: BaseSettings

This class is used to load environment variables.

They are either from environment or from a .env file and store them as class attributes.

Note
  • environment variables will always take priority over values loaded from a dotenv file
  • environment variable names are case-insensitive
  • environment variable type is inferred from the type hint of the class attribute
  • For environment variables that are not set, a default value should be provided

For more info, see the related pydantic docs: https://docs.pydantic.dev/latest/concepts/pydantic_settings

validate_model_api_base

validate_model_api_base(value, values)

Validate the model API base URL.

Source code in src/podflix/env_settings.py
105
106
107
108
109
110
111
112
113
@field_validator("model_api_base")
def validate_model_api_base(cls, value, values):
    """Validate the model API base URL."""
    if values.data.get("enable_openai_api") is True:
        logger.debug("When OpenAI API is enabled, `model_api_base` environment is ignored and set to OpenAI API.")

        return "https://api.openai.com"

    return value

validate_openai_key

validate_openai_key(value, values)

Validate the OpenAI API key.

Source code in src/podflix/env_settings.py
 95
 96
 97
 98
 99
100
101
102
103
@field_validator("openai_api_key")
def validate_openai_key(cls, value, values):
    """Validate the OpenAI API key."""
    if values.data.get("enable_openai_api") is True and value is None:
        message = "OpenAI API key should be set, when enable_openai_api is True."
        logger.error(message)
        raise ValueError(message)

    return value

allowed_values

allowed_values(v, values)

Validate if a value is in a set of allowed values.

Examples:

>>> allowed_values("a", ["a", "b"])
'a'
>>> allowed_values(1, [1, 2, 3])
1

Parameters:

Name Type Description Default
v

The value to validate

required
values

A collection of allowed values

required

Returns:

Type Description

The validated value if it exists in the allowed values

Raises:

Type Description
AssertionError

If the value is not in the allowed values

Source code in src/podflix/env_settings.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def allowed_values(v, values):
    """Validate if a value is in a set of allowed values.

    Examples:
        >>> allowed_values("a", ["a", "b"])
        'a'
        >>> allowed_values(1, [1, 2, 3])
        1

    Args:
        v: The value to validate
        values: A collection of allowed values

    Returns:
        The validated value if it exists in the allowed values

    Raises:
        AssertionError: If the value is not in the allowed values
    """
    assert v in values
    return v

graph

Module about LLM interaction using Langchain Langgprah nodes.

mock

Define the mock graph for the Podflix agent.

AgentState

Bases: TypedDict

A dictionary representing the state of the agent.

mock_answer async

mock_answer(state)

Generate a mock response using a random choice from predefined messages.

The function simulates an AI model's response by randomly selecting from a predefined list of messages and returning it as an AIMessage.

Examples:

>>> state = {"messages": [HumanMessage(content="Hello")]}
>>> response = await mock_answer(state)
>>> isinstance(response["messages"][0], AIMessage)
True

Parameters:

Name Type Description Default
state AgentState

A dictionary containing the current conversation state, including: - messages: A sequence of BaseMessage objects representing the conversation history.

required

Returns:

Type Description

A dictionary containing: - messages: A list with a single AIMessage containing the random response.

Source code in src/podflix/graph/mock.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def mock_answer(state: AgentState):
    """Generate a mock response using a random choice from predefined messages.

    The function simulates an AI model's response by randomly selecting from a
    predefined list of messages and returning it as an AIMessage.

    Examples:
        >>> state = {"messages": [HumanMessage(content="Hello")]}
        >>> response = await mock_answer(state)
        >>> isinstance(response["messages"][0], AIMessage)
        True

    Args:
        state: A dictionary containing the current conversation state, including:
            - messages: A sequence of BaseMessage objects representing the conversation history.

    Returns:
        A dictionary containing:
            - messages: A list with a single AIMessage containing the random response.
    """
    random_response = random.choice(MOCK_RESPONSES)

    model = get_mock_model(message=random_response)
    _ = await model.ainvoke("mock")

    return {"messages": [AIMessage(random_response)]}

podcast_rag

Define the RAG-based graph for the Podflix agent.

AgentState

Bases: TypedDict

A dictionary representing the state of the agent.

generate async

generate(state)

Generate a response using the retrieved context.

Source code in src/podflix/graph/podcast_rag.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def generate(state: AgentState) -> AgentState:
    """Generate a response using the retrieved context."""
    question = state["messages"][-1].content
    context = state["context"]

    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "Use the following context to answer the question: {context}"),
            ("human", "{question}"),
        ]
    )

    model = get_chat_model()
    chain = prompt | model | StrOutputParser()

    response = await chain.ainvoke({"context": context, "question": question})

    return {
        "messages": [AIMessage(content=response)],
    }

retrieve async

retrieve(state)

Retrieve relevant context based on the user's question.

Source code in src/podflix/graph/podcast_rag.py
22
23
24
25
26
27
28
29
async def retrieve(state: AgentState) -> AgentState:
    """Retrieve relevant context based on the user's question."""
    # TODO: Implement actual retrieval logic
    # This is a placeholder that should be replaced with your vector store retrieval
    # question = state["messages"][-1].content
    # context = f"Retrieved context for: {question}"
    # return {"messages": state["messages"]}
    return {}

gui

Chainlit UI module.

base_chat

settings_update async

settings_update(settings)

Update settings when changed in UI

Source code in src/podflix/gui/base_chat.py
58
59
60
61
@cl.on_settings_update
async def settings_update(settings: cl.ChatSettings) -> None:
    """Update settings when changed in UI"""
    cl.user_session.set("settings", OpenAIChatGenerationSettings(**settings))

placeholder

Placeholder file to provide several sample math calculations.

This module allows the user to make mathematical calculations. Adapted from: https://realpython.com/python-project-documentation-with-mkdocs/

Examples:

>>> from podflix import placeholder
>>> placeholder.add(2, 4)
6.0
>>> placeholder.multiply(2.0, 4.0)
8.0
>>> from podflix.placeholder import divide
>>> divide(4.0, 2)
2.0

The module contains the following functions:

  • add(a, b) - Returns the sum of two numbers.
  • subtract(a, b) - Returns the difference of two numbers.
  • multiply(a, b) - Returns the product of two numbers.
  • divide(a, b) - Returns the quotient of two numbers.

add

add(a, b)

Compute and return the sum of two numbers.

Examples:

>>> add(4.0, 2.0)
6.0
>>> add(4, 2)
6.0

Parameters:

Name Type Description Default
a float | int

A number representing the first addend in the addition.

required
b float | int

A number representing the second addend in the addition.

required

Returns:

Type Description
float

A number representing the arithmetic sum result of a and b.

Source code in src/podflix/placeholder.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def add(a: float | int, b: float | int) -> float:
    """Compute and return the sum of two numbers.

    Examples:
        >>> add(4.0, 2.0)
        6.0
        >>> add(4, 2)
        6.0

    Args:
        a: A number representing the first addend in the addition.
        b: A number representing the second addend in the addition.

    Returns:
        A number representing the arithmetic sum result of `a` and `b`.
    """
    return float(a + b)

divide

divide(a, b)

Compute and return the division of two numbers.

Examples:

>>> divide(4.0, 2.0)
2.0
>>> divide(4, 2)
2.0

Parameters:

Name Type Description Default
a float | int

A number representing the first divider in the divide.

required
b float | int

A number representing the second divider in the divide.

required

Returns:

Type Description
float

A number representing the division result of a and b.

Raises:

Type Description
ZeroDivisionError

If b is zero.

Source code in src/podflix/placeholder.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def divide(a: float | int, b: float | int) -> float:
    """Compute and return the division of two numbers.

    Examples:
        >>> divide(4.0, 2.0)
        2.0
        >>> divide(4, 2)
        2.0

    Args:
        a: A number representing the first divider in the divide.
        b: A number representing the second divider in the divide.

    Returns:
        A number representing the division result of `a` and `b`.

    Raises:
        ZeroDivisionError: If `b` is zero.
    """
    if b == 0:
        raise ZeroDivisionError("division by zero")

    return float(a / b)

multiply

multiply(a, b)

Compute and return the multiplication of two numbers.

Examples:

>>> multiply(4.0, 2.0)
8.0
>>> multiply(4, 2)
8.0

Parameters:

Name Type Description Default
a float | int

A number representing the first multiplicator in the multiply.

required
b float | int

A number representing the second multiplicator in the multiply.

required

Returns:

Type Description
float

A number representing the multiplied result of a and b.

Source code in src/podflix/placeholder.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def multiply(a: float | int, b: float | int) -> float:
    """Compute and return the multiplication of two numbers.

    Examples:
        >>> multiply(4.0, 2.0)
        8.0
        >>> multiply(4, 2)
        8.0

    Args:
        a: A number representing the first multiplicator in the multiply.
        b: A number representing the second multiplicator in the multiply.

    Returns:
        A number representing the multiplied result of `a` and `b`.
    """
    return float(a * b)

subtract

subtract(a, b)

Compute and return the substaction of two numbers.

Examples:

>>> subtract(4.0, 2.0)
2.0
>>> subtract(4, 2)
2.0

Parameters:

Name Type Description Default
a float | int

A number representing the first substracter in the substract.

required
b float | int

A number representing the second substracter in the substract.

required

Returns:

Type Description
float

A number representing the substract result of a and b.

Source code in src/podflix/placeholder.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def subtract(a: float | int, b: float | int) -> float:
    """Compute and return the substaction of two numbers.

    Examples:
        >>> subtract(4.0, 2.0)
        2.0
        >>> subtract(4, 2)
        2.0

    Args:
        a: A number representing the first substracter in the substract.
        b:  A number representing the second substracter in the substract.

    Returns:
        A number representing the substract result of `a` and `b`.
    """
    return float(a - b)

utils

Utility functions.

chainlit_utils

Utility functions.

data_layer

Utility functions for working with ChainLit data layer.

This module provides utility functions for configuring and working with ChainLit's data layer, including S3 storage integration and SQLAlchemy database connections.

apply_sqlite_data_layer_fixes
apply_sqlite_data_layer_fixes()

Apply necessary fixes for SQLite data layer configuration.

This function applies patches and configurations specific to SQLite data layer when it is enabled in the environment settings.

Examples:

>>> apply_sqlite_data_layer_fixes()  # Applies fixes if SQLite is enabled

Returns:

Type Description

None

Source code in src/podflix/utils/chainlit_utils/data_layer.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def apply_sqlite_data_layer_fixes():
    """Apply necessary fixes for SQLite data layer configuration.

    This function applies patches and configurations specific to SQLite data layer
    when it is enabled in the environment settings.

    Examples:
        >>> apply_sqlite_data_layer_fixes()  # Applies fixes if SQLite is enabled

    Returns:
        None
    """
    if env_settings.enable_sqlite_data_layer is False:
        return

    from podflix.utils.chainlit_utils.patch_chainlit import custom_resume_thread

    chainlit.socket.resume_thread = custom_resume_thread

    @cl.data_layer
    def data_layer():
        return get_custom_sqlalchemy_data_layer(show_logger=True)
check_s3_credentials
check_s3_credentials(boto_client)

Check if the AWS S3 credentials are valid by attempting to list buckets.

Examples:

>>> s3_client = get_s3_storage_client()
>>> check_s3_credentials(s3_client.client)  # No error if credentials are valid

Parameters:

Name Type Description Default
boto_client client

A boto3 client instance configured for AWS S3.

required

Returns:

Type Description
None

None

Raises:

Type Description
Exception

If connection to AWS S3 fails using provided credentials.

Source code in src/podflix/utils/chainlit_utils/data_layer.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def check_s3_credentials(boto_client: boto3.client) -> None:
    """Check if the AWS S3 credentials are valid by attempting to list buckets.

    Examples:
        >>> s3_client = get_s3_storage_client()
        >>> check_s3_credentials(s3_client.client)  # No error if credentials are valid

    Args:
        boto_client: A boto3 client instance configured for AWS S3.

    Returns:
        None

    Raises:
        Exception: If connection to AWS S3 fails using provided credentials.
    """
    try:
        boto_client.list_buckets()
        logger.debug("AWS S3 Auth Check Passed")
    except Exception as e:
        logger.error(f"AWS S3 Auth Check Error: {e}")
        raise e
get_custom_sqlalchemy_data_layer
get_custom_sqlalchemy_data_layer(show_logger=False, enable_s3_storage_provider=False)

Create and configure a custom SQLAlchemy data layer instance.

This function sets up a SQLAlchemy data layer with optional S3 storage integration and logging capabilities.

Examples:

>>> data_layer = get_custom_sqlalchemy_data_layer()
>>> isinstance(data_layer, SQLAlchemyDataLayer)
True
>>> data_layer_with_s3 = get_custom_sqlalchemy_data_layer(enable_s3_storage_provider=True)
>>> data_layer_with_logging = get_custom_sqlalchemy_data_layer(show_logger=True)

Parameters:

Name Type Description Default
show_logger bool

Whether to enable SQL query logging. Defaults to False.

False
enable_s3_storage_provider bool

Whether to enable S3 storage integration. Defaults to False.

False

Returns:

Name Type Description
SQLAlchemyDataLayer SQLAlchemyDataLayer

A configured data layer instance.

Raises:

Type Description
ValueError

If S3 storage is enabled but credentials are invalid.

Source code in src/podflix/utils/chainlit_utils/data_layer.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def get_custom_sqlalchemy_data_layer(
    show_logger: bool = False, enable_s3_storage_provider: bool = False
) -> SQLAlchemyDataLayer:
    """Create and configure a custom SQLAlchemy data layer instance.

    This function sets up a SQLAlchemy data layer with optional S3 storage integration
    and logging capabilities.

    Examples:
        >>> data_layer = get_custom_sqlalchemy_data_layer()
        >>> isinstance(data_layer, SQLAlchemyDataLayer)
        True
        >>> data_layer_with_s3 = get_custom_sqlalchemy_data_layer(enable_s3_storage_provider=True)
        >>> data_layer_with_logging = get_custom_sqlalchemy_data_layer(show_logger=True)

    Args:
        show_logger: Whether to enable SQL query logging. Defaults to False.
        enable_s3_storage_provider: Whether to enable S3 storage integration. Defaults to False.

    Returns:
        SQLAlchemyDataLayer: A configured data layer instance.

    Raises:
        ValueError: If S3 storage is enabled but credentials are invalid.
    """
    if enable_s3_storage_provider is True:
        storage_client = get_s3_storage_client()

        check_s3_credentials(boto_client=storage_client.client)
    else:
        storage_client = None

    return SQLAlchemyDataLayer(
        DBInterfaceFactory.create().async_connection(),
        ssl_require=False,
        show_logger=show_logger,
        storage_provider=storage_client,
    )
get_element_url async
get_element_url(data_layer, thread_id, element_id)

Retrieve the URL for accessing an element's file content.

Examples:

>>> data_layer = get_custom_sqlalchemy_data_layer()
>>> url = await get_element_url(data_layer, "thread123", "element456")
>>> print(url)  # None if not found, URL string if exists

Parameters:

Name Type Description Default
data_layer SQLAlchemyDataLayer

The SQLAlchemy data layer instance to use for retrieval.

required
thread_id str

The unique identifier of the thread containing the element.

required
element_id str

The unique identifier of the element to retrieve.

required

Returns:

Type Description
str | None

str | None: The URL string if the element exists, None otherwise.

Source code in src/podflix/utils/chainlit_utils/data_layer.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
async def get_element_url(
    data_layer: SQLAlchemyDataLayer, thread_id: str, element_id: str
) -> str | None:
    """Retrieve the URL for accessing an element's file content.

    Examples:
        >>> data_layer = get_custom_sqlalchemy_data_layer()
        >>> url = await get_element_url(data_layer, "thread123", "element456")
        >>> print(url)  # None if not found, URL string if exists

    Args:
        data_layer: The SQLAlchemy data layer instance to use for retrieval.
        thread_id: The unique identifier of the thread containing the element.
        element_id: The unique identifier of the element to retrieve.

    Returns:
        str | None: The URL string if the element exists, None otherwise.
    """
    logger.debug(
        f"SQLAlchemy: get_element_url, thread_id={thread_id}, element_id={element_id}"
    )

    element_dict: ElementDict = data_layer.get_element(
        thread_id=thread_id, element_id=element_id
    )

    if element_dict is None:
        return None

    return element_dict.url
get_read_url_of_file async
get_read_url_of_file(thread_id, file_id)

Retrieve the URL for accessing an file in a thread.

Examples:

>>> data_layer = ChainlitDataLayer()
>>> url = await get_read_url_of_file(data_layer, "thread123", "audio.mp3")
>>> print(url)  # URL string

Parameters:

Name Type Description Default
thread_id str

The unique identifier of the thread containing the file.

required
file_id str

The file id of the file to retrieve.

required

Returns:

Name Type Description
str str

The S3 URL string of the file.

Raises:

Type Description
ValueError

If S3 storage client is not configured in the data layer.

Source code in src/podflix/utils/chainlit_utils/data_layer.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
async def get_read_url_of_file(thread_id: str, file_id: str) -> str:
    """Retrieve the URL for accessing an file in a thread.

    Examples:
        >>> data_layer = ChainlitDataLayer()
        >>> url = await get_read_url_of_file(data_layer, "thread123", "audio.mp3")
        >>> print(url)  # URL string

    Args:
        thread_id: The unique identifier of the thread containing the file.
        file_id: The file id of the file to retrieve.

    Returns:
        str: The S3 URL string of the file.

    Raises:
        ValueError: If S3 storage client is not configured in the data layer.
    """
    cl_data_layer = get_data_layer()

    if cl_data_layer.storage_client is None:
        raise ValueError("S3 storage client not set in data layer.")

    object_key = f"threads/{thread_id}/files/{file_id}"

    return await cl_data_layer.storage_client.get_read_url(object_key=object_key)
get_s3_storage_client
get_s3_storage_client()

Get the S3 storage client configured with environment credentials.

Examples:

>>> client = get_s3_storage_client()
>>> isinstance(client, S3StorageClient)
True

Returns:

Name Type Description
S3StorageClient S3StorageClient

An initialized S3 storage client instance.

Raises:

Type Description
ValueError

If required AWS S3 credentials are not set in environment variables.

Source code in src/podflix/utils/chainlit_utils/data_layer.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def get_s3_storage_client() -> S3StorageClient:
    """Get the S3 storage client configured with environment credentials.

    Examples:
        >>> client = get_s3_storage_client()
        >>> isinstance(client, S3StorageClient)
        True

    Returns:
        S3StorageClient: An initialized S3 storage client instance.

    Raises:
        ValueError: If required AWS S3 credentials are not set in environment variables.
    """
    BUCKET_NAME = os.getenv("BUCKET_NAME", None)
    APP_AWS_ACCESS_KEY = os.getenv("APP_AWS_ACCESS_KEY", None)
    APP_AWS_SECRET_KEY = os.getenv("APP_AWS_SECRET_KEY", None)
    APP_AWS_REGION = os.getenv("APP_AWS_REGION", None)
    DEV_AWS_ENDPOINT = os.getenv("DEV_AWS_ENDPOINT", None)

    if not all(
        [
            BUCKET_NAME,
            APP_AWS_ACCESS_KEY,
            APP_AWS_SECRET_KEY,
            APP_AWS_REGION,
            DEV_AWS_ENDPOINT,
        ]
    ):
        raise ValueError("AWS S3 credentials not set in environment variables.")

    return S3StorageClient(
        bucket=BUCKET_NAME,
        region_name=APP_AWS_REGION,
        aws_access_key_id=APP_AWS_ACCESS_KEY,
        aws_secret_access_key=APP_AWS_SECRET_KEY,
        endpoint_url=DEV_AWS_ENDPOINT,
    )

general

Utilies for chainlit UI.

create_message_history_from_db_thread
create_message_history_from_db_thread(thread)

Create message history from the thread steps.

Examples:

>>> thread = {"steps": [{"type": "user_message", "output": "hello", "createdAt": 1}]}
>>> history = create_message_history_from_db_thread(thread)
>>> len(history.messages)
1

Parameters:

Name Type Description Default
thread ThreadDict

A ThreadDict object containing the conversation thread data.

required

Returns:

Type Description
ChatMessageHistory

A ChatMessageHistory object containing the processed message history.

Source code in src/podflix/utils/chainlit_utils/general.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def create_message_history_from_db_thread(
    thread: ThreadDict,
) -> ChatMessageHistory:
    """Create message history from the thread steps.

    Examples:
        >>> thread = {"steps": [{"type": "user_message", "output": "hello", "createdAt": 1}]}
        >>> history = create_message_history_from_db_thread(thread)
        >>> len(history.messages)
        1

    Args:
        thread: A ThreadDict object containing the conversation thread data.

    Returns:
        A ChatMessageHistory object containing the processed message history.
    """
    message_history = ChatMessageHistory()

    # TODO: This is a workaround to sort the messages based on createdAt.
    steps_messages = sorted(
        [
            message
            for message in thread["steps"]
            if message["type"] in ["user_message", "assistant_message"]
        ],
        key=lambda x: x["createdAt"],
    )

    for steps_message in steps_messages:
        if steps_message["type"] == "user_message":
            message_history.add_user_message(steps_message["output"])
        elif steps_message["type"] == "assistant_message":
            message_history.add_ai_message(steps_message["output"])
        else:
            logger.warning(f"Unknown message type: {steps_message['type']}")

    return message_history
get_current_chainlit_thread_id
get_current_chainlit_thread_id()

Get the current Chainlit thread ID.

Source code in src/podflix/utils/chainlit_utils/general.py
131
132
133
def get_current_chainlit_thread_id() -> str:
    """Get the current Chainlit thread ID."""
    return cl.context.session.thread_id
set_extra_user_session_params
set_extra_user_session_params(session_id=None, user_id=None, message_history=None)

Set extra user session parameters for the chainlit session.

Examples:

>>> set_extra_user_session_params(session_id="test123")
>>> cl.user_session.get("session_id")
'test123'

Parameters:

Name Type Description Default
session_id str | None

Optional string representing the session ID. If None, a new UUID is generated.

None
user_id str | None

Optional string representing the user ID. If None, gets from current user session.

None
message_history ChatMessageHistory | None

Optional ChatMessageHistory object. If None, creates new empty history.

None

Returns:

Type Description

None

Source code in src/podflix/utils/chainlit_utils/general.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def set_extra_user_session_params(
    session_id: str | None = None,
    user_id: str | None = None,
    message_history: ChatMessageHistory | None = None,
):
    """Set extra user session parameters for the chainlit session.

    Examples:
        >>> set_extra_user_session_params(session_id="test123")
        >>> cl.user_session.get("session_id")
        'test123'

    Args:
        session_id: Optional string representing the session ID. If None, a new UUID is generated.
        user_id: Optional string representing the user ID. If None, gets from current user session.
        message_history: Optional ChatMessageHistory object. If None, creates new empty history.

    Returns:
        None
    """
    if session_id is None:
        session_id = str(uuid4())

    if user_id is None:
        chainlit_user: Chainlit_User_Type = cl.user_session.get("user")
        user_id = chainlit_user.identifier

    if message_history is None:
        message_history = ChatMessageHistory()

    check_lf_credentials()
    lf_cb_handler = LangfuseCallbackHandler(
        user_id=user_id,
        session_id=session_id,
    )

    cl.user_session.set("session_id", session_id)
    cl.user_session.set("lf_cb_handler", lf_cb_handler)
    cl.user_session.set("message_history", message_history)

    langfuse_session_url = get_lf_session_url(session_id=session_id)

    logger.debug(f"Langfuse Session URL: {langfuse_session_url}")
set_mock_elements async
set_mock_elements()

Set mock elements for the sidebar.

Source code in src/podflix/utils/chainlit_utils/general.py
136
137
138
139
140
141
142
143
144
145
146
147
async def set_mock_elements():
    """Set mock elements for the sidebar."""
    sidebar_mock_elements = [
        cl.Text(content="Here is a side text document", name="text1"),
        cl.Image(
            path=f"{env_settings.library_base_path}/configs/chainlit/public/banner.png",
            name="banner",
        ),
    ]

    await cl.ElementSidebar.set_elements(sidebar_mock_elements)
    await cl.ElementSidebar.set_title("Sidebar Mock Title")
simple_auth_callback
simple_auth_callback(username, password)

Authenticate user with simple username and password check.

Examples:

>>> simple_auth_callback("admin", "admin")
User(identifier="admin", metadata={"role": "admin", "provider": "credentials"})

Parameters:

Name Type Description Default
username str

A string representing the username for authentication.

required
password str

A string representing the password for authentication.

required

Returns:

Type Description
User

A User object if authentication is successful.

Raises:

Type Description
ValueError

If credentials are invalid.

Source code in src/podflix/utils/chainlit_utils/general.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def simple_auth_callback(username: str, password: str) -> User:
    """Authenticate user with simple username and password check.

    Examples:
        >>> simple_auth_callback("admin", "admin")
        User(identifier="admin", metadata={"role": "admin", "provider": "credentials"})

    Args:
        username: A string representing the username for authentication.
        password: A string representing the password for authentication.

    Returns:
        A User object if authentication is successful.

    Raises:
        ValueError: If credentials are invalid.
    """
    if (username, password) == (
        env_settings.chainlit_user_name,
        env_settings.chainlit_user_password,
    ):
        return cl.User(
            identifier=username, metadata={"role": "admin", "provider": "credentials"}
        )

    raise ValueError("Invalid credentials")

patch_chainlit

custom_resume_thread async
custom_resume_thread(session)

Resume a thread and set the user session parameters.

NOTE: This is a workaround to fix the issue of the chatbot not resuming the thread on sqlite data layer.

Source code in src/podflix/utils/chainlit_utils/patch_chainlit.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def custom_resume_thread(session: WebsocketSession):
    """Resume a thread and set the user session parameters.

    NOTE: This is a workaround to fix the issue of the chatbot not resuming the thread
    on sqlite data layer.
    """
    data_layer = get_data_layer()
    if not data_layer or not session.user or not session.thread_id_to_resume:
        return
    thread = await data_layer.get_thread(thread_id=session.thread_id_to_resume)
    if not thread:
        return

    author = thread.get("userIdentifier")
    user_is_author = author == session.user.identifier

    if user_is_author:
        metadata = thread.get("metadata") or {}

        # NOTE: Original code
        # user_sessions[session.id] = metadata.copy()

        # NOTE: Patched code
        if isinstance(metadata, str):
            try:
                metadata = json.loads(metadata)
            except json.JSONDecodeError:
                metadata = {}

        if isinstance(metadata, dict):
            user_sessions[session.id] = metadata.copy()
        else:
            user_sessions[session.id] = {}
        # End of patch

        if chat_profile := metadata.get("chat_profile"):
            session.chat_profile = chat_profile
        if chat_settings := metadata.get("chat_settings"):
            session.chat_settings = chat_settings

        trace_event("thread_resumed")

        return thread

setting_widgets

Utility functions for converting Pydantic models to Chainlit settings.

convert_pydantic_model_to_chainlit_settings
convert_pydantic_model_to_chainlit_settings(model_class, prefix='')

Convert a Pydantic model to Chainlit settings.

Parameters:

Name Type Description Default
model_class Type[BaseModel]

Pydantic model class to convert

required
prefix str

Prefix for widget labels (e.g., 'OpenAI - ')

''

Returns:

Type Description
ChatSettings

cl.ChatSettings instance

Source code in src/podflix/utils/chainlit_utils/setting_widgets.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def convert_pydantic_model_to_chainlit_settings(
    model_class: Type[BaseModel], prefix: str = ""
) -> cl.ChatSettings:
    """Convert a Pydantic model to Chainlit settings.

    Args:
        model_class: Pydantic model class to convert
        prefix: Prefix for widget labels (e.g., 'OpenAI - ')

    Returns:
        cl.ChatSettings instance
    """
    settings = model_class()
    model_fields = model_class.model_fields
    chat_settings = []

    for field_name, field_info in model_fields.items():
        current_value = getattr(settings, field_name)
        field_type = field_info.annotation
        description = field_info.description
        json_schema_extra = field_info.json_schema_extra or {}
        metadata = field_info.metadata or []

        label = f"{prefix}{field_name.replace('_', ' ').title()}"

        # Default widget mapping
        if field_type is bool:
            chat_settings.append(
                Switch(
                    id=field_name,
                    label=label,
                    initial=current_value,
                    description=description,
                )
            )
        elif field_type in (float, int):
            min_val = None
            max_val = None

            # Extract min/max from metadata
            for constraint in metadata:
                if hasattr(constraint, "ge"):
                    min_val = constraint.ge
                elif hasattr(constraint, "le"):
                    max_val = constraint.le

            # Use defaults if not found in metadata
            min_val = min_val if min_val is not None else -100
            max_val = max_val if max_val is not None else 32000
            step = 0.1 if field_type is float else 1

            chat_settings.append(
                Slider(
                    id=field_name,
                    label=label,
                    initial=current_value,
                    min=min_val,
                    max=max_val,
                    step=step,
                    description=description,
                )
            )
        elif "choices" in json_schema_extra:
            choices = json_schema_extra["choices"]
            if not isinstance(choices[0], str):
                logger.debug(f"Skipping {field_name} due to non-string choices")
                continue

            chat_settings.append(
                Select(
                    id=field_name,
                    label=label,
                    initial_value=current_value,
                    values=choices,
                    description=description,
                )
            )

    return cl.ChatSettings(chat_settings)
get_openai_chat_settings
get_openai_chat_settings()

Get chat settings for OpenAI chat completion parameters.

Source code in src/podflix/utils/chainlit_utils/setting_widgets.py
94
95
96
97
98
def get_openai_chat_settings():
    """Get chat settings for OpenAI chat completion parameters."""
    return convert_pydantic_model_to_chainlit_settings(
        model_class=OpenAIChatGenerationSettings
    )

general

General utility functions.

check_env_vars

check_env_vars(env_vars=None)

Checks if the required environment variables are set.

Examples:

>>> check_env_vars(['API_KEY', 'SECRET_KEY'])
None
>>> check_env_vars(None)
None
>>> check_env_vars(['NONEXISTENT_VAR'])
Traceback (most recent call last):
ValueError: Please set NONEXISTENT_VAR env var.

Parameters:

Name Type Description Default
env_vars list[str] | None

List of environment variables to check. Defaults to None.

None

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

If any of the environment variables are not set.

Source code in src/podflix/utils/general.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def check_env_vars(env_vars: list[str] | None = None) -> None:
    """Checks if the required environment variables are set.

    Examples:
        >>> check_env_vars(['API_KEY', 'SECRET_KEY'])
        None
        >>> check_env_vars(None)
        None
        >>> check_env_vars(['NONEXISTENT_VAR'])
        Traceback (most recent call last):
        ValueError: Please set NONEXISTENT_VAR env var.

    Args:
        env_vars: List of environment variables to check. Defaults to None.

    Returns:
        None

    Raises:
        ValueError: If any of the environment variables are not set.
    """
    if env_vars is None:
        return

    for env_var in env_vars:
        if os.getenv(env_var) is None:
            raise ValueError(f"Please set {env_var} env var.")

check_lf_credentials

check_lf_credentials()

Check if the Langfuse credentials are correct by attempting authentication.

Examples:

>>> check_lf_credentials()
None

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

If authentication fails with provided Langfuse credentials

Exception

If Langfuse authentication check fails for any other reason

Source code in src/podflix/utils/general.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def check_lf_credentials() -> None:
    """Check if the Langfuse credentials are correct by attempting authentication.

    Examples:
        >>> check_lf_credentials()
        None

    Returns:
        None

    Raises:
        ValueError: If authentication fails with provided Langfuse credentials
        Exception: If Langfuse authentication check fails for any other reason
    """
    try:
        langfuse_obj = Langfuse()
        lf_check_result = langfuse_obj.auth_check()

        if lf_check_result is False:
            raise Exception("Langfuse Auth Check Failed")

        logger.debug("Langfuse Auth Check Passed")
    except Exception as e:
        logger.error(f"Langfuse Auth Check Error: {e}")
        raise e

get_lf_project_id

get_lf_project_id()

Retrieve the Langfuse project ID from the first available project.

Examples:

>>> get_lf_project_id()
'cm5a4jaff0006r8yk44cvas5a'

Returns:

Name Type Description
str str

The unique identifier of the first Langfuse project.

Raises:

Type Description
Exception

If no projects are found or if there's an error accessing Langfuse API

Source code in src/podflix/utils/general.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_lf_project_id() -> str:
    """Retrieve the Langfuse project ID from the first available project.

    Examples:
        >>> get_lf_project_id()
        'cm5a4jaff0006r8yk44cvas5a'

    Returns:
        str: The unique identifier of the first Langfuse project.

    Raises:
        Exception: If no projects are found or if there's an error accessing Langfuse API
    """
    langfuse_obj = Langfuse()
    projects = langfuse_obj.client.projects.get()
    return projects.data[0].id

get_lf_session_url

get_lf_session_url(session_id)

Construct the full URL for a Langfuse session.

Examples:

>>> get_lf_session_url("123")
'https://YOUR_LANFUSE_HOST/project/YOUR_PROJECT_ID/sessions/123'

Parameters:

Name Type Description Default
session_id str

The unique identifier of the Langfuse session.

required

Returns:

Name Type Description
str str

The complete URL to access the session in Langfuse UI.

Raises:

Type Description
Exception

If there's an error retrieving the project ID

Source code in src/podflix/utils/general.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def get_lf_session_url(session_id: str) -> str:
    """Construct the full URL for a Langfuse session.

    Examples:
        >>> get_lf_session_url("123")
        'https://YOUR_LANFUSE_HOST/project/YOUR_PROJECT_ID/sessions/123'

    Args:
        session_id: The unique identifier of the Langfuse session.

    Returns:
        str: The complete URL to access the session in Langfuse UI.

    Raises:
        Exception: If there's an error retrieving the project ID
    """
    langfuse_project_id = get_lf_project_id()

    return f"{env_settings.langfuse_host}/project/{langfuse_project_id}/sessions/{session_id}"

get_lf_traces_url

get_lf_traces_url(langchain_run_id)

Construct the full URL for a Langfuse trace.

Examples:

>>> get_lf_traces_url("123")
'https://YOUR_LANFUSE_HOST/project/YOUR_PROJECT_ID/traces/123'

Parameters:

Name Type Description Default
langchain_run_id str

The unique identifier of the Langchain run.

required

Returns:

Name Type Description
str str

The complete URL to access the trace in Langfuse UI.

Raises:

Type Description
ValueError

If langchain_run_id is None

Exception

If there's an error retrieving the project ID

Source code in src/podflix/utils/general.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_lf_traces_url(langchain_run_id: str) -> str:
    """Construct the full URL for a Langfuse trace.

    Examples:
        >>> get_lf_traces_url("123")
        'https://YOUR_LANFUSE_HOST/project/YOUR_PROJECT_ID/traces/123'

    Args:
        langchain_run_id: The unique identifier of the Langchain run.

    Returns:
        str: The complete URL to access the trace in Langfuse UI.

    Raises:
        ValueError: If langchain_run_id is None
        Exception: If there's an error retrieving the project ID
    """
    if langchain_run_id is None:
        raise ValueError("langchain_run_id cannot be None")

    langfuse_project_id = get_lf_project_id()

    return f"{env_settings.langfuse_host}/project/{langfuse_project_id}/traces/{langchain_run_id}"

is_module_installed

is_module_installed(module_name, throw_error=False)

Check if the module is installed or not.

Examples:

>>> is_module_installed(module_name="yaml", throw_error=False)
True
>>> is_module_installed(module_name="numpy", throw_error=False)
False
>>> is_module_installed(module_name="numpy", throw_error=True)
Traceback (most recent call last):
ImportError: Module numpy is not installed.

Parameters:

Name Type Description Default
module_name str

Name of the module to be checked.

required
throw_error bool

If True, raises ImportError if module is not installed.

False

Returns:

Type Description
bool

Returns True if module is installed, False otherwise.

Raises:

Type Description
ImportError

If throw_error is True and module is not installed.

Source code in src/podflix/utils/general.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def is_module_installed(module_name: str, throw_error: bool = False) -> bool:
    """Check if the module is installed or not.

    Examples:
        >>> is_module_installed(module_name="yaml", throw_error=False)
        True
        >>> is_module_installed(module_name="numpy", throw_error=False)
        False
        >>> is_module_installed(module_name="numpy", throw_error=True)
        Traceback (most recent call last):
        ImportError: Module numpy is not installed.

    Args:
        module_name: Name of the module to be checked.
        throw_error: If True, raises ImportError if module is not installed.

    Returns:
        Returns True if module is installed, False otherwise.

    Raises:
        ImportError: If throw_error is True and module is not installed.
    """
    try:
        importlib.import_module(module_name)
        return True
    except ImportError as e:
        if throw_error:
            message = f"Module {module_name} is not installed."
            raise ImportError(message) from e
        return False

graph_runner

Helper class for running the graph.

GraphRunner

GraphRunner(
    graph,
    graph_inputs,
    graph_streamable_node_names,
    lf_cb_handler,
    session_id,
    assistant_message,
)

Helper class for on_message callback.

Initialize the GraphRunner class.

Parameters:

Name Type Description Default
graph CompiledStateGraph

A CompiledStateGraph instance representing the graph to be executed.

required
graph_inputs dict

A dictionary containing the inputs for the graph.

required
graph_streamable_node_names list[str]

A list of node names that can be streamed.

required
lf_cb_handler CallbackHandler

A LangfuseCallbackHandler instance for tracking.

required
session_id str

A string representing the unique session identifier.

required
assistant_message Message

A chainlit Message instance for displaying responses.

required
Source code in src/podflix/utils/graph_runner.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(  # noqa: PLR0913
    self,
    graph: CompiledStateGraph,
    graph_inputs: dict,
    graph_streamable_node_names: list[str],
    lf_cb_handler: LangfuseCallbackHandler,
    session_id: str,
    assistant_message: cl.Message,
):
    """Initialize the GraphRunner class.

    Args:
        graph: A CompiledStateGraph instance representing the graph to be executed.
        graph_inputs: A dictionary containing the inputs for the graph.
        graph_streamable_node_names: A list of node names that can be streamed.
        lf_cb_handler: A LangfuseCallbackHandler instance for tracking.
        session_id: A string representing the unique session identifier.
        assistant_message: A chainlit Message instance for displaying responses.
    """
    self.graph = graph
    self.graph_inputs = graph_inputs
    self.graph_streamable_node_names = graph_streamable_node_names
    self.lf_cb_handler = lf_cb_handler
    self.session_id = session_id
    self.assistant_message = assistant_message

    self.run_id = None
run_graph async
run_graph()

Execute the graph asynchronously with the configured inputs.

This method sets up the runnable configuration with callbacks and streams the graph events to process the LLM responses.

Examples:

>>> runner = GraphRunner(graph, inputs, nodes, handler, "session1", message)
>>> await runner.run_graph()

Returns:

Type Description

None

Source code in src/podflix/utils/graph_runner.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def run_graph(self):
    """Execute the graph asynchronously with the configured inputs.

    This method sets up the runnable configuration with callbacks and streams
    the graph events to process the LLM responses.

    Examples:
        >>> runner = GraphRunner(graph, inputs, nodes, handler, "session1", message)
        >>> await runner.run_graph()

    Returns:
        None
    """
    graph_runnable_config = RunnableConfig(
        callbacks=[
            self.lf_cb_handler,
            cl.LangchainCallbackHandler(),
        ],
        recursion_limit=10,
        configurable={"session_id": self.session_id},
    )

    async for event in self.graph.astream_events(
        self.graph_inputs,
        config=graph_runnable_config,
        version="v2",
    ):
        await self.stream_llm_response(event)
stream_llm_response async
stream_llm_response(event)

Stream the LLM response to the assistant message.

Process the event data and stream tokens to the assistant message if the event comes from a streamable node.

Examples:

>>> event = {"event": "on_chat_model_stream", "data": {"chunk": chunk}}
>>> await runner.stream_llm_response(event)

Parameters:

Name Type Description Default
event dict

A dictionary containing the event data with the following structure: - event: The type of event (str) - metadata: Dictionary with event metadata - data: The event payload

required

Returns:

Type Description

None

Notes

The method updates the assistant_message.content when streaming tokens. It also captures the run_id for Langfuse tracking when the chain ends.

Source code in src/podflix/utils/graph_runner.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def stream_llm_response(self, event: dict):
    """Stream the LLM response to the assistant message.

    Process the event data and stream tokens to the assistant message if the
    event comes from a streamable node.

    Examples:
        >>> event = {"event": "on_chat_model_stream", "data": {"chunk": chunk}}
        >>> await runner.stream_llm_response(event)

    Args:
        event: A dictionary containing the event data with the following structure:
              - event: The type of event (str)
              - metadata: Dictionary with event metadata
              - data: The event payload

    Returns:
        None

    Notes:
        The method updates the assistant_message.content when streaming tokens.
        It also captures the run_id for Langfuse tracking when the chain ends.
    """
    event_kind = event["event"]
    langgraph_node = event["metadata"].get("langgraph_node", None)

    if event_kind == "on_chat_model_stream":
        if langgraph_node not in self.graph_streamable_node_names:
            return

        ai_message_chunk: AIMessageChunk = event["data"]["chunk"]
        ai_message_content = ai_message_chunk.content

        if ai_message_content:
            # NOTE: This automatically updates the assistant_message.content
            await self.assistant_message.stream_token(ai_message_content)

    # TODO: Find out more robust way to get run_id for langfuse
    if event["event"] == "on_chain_end":
        run_id = event.get("run_id")
        self.run_id = run_id
        logger.debug(f"Langfuse Run ID: {run_id}")

Hugginface related utilities.

download_gguf_hf

download_gguf_hf(repo_id, filename, revision='main')

Download a specific GGUF file from Hugging Face hub.

Examples:

>>> download_gguf_hf("Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-fp16.gguf")
'/path/to/downloaded/file.gguf'

Parameters:

Name Type Description Default
repo_id str

A string representing the Hugging Face repository ID.

required
filename str

A string representing the name of the GGUF file to download.

required
revision str

A string representing the specific model version to use.

'main'

Returns:

Name Type Description
str str

Local path to the downloaded file.

Source code in src/podflix/utils/hf_related.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def download_gguf_hf(
    repo_id: str,
    filename: str,
    revision: str = "main",
) -> str:
    """Download a specific GGUF file from Hugging Face hub.

    Examples:
        >>> download_gguf_hf("Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-fp16.gguf")
        '/path/to/downloaded/file.gguf'

    Args:
        repo_id: A string representing the Hugging Face repository ID.
        filename: A string representing the name of the GGUF file to download.
        revision: A string representing the specific model version to use.

    Returns:
        str: Local path to the downloaded file.
    """
    return hf_hub_download(
        repo_id=repo_id,
        filename=filename,
        revision=revision,
        local_dir=f"{env_settings.library_base_path}/deployment/models/GGUF",
        token=env_settings.hf_token,
    )

download_model_snapshot_hf

download_model_snapshot_hf(
    repo_id="Qwen/Qwen2.5-0.5B-Instruct", revision="main", ignore_patterns=None
)

Download a complete model snapshot from Hugging Face hub.

Examples:

>>> download_model_snapshot_hf("Qwen/Qwen2.5-0.5B-Instruct")
>>> download_model_snapshot_hf("Qwen/Qwen2.5-0.5B-Instruct", ignore_patterns=["*.pt"])

Parameters:

Name Type Description Default
repo_id str

A string representing the Hugging Face repository ID.

'Qwen/Qwen2.5-0.5B-Instruct'
revision str

A string representing the specific model version to use.

'main'
ignore_patterns list[str] | None

A list of patterns to ignore during download.

None

Returns:

Type Description
None

None

Source code in src/podflix/utils/hf_related.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def download_model_snapshot_hf(
    repo_id: str = "Qwen/Qwen2.5-0.5B-Instruct",
    revision: str = "main",
    ignore_patterns: list[str] | None = None,
) -> None:
    """Download a complete model snapshot from Hugging Face hub.

    Examples:
        >>> download_model_snapshot_hf("Qwen/Qwen2.5-0.5B-Instruct")
        >>> download_model_snapshot_hf("Qwen/Qwen2.5-0.5B-Instruct", ignore_patterns=["*.pt"])

    Args:
        repo_id: A string representing the Hugging Face repository ID.
        revision: A string representing the specific model version to use.
        ignore_patterns: A list of patterns to ignore during download.

    Returns:
        None
    """
    local_dir_name = repo_id.split("/")[1]

    if ignore_patterns is None:
        ignore_patterns = ["*.pt"]

    snapshot_download(
        repo_id=repo_id,
        revision=revision,
        local_dir=f"{env_settings.library_base_path}/deployment/models/{local_dir_name}",
        ignore_patterns=ignore_patterns,
        token=env_settings.hf_token,
    )

model

Model utilities.

get_chat_model

get_chat_model(model_name=None, chat_model_kwargs=None)

Create and configure a ChatOpenAI model instance.

Examples:

>>> model = get_chat_model()
>>> isinstance(model, ChatOpenAI)
True
>>> model = get_chat_model("gpt-4o-mini", {"temperature": 0.7})
>>> isinstance(model, ChatOpenAI)
True

Parameters:

Name Type Description Default
model_name str | None

The name of the model to use. If None, uses the default from env_settings.

None
chat_model_kwargs dict[Any] | None

Additional keyword arguments to pass to ChatOpenAI. Defaults to None.

None

Returns:

Type Description
ChatOpenAI

A configured ChatOpenAI model instance ready for use.

Source code in src/podflix/utils/model.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def get_chat_model(
    model_name: str | None = None,
    chat_model_kwargs: dict[Any] | None = None,
) -> ChatOpenAI:
    """Create and configure a ChatOpenAI model instance.

    Examples:
        >>> model = get_chat_model()
        >>> isinstance(model, ChatOpenAI)
        True
        >>> model = get_chat_model("gpt-4o-mini", {"temperature": 0.7})
        >>> isinstance(model, ChatOpenAI)
        True

    Args:
        model_name: The name of the model to use. If None, uses the default from env_settings.
        chat_model_kwargs: Additional keyword arguments to pass to ChatOpenAI. Defaults to None.

    Returns:
        A configured ChatOpenAI model instance ready for use.
    """
    if chat_model_kwargs is None:
        chat_model_kwargs = {}

    openai_api_base = f"{env_settings.model_api_base}/v1"

    if model_name is None:
        model_name = env_settings.model_name

    if env_settings.enable_openai_api is True:
        openai_api_key = env_settings.openai_api_key
    else:
        openai_api_key = "DUMMY_KEY"

    return ChatOpenAI(
        model_name=model_name,
        openai_api_base=openai_api_base,
        openai_api_key=openai_api_key,
        **chat_model_kwargs,
    )

get_mock_model

get_mock_model(message='MOCK MESSAGE')

Create a mock language model for testing purposes.

Examples:

>>> model = get_mock_model("Test response")
>>> isinstance(model, (FakeListChatModel, StrOutputParser))
True
>>> model = get_mock_model()
>>> isinstance(model, (FakeListChatModel, StrOutputParser))
True

Parameters:

Name Type Description Default
message str

The message to be returned by the mock model. Defaults to "MOCK MESSAGE".

'MOCK MESSAGE'

Returns:

Type Description
FakeListChatModel | StrOutputParser

A chain of FakeListChatModel and StrOutputParser that returns the specified message.

Source code in src/podflix/utils/model.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def get_mock_model(
    message: str = "MOCK MESSAGE",
) -> FakeListChatModel | StrOutputParser:
    """Create a mock language model for testing purposes.

    Examples:
        >>> model = get_mock_model("Test response")
        >>> isinstance(model, (FakeListChatModel, StrOutputParser))
        True
        >>> model = get_mock_model()
        >>> isinstance(model, (FakeListChatModel, StrOutputParser))
        True

    Args:
        message: The message to be returned by the mock model. Defaults to "MOCK MESSAGE".

    Returns:
        A chain of FakeListChatModel and StrOutputParser that returns the specified message.
    """
    model = FakeListChatModel(responses=[message])

    return model | StrOutputParser()

transcribe_audio_file

transcribe_audio_file(file, model_name=None, response_format='verbose_json')

Transcribe an audio file using OpenAI's Whisper model.

When using the verbose_json response format, the function returns the transcribed text with optional timestamps. Otherwise, it returns only the transcribed text.

Examples:

>>> with open('audio.mp3', 'rb') as f:
...     transcription = transcribe_audio_file(f)
>>> isinstance(transcription.text, str)
True
>>> transcription = transcribe_audio_file(Path('audio.mp3'))
>>> isinstance(transcription.text, str)
True

Parameters:

Name Type Description Default
file BinaryIO | Path

The audio file to transcribe. Can be a file object or Path.

required
model_name str | None

The name of the Whisper model to use. If None, uses the default from env_settings.

None
response_format AudioResponseFormat

The format of the response to return. Defaults to "verbose_json".

'verbose_json'

Returns:

Type Description
Transcription | TranscriptionVerbose

The transcribed text with optional timestamps from the audio file.

Source code in src/podflix/utils/model.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def transcribe_audio_file(
    file: BinaryIO | Path,
    model_name: str | None = None,
    response_format: AudioResponseFormat = "verbose_json",
) -> Transcription | TranscriptionVerbose:
    """Transcribe an audio file using OpenAI's Whisper model.

    When using the verbose_json response format, the function returns the transcribed text with optional timestamps.
    Otherwise, it returns only the transcribed text.

    Examples:
        >>> with open('audio.mp3', 'rb') as f:
        ...     transcription = transcribe_audio_file(f)
        >>> isinstance(transcription.text, str)
        True
        >>> transcription = transcribe_audio_file(Path('audio.mp3'))
        >>> isinstance(transcription.text, str)
        True

    Args:
        file: The audio file to transcribe. Can be a file object or Path.
        model_name: The name of the Whisper model to use. If None, uses the default from env_settings.
        response_format: The format of the response to return. Defaults to "verbose_json".

    Returns:
        The transcribed text with optional timestamps from the audio file.
    """
    if model_name is None:
        model_name = env_settings.whisper_model_name

    if env_settings.enable_openai_api is True:
        openai_api_key = env_settings.openai_api_key
    else:
        openai_api_key = "DUMMY_KEY"

    client = OpenAI(
        base_url=f"{env_settings.whisper_api_base}/v1", api_key=openai_api_key
    )

    if isinstance(file, Path):
        file = file.open("rb")

    try:
        return client.audio.transcriptions.create(
            model=model_name, file=file, response_format=response_format
        )
    finally:
        if isinstance(file, Path):
            file.close()

pydantic_models

Pydantic models for Podflix.

OpenAIChatGenerationSettings

Bases: BaseModel

Pydantic model for OpenAI chat settings.

get_available_models

get_available_models()

Return available models based on environment settings.

Source code in src/podflix/utils/pydantic_models.py
 8
 9
10
11
12
def get_available_models():
    """Return available models based on environment settings."""
    if env_settings.enable_openai_api:
        return ["gpt-3.5-turbo", "gpt-4", "gpt-4o-mini"]
    return [env_settings.model_name]

get_default_model

get_default_model()

Return the default model based on environment settings.

Source code in src/podflix/utils/pydantic_models.py
15
16
17
18
19
def get_default_model():
    """Return the default model based on environment settings."""
    return (
        env_settings.model_name if not env_settings.enable_openai_api else "gpt-4o-mini"
    )