Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,7 @@ def plan_builder(
# This ensures that no models outside the impacted sub-DAG(s) will be backfilled unexpectedly.
backfill_models = modified_model_names or None

plan_execution_time = execution_time or now()
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan_execution_time = execution_time or now() will ignore valid falsy execution_time values (e.g., epoch millis 0) and silently replace them with now(). Use an explicit None check (e.g., execution_time if execution_time is not None else now()) to preserve caller-provided values.

Suggested change
plan_execution_time = execution_time or now()
plan_execution_time = execution_time if execution_time is not None else now()

Copilot uses AI. Check for mistakes.
max_interval_end_per_model = None
default_start, default_end = None, None
if not run:
Expand All @@ -1680,17 +1681,21 @@ def plan_builder(
max_interval_end_per_model,
backfill_models,
modified_model_names,
execution_time or now(),
plan_execution_time,
)

# Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model.
self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values())
max_interval_end_per_model = self._filter_stale_end_overrides(
max_interval_end_per_model,
context_diff.snapshots_by_name,
)

start_override_per_model = self._calculate_start_override_per_model(
min_intervals,
start or default_start,
end or default_end,
execution_time or now(),
plan_execution_time,
backfill_models,
snapshots,
max_interval_end_per_model,
Expand Down Expand Up @@ -3181,6 +3186,20 @@ def _get_max_interval_end_per_model(
).items()
}

@staticmethod
def _filter_stale_end_overrides(
max_interval_end_per_model: t.Dict[str, datetime],
snapshots_by_name: t.Dict[str, Snapshot],
) -> t.Dict[str, datetime]:
# Drop stale interval ends for snapshots whose new versions have no intervals yet. Otherwise the old
# prod end is reused as an end_override, causing missing_intervals() to skip the new snapshot entirely
# when the requested start is newer than that stale end.
return {
model_fqn: end
for model_fqn, end in max_interval_end_per_model.items()
if model_fqn not in snapshots_by_name or snapshots_by_name[model_fqn].intervals
}

@staticmethod
def _get_models_for_interval_end(
snapshots: t.Dict[str, Snapshot], backfill_models: t.Set[str]
Expand Down
80 changes: 79 additions & 1 deletion tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
from sqlmesh.core.plan.definition import Plan
from sqlmesh.core.macros import MacroEvaluator, RuntimeStage
from sqlmesh.core.model import load_sql_based_model, model, SqlModel, Model
from sqlmesh.core.model import SeedModel, load_sql_based_model, model, SqlModel, Model
from sqlmesh.core.model.common import ParsableSql
from sqlmesh.core.model.cache import OptimizedQueryCache
from sqlmesh.core.renderer import render_statements
Expand Down Expand Up @@ -1223,6 +1223,84 @@ def test_plan_seed_model_excluded_from_default_end(copy_to_temp_path: t.Callable
context.close()


@pytest.mark.slow
def test_seed_model_pr_plan_filters_stale_end_override(
copy_to_temp_path: t.Callable, mocker: MockerFixture
):
path = copy_to_temp_path("examples/sushi")
Comment on lines +1226 to +1230
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description/test plan references test_seed_model_pr_plan_with_stale_prod_intervals, but the added regression test is named test_seed_model_pr_plan_filters_stale_end_override. Consider updating the PR description/test plan (or renaming the test) so the suggested -k selector matches.

Copilot uses AI. Check for mistakes.

with time_machine.travel("2024-06-01 00:00:00 UTC"):
context = Context(paths=path, gateway="duckdb_persistent")
context.plan("prod", no_prompts=True, auto_apply=True)
context.close()

with time_machine.travel("2026-04-13 00:00:00 UTC"):
context = Context(paths=path, gateway="duckdb_persistent")

model = t.cast(SeedModel, context.get_model("sushi.waiter_names").copy())
model.seed.content += "10,Trey\n"
context.upsert_model(model)
context.upsert_model(
load_sql_based_model(
parse(
"""
MODEL (
name sushi.waiter_rollup,
kind FULL,
cron '@daily'
);

SELECT waiter_id, waiter_name, event_date
FROM sushi.waiter_as_customer_by_day
"""
),
default_catalog=context.default_catalog,
)
)

waiter_as_customer_by_day = context.get_snapshot(
"sushi.waiter_as_customer_by_day", raise_if_missing=True
)
orders = context.get_snapshot("sushi.orders", raise_if_missing=True)
original_get_max_interval_end_per_model = context._get_max_interval_end_per_model

def _mocked_max_interval_end_per_model(
snapshots: t.Dict[str, t.Any], backfill_models: t.Optional[t.Set[str]]
) -> t.Dict[str, datetime]:
result = original_get_max_interval_end_per_model(snapshots, backfill_models)
# Keep the overall plan end recent via another affected model while making the old prod end for
# waiter_as_customer_by_day older than the PR start. Without filtering, that stale end_override
# causes the new waiter_as_customer_by_day snapshot to be skipped and waiter_rollup fails when it
# references the missing physical table.
result[waiter_as_customer_by_day.name] = to_datetime("2026-01-01")
result[orders.name] = to_datetime("2026-04-13")
return result

mocker.patch.object(
context,
"_get_max_interval_end_per_model",
side_effect=_mocked_max_interval_end_per_model,
)

plan = context.plan("dev", start="2 months ago", no_prompts=True)
missing_interval_names = {si.snapshot_id.name for si in plan.missing_intervals}

assert plan.user_provided_flags == {"start": "2 months ago"}
assert to_timestamp(plan.start) == to_timestamp("2026-02-13")
assert to_timestamp(plan.end) == to_timestamp("2026-04-13")
assert any("waiter_as_customer_by_day" in name for name in missing_interval_names)
assert any("waiter_rollup" in name for name in missing_interval_names)

context.apply(plan)

environment = context.state_sync.get_environment("dev")
assert environment is not None
promoted_snapshot_names = {snapshot.name for snapshot in environment.promoted_snapshots}
assert any("waiter_as_customer_by_day" in name for name in promoted_snapshot_names)
assert any("waiter_rollup" in name for name in promoted_snapshot_names)
context.close()


@pytest.mark.slow
def test_schema_error_no_default(sushi_context_pre_scheduling) -> None:
context = sushi_context_pre_scheduling
Expand Down
Loading