Skip to content

Source Code Reference

This page contains the complete API reference for Tramlines, automatically generated from the source code.

Core Components

CLI Module

tramlines.cli

Tramlines CLI Entrypoint.

This module contains the command-line interface for running the Tramlines proxy. It handles argument parsing, configuration loading, and server initialization.

Classes

Functions

app() -> None

CLI entrypoint for running the Tramlines proxy server.

Source code in src/tramlines/cli.py
def app() -> None:  # pragma: no cover
    """CLI entrypoint for running the Tramlines proxy server."""

    available_policies = _discover_policies()

    parser = argparse.ArgumentParser(
        description="Tramlines MCP Proxy (GuardedFastMCPProxy)"
    )
    parser.add_argument(
        "--config-path",
        type=str,
        help="Path to JSON configuration file (alternative to MCP_CONFIG env var)",
    )
    parser.add_argument(
        "--policy-path",
        type=str,
        default="",
        help="Path to a custom Python policy file (*.py)",
    )
    parser.add_argument(
        "--use-policy",
        nargs="*",
        default=[],
        help=f"Names of built-in policies to use (e.g., {' '.join(available_policies.keys())})",
    )
    parser.add_argument(
        "--list-policies",
        action="store_true",
        help="List all available built-in policies and exit",
    )
    parser.add_argument(
        "--disable-tools",
        nargs="*",
        default=[],
        help="List of tool names to disable at discovery",
    )
    args = parser.parse_args()

    if args.list_policies:
        _list_policies(available_policies)

    # Load MCP configuration from file or environment
    mcp_config = _load_mcp_config(args.config_path)
    logger.info(f"CONFIG_LOADED | MCP Config: {mcp_config}")

    # Show disabled tools info
    if args.disable_tools:
        print(
            f"🚫 Disabling {len(args.disable_tools)} tools: {args.disable_tools}",
            file=sys.stderr,
        )

    # Load and combine selected policies
    policy = _combine_policies(
        custom_policy_path=args.policy_path,
        use_policies=args.use_policy,
        available_policies=available_policies,
    )

    # Create the guarded proxy directly
    proxy = create_guarded_proxy(
        mcp_config=mcp_config, policy=policy, disabled_tools=args.disable_tools
    )

    print("🚀 Tramlines Proxy Ready", file=sys.stderr)
    print("   Transport: stdio", file=sys.stderr)
    proxy.run(transport="stdio")

Logger

tramlines.logger

Middleware

tramlines.middleware

Classes

SessionManager(max_calls_per_session: int = 30, cleanup_hours: int = 24)

Manages session-based call histories with automatic cleanup.

Source code in src/tramlines/middleware.py
def __init__(self, max_calls_per_session: int = 30, cleanup_hours: int = 24):
    self.max_calls_per_session = max_calls_per_session
    self.cleanup_interval = timedelta(hours=cleanup_hours)
    self.histories: Dict[str, CallHistory] = {}
    self.last_cleanup = datetime.now()
Functions
get_session_id() -> str

Get session ID from FastMCP context with fallbacks.

Source code in src/tramlines/middleware.py
def get_session_id(self) -> str:
    """Get session ID from FastMCP context with fallbacks."""
    try:
        context = get_context()
        return context.session_id or context.client_id or "default_session"
    except RuntimeError:
        return "fallback_session"
get_history(session_id: str) -> CallHistory

Get or create call history for session.

Source code in src/tramlines/middleware.py
def get_history(self, session_id: str) -> CallHistory:
    """Get or create call history for session."""
    if session_id not in self.histories:
        self.histories[session_id] = CallHistory(
            max_calls=self.max_calls_per_session
        )
        logger.debug(
            f"SESSION_CREATE | session_id={session_id} | Creating new call history"
        )
    return self.histories[session_id]
cleanup_stale_sessions() -> None

Remove sessions inactive beyond cleanup interval.

Source code in src/tramlines/middleware.py
def cleanup_stale_sessions(self) -> None:
    """Remove sessions inactive beyond cleanup interval."""
    if datetime.now() - self.last_cleanup < self.cleanup_interval:
        return

    cutoff = datetime.now() - self.cleanup_interval
    stale = [
        sid
        for sid, hist in self.histories.items()
        if not hist.calls or hist.calls[-1].timestamp < cutoff
    ]

    for session_id in stale:
        del self.histories[session_id]

    self.last_cleanup = datetime.now()
