Skip to content

Reference

This part of the project documentation focuses on an information-oriented approach. Use it as a reference for the technical implementation of the podflix project code.

podflix package.

db

A module for managing database connections and operations.

db_factory

SqlAlchemy database interface factory and implementations.

DBInterfaceFactory

Singleton factory class for creating database interface instances.

create classmethod
create(db_path='db.sqlite')

Creates and returns appropriate database interface based on environment settings.

Returns the same instance on subsequent calls.

Parameters:

Name Type Description Default
db_path str | Path

Path to SQLite database file. Only used for SQLite interface.

'db.sqlite'

Returns:

Name Type Description
SqlAlchemyDBInterface SqlAlchemyDBInterface

Singleton instance of appropriate database interface.

Source code in src/podflix/db/db_factory.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@classmethod
def create(cls, db_path: str | Path = "db.sqlite") -> SqlAlchemyDBInterface:
    """Creates and returns appropriate database interface based on environment settings.

    Returns the same instance on subsequent calls.

    Args:
        db_path: Path to SQLite database file. Only used for SQLite interface.

    Returns:
        SqlAlchemyDBInterface: Singleton instance of appropriate database interface.
    """
    if cls._db_interface is None:
        if env_settings.sqlalchemy_db_type == "sqlite":
            cls._db_interface = SQLiteDBInterface(db_path)
        elif env_settings.sqlalchemy_db_type == "postgres":
            cls._db_interface = PostgresDBInterface()
        else:
            raise ValueError(
                f"Invalid database type: {env_settings.sqlalchemy_db_type}. "
                "Must be either 'sqlite' or 'postgres'"
            )
    return cls._db_interface

PostgresDBInterface

Bases: SqlAlchemyDBInterface

PostgreSQL database interface implementation.

async_connection
async_connection()

Returns the async PostgreSQL connection string.

Returns:

Type Description
str

A string representing the async PostgreSQL connection URL.

Source code in src/podflix/db/db_factory.py
102
103
104
105
106
107
108
def async_connection(self) -> str:
    """Returns the async PostgreSQL connection string.

    Returns:
        A string representing the async PostgreSQL connection URL.
    """
    return f"postgresql+asyncpg://{self.get_connection_path()}"
get_connection_path
get_connection_path()

Returns the PostgreSQL connection path.

Returns:

Type Description
str

A string containing the PostgreSQL connection details including user, password, host, port and database.

Source code in src/podflix/db/db_factory.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_connection_path(self) -> str:
    """Returns the PostgreSQL connection path.

    Returns:
        A string containing the PostgreSQL connection details including user, password, host, port and database.
    """
    return (
        f"{env_settings.postgres_user}:"
        f"{env_settings.postgres_password}@"
        f"{env_settings.postgres_host}:"
        f"{env_settings.postgres_port}/"
        f"{env_settings.postgres_db}"
    )
sync_connection
sync_connection()

Returns the sync PostgreSQL connection string.

Returns:

Type Description
str

A string representing the sync PostgreSQL connection URL.

Source code in src/podflix/db/db_factory.py
110
111
112
113
114
115
116
def sync_connection(self) -> str:
    """Returns the sync PostgreSQL connection string.

    Returns:
        A string representing the sync PostgreSQL connection URL.
    """
    return f"postgresql://{self.get_connection_path()}"

SQLiteDBInterface

SQLiteDBInterface(db_path='db.sqlite')

Bases: SqlAlchemyDBInterface

SQLite database interface implementation.

Source code in src/podflix/db/db_factory.py
56
57
58
def __init__(self, db_path: str | Path = "db.sqlite"):
    self.db_path = Path(db_path)
    self.db_path.parent.mkdir(parents=True, exist_ok=True)
async_connection
async_connection()

Returns the async SQLite connection string.

Returns:

Type Description
str

A string representing the async SQLite connection URL.

Source code in src/podflix/db/db_factory.py
68
69
70
71
72
73
74
def async_connection(self) -> str:
    """Returns the async SQLite connection string.

    Returns:
        A string representing the async SQLite connection URL.
    """
    return f"sqlite+aiosqlite:///{self.get_connection_path()}"
get_connection_path
get_connection_path()

Returns the SQLite database file path.

Returns:

Type Description
str

A string representing the path to the SQLite database file.

Source code in src/podflix/db/db_factory.py
60
61
62
63
64
65
66
def get_connection_path(self) -> str:
    """Returns the SQLite database file path.

    Returns:
        A string representing the path to the SQLite database file.
    """
    return str(self.db_path)
sync_connection
sync_connection()

Returns the sync SQLite connection string.

Returns:

Type Description
str

A string representing the sync SQLite connection URL.

Source code in src/podflix/db/db_factory.py
76
77
78
79
80
81
82
def sync_connection(self) -> str:
    """Returns the sync SQLite connection string.

    Returns:
        A string representing the sync SQLite connection URL.
    """
    return f"sqlite:///{self.get_connection_path()}"

SqlAlchemyDBInterface

Bases: ABC

Abstract base class for database interfaces.

async_connection abstractmethod
async_connection()

Returns the async database connection string.

Returns:

Type Description
str

A string representing the async database connection URL.

Source code in src/podflix/db/db_factory.py
20
21
22
23
24
25
26
@abstractmethod
def async_connection(self) -> str:
    """Returns the async database connection string.

    Returns:
        A string representing the async database connection URL.
    """
check_db_connection
check_db_connection()

Checks if database connection is valid.

Raises:

Type Description
Exception

If database connection fails, with details about the error.

Source code in src/podflix/db/db_factory.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def check_db_connection(self) -> None:
    """Checks if database connection is valid.

    Raises:
        Exception: If database connection fails, with details about the error.
    """
    from sqlalchemy import create_engine

    try:
        engine = create_engine(self.sync_connection())
        with engine.connect() as conn:
            conn.execute("SELECT 1")
    except Exception as e:
        db_type = self.__class__.__name__.replace("DBInterface", "")
        raise Exception(f"{db_type} connection error:\n{e}") from e
get_connection_path abstractmethod
get_connection_path()

Returns the database connection path.

Returns:

Type Description
str

A string representing the database connection path.

Source code in src/podflix/db/db_factory.py
12
13
14
15
16
17
18
@abstractmethod
def get_connection_path(self) -> str:
    """Returns the database connection path.

    Returns:
        A string representing the database connection path.
    """
sync_connection abstractmethod
sync_connection()

Returns the sync database connection string.

Returns:

Type Description
str

A string representing the sync database connection URL.

Source code in src/podflix/db/db_factory.py
28
29
30
31
32
33
34
@abstractmethod
def sync_connection(self) -> str:
    """Returns the sync database connection string.

    Returns:
        A string representing the sync database connection URL.
    """

db_manager

Base database management functionality.

This module provides database management capabilities including initialization, connection handling, and SQL file execution with retry logic.

Examples:

>>> from podflix.db.db_manager import DatabaseManager
>>> db_manager = DatabaseManager()
>>> db_manager.execute_sql_file(Path("init.sql"), True, "Initialize")

The module contains the following class:

  • DatabaseManager - Handles database operations with retry logic.

DatabaseManager

DatabaseManager(max_retries=5, retry_delay=2)

Manages database operations with retry logic and error handling.

This class provides functionality to manage database operations including reading SQL files, checking table existence, and executing SQL statements with retry logic.

Parameters:

Name Type Description Default
max_retries int

Maximum number of connection retry attempts. Defaults to 5.

5
retry_delay int

Initial delay between retries in seconds. Defaults to 2.

2

