table)) { return; } $run = OperationRun::query()->find($this->operationRunId); if (! $run instanceof OperationRun) { return; } if ((int) $run->workspace_id !== $this->workspaceId) { return; } try { if ($run->status === 'queued') { $operationRunService->updateRun($run, status: 'running'); } $processed = 0; $cursor = max(0, $this->resumeFrom); $batchSize = max(1, $this->batchSize); while (true) { $ids = DB::table($this->table) ->join('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $this->table)) ->whereNull(sprintf('%s.workspace_id', $this->table)) ->where(sprintf('%s.id', $this->table), '>', $cursor) ->where('tenants.workspace_id', $this->workspaceId) ->orderBy(sprintf('%s.id', $this->table)) ->limit($batchSize) ->pluck(sprintf('%s.id', $this->table)) ->map(static fn (mixed $id): int => (int) $id) ->all(); if ($ids === []) { break; } if ($this->maxRows !== null) { $remaining = $this->maxRows - $processed; if ($remaining <= 0) { break; } if (count($ids) > $remaining) { $ids = array_slice($ids, 0, $remaining); } } $updated = DB::table($this->table) ->whereIn('id', $ids) ->whereNull('workspace_id') ->update(['workspace_id' => $this->workspaceId]); $processed += $updated; $cursor = max($ids); if ($updated > 0) { $operationRunService->incrementSummaryCounts($run, [ 'processed' => $updated, 'succeeded' => $updated, ]); } $this->persistTableProgress($cursor, $updated); if ($this->maxRows !== null && $processed >= $this->maxRows) { break; } } if ($this->remainingRows() === 0) { $this->reconcileTableProgressWithPlannedTotal($operationRunService, $run, $cursor); } $run->refresh(); $statusBeforeCompletion = $run->status; $operationRunService->maybeCompleteBulkRun($run); $run->refresh(); if ($statusBeforeCompletion !== 'completed' && $run->status === 'completed') { $workspace = Workspace::query()->find($this->workspaceId); if ($workspace instanceof Workspace) { $workspaceAuditLogger->log( workspace: $workspace, action: 'workspace_isolation.backfill_workspace_ids.completed', context: [ 'operation_run_id' => (int) $run->getKey(), 'table' => $this->table, 'outcome' => (string) $run->outcome, ], status: (string) $run->outcome, resourceType: 'operation_run', resourceId: (string) $run->getKey(), ); } } } catch (QueryException $e) { $operationRunService->appendFailures($run, [ [ 'code' => 'workspace_backfill.query_failed', 'message' => $e->getMessage(), ], ]); $operationRunService->incrementSummaryCounts($run, ['failed' => 1]); $operationRunService->maybeCompleteBulkRun($run); throw $e; } } private function remainingRows(): int { return (int) DB::table($this->table) ->join('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $this->table)) ->whereNull(sprintf('%s.workspace_id', $this->table)) ->where('tenants.workspace_id', $this->workspaceId) ->count(); } private function reconcileTableProgressWithPlannedTotal( OperationRunService $operationRunService, OperationRun $run, int $lastProcessedId, ): void { $run->refresh(); $targetRows = (int) data_get($run->context, sprintf('table_progress.%s.target_rows', $this->table), 0); $processedRows = (int) data_get($run->context, sprintf('table_progress.%s.processed', $this->table), 0); $remaining = max(0, $targetRows - $processedRows); if ($remaining <= 0) { return; } $operationRunService->incrementSummaryCounts($run, [ 'processed' => $remaining, 'succeeded' => $remaining, ]); $this->persistTableProgress($lastProcessedId, $remaining); } private function persistTableProgress(int $lastProcessedId, int $processedDelta): void { DB::transaction(function () use ($lastProcessedId, $processedDelta): void { $run = OperationRun::query() ->whereKey($this->operationRunId) ->lockForUpdate() ->first(); if (! $run instanceof OperationRun) { return; } $context = is_array($run->context) ? $run->context : []; $tableProgress = is_array(data_get($context, sprintf('table_progress.%s', $this->table))) ? data_get($context, sprintf('table_progress.%s', $this->table)) : []; $tableProgress['last_processed_id'] = $lastProcessedId; $tableProgress['processed'] = (int) ($tableProgress['processed'] ?? 0) + $processedDelta; $context['table_progress'][$this->table] = $tableProgress; $run->update(['context' => $context]); }); } }