stats() -> dict

Get session statistics.

Source code in src/tramlines/middleware.py
def stats(self) -> dict:
    """Get session statistics."""
    return {
        "active_sessions": len(self.histories),
        "total_calls": sum(len(h.calls) for h in self.histories.values()),
        "max_calls_per_session": self.max_calls_per_session,
    }

GuardRailMiddleware(policy: Policy | None = None, disabled_tools: list[str] | None = None, **kwargs)

Bases: Middleware

Unified middleware that handles security policies, performance monitoring, and call history tracking in a single clean implementation.

Now with session-based call history management for multi-user support.

Source code in src/tramlines/middleware.py
def __init__(
    self,
    policy: Policy | None = None,
    disabled_tools: list[str] | None = None,
    **kwargs,
):
    self.policy = policy
    self.disabled_tools = set(disabled_tools or [])
    self.sessions = SessionManager(**kwargs)
Functions
on_call_tool(context: MiddlewareContext[mt.CallToolRequestParams], call_next) -> mt.CallToolResult async

Handle tool call with security and tracking.

Source code in src/tramlines/middleware.py
async def on_call_tool(
    self, context: MiddlewareContext[mt.CallToolRequestParams], call_next
) -> mt.CallToolResult:
    """Handle tool call with security and tracking."""
    session_id = self.sessions.get_session_id()
    history = self.sessions.get_history(session_id)
    self.sessions.cleanup_stale_sessions()

    tool_call = ToolCall(
        name=context.message.name, arguments=context.message.arguments or {}
    )
    history.add_call(tool_call)

    # Step 1: Pre-execution guardrail evaluation (only if policy exists)
    if self.policy:
        result = evaluate_call(self.policy, history)

        if result.is_blocked:
            tool_call.status = CallStatus.BLOCK
            raise ToolError(f"Tool blocked by policy: {result.message}")

    # Step 2: Execute the tool
    start_time = time.time()
    try:
        call_result = await call_next(context)
    except Exception:
        tool_call.status = CallStatus.BLOCK
        raise
    finally:
        tool_call.execution_duration = round((time.time() - start_time) * 1000, 3)

    # Step 3: If all checks passed, mark as allowed and return result
    tool_call.status = CallStatus.ALLOW
    return call_result  # type: ignore[no-any-return]
on_list_tools(context: MiddlewareContext[mt.ListToolsRequest], call_next) async

Filter disabled tools.

Source code in src/tramlines/middleware.py
async def on_list_tools(
    self, context: MiddlewareContext[mt.ListToolsRequest], call_next
):
    """Filter disabled tools."""
    result = await call_next(context)
    return (
        [tool for tool in result if tool.name not in self.disabled_tools]
        if self.disabled_tools
        else result
    )
get_session_stats() -> dict

Get session statistics.

Source code in src/tramlines/middleware.py
def get_session_stats(self) -> dict:
    """Get session statistics."""
    return self.sessions.stats()

Functions

Proxy

tramlines.proxy

Classes

Functions

create_guarded_proxy(mcp_config: dict[str, Any], policy: Policy | None = None, disabled_tools: list[str] = []) -> FastMCP

Create a FastMCP proxy with unified security middleware.

Parameters:

Name Type Description Default
mcp_config dict[str, Any]

MCP server configuration

required
policy Policy | None

Optional security policy to enforce

None
disabled_tools list[str]

List of tools to disable

[]

Returns:

Type Description
FastMCP

FastMCP proxy server with security middleware applied

Source code in src/tramlines/proxy.py
def create_guarded_proxy(
    mcp_config: dict[str, Any],
    policy: Policy | None = None,
    disabled_tools: list[str] = [],
) -> FastMCP:
    """
    Create a FastMCP proxy with unified security middleware.

    Args:
        mcp_config: MCP server configuration
        policy: Optional security policy to enforce
        disabled_tools: List of tools to disable

    Returns:
        FastMCP proxy server with security middleware applied
    """
    client: Client = Client(mcp_config)

    # Create the base proxy
    proxy = FastMCP.as_proxy(client)

    # Add single unified middleware
    guard_rail_middleware = GuardRailMiddleware(
        policy=policy, disabled_tools=disabled_tools
    )
    proxy.add_middleware(guard_rail_middleware)

    # Log initialization
    logger.info("GUARD_PROXY_INIT | Initializing unified middleware proxy")
    logger.info(
        f"GUARD_PROXY_INIT | MCP servers: {len(mcp_config.get('mcpServers', {}))}"
    )
    logger.info(f"GUARD_PROXY_INIT | Guard policy loaded: {policy is not None}")
    logger.info(f"GUARD_PROXY_INIT | Disabled tools: {len(disabled_tools)}")

    return proxy