Examples:

>>> db_manager = DatabaseManager(max_retries=3, retry_delay=1)
>>> db_manager.execute_sql_file(Path("init.sql"), True, "Initialize")
Source code in src/podflix/db/db_manager.py
48
49
50
51
def __init__(self, max_retries: int = 5, retry_delay: int = 2):
    self.max_retries = max_retries
    self.retry_delay = retry_delay
    self.engine = sa.create_engine(DBInterfaceFactory.create().sync_connection())
execute_sql_file
execute_sql_file(sql_file, check_exists, operation_name)

Execute SQL statements from a file with retry logic.

Parameters:

Name Type Description Default
sql_file Path

Path to the SQL file to execute.

required
check_exists bool

If True, checks for existing tables before execution.

required
operation_name str

Name of the operation for logging purposes.

required

Raises:

Type Description
OperationalError

If database connection fails after max retries.

Exception

If SQL statement execution fails.

Examples:

>>> db_manager = DatabaseManager()
>>> db_manager.execute_sql_file(
...     Path("init.sql"),
...     check_exists=True,
...     operation_name="Initialize"
... )
Source code in src/podflix/db/db_manager.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=2, min=2, max=30),
    retry=retry_if_exception_type(OperationalError),
    before_sleep=lambda retry_state: logger.warning(
        f"Database connection attempt {retry_state.attempt_number} failed, retrying..."
    ),
)
def execute_sql_file(self, sql_file: Path, check_exists: bool, operation_name: str):
    """Execute SQL statements from a file with retry logic.

    Args:
        sql_file: Path to the SQL file to execute.
        check_exists: If True, checks for existing tables before execution.
        operation_name: Name of the operation for logging purposes.

    Raises:
        OperationalError: If database connection fails after max retries.
        Exception: If SQL statement execution fails.

    Examples:
        >>> db_manager = DatabaseManager()
        >>> db_manager.execute_sql_file(
        ...     Path("init.sql"),
        ...     check_exists=True,
        ...     operation_name="Initialize"
        ... )
    """
    with self.engine.connect() as conn:
        exists = self.table_exists(conn)
        if check_exists and exists:
            logger.info("Database already initialized, skipping...")
            return

        if not check_exists and not exists:
            logger.info("No tables found in database, nothing to drop...")
            return

        logger.info(f"{operation_name} database...")
        raw_sql_statements = self.read_sql_file(sql_file)

        for stmt in raw_sql_statements:
            try:
                conn.execute(sa.text(stmt))
            except Exception as e:
                logger.error(f"Error executing statement: {e}")
                raise

        conn.commit()
        logger.info(f"Database {operation_name.lower()} completed successfully")
read_sql_file
read_sql_file(file_path)

Read and parse SQL statements from a file.

Parameters:

Name Type Description Default
file_path str | Path

Path to the SQL file to be read.

required

Returns:

Type Description
list[str]

A list of SQL statements parsed from the file.

Examples:

>>> db_manager = DatabaseManager()
>>> statements = db_manager.read_sql_file("init.sql")
>>> len(statements) > 0
True
Source code in src/podflix/db/db_manager.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def read_sql_file(self, file_path: str | Path) -> list[str]:
    """Read and parse SQL statements from a file.

    Args:
        file_path: Path to the SQL file to be read.

    Returns:
        A list of SQL statements parsed from the file.

    Examples:
        >>> db_manager = DatabaseManager()
        >>> statements = db_manager.read_sql_file("init.sql")
        >>> len(statements) > 0
        True
    """
    with Path(file_path).open("r") as f:
        return [x.strip() for x in f.read().split(";") if x.strip()]
table_exists
table_exists(conn)

Check if users table exists in the database.

Parameters:

Name Type Description Default
conn

SQLAlchemy connection object to use for the query.

required

Returns:

Type Description
bool

True if the users table exists, False otherwise.

Raises:

Type Description
OperationalError

If there's a problem connecting to the database.

ProgrammingError

If there's a problem executing the SQL query.

Examples:

>>> db_manager = DatabaseManager()
>>> with db_manager.engine.connect() as conn:
...     exists = db_manager.table_exists(conn)
>>> isinstance(exists, bool)
True
Source code in src/podflix/db/db_manager.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def table_exists(self, conn) -> bool:
    """Check if users table exists in the database.

    Args:
        conn: SQLAlchemy connection object to use for the query.

    Returns:
        True if the users table exists, False otherwise.

    Raises:
        OperationalError: If there's a problem connecting to the database.
        ProgrammingError: If there's a problem executing the SQL query.

    Examples:
        >>> db_manager = DatabaseManager()
        >>> with db_manager.engine.connect() as conn:
        ...     exists = db_manager.table_exists(conn)
        >>> isinstance(exists, bool)
        True
    """
    try:
        if env_settings.sqlalchemy_db_type == "sqlite":
            query = """
                SELECT COUNT(*)
                FROM sqlite_master
                WHERE type='table' AND name='users'
            """
        else:  # postgresql
            query = """
                SELECT EXISTS (
                    SELECT FROM information_schema.tables
                    WHERE table_schema = 'public'
                    AND table_name = 'users'
                );
            """
        result = conn.execute(sa.text(query)).scalar()
        return bool(result)
    except (OperationalError, ProgrammingError) as e:
        logger.error(f"Error checking for tables: {e}")
        return False

drop_db

Drop all database tables using SQL statements from drop_db.sql file.

drop_db

drop_db(max_retries=5, retry_delay=2)

Drop all database tables using SQL statements from drop_db.sql file.

Source code in src/podflix/db/drop_db.py
 8
 9
10
11
12
13
14
15
def drop_db(max_retries: int = 5, retry_delay: int = 2):
    """Drop all database tables using SQL statements from drop_db.sql file."""
    db_manager = DatabaseManager(max_retries, retry_delay)
    db_manager.execute_sql_file(
        Path(__file__).parent / "drop_db.sql",
        check_exists=False,
        operation_name="Dropping",
    )

init_db

Initialize the database schema using SQL statements from init_db.sql file.

initialize_db

initialize_db(max_retries=5, retry_delay=2)

Initialize the database using SQL statements from init_db.sql file.

Source code in src/podflix/db/init_db.py
 8
 9
10
11
12
13
14
15
def initialize_db(max_retries: int = 5, retry_delay: int = 2):
    """Initialize the database using SQL statements from init_db.sql file."""
    db_manager = DatabaseManager(max_retries, retry_delay)
    db_manager.execute_sql_file(
        Path(__file__).parent / "init_db.sql",
        check_exists=True,
        operation_name="Initializing",
    )

s3_storage_client

Module to interact with Amazon S3 compatible storage provider.

S3CompatibleStorageClient

S3CompatibleStorageClient(bucket=None)

Bases: BaseStorageClient

Class to enable Amazon S3 compatible storage provider.

This class provides functionality to interact with Amazon S3 compatible storage services. It handles bucket creation, file uploads, and basic S3 operations.

Examples:

>>> storage_client = S3CompatibleStorageClient("my-bucket")
>>> await storage_client.upload_file("test.txt", "Hello World", "text/plain")
{'object_key': 'test.txt', 'url': 'https://s3.example.com/my-bucket/test.txt'}

Attributes:

Name Type Description
bucket str

The name of the S3 bucket to use

client

The boto3 S3 client instance

Initialize the S3 compatible storage client.

Parameters:

Name Type Description Default
bucket str | None

Name of the S3 bucket. If None, uses the value from env_settings.

None

Raises:

Type Description
Exception

If initialization of the S3 client fails

Source code in src/podflix/db/s3_storage_client.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def __init__(self, bucket: str | None = None):
    """Initialize the S3 compatible storage client.

    Args:
        bucket: Name of the S3 bucket. If None, uses the value from env_settings.

    Raises:
        Exception: If initialization of the S3 client fails
    """
    try:
        if bucket is None:
            bucket = env_settings.aws_s3_bucket_name

        self.bucket = bucket
        self.client = get_boto_client()
        # Check if bucket exists, if not create it
        try:
            self.client.head_bucket(Bucket=self.bucket)
        except self.client.exceptions.ClientError:
            logger.info(f"Creating bucket: {self.bucket}")
            self.client.create_bucket(
                Bucket=self.bucket,
                CreateBucketConfiguration={"LocationConstraint": "eu-central-1"},
            )

        logger.debug("S3CompatibleStorageClient initialized")
    except Exception as e:
        logger.error(f"S3CompatibleStorageClient initialization error: {e}")
get_read_url async
get_read_url(object_key)

Compute and return the full URL for an object in S3 storage.

Examples:

>>> url = await storage_client.get_read_url("test.txt")
>>> print(url)
'https://s3.example.com/my-bucket/test.txt'

Parameters:

Name Type Description Default
object_key str

A string representing the key (path) of the object in the bucket.

required

Returns:

Type Description
str

A string representing the full URL to access the object.

Source code in src/podflix/db/s3_storage_client.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
async def get_read_url(self, object_key: str) -> str:
    """Compute and return the full URL for an object in S3 storage.

    Examples:
        >>> url = await storage_client.get_read_url("test.txt")
        >>> print(url)
        'https://s3.example.com/my-bucket/test.txt'

    Args:
        object_key: A string representing the key (path) of the object in the bucket.

    Returns:
        A string representing the full URL to access the object.
    """
    return f"{env_settings.aws_s3_endpoint_url}/{self.bucket}/{object_key}"
read_file async
read_file(object_key)

Read a file from the S3 compatible storage.

Examples:

>>> content = await storage_client.read_file("test.txt")
>>> print(content)
'Hello World'

Parameters:

Name Type Description Default
object_key str

A string representing the key (path) of the object in the bucket.

required

Returns:

Type Description
Union[str, None]

The content of the file as a string, or None if the operation fails.

Raises:

Type Description
Exception

If the read operation fails.

Source code in src/podflix/db/s3_storage_client.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
async def read_file(self, object_key: str) -> Union[str, None]:
    """Read a file from the S3 compatible storage.

    Examples:
        >>> content = await storage_client.read_file("test.txt")
        >>> print(content)
        'Hello World'

    Args:
        object_key: A string representing the key (path) of the object in the bucket.

    Returns:
        The content of the file as a string, or None if the operation fails.

    Raises:
        Exception: If the read operation fails.
    """
    try:
        response = self.client.get_object(Bucket=self.bucket, Key=object_key)
        return response["Body"].read().decode("utf-8")
    except Exception as e:
        logger.error(f"S3CompatibleStorageClient, read_file error: {e}")
        return None
upload_file async
upload_file(object_key, data, mime='application/octet-stream', overwrite=True)

Upload a file to the S3 compatible storage.

Examples:

>>> result = await storage_client.upload_file(
...     "test.txt",
...     "Hello World",
...     "text/plain"
... )
>>> print(result)
{'object_key': 'test.txt', 'url': 'https://s3.example.com/bucket/test.txt'}

Parameters:

Name Type Description Default
object_key str

A string representing the key (path) where the object will be stored.

required
data Union[bytes, str]

The file content to upload (can be bytes or string).

required
mime str

A string representing the MIME type of the file.

'application/octet-stream'
overwrite bool

A boolean indicating whether to overwrite existing files.

True

Returns:

Type Description
Dict[str, Any]

A dictionary containing the object_key and url of the uploaded file.

Dict[str, Any]

Returns empty dict if upload fails.

Raises:

Type Description
Exception

If the upload operation fails.

Source code in src/podflix/db/s3_storage_client.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def upload_file(
    self,
    object_key: str,
    data: Union[bytes, str],
    mime: str = "application/octet-stream",
    overwrite: bool = True,
) -> Dict[str, Any]:
    """Upload a file to the S3 compatible storage.

    Examples:
        >>> result = await storage_client.upload_file(
        ...     "test.txt",
        ...     "Hello World",
        ...     "text/plain"
        ... )
        >>> print(result)
        {'object_key': 'test.txt', 'url': 'https://s3.example.com/bucket/test.txt'}

    Args:
        object_key: A string representing the key (path) where the object will be stored.
        data: The file content to upload (can be bytes or string).
        mime: A string representing the MIME type of the file.
        overwrite: A boolean indicating whether to overwrite existing files.

    Returns:
        A dictionary containing the object_key and url of the uploaded file.
        Returns empty dict if upload fails.

    Raises:
        Exception: If the upload operation fails.
    """
    try:
        self.client.put_object(
            Bucket=self.bucket, Key=object_key, Body=data, ContentType=mime
        )
        url = await self.get_read_url(object_key)
        return {"object_key": object_key, "url": url}
    except Exception as e:
        logger.error(f"S3CompatibleStorageClient, upload_file error: {e}")
        return {}

get_boto_client

get_boto_client()

Create and return a boto3 S3 client with configured credentials.

Examples:

>>> client = get_boto_client()
>>> client.list_buckets()
{'Buckets': [...], 'Owner': {...}}

Returns:

Type Description

A configured boto3 S3 client instance with the specified endpoint and credentials.

Raises:

Type Description
ClientError

If there are issues with credentials or configuration.

Source code in src/podflix/db/s3_storage_client.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def get_boto_client():
    """Create and return a boto3 S3 client with configured credentials.

    Examples:
        >>> client = get_boto_client()
        >>> client.list_buckets()
        {'Buckets': [...], 'Owner': {...}}

    Returns:
        A configured boto3 S3 client instance with the specified endpoint and credentials.

    Raises:
        botocore.exceptions.ClientError: If there are issues with credentials or configuration.
    """
    return boto3.client(
        "s3",
        endpoint_url=env_settings.aws_s3_endpoint_url,
        aws_access_key_id=env_settings.aws_access_key_id,
        aws_secret_access_key=env_settings.aws_secret_access_key,
        region_name=env_settings.aws_region_name,
        config=Config(signature_version="s3v4"),
        verify=True,  # Set False to skip SSL verification for local development
    )

env_settings

Application configuration for environment variables.

EnvSettings

Bases: BaseSettings

This class is used to load environment variables.

They are either from environment or from a .env file and store them as class attributes.

Note
  • environment variables will always take priority over values loaded from a dotenv file
  • environment variable names are case-insensitive
  • environment variable type is inferred from the type hint of the class attribute
  • For environment variables that are not set, a default value should be provided

For more info, see the related pydantic docs: https://docs.pydantic.dev/latest/concepts/pydantic_settings

validate_model_api_base

validate_model_api_base(value, values)

Validate the model API base URL.

Source code in src/podflix/env_settings.py
120
121
122
123
124
125
126
127
128
@field_validator("model_api_base")
def validate_model_api_base(cls, value, values):
    """Validate the model API base URL."""
    if values.data.get("enable_openai_api") is True:
        logger.debug("When OpenAI API is enabled, `model_api_base` environment is ignored and set to OpenAI API.")

        return "https://api.openai.com"

    return value

validate_openai_key

validate_openai_key(value, values)

Validate the OpenAI API key.