Session

tramlines.session

Classes

CallStatus

Bases: Enum

The final status of a tool call after evaluation.

ToolCall(name: str, arguments: dict[str, Any], timestamp: datetime = datetime.now(), status: CallStatus = CallStatus.ALLOW, execution_duration: float | None = None) dataclass

Tool call data for policy evaluation and history tracking.

CallHistory(max_calls: int = 100)

Session call history with automatic size management.

Source code in src/tramlines/session.py
def __init__(self, max_calls: int = 100) -> None:
    self.calls: list[ToolCall] = []
    self._max_calls = max_calls
Functions
add_call(call: ToolCall) -> None

Add tool call to history with automatic cleanup.

Source code in src/tramlines/session.py
def add_call(self, call: ToolCall) -> None:
    """Add tool call to history with automatic cleanup."""
    self.calls.append(call)
    if len(self.calls) > self._max_calls:
        self.calls = self.calls[-self._max_calls :]

Guardrail System

DSL Context

tramlines.guardrail.dsl.context

Classes

DSL Evaluator

tramlines.guardrail.dsl.evaluator

Classes

EvaluationResult(action_type: ActionType, violated_rule: str | None = None, message: str | None = None) dataclass

Result of a guardrail policy evaluation for a single tool call.

Attributes
is_allowed: bool property

Check if the action is allowed.

is_blocked: bool property

Check if the action blocks the tool call.

Functions

load_policy_from_file(policy_file: str) -> Policy

Load a security policy from a Python file.

The policy file should define a module-level variable named 'policy' that contains a Policy instance.

Parameters:

Name Type Description Default
policy_file str

Path to the Python policy file

required

Returns:

Name Type Description
Policy Policy

The loaded policy object

Raises:

Type Description
FileNotFoundError

If the policy file doesn't exist

ValueError

If the policy file doesn't contain a valid policy

ImportError

If the policy file has import errors

Source code in src/tramlines/guardrail/dsl/evaluator.py
def load_policy_from_file(policy_file: str) -> Policy:
    """
    Load a security policy from a Python file.

    The policy file should define a module-level variable named 'policy'
    that contains a Policy instance.

    Args:
        policy_file: Path to the Python policy file

    Returns:
        Policy: The loaded policy object

    Raises:
        FileNotFoundError: If the policy file doesn't exist
        ValueError: If the policy file doesn't contain a valid policy
        ImportError: If the policy file has import errors
    """
    policy_path = Path(policy_file)

    if not policy_path.exists():
        raise FileNotFoundError(f"Policy file not found: {policy_path}")

    if not policy_path.suffix == ".py":
        raise ValueError(f"Policy file must have .py extension: {policy_path}")

    # Generate a unique module name to avoid conflicts
    module_name = f"policy_module_{policy_path.stem}_{id(policy_path)}"

    try:
        # Load the module dynamically
        spec = importlib.util.spec_from_file_location(module_name, policy_path)
        if spec is None or spec.loader is None:
            raise ImportError(f"Could not load module spec from {policy_path}")

        module = importlib.util.module_from_spec(spec)

        # Add to sys.modules temporarily to support relative imports
        sys.modules[module_name] = module

        try:
            spec.loader.exec_module(module)
        finally:
            # Clean up sys.modules
            if module_name in sys.modules:
                del sys.modules[module_name]

        # Extract the policy object
        if not hasattr(module, "policy"):
            raise ValueError(
                f"Policy file {policy_path} must define a module-level variable named 'policy'"
            )

        policy = getattr(module, "policy")

        # Validate the policy object
        if not isinstance(policy, Policy):
            raise ValueError(
                f"The 'policy' variable in {policy_path} must be an instance of Policy, "
                f"got {type(policy)}"
            )

        logger.info(
            f"POLICY_LOAD | Successfully loaded policy '{policy.name}' "
            f"with {len(policy.rules)} rules from {policy_path}"
        )

        return policy

    except Exception as e:
        logger.error(
            f"POLICY_LOAD_ERROR | Failed to load policy from {policy_path}: {e}"
        )
        raise