Source code in src/podflix/env_settings.py
110
111
112
113
114
115
116
117
118
@field_validator("openai_api_key")
def validate_openai_key(cls, value, values):
    """Validate the OpenAI API key."""
    if values.data.get("enable_openai_api") is True and value is None:
        message = "OpenAI API key should be set, when enable_openai_api is True."
        logger.error(message)
        raise ValueError(message)

    return value

allowed_values

allowed_values(v, values)

Validate if a value is in a set of allowed values.

Examples:

>>> allowed_values("a", ["a", "b"])
'a'
>>> allowed_values(1, [1, 2, 3])
1

Parameters:

Name Type Description Default
v

The value to validate

required
values

A collection of allowed values

required

Returns:

Type Description

The validated value if it exists in the allowed values

Raises:

Type Description
AssertionError

If the value is not in the allowed values

Source code in src/podflix/env_settings.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def allowed_values(v, values):
    """Validate if a value is in a set of allowed values.

    Examples:
        >>> allowed_values("a", ["a", "b"])
        'a'
        >>> allowed_values(1, [1, 2, 3])
        1

    Args:
        v: The value to validate
        values: A collection of allowed values

    Returns:
        The validated value if it exists in the allowed values

    Raises:
        AssertionError: If the value is not in the allowed values
    """
    assert v in values
    return v

graph

Module about LLM interaction using Langchain Langgprah nodes.

mock

Define the mock graph for the Podflix agent.

AgentState

Bases: TypedDict

A dictionary representing the state of the agent.

mock_answer async

mock_answer(state)

Generate a mock response using a random choice from predefined messages.

The function simulates an AI model's response by randomly selecting from a predefined list of messages and returning it as an AIMessage.

Examples:

>>> state = {"messages": [HumanMessage(content="Hello")]}
>>> response = await mock_answer(state)
>>> isinstance(response["messages"][0], AIMessage)
True

Parameters:

Name Type Description Default
state AgentState

A dictionary containing the current conversation state, including: - messages: A sequence of BaseMessage objects representing the conversation history.

required

Returns:

Type Description

A dictionary containing: - messages: A list with a single AIMessage containing the random response.

Source code in src/podflix/graph/mock.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def mock_answer(state: AgentState):
    """Generate a mock response using a random choice from predefined messages.

    The function simulates an AI model's response by randomly selecting from a
    predefined list of messages and returning it as an AIMessage.

    Examples:
        >>> state = {"messages": [HumanMessage(content="Hello")]}
        >>> response = await mock_answer(state)
        >>> isinstance(response["messages"][0], AIMessage)
        True

    Args:
        state: A dictionary containing the current conversation state, including:
            - messages: A sequence of BaseMessage objects representing the conversation history.

    Returns:
        A dictionary containing:
            - messages: A list with a single AIMessage containing the random response.
    """
    random_response = random.choice(MOCK_RESPONSES)

    model = get_mock_model(message=random_response)
    _ = await model.ainvoke("mock")

    return {"messages": [AIMessage(random_response)]}

podcast_rag

Define the RAG-based graph for the Podflix agent.

AgentState

Bases: TypedDict

A dictionary representing the state of the agent.

generate async

generate(state)

Generate a response using the retrieved context.

Source code in src/podflix/graph/podcast_rag.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def generate(state: AgentState) -> AgentState:
    """Generate a response using the retrieved context."""
    question = state["messages"][-1].content
    context = state["context"]

    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "Use the following context to answer the question: {context}"),
            ("human", "{question}"),
        ]
    )

    model = get_chat_model()
    chain = prompt | model | StrOutputParser()

    response = await chain.ainvoke({"context": context, "question": question})

    return {
        "messages": [AIMessage(content=response)],
    }

retrieve async

retrieve(state)

Retrieve relevant context based on the user's question.

Source code in src/podflix/graph/podcast_rag.py
22
23
24
25
26
27
28
29
async def retrieve(state: AgentState) -> AgentState:
    """Retrieve relevant context based on the user's question."""
    # TODO: Implement actual retrieval logic
    # This is a placeholder that should be replaced with your vector store retrieval
    # question = state["messages"][-1].content
    # context = f"Retrieved context for: {question}"
    # return {"messages": state["messages"]}
    return {}

gui

Chainlit UI module.

placeholder

Placeholder file to provide several sample math calculations.

This module allows the user to make mathematical calculations. Adapted from: https://realpython.com/python-project-documentation-with-mkdocs/

Examples:

>>> from podflix import placeholder
>>> placeholder.add(2, 4)
6.0
>>> placeholder.multiply(2.0, 4.0)
8.0
>>> from podflix.placeholder import divide
>>> divide(4.0, 2)
2.0

The module contains the following functions:

  • add(a, b) - Returns the sum of two numbers.
  • subtract(a, b) - Returns the difference of two numbers.
  • multiply(a, b) - Returns the product of two numbers.
  • divide(a, b) - Returns the quotient of two numbers.

add

add(a, b)

Compute and return the sum of two numbers.

Examples:

>>> add(4.0, 2.0)
6.0
>>> add(4, 2)
6.0

Parameters:

Name Type Description Default
a float | int

A number representing the first addend in the addition.

required
b float | int

A number representing the second addend in the addition.

required

Returns:

Type Description
float

A number representing the arithmetic sum result of a and b.

Source code in src/podflix/placeholder.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def add(a: float | int, b: float | int) -> float:
    """Compute and return the sum of two numbers.

    Examples:
        >>> add(4.0, 2.0)
        6.0
        >>> add(4, 2)
        6.0

    Args:
        a: A number representing the first addend in the addition.
        b: A number representing the second addend in the addition.

    Returns:
        A number representing the arithmetic sum result of `a` and `b`.
    """
    return float(a + b)

divide

divide(a, b)

Compute and return the division of two numbers.

Examples:

>>> divide(4.0, 2.0)
2.0
>>> divide(4, 2)
2.0

Parameters:

Name Type Description Default
a float | int

A number representing the first divider in the divide.

required
b float | int

A number representing the second divider in the divide.

required

Returns:

Type Description
float

A number representing the division result of a and b.

Raises:

Type Description
ZeroDivisionError

If b is zero.

Source code in src/podflix/placeholder.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def divide(a: float | int, b: float | int) -> float:
    """Compute and return the division of two numbers.

    Examples:
        >>> divide(4.0, 2.0)
        2.0
        >>> divide(4, 2)
        2.0

    Args:
        a: A number representing the first divider in the divide.
        b: A number representing the second divider in the divide.

    Returns:
        A number representing the division result of `a` and `b`.

    Raises:
        ZeroDivisionError: If `b` is zero.
    """
    if b == 0:
        raise ZeroDivisionError("division by zero")

    return float(a / b)

multiply

multiply(a, b)

Compute and return the multiplication of two numbers.

Examples:

>>> multiply(4.0, 2.0)
8.0
>>> multiply(4, 2)
8.0

Parameters:

Name Type Description Default
a float | int

A number representing the first multiplicator in the multiply.

required
b float | int

A number representing the second multiplicator in the multiply.

required

Returns:

Type Description
float

A number representing the multiplied result of a and b.

Source code in src/podflix/placeholder.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def multiply(a: float | int, b: float | int) -> float:
    """Compute and return the multiplication of two numbers.

    Examples:
        >>> multiply(4.0, 2.0)
        8.0
        >>> multiply(4, 2)
        8.0

    Args:
        a: A number representing the first multiplicator in the multiply.
        b: A number representing the second multiplicator in the multiply.

    Returns:
        A number representing the multiplied result of `a` and `b`.
    """
    return float(a * b)

subtract

subtract(a, b)

Compute and return the substaction of two numbers.

Examples:

>>> subtract(4.0, 2.0)
2.0
>>> subtract(4, 2)
2.0

Parameters:

Name Type Description Default
a float | int

A number representing the first substracter in the substract.

required
b float | int

A number representing the second substracter in the substract.

required

Returns:

Type Description
float

A number representing the substract result of a and b.

Source code in src/podflix/placeholder.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def subtract(a: float | int, b: float | int) -> float:
    """Compute and return the substaction of two numbers.

    Examples:
        >>> subtract(4.0, 2.0)
        2.0
        >>> subtract(4, 2)
        2.0

    Args:
        a: A number representing the first substracter in the substract.
        b:  A number representing the second substracter in the substract.

    Returns:
        A number representing the substract result of `a` and `b`.
    """
    return float(a - b)

utils

Utility functions.

chainlit_ui

Utilies for chainlit UI.

StarterQuestion dataclass

StarterQuestion(label, message=None, icon=None)

Dataclass for starter questions.

__post_init__
__post_init__()

Post initialization method for the StarterQuestion class.

Source code in src/podflix/utils/chainlit_ui.py
31
32
33
34
35
36
37
def __post_init__(self):
    """Post initialization method for the StarterQuestion class."""
    if self.message is None:
        self.message = self.label

    if self.icon is None:
        self.icon = "🚀"

create_message_history_from_db_thread

create_message_history_from_db_thread(thread)

Create message history from the thread steps.

Examples:

>>> thread = {"steps": [{"type": "user_message", "output": "hello", "createdAt": 1}]}
>>> history = create_message_history_from_db_thread(thread)
>>> len(history.messages)
1

Parameters:

Name Type Description Default
thread ThreadDict

A ThreadDict object containing the conversation thread data.

required

Returns:

Type Description
ChatMessageHistory

A ChatMessageHistory object containing the processed message history.

Source code in src/podflix/utils/chainlit_ui.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def create_message_history_from_db_thread(
    thread: ThreadDict,
) -> ChatMessageHistory:
    """Create message history from the thread steps.

    Examples:
        >>> thread = {"steps": [{"type": "user_message", "output": "hello", "createdAt": 1}]}
        >>> history = create_message_history_from_db_thread(thread)
        >>> len(history.messages)
        1

    Args:
        thread: A ThreadDict object containing the conversation thread data.

    Returns:
        A ChatMessageHistory object containing the processed message history.
    """
    message_history = ChatMessageHistory()

    # TODO: This is a workaround to sort the messages based on createdAt.
    steps_messages = sorted(
        [
            message
            for message in thread["steps"]
            if message["type"] in ["user_message", "assistant_message"]
        ],
        key=lambda x: x["createdAt"],
    )

    for steps_message in steps_messages:
        if steps_message["type"] == "user_message":
            message_history.add_user_message(steps_message["output"])
        elif steps_message["type"] == "assistant_message":
            message_history.add_ai_message(steps_message["output"])

    return message_history

create_starter_questions_from_list

create_starter_questions_from_list(starters)

Create starter questions from the list of mock starters.

Examples:

>>> starters = [StarterQuestion(label="Test", message="Test message")]
>>> questions = create_starter_questions_from_list(starters)
>>> len(questions)
1

Parameters:

Name Type Description Default
starters list[StarterQuestion]

A list of StarterQuestion objects to convert.

required

Returns:

Type Description
list[Starter]

A list of chainlit Starter objects.

Source code in src/podflix/utils/chainlit_ui.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def create_starter_questions_from_list(
    starters: list[StarterQuestion],
) -> list[cl.Starter]:
    """Create starter questions from the list of mock starters.

    Examples:
        >>> starters = [StarterQuestion(label="Test", message="Test message")]
        >>> questions = create_starter_questions_from_list(starters)
        >>> len(questions)
        1

    Args:
        starters: A list of StarterQuestion objects to convert.

    Returns:
        A list of chainlit Starter objects.
    """
    return [
        cl.Starter(
            label=starter.label,
            message=starter.message,
            icon=starter.icon,
        )
        for starter in starters
    ]

get_element_url async

get_element_url(data_layer, thread_id, element_id)

Get the URL for accessing an element's file content.

Parameters:

Name Type Description Default
data_layer SQLAlchemyDataLayer

SQLAlchemyDataLayer instance

required
thread_id str

ID of the thread containing the element

required
element_id str

ID of the element

required

Returns:

Type Description
str | None

URL string if found, None if element doesn't exist

Source code in src/podflix/utils/chainlit_ui.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
async def get_element_url(
    data_layer: SQLAlchemyDataLayer, thread_id: str, element_id: str
) -> str | None:
    """Get the URL for accessing an element's file content.

    Args:
        data_layer: SQLAlchemyDataLayer instance
        thread_id: ID of the thread containing the element
        element_id: ID of the element

    Returns:
        URL string if found, None if element doesn't exist
    """
    logger.debug(
        f"SQLAlchemy: get_element_url, thread_id={thread_id}, element_id={element_id}"
    )

    element_dict: ElementDict = data_layer.get_element(
        thread_id=thread_id, element_id=element_id
    )

    if element_dict is None:
        return None

    return element_dict.url

get_sqlalchemy_data_layer

get_sqlalchemy_data_layer(show_logger=False)

Create and return a SQLAlchemy data layer instance.

Examples:

>>> data_layer = get_sqlalchemy_data_layer()
>>> isinstance(data_layer, SQLAlchemyDataLayer)
True

Parameters:

Name Type Description Default
show_logger bool

A boolean indicating whether to show SQL logging information.

False

Returns:

Type Description
SQLAlchemyDataLayer

A SQLAlchemyDataLayer instance configured with database connection and storage provider.

Source code in src/podflix/utils/chainlit_ui.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def get_sqlalchemy_data_layer(show_logger: bool = False) -> SQLAlchemyDataLayer:
    """Create and return a SQLAlchemy data layer instance.

    Examples:
        >>> data_layer = get_sqlalchemy_data_layer()
        >>> isinstance(data_layer, SQLAlchemyDataLayer)
        True

    Args:
        show_logger: A boolean indicating whether to show SQL logging information.

    Returns:
        A SQLAlchemyDataLayer instance configured with database connection and storage provider.
    """
    return SQLAlchemyDataLayer(
        DBInterfaceFactory.create().async_connection(),
        ssl_require=False,
        show_logger=show_logger,
        storage_provider=S3CompatibleStorageClient(),
    )

set_extra_user_session_params

set_extra_user_session_params(session_id=None, user_id=None, message_history=None)

Set extra user session parameters for the chainlit session.

Examples:

>>> set_extra_user_session_params(session_id="test123")
>>> cl.user_session.get("session_id")
'test123'

Parameters:

Name Type Description Default
session_id str | None

Optional string representing the session ID. If None, a new UUID is generated.

None
user_id str | None

Optional string representing the user ID. If None, gets from current user session.

None
message_history ChatMessageHistory | None

Optional ChatMessageHistory object. If None, creates new empty history.

None

Returns:

Type Description

None

Source code in src/podflix/utils/chainlit_ui.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def set_extra_user_session_params(
    session_id: str | None = None,
    user_id: str | None = None,
    message_history: ChatMessageHistory | None = None,
):
    """Set extra user session parameters for the chainlit session.

    Examples:
        >>> set_extra_user_session_params(session_id="test123")
        >>> cl.user_session.get("session_id")
        'test123'

    Args:
        session_id: Optional string representing the session ID. If None, a new UUID is generated.
        user_id: Optional string representing the user ID. If None, gets from current user session.
        message_history: Optional ChatMessageHistory object. If None, creates new empty history.

    Returns:
        None
    """
    if session_id is None:
        session_id = str(uuid4())

    if user_id is None:
        chainlit_user: Chainlit_User_Type = cl.user_session.get("user")
        user_id = chainlit_user.identifier

    if message_history is None:
        message_history = ChatMessageHistory()

    check_lf_credentials()
    lf_cb_handler = LangfuseCallbackHandler(
        user_id=user_id,
        session_id=session_id,
    )

    cl.user_session.set("session_id", session_id)
    cl.user_session.set("lf_cb_handler", lf_cb_handler)
    cl.user_session.set("message_history", message_history)

    langfuse_session_url = get_lf_session_url(session_id=session_id)

    logger.debug(f"Langfuse Session URL: {langfuse_session_url}")

simple_auth_callback

simple_auth_callback(username, password)

Authenticate user with simple username and password check.

Examples:

>>> simple_auth_callback("admin", "admin")
User(identifier="admin", metadata={"role": "admin", "provider": "credentials"})

Parameters:

Name Type Description Default
username str

A string representing the username for authentication.

required
password str

A string representing the password for authentication.

required

Returns:

Type Description
User

A User object if authentication is successful.

Raises:

Type Description
ValueError

If credentials are invalid.

Source code in src/podflix/utils/chainlit_ui.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def simple_auth_callback(username: str, password: str) -> User:
    """Authenticate user with simple username and password check.

    Examples:
        >>> simple_auth_callback("admin", "admin")
        User(identifier="admin", metadata={"role": "admin", "provider": "credentials"})

    Args:
        username: A string representing the username for authentication.
        password: A string representing the password for authentication.

    Returns:
        A User object if authentication is successful.

    Raises:
        ValueError: If credentials are invalid.
    """
    if (username, password) == (
        env_settings.chainlit_user_name,
        env_settings.chainlit_user_password,
    ):
        return cl.User(
            identifier=username, metadata={"role": "admin", "provider": "credentials"}
        )

    raise ValueError("Invalid credentials")

general

General utility functions.

check_env_vars

check_env_vars(env_vars=None)

Checks if the required environment variables are set.

Examples:

>>> check_env_vars(['API_KEY', 'SECRET_KEY'])
None
>>> check_env_vars(None)
None
>>> check_env_vars(['NONEXISTENT_VAR'])
Traceback (most recent call last):
ValueError: Please set NONEXISTENT_VAR env var.

Parameters:

Name Type Description Default
env_vars list[str] | None

List of environment variables to check. Defaults to None.

None

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

If any of the environment variables are not set.

Source code in src/podflix/utils/general.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def check_env_vars(env_vars: list[str] | None = None) -> None:
    """Checks if the required environment variables are set.

    Examples:
        >>> check_env_vars(['API_KEY', 'SECRET_KEY'])
        None
        >>> check_env_vars(None)
        None
        >>> check_env_vars(['NONEXISTENT_VAR'])
        Traceback (most recent call last):
        ValueError: Please set NONEXISTENT_VAR env var.

    Args:
        env_vars: List of environment variables to check. Defaults to None.

    Returns:
        None

    Raises:
        ValueError: If any of the environment variables are not set.
    """
    if env_vars is None:
        return

    for env_var in env_vars:
        if os.getenv(env_var) is None:
            raise ValueError(f"Please set {env_var} env var.")

check_lf_credentials

check_lf_credentials()

Check if the Langfuse credentials are correct.

Examples:

>>> check_lf_credentials()
None

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

If we can't connect to lanfuse using gived credentials.

Source code in src/podflix/utils/general.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def check_lf_credentials() -> None:
    """Check if the Langfuse credentials are correct.

    Examples:
        >>> check_lf_credentials()
        None

    Returns:
        None

    Raises:
        ValueError: If we can't connect to lanfuse using gived credentials.
    """
    try:
        langfuse_obj = Langfuse()
        lf_check_result = langfuse_obj.auth_check()

        if lf_check_result is False:
            raise Exception("Langfuse Auth Check Failed")

        logger.debug("Langfuse Auth Check Passed")
    except Exception as e:
        logger.error(f"Langfuse Auth Check Error: {e}")
        raise e

get_lf_project_id

get_lf_project_id()

Get the Langfuse project ID.

Examples:

>>> get_lf_project_id()
'cm5a4jaff0006r8yk44cvas5a'

Returns:

Type Description
str

Langfuse project ID.

Source code in src/podflix/utils/general.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
def get_lf_project_id() -> str:
    """Get the Langfuse project ID.

    Examples:
        >>> get_lf_project_id()
        'cm5a4jaff0006r8yk44cvas5a'

    Returns:
        Langfuse project ID.
    """
    langfuse_obj = Langfuse()
    projects = langfuse_obj.client.projects.get()
    return projects.data[0].id

get_lf_session_url

get_lf_session_url(session_id)

Get the Langfuse session URL.

Examples:

>>> get_lf_session_url("123")
'https://YOUR_LANFUSE_HOST/project/YOUR_PROJECT_ID/sessions/123'

Parameters:

Name Type Description Default
session_id str

Session ID.

required

Returns:

Type Description
str

Langfuse session URL.

Source code in src/podflix/utils/general.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def get_lf_session_url(session_id: str) -> str:
    """Get the Langfuse session URL.

    Examples:
        >>> get_lf_session_url("123")
        'https://YOUR_LANFUSE_HOST/project/YOUR_PROJECT_ID/sessions/123'

    Args:
        session_id: Session ID.

    Returns:
        Langfuse session URL.
    """
    langfuse_project_id = get_lf_project_id()

    return f"{env_settings.langfuse_host}/project/{langfuse_project_id}/sessions/{session_id}"

get_lf_traces_url

get_lf_traces_url(langchain_run_id)

Get the Langfuse traces URL.

Examples:

>>> get_lf_traces_url("123")
'https://YOUR_LANFUSE_HOST/project/YOUR_PROJECT_ID/traces/123'

Parameters:

Name Type Description Default
langchain_run_id str

Langchain run ID.

required

Returns:

Type Description
str

Langfuse traces URL.

Source code in src/podflix/utils/general.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def get_lf_traces_url(langchain_run_id: str) -> str:
    """Get the Langfuse traces URL.

    Examples:
        >>> get_lf_traces_url("123")
        'https://YOUR_LANFUSE_HOST/project/YOUR_PROJECT_ID/traces/123'

    Args:
        langchain_run_id: Langchain run ID.

    Returns:
        Langfuse traces URL.
    """
    if langchain_run_id is None:
        raise ValueError("langchain_run_id cannot be None")

    langfuse_project_id = get_lf_project_id()

    return f"{env_settings.langfuse_host}/project/{langfuse_project_id}/traces/{langchain_run_id}"

is_module_installed

is_module_installed(module_name, throw_error=False)

Check if the module is installed or not.

Examples:

>>> is_module_installed(module_name="yaml", throw_error=False)
True
>>> is_module_installed(module_name="numpy", throw_error=False)
False
>>> is_module_installed(module_name="numpy", throw_error=True)
Traceback (most recent call last):
ImportError: Module numpy is not installed.

Parameters:

Name Type Description Default
module_name str

Name of the module to be checked.

required
throw_error bool

If True, raises ImportError if module is not installed.

False

Returns:

Type Description
bool

Returns True if module is installed, False otherwise.

Raises:

Type Description
ImportError

If throw_error is True and module is not installed.

Source code in src/podflix/utils/general.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def is_module_installed(module_name: str, throw_error: bool = False) -> bool:
    """Check if the module is installed or not.

    Examples:
        >>> is_module_installed(module_name="yaml", throw_error=False)
        True
        >>> is_module_installed(module_name="numpy", throw_error=False)
        False
        >>> is_module_installed(module_name="numpy", throw_error=True)
        Traceback (most recent call last):
        ImportError: Module numpy is not installed.

    Args:
        module_name: Name of the module to be checked.
        throw_error: If True, raises ImportError if module is not installed.

    Returns:
        Returns True if module is installed, False otherwise.

    Raises:
        ImportError: If throw_error is True and module is not installed.
    """
    try:
        importlib.import_module(module_name)
        return True
    except ImportError as e:
        if throw_error:
            message = f"Module {module_name} is not installed."
            raise ImportError(message) from e
        return False

graph_runner

Helper class for running the graph.

GraphRunner

GraphRunner(
    graph,
    graph_inputs,
    graph_streamable_node_names,
    lf_cb_handler,
    session_id,
    assistant_message,
)

Helper class for on_message callback.

Initialize the GraphRunner class.

Parameters:

Name Type Description Default
graph CompiledStateGraph

A CompiledStateGraph instance representing the graph to be executed.

required
graph_inputs dict

A dictionary containing the inputs for the graph.

required
graph_streamable_node_names list[str]

A list of node names that can be streamed.

required
lf_cb_handler CallbackHandler

A LangfuseCallbackHandler instance for tracking.

required
session_id str

A string representing the unique session identifier.

required
assistant_message Message

A chainlit Message instance for displaying responses.

required
Source code in src/podflix/utils/graph_runner.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(  # noqa: PLR0913
    self,
    graph: CompiledStateGraph,
    graph_inputs: dict,
    graph_streamable_node_names: list[str],
    lf_cb_handler: LangfuseCallbackHandler,
    session_id: str,
    assistant_message: cl.Message,
):
    """Initialize the GraphRunner class.

    Args:
        graph: A CompiledStateGraph instance representing the graph to be executed.
        graph_inputs: A dictionary containing the inputs for the graph.
        graph_streamable_node_names: A list of node names that can be streamed.
        lf_cb_handler: A LangfuseCallbackHandler instance for tracking.
        session_id: A string representing the unique session identifier.
        assistant_message: A chainlit Message instance for displaying responses.
    """
    self.graph = graph
    self.graph_inputs = graph_inputs
    self.graph_streamable_node_names = graph_streamable_node_names
    self.lf_cb_handler = lf_cb_handler
    self.session_id = session_id
    self.assistant_message = assistant_message

    self.run_id = None
run_graph async
run_graph()

Execute the graph asynchronously with the configured inputs.

This method sets up the runnable configuration with callbacks and streams the graph events to process the LLM responses.

Examples:

>>> runner = GraphRunner(graph, inputs, nodes, handler, "session1", message)
>>> await runner.run_graph()

Returns:

Type Description

None

Source code in src/podflix/utils/graph_runner.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def run_graph(self):
    """Execute the graph asynchronously with the configured inputs.

    This method sets up the runnable configuration with callbacks and streams
    the graph events to process the LLM responses.

    Examples:
        >>> runner = GraphRunner(graph, inputs, nodes, handler, "session1", message)
        >>> await runner.run_graph()

    Returns:
        None
    """
    graph_runnable_config = RunnableConfig(
        callbacks=[
            self.lf_cb_handler,
            cl.LangchainCallbackHandler(),
        ],
        recursion_limit=10,
        configurable={"session_id": self.session_id},
    )

    async for event in self.graph.astream_events(
        self.graph_inputs,
        config=graph_runnable_config,
        version="v2",
    ):
        await self.stream_llm_response(event)
stream_llm_response async
stream_llm_response(event)

Stream the LLM response to the assistant message.

Process the event data and stream tokens to the assistant message if the event comes from a streamable node.

Examples:

>>> event = {"event": "on_chat_model_stream", "data": {"chunk": chunk}}
>>> await runner.stream_llm_response(event)

Parameters:

Name Type Description Default
event dict

A dictionary containing the event data with the following structure: - event: The type of event (str) - metadata: Dictionary with event metadata - data: The event payload

required

Returns:

Type Description

None

Notes

The method updates the assistant_message.content when streaming tokens. It also captures the run_id for Langfuse tracking when the chain ends.

Source code in src/podflix/utils/graph_runner.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def stream_llm_response(self, event: dict):
    """Stream the LLM response to the assistant message.

    Process the event data and stream tokens to the assistant message if the
    event comes from a streamable node.

    Examples:
        >>> event = {"event": "on_chat_model_stream", "data": {"chunk": chunk}}
        >>> await runner.stream_llm_response(event)

    Args:
        event: A dictionary containing the event data with the following structure:
              - event: The type of event (str)
              - metadata: Dictionary with event metadata
              - data: The event payload

    Returns:
        None

    Notes:
        The method updates the assistant_message.content when streaming tokens.
        It also captures the run_id for Langfuse tracking when the chain ends.
    """
    event_kind = event["event"]
    langgraph_node = event["metadata"].get("langgraph_node", None)

    if event_kind == "on_chat_model_stream":
        if langgraph_node not in self.graph_streamable_node_names:
            return

        ai_message_chunk: AIMessageChunk = event["data"]["chunk"]
        ai_message_content = ai_message_chunk.content

        if ai_message_content:
            # NOTE: This automatically updates the assistant_message.content
            await self.assistant_message.stream_token(ai_message_content)

    # TODO: Find out more robust way to get run_id for langfuse
    if event["event"] == "on_chain_end":
        run_id = event.get("run_id")
        self.run_id = run_id
        logger.debug(f"Langfuse Run ID: {run_id}")

Hugginface related utilities.

download_gguf_hf

download_gguf_hf(repo_id, filename, revision='main')

Download a specific GGUF file from Hugging Face hub.

Examples:

>>> download_gguf_hf("Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-fp16.gguf")
'/path/to/downloaded/file.gguf'

Parameters:

Name Type Description Default
repo_id str

A string representing the Hugging Face repository ID.

required
filename str

A string representing the name of the GGUF file to download.

required
revision str

A string representing the specific model version to use.

'main'

Returns:

Name Type Description
str str

Local path to the downloaded file.

Source code in src/podflix/utils/hf_related.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def download_gguf_hf(
    repo_id: str,
    filename: str,
    revision: str = "main",
) -> str:
    """Download a specific GGUF file from Hugging Face hub.

    Examples:
        >>> download_gguf_hf("Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-fp16.gguf")
        '/path/to/downloaded/file.gguf'

    Args:
        repo_id: A string representing the Hugging Face repository ID.
        filename: A string representing the name of the GGUF file to download.
        revision: A string representing the specific model version to use.

    Returns:
        str: Local path to the downloaded file.
    """
    return hf_hub_download(
        repo_id=repo_id,
        filename=filename,
        revision=revision,
        local_dir=f"{env_settings.library_base_path}/deployment/models/GGUF",
        token=env_settings.hf_token,
    )

download_model_snapshot_hf

download_model_snapshot_hf(
    repo_id="Qwen/Qwen2.5-0.5B-Instruct", revision="main", ignore_patterns=None
)

Download a complete model snapshot from Hugging Face hub.

Examples:

>>> download_model_snapshot_hf("Qwen/Qwen2.5-0.5B-Instruct")
>>> download_model_snapshot_hf("Qwen/Qwen2.5-0.5B-Instruct", ignore_patterns=["*.pt"])

Parameters:

Name Type Description Default
repo_id str

A string representing the Hugging Face repository ID.

'Qwen/Qwen2.5-0.5B-Instruct'
revision str

A string representing the specific model version to use.

'main'
ignore_patterns list[str] | None

A list of patterns to ignore during download.

None

Returns:

Type Description
None

None

Source code in src/podflix/utils/hf_related.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def download_model_snapshot_hf(
    repo_id: str = "Qwen/Qwen2.5-0.5B-Instruct",
    revision: str = "main",
    ignore_patterns: list[str] | None = None,
) -> None:
    """Download a complete model snapshot from Hugging Face hub.

    Examples:
        >>> download_model_snapshot_hf("Qwen/Qwen2.5-0.5B-Instruct")
        >>> download_model_snapshot_hf("Qwen/Qwen2.5-0.5B-Instruct", ignore_patterns=["*.pt"])

    Args:
        repo_id: A string representing the Hugging Face repository ID.
        revision: A string representing the specific model version to use.
        ignore_patterns: A list of patterns to ignore during download.

    Returns:
        None
    """
    local_dir_name = repo_id.split("/")[1]

    if ignore_patterns is None:
        ignore_patterns = ["*.pt"]

    snapshot_download(
        repo_id=repo_id,
        revision=revision,
        local_dir=f"{env_settings.library_base_path}/deployment/models/{local_dir_name}",
        ignore_patterns=ignore_patterns,
        token=env_settings.hf_token,
    )

model

Model utilities.

get_chat_model

get_chat_model(model_name=None, chat_model_kwargs=None)

Create and configure a ChatOpenAI model instance.

Examples:

>>> model = get_chat_model()
>>> isinstance(model, ChatOpenAI)
True
>>> model = get_chat_model("gpt-4o-mini", {"temperature": 0.7})
>>> isinstance(model, ChatOpenAI)
True

Parameters:

Name Type Description Default
model_name str | None

The name of the model to use. If None, uses the default from env_settings.

None
chat_model_kwargs dict[Any] | None

Additional keyword arguments to pass to ChatOpenAI. Defaults to None.

None

Returns:

Type Description
ChatOpenAI

A configured ChatOpenAI model instance ready for use.

Source code in src/podflix/utils/model.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def get_chat_model(
    model_name: str | None = None,
    chat_model_kwargs: dict[Any] | None = None,
) -> ChatOpenAI:
    """Create and configure a ChatOpenAI model instance.

    Examples:
        >>> model = get_chat_model()
        >>> isinstance(model, ChatOpenAI)
        True
        >>> model = get_chat_model("gpt-4o-mini", {"temperature": 0.7})
        >>> isinstance(model, ChatOpenAI)
        True

    Args:
        model_name: The name of the model to use. If None, uses the default from env_settings.
        chat_model_kwargs: Additional keyword arguments to pass to ChatOpenAI. Defaults to None.

    Returns:
        A configured ChatOpenAI model instance ready for use.
    """
    if chat_model_kwargs is None:
        chat_model_kwargs = {}

    openai_api_base = f"{env_settings.model_api_base}/v1"

    if model_name is None:
        model_name = env_settings.model_name

    if env_settings.enable_openai_api is True:
        openai_api_key = env_settings.openai_api_key
    else:
        openai_api_key = "DUMMY_KEY"

    return ChatOpenAI(
        model_name=model_name,
        openai_api_base=openai_api_base,
        openai_api_key=openai_api_key,
        **chat_model_kwargs,
    )

get_mock_model

get_mock_model(message='MOCK MESSAGE')

Create a mock language model for testing purposes.

Examples:

>>> model = get_mock_model("Test response")
>>> isinstance(model, (FakeListChatModel, StrOutputParser))
True
>>> model = get_mock_model()
>>> isinstance(model, (FakeListChatModel, StrOutputParser))
True

Parameters:

Name Type Description Default
message str

The message to be returned by the mock model. Defaults to "MOCK MESSAGE".

'MOCK MESSAGE'

Returns:

Type Description
FakeListChatModel | StrOutputParser

A chain of FakeListChatModel and StrOutputParser that returns the specified message.

Source code in src/podflix/utils/model.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def get_mock_model(
    message: str = "MOCK MESSAGE",
) -> FakeListChatModel | StrOutputParser:
    """Create a mock language model for testing purposes.

    Examples:
        >>> model = get_mock_model("Test response")
        >>> isinstance(model, (FakeListChatModel, StrOutputParser))
        True
        >>> model = get_mock_model()
        >>> isinstance(model, (FakeListChatModel, StrOutputParser))
        True

    Args:
        message: The message to be returned by the mock model. Defaults to "MOCK MESSAGE".

    Returns:
        A chain of FakeListChatModel and StrOutputParser that returns the specified message.
    """
    model = FakeListChatModel(responses=[message])

    return model | StrOutputParser()

transcribe_audio_file

transcribe_audio_file(file, model_name=None, response_format='verbose_json')

Transcribe an audio file using OpenAI's Whisper model.

When using the verbose_json response format, the function returns the transcribed text with optional timestamps. Otherwise, it returns only the transcribed text.

Examples:

>>> with open('audio.mp3', 'rb') as f:
...     transcription = transcribe_audio_file(f)
>>> isinstance(transcription.text, str)
True
>>> transcription = transcribe_audio_file(Path('audio.mp3'))
>>> isinstance(transcription.text, str)
True

Parameters:

Name Type Description Default
file BinaryIO | Path

The audio file to transcribe. Can be a file object or Path.

required
model_name str | None

The name of the Whisper model to use. If None, uses the default from env_settings.

None
response_format AudioResponseFormat

The format of the response to return. Defaults to "verbose_json".

'verbose_json'

Returns:

Type Description
Transcription | TranscriptionVerbose

The transcribed text with optional timestamps from the audio file.

Source code in src/podflix/utils/model.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def transcribe_audio_file(
    file: BinaryIO | Path,
    model_name: str | None = None,
    response_format: AudioResponseFormat = "verbose_json",
) -> Transcription | TranscriptionVerbose:
    """Transcribe an audio file using OpenAI's Whisper model.

    When using the verbose_json response format, the function returns the transcribed text with optional timestamps.
    Otherwise, it returns only the transcribed text.

    Examples:
        >>> with open('audio.mp3', 'rb') as f:
        ...     transcription = transcribe_audio_file(f)
        >>> isinstance(transcription.text, str)
        True
        >>> transcription = transcribe_audio_file(Path('audio.mp3'))
        >>> isinstance(transcription.text, str)
        True

    Args:
        file: The audio file to transcribe. Can be a file object or Path.
        model_name: The name of the Whisper model to use. If None, uses the default from env_settings.
        response_format: The format of the response to return. Defaults to "verbose_json".

    Returns:
        The transcribed text with optional timestamps from the audio file.
    """
    if model_name is None:
        model_name = env_settings.whisper_model_name

    if env_settings.enable_openai_api is True:
        openai_api_key = env_settings.openai_api_key
    else:
        openai_api_key = "DUMMY_KEY"

    client = OpenAI(
        base_url=f"{env_settings.whisper_api_base}/v1", api_key=openai_api_key
    )

    if isinstance(file, Path):
        file = file.open("rb")

    try:
        return client.audio.transcriptions.create(
            model=model_name, file=file, response_format=response_format
        )
    finally:
        if isinstance(file, Path):
            file.close()