evaluate_call(policy: Policy, history: CallHistory) -> EvaluationResult

Evaluates guardrail rules for a given tool call.

Source code in src/tramlines/guardrail/dsl/evaluator.py
def evaluate_call(policy: Policy, history: CallHistory) -> EvaluationResult:
    """Evaluates guardrail rules for a given tool call."""
    if not history:
        raise ValueError("Call history cannot be empty.")

    call = history[-1]

    for rule in policy.rules:
        try:
            if rule.condition(call, history):
                if rule.action_type == ActionType.BLOCK:
                    # Block actions are final
                    return EvaluationResult(
                        action_type=ActionType.BLOCK,
                        violated_rule=rule.name,
                        message=rule.message,
                    )
                elif rule.action_type == ActionType.ALLOW:
                    # Allow actions stop processing for this phase
                    return EvaluationResult(action_type=ActionType.ALLOW)

        except Exception as e:
            logger.error(f"GUARDRAIL_ERROR | Error evaluating rule '{rule.name}': {e}")
            # Decide on a default behavior for errors, e.g., fail-safe (block)
            # For now, we'll log and continue, which is fail-open
            continue

    # If no rule was triggered, default to allow
    return EvaluationResult(action_type=ActionType.ALLOW)

DSL Predicates

tramlines.guardrail.dsl.predicates

Classes

BasePredicate

Bases: Predicate, ABC

Base implementation for all predicates with logical operators.

ValueBuilder(extractor: Callable[[ToolCall, CallHistory], T | None])

Bases: Generic[T]

Creates predicates when comparison operators are used.

Source code in src/tramlines/guardrail/dsl/predicates.py
def __init__(self, extractor: Callable[[ToolCall, CallHistory], T | None]):
    self._extractor = extractor

StringValueBuilder(extractor: Callable[[ToolCall, CallHistory], T | None])

Bases: ValueBuilder[str]

Value builder with string-specific methods.

Source code in src/tramlines/guardrail/dsl/predicates.py
def __init__(self, extractor: Callable[[ToolCall, CallHistory], T | None]):
    self._extractor = extractor

ComparisonPredicate(extractor: Callable[[ToolCall, CallHistory], T | None], comparison: Callable[[T, Any], bool], target: Any)

Bases: BasePredicate, Generic[T]

Predicate for comparing extracted values.

Source code in src/tramlines/guardrail/dsl/predicates.py
def __init__(
    self,
    extractor: Callable[[ToolCall, CallHistory], T | None],
    comparison: Callable[[T, Any], bool],
    target: Any,
):
    self._extractor = extractor
    self._comparison = comparison
    self._target = target

HistoryQueryBuilder(pattern: str | Pattern[str])

Simplified history query builder.

Source code in src/tramlines/guardrail/dsl/predicates.py
def __init__(self, pattern: str | Pattern[str]):
    self._pattern = re.compile(pattern) if isinstance(pattern, str) else pattern
    self._condition: Predicate | None = None
Functions
where(condition: Predicate) -> HistoryQueryBuilder

Filter historical calls with a condition.

Source code in src/tramlines/guardrail/dsl/predicates.py
def where(self, condition: Predicate) -> HistoryQueryBuilder:
    """Filter historical calls with a condition."""
    self._condition = condition
    return self
exists() -> Predicate

Check if any matching calls exist.

Source code in src/tramlines/guardrail/dsl/predicates.py
def exists(self) -> Predicate:
    """Check if any matching calls exist."""
    return HistoryExistsPredicate(self._pattern, self._condition)
count(within: str | None = None) -> ValueBuilder[int]

Count matching calls, optionally within a time window.

Source code in src/tramlines/guardrail/dsl/predicates.py
def count(self, within: str | None = None) -> ValueBuilder[int]:
    """Count matching calls, optionally within a time window."""
    time_delta = _parse_time_window(within) if within else None

    def counter(call: ToolCall, history: CallHistory) -> int:
        cutoff = datetime.now() - time_delta if time_delta else None
        count = 0
        for past_call in history.calls:
            # Check time window if applicable
            if cutoff and (not past_call.timestamp or past_call.timestamp < cutoff):
                continue

            # Check name pattern
            if self._pattern.search(past_call.name):
                # Check extra condition if applicable
                if self._condition is None or self._condition(past_call, history):
                    count += 1
        return count

    return ValueBuilder(counter)
last() -> HistoricalCallBuilder

Get the most recent matching call.

Source code in src/tramlines/guardrail/dsl/predicates.py
def last(self) -> HistoricalCallBuilder:
    """Get the most recent matching call."""
    return HistoricalCallBuilder(self._pattern, self._condition, reverse=True)
first() -> HistoricalCallBuilder

Get the oldest matching call.

Source code in src/tramlines/guardrail/dsl/predicates.py
def first(self) -> HistoricalCallBuilder:
    """Get the oldest matching call."""
    return HistoricalCallBuilder(self._pattern, self._condition, reverse=False)

HistoryExistsPredicate(pattern: Pattern[str], condition: Predicate | None)

Bases: BasePredicate

Check if historical calls matching a pattern exist.

Source code in src/tramlines/guardrail/dsl/predicates.py
def __init__(self, pattern: Pattern[str], condition: Predicate | None):
    self._pattern = pattern
    self._condition = condition

HistoricalCallBuilder(pattern: Pattern[str], condition: Predicate | None, reverse: bool)

A builder that provides access to a specific historical call's properties.

Source code in src/tramlines/guardrail/dsl/predicates.py
def __init__(
    self, pattern: Pattern[str], condition: Predicate | None, reverse: bool
):
    self._pattern = pattern
    self._condition = condition
    self._reverse = reverse
Attributes
name: StringValueBuilder property

Get the name of the historical call.

Functions
arg(key: str) -> StringValueBuilder

Get an argument from the historical call.

Source code in src/tramlines/guardrail/dsl/predicates.py
def arg(self, key: str) -> StringValueBuilder:
    """Get an argument from the historical call."""

    def extractor(call: ToolCall, hist: CallHistory) -> str | None:
        match = self._find_matching_call(call, hist)
        if match:
            return match.arguments.get(key)
        return None

    return StringValueBuilder(extractor)

CustomPredicate(func: Callable[[ToolCall, CallHistory], bool])

Bases: BasePredicate

A wrapper for a raw Python function to be used as a predicate.

Source code in src/tramlines/guardrail/dsl/predicates.py
def __init__(self, func: Callable[[ToolCall, CallHistory], bool]):
    self._func = func

Functions

custom(func: Callable[[ToolCall, CallHistory], bool]) -> Predicate

Provides a clean escape hatch to use a raw Python function for complex logic that cannot be expressed by the declarative DSL.

The function must have the signature: my_function(call: ToolCall, history: CallHistory) -> bool

Parameters:

Name Type Description Default
func Callable[[ToolCall, CallHistory], bool]

The Python function to wrap in a predicate.

required

Returns:

Type Description
Predicate

A Predicate instance that can be used in a .when() clause.

Source code in src/tramlines/guardrail/dsl/predicates.py
def custom(func: Callable[[ToolCall, CallHistory], bool]) -> Predicate:
    """
    Provides a clean escape hatch to use a raw Python function for complex logic
    that cannot be expressed by the declarative DSL.

    The function must have the signature:
    `my_function(call: ToolCall, history: CallHistory) -> bool`

    Args:
        func: The Python function to wrap in a predicate.

    Returns:
        A Predicate instance that can be used in a .when() clause.
    """
    return CustomPredicate(func)

DSL Rules

tramlines.guardrail.dsl.rules

Classes

RuleBuilder(name: str)

A builder for creating a Rule object in a fluent, step-by-step manner.

Source code in src/tramlines/guardrail/dsl/rules.py
def __init__(self, name: str):
    self._name = name
    self._condition: Predicate | None = None
Functions
when(condition: Predicate) -> RuleBuilder

Sets the condition (a Predicate) for this rule to trigger.

Parameters:

Name Type Description Default
condition Predicate

The predicate that defines the logic of the rule.

required

Returns:

Type Description
RuleBuilder

The RuleBuilder instance for chaining.

Source code in src/tramlines/guardrail/dsl/rules.py
def when(self, condition: Predicate) -> RuleBuilder:
    """
    Sets the condition (a Predicate) for this rule to trigger.

    Args:
        condition: The predicate that defines the logic of the rule.

    Returns:
        The RuleBuilder instance for chaining.
    """
    self._condition = condition
    return self
block(message: str) -> Rule

Finalizes the rule with a BLOCK action. If the condition is met, the tool call will be blocked.

Source code in src/tramlines/guardrail/dsl/rules.py
def block(self, message: str) -> Rule:
    """
    Finalizes the rule with a BLOCK action.
    If the condition is met, the tool call will be blocked.
    """
    return Rule(
        name=self._name,
        condition=self._ensure_condition(),
        action_type=ActionType.BLOCK,
        message=message,
    )
allow() -> Rule

Finalizes the rule with an ALLOW action. If the condition is met, evaluation of other rules in the same phase stops.

Source code in src/tramlines/guardrail/dsl/rules.py
def allow(self) -> Rule:
    """
    Finalizes the rule with an ALLOW action.
    If the condition is met, evaluation of other rules in the same phase stops.
    """
    return Rule(
        name=self._name,
        condition=self._ensure_condition(),
        action_type=ActionType.ALLOW,
    )

Functions

rule(name: str) -> RuleBuilder

The entry point for creating a new security rule.

Parameters:

Name Type Description Default
name str

A descriptive name for the rule, used for logging.

required

Returns:

Type Description
RuleBuilder

A RuleBuilder instance to define the rule's condition and action.

Source code in src/tramlines/guardrail/dsl/rules.py
def rule(name: str) -> RuleBuilder:
    """
    The entry point for creating a new security rule.

    Args:
        name: A descriptive name for the rule, used for logging.

    Returns:
        A RuleBuilder instance to define the rule's condition and action.
    """
    return RuleBuilder(name)

DSL Testing

tramlines.guardrail.dsl.testing

Classes

Functions

assert_allowed(result: EvaluationResult)

Asserts that a tool call was allowed. Raises an AssertionError with a descriptive message if blocked.

Source code in src/tramlines/guardrail/dsl/testing.py
def assert_allowed(result: EvaluationResult):
    """
    Asserts that a tool call was allowed.
    Raises an AssertionError with a descriptive message if blocked.
    """
    if result.is_blocked:
        raise AssertionError(
            f"Expected call to be ALLOWED, but it was BLOCKED by rule "
            f"'{result.violated_rule}': {result.message}"
        )

assert_blocked(result: EvaluationResult, by_rule: str | None = None) -> None

Asserts that a tool call was blocked.

Parameters:

Name Type Description Default
result EvaluationResult

The EvaluationResult from a test run.

required
by_rule str | None

If provided, also asserts that the block was triggered by the rule with this specific name.

None

Raises:

Type Description
AssertionError

If the call was allowed or blocked by a different rule.

Source code in src/tramlines/guardrail/dsl/testing.py
def assert_blocked(result: EvaluationResult, by_rule: str | None = None) -> None:
    """
    Asserts that a tool call was blocked.

    Args:
        result: The EvaluationResult from a test run.
        by_rule: If provided, also asserts that the block was triggered by the
                 rule with this specific name.

    Raises:
        AssertionError: If the call was allowed or blocked by a different rule.
    """
    if result.is_allowed:
        raise AssertionError("Expected call to be BLOCKED, but it was ALLOWED.")

    if by_rule and result.violated_rule != by_rule:
        raise AssertionError(
            f"Expected block by rule '{by_rule}', but was blocked by "
            f"'{result.violated_rule}' instead."
        )

simulate_calls(policy_under_test: Policy, calls: list[ToolCall]) -> EvaluationResult

Simulates a sequence of tool calls against a policy and returns the final result.

This function simulates what happens in a real session - each call is evaluated as it arrives, with full knowledge of previous calls. If any call is blocked, that result is returned immediately.

Parameters:

Name Type Description Default
policy_under_test Policy

The policy to test against.

required
calls list[ToolCall]

List of tool calls to simulate in sequence.

required

Returns:

Type Description
EvaluationResult

The final EvaluationResult - either the first blocked call or the last call's result.

Source code in src/tramlines/guardrail/dsl/testing.py
def simulate_calls(
    policy_under_test: Policy, calls: list[ToolCall]
) -> EvaluationResult:
    """
    Simulates a sequence of tool calls against a policy and returns the final result.

    This function simulates what happens in a real session - each call is evaluated
    as it arrives, with full knowledge of previous calls. If any call is blocked,
    that result is returned immediately.

    Args:
        policy_under_test: The policy to test against.
        calls: List of tool calls to simulate in sequence.

    Returns:
        The final EvaluationResult - either the first blocked call or the last call's result.
    """
    history = CallHistory()

    for call in calls:
        history.add_call(call)
        result = evaluate_call(policy_under_test, history)

        # If this call gets blocked, return immediately
        if result.is_blocked:
            return result

    # If we made it through all calls, return the final result
    return result

DSL Types

tramlines.guardrail.dsl.types

Classes

ActionType

Bases: Enum

The type of action a rule can take.

Predicate

Bases: Protocol

A protocol defining the structure of a predicate. It's a callable that evaluates a condition against a tool call and its history.

Rule(name: str, condition: Predicate, action_type: ActionType, message: str | None = None) dataclass

A single, immutable security rule. It consists of a name, a condition (predicate), and the action to take.

Policy(name: str, rules: List[Rule] = list(), description: str | None = None) dataclass

A collection of rules that are evaluated in order.

Extensions

Encoding Detector

tramlines.guardrail.extensions.encoding_detector

Encoding Detection Extension

Detects potentially encoded or obfuscated content that could be used to bypass security checks.

Functions

detect_encoding(text: str) -> bool

Detects suspicious encoding or obfuscation in text.

Parameters:

Name Type Description Default
text str

String to analyze for encoding patterns

required

Returns:

Name Type Description
bool bool

True if encoding detected, False if safe

Source code in src/tramlines/guardrail/extensions/encoding_detector.py
def detect_encoding(text: str) -> bool:
    """
    Detects suspicious encoding or obfuscation in text.

    Args:
        text: String to analyze for encoding patterns

    Returns:
        bool: True if encoding detected, False if safe
    """
    if not text or not text.strip():
        return False

    content = str(text).strip()

    # Check for base64 patterns - but be more specific
    # Must be longer and not contain common words/domains
    base64_pattern = r"[A-Za-z0-9+/]{16,}={0,2}"
    base64_matches = re.findall(base64_pattern, content)
    if base64_matches:
        # Filter out common false positives like domain names, URLs
        for match in base64_matches:
            # Skip if it looks like a domain or URL component
            if any(
                word in match.lower()
                for word in ["com", "org", "net", "example", "api", "http", "www"]
            ):
                continue
            # Skip short matches that might be legitimate tokens
            if len(match) < 20:
                continue
            return True

    # Check for hex encoding - require multiple consecutive hex sequences
    hex_patterns = [
        r"\\x[0-9A-Fa-f]{2}",  # \x hex escapes
        r"%[0-9A-Fa-f]{2}",  # URL-style hex
        r"&#x[0-9A-Fa-f]+;",  # HTML hex entities
    ]
    for pattern in hex_patterns:
        matches = re.findall(pattern, content)
        # Only flag if there are multiple consecutive hex sequences
        if len(matches) >= 3:
            return True

    # Check for excessive URL encoding (more than 8 encoded chars)
    url_encoded = re.findall(r"%[0-9A-Fa-f]{2}", content)
    if len(url_encoded) >= 8:
        return True

    # Check for unicode escape sequences (multiple required)
    unicode_matches = re.findall(r"\\u[0-9A-Fa-f]{4}", content)
    if len(unicode_matches) >= 3:
        return True

    # Check for excessive non-ASCII characters (but allow normal international text)
    non_ascii_count = sum(1 for char in content if ord(char) > 127)
    if len(content) > 20 and non_ascii_count / len(content) > 0.5:
        return True

    return False

PII Detector

tramlines.guardrail.extensions.pii_detector

PII Detection Extension

Uses Microsoft Presidio for detecting personally identifiable information (PII). Provides comprehensive detection of emails, phone numbers, credit cards, SSNs, and more.

Functions

detect_pii(text: str) -> bool

Detects personally identifiable information in text.

Parameters:

Name Type Description Default
text str

String to analyze for PII

required

Returns:

Name Type Description
bool bool

True if PII detected, False if safe

Source code in src/tramlines/guardrail/extensions/pii_detector.py
def detect_pii(text: str) -> bool:
    """
    Detects personally identifiable information in text.

    Args:
        text: String to analyze for PII

    Returns:
        bool: True if PII detected, False if safe
    """
    if not text or not text.strip():
        return False

    # Return False if analyzer couldn't be initialized
    if _analyzer is None:
        return False

    try:
        results = _analyzer.analyze(
            text=text,
            entities=[
                "PHONE_NUMBER",
                "EMAIL_ADDRESS",
                "CREDIT_CARD",
                "IBAN_CODE",
                "IP_ADDRESS",
                "PERSON",
                "LOCATION",
                "ORGANIZATION",
                "US_SSN",
                "US_DRIVER_LICENSE",
                "US_PASSPORT",
                "US_BANK_NUMBER",
                "DATE_TIME",
                "MEDICAL_LICENSE",
                "URL",
                "CRYPTO",
            ],
            language="en",
        )

        return len(results) > 0
    except Exception:
        return False

Prompt Detector

tramlines.guardrail.extensions.prompt_detector

Prompt Injection Detection Extension

Uses LlamaFirewall's PromptGuard for detecting jailbreak attempts and prompt injections.

Functions

detect_prompt(text: str) -> bool

Detects prompt injection attacks in text.

Parameters:

Name Type Description Default
text str

String to analyze for prompt injection

required

Returns:

Name Type Description
bool bool

True if prompt injection detected, False if safe

Source code in src/tramlines/guardrail/extensions/prompt_detector.py
def detect_prompt(text: str) -> bool:
    """
    Detects prompt injection attacks in text.

    Args:
        text: String to analyze for prompt injection

    Returns:
        bool: True if prompt injection detected, False if safe
    """
    if not text or not text.strip():
        return False

    # Return False if firewall couldn't be initialized
    if _firewall is None:
        return False

    try:
        message = UserMessage(content=text)
        result = _firewall.scan(message)

        return result.decision == LlamaDecision.BLOCK  # type: ignore[no-any-return]

    except Exception:
        return False

Regex Detector

tramlines.guardrail.extensions.regex_detector

Regex Threat Detection Extension

Uses LlamaFirewall's RegexScanner for pattern-based threat detection.

Functions

detect_regex(text: str) -> bool

Detect potential regex-based threats in text using LlamaFirewall's RegexScanner.

Parameters:

Name Type Description Default
text str

The text to analyze

required

Returns:

Type Description
bool

True if potential threats are detected, False otherwise

Source code in src/tramlines/guardrail/extensions/regex_detector.py
def detect_regex(text: str) -> bool:
    """
    Detect potential regex-based threats in text using LlamaFirewall's RegexScanner.

    Args:
        text: The text to analyze

    Returns:
        True if potential threats are detected, False otherwise
    """
    if not text or not text.strip():
        return False

    if _scanner is None:
        # Gracefully handle missing dependencies
        return False

    try:
        # Create a user message and scan it using asyncio.run() as in official example
        message = UserMessage(text)
        result = asyncio.run(_scanner.scan(message))

        # Return True if scanner decided to block
        return bool(result.decision == LlamaDecision.BLOCK)
    except Exception:
        # Return False on any scanning errors to avoid blocking valid content
        return False

Security Policies

Block PII in Tool Args

tramlines.guardrail.policies.block_pii_in_tool_args

Generic PII Detection Guardrail

This policy scans all string arguments in every tool call for Personally Identifiable Information (PII) and blocks the call if any is found.

Classes

Functions

Block Regex Patterns

tramlines.guardrail.policies.block_regex_patterns

Known Pattern Detection Guardrail

This policy scans all string arguments in every tool call for known malicious or sensitive patterns using a regex scanner and blocks the call if any are found.

Classes

Functions

GitHub Enforce Single Repo

tramlines.guardrail.policies.github_enforce_single_repo

GitHub Single-Repository-Per-Session Policy

This policy restricts tool access to a single GitHub repository per session. Once a GitHub tool accesses a specific owner/repo combination, all subsequent GitHub operations in that session must use the same owner/repo or be blocked.

Classes

Functions

Heroku Enforce Single App

tramlines.guardrail.policies.heroku_enforce_single_app

Heroku Single-App-Per-Session Policy

This policy restricts tool access to a single Heroku app per session. Once a Heroku tool accesses a specific app ID or name, all subsequent Heroku operations in that session must use the same app or be blocked.

Classes

Functions

Linear Enforce Single Team

tramlines.guardrail.policies.linear_enforce_single_team

Linear Single-Team-Per-Session Policy

This policy restricts tool access to a single Linear team per session. Once a Linear tool accesses a specific team ID, all subsequent Linear operations in that session must use the same team ID or be blocked.

Classes

Functions

Notion Enforce Single Page

tramlines.guardrail.policies.notion_enforce_single_page

Notion Single-Page-Per-Session Policy

This policy restricts tool access to a single Notion page per session. Once a Notion tool accesses a specific page ID, all subsequent Notion operations in that session must use the same page ID or be blocked.

Classes

Functions