resolveTables(); if ($tables === []) { return self::FAILURE; } $batchSize = max(1, (int) $this->option('batch-size')); $resumeFrom = max(0, (int) $this->option('resume-from')); $maxRows = $this->normalizeMaxRows(); $dryRun = (bool) $this->option('dry-run'); $lock = Cache::lock('tenantpilot:backfill-workspace-ids', 900); if (! $lock->get()) { $this->error('Another workspace backfill is already running.'); return self::FAILURE; } try { $tableStats = $this->collectTableStats($tables); $this->table( ['Table', 'Missing workspace_id', 'Unresolvable tenant mapping', 'Sample row ids'], array_map(static function (array $stats): array { return [ $stats['table'], $stats['missing'], $stats['unresolvable'], $stats['sample_ids'] === [] ? '-' : implode(',', $stats['sample_ids']), ]; }, $tableStats), ); $unresolvable = array_values(array_filter($tableStats, static fn (array $stats): bool => $stats['unresolvable'] > 0)); if ($unresolvable !== []) { foreach ($unresolvable as $stats) { $this->error(sprintf( 'Unresolvable tenant->workspace mapping in %s (%d rows). Sample ids: %s', $stats['table'], $stats['unresolvable'], $stats['sample_ids'] === [] ? '-' : implode(',', $stats['sample_ids']), )); } return self::FAILURE; } if ($dryRun) { $this->info('Dry-run complete. No changes written.'); return self::SUCCESS; } $workspaceWorkloads = $this->collectWorkspaceWorkloads($tables, $maxRows); if ($workspaceWorkloads === []) { $this->info('No rows require workspace_id backfill.'); return self::SUCCESS; } $dispatchedJobs = 0; foreach ($workspaceWorkloads as $workspaceId => $workload) { $workspace = Workspace::query()->find($workspaceId); if (! $workspace instanceof Workspace) { continue; } $run = $operationRunService->ensureWorkspaceRunWithIdentity( workspace: $workspace, type: 'workspace_isolation_backfill_workspace_ids', identityInputs: [ 'tables' => array_keys($workload['tables']), ], context: [ 'source' => 'tenantpilot:backfill-workspace-ids', 'workspace_id' => (int) $workspace->getKey(), 'batch_size' => $batchSize, 'max_rows' => $maxRows, 'resume_from' => $resumeFrom, 'tables' => array_keys($workload['tables']), ], ); if (! $run->wasRecentlyCreated) { $this->line(sprintf( 'Workspace %d already has an active backfill run (#%d).', (int) $workspace->getKey(), (int) $run->getKey(), )); continue; } $tableProgress = []; foreach ($workload['tables'] as $table => $count) { $tableProgress[$table] = [ 'target_rows' => (int) $count, 'processed' => 0, 'last_processed_id' => $resumeFrom, ]; } $context = is_array($run->context) ? $run->context : []; $context['table_progress'] = $tableProgress; $run->update([ 'context' => $context, 'summary_counts' => [ 'total' => (int) $workload['total'], 'processed' => 0, 'succeeded' => 0, 'failed' => 0, ], ]); $operationRunService->updateRun($run, status: 'running'); $workspaceAuditLogger->log( workspace: $workspace, action: 'workspace_isolation.backfill_workspace_ids.started', context: [ 'operation_run_id' => (int) $run->getKey(), 'tables' => array_keys($workload['tables']), 'planned_rows' => (int) $workload['total'], 'batch_size' => $batchSize, ], status: 'success', resourceType: 'operation_run', resourceId: (string) $run->getKey(), ); $workspaceJobs = 0; foreach ($workload['tables'] as $table => $tableRows) { if ($tableRows <= 0) { continue; } BackfillWorkspaceIdsJob::dispatch( operationRunId: (int) $run->getKey(), workspaceId: (int) $workspace->getKey(), table: $table, batchSize: $batchSize, maxRows: $maxRows, resumeFrom: $resumeFrom, ); $workspaceJobs++; $dispatchedJobs++; } $workspaceAuditLogger->log( workspace: $workspace, action: 'workspace_isolation.backfill_workspace_ids.dispatched', context: [ 'operation_run_id' => (int) $run->getKey(), 'jobs_dispatched' => $workspaceJobs, 'tables' => array_keys($workload['tables']), ], status: 'success', resourceType: 'operation_run', resourceId: (string) $run->getKey(), ); $this->line(sprintf( 'Workspace %d run #%d queued (%d job(s)).', (int) $workspace->getKey(), (int) $run->getKey(), $workspaceJobs, )); } $this->info(sprintf('Backfill jobs dispatched: %d', $dispatchedJobs)); return self::SUCCESS; } finally { $lock->release(); } } /** * @return array */ private function resolveTables(): array { $selectedTable = $this->option('table'); if (! is_string($selectedTable) || trim($selectedTable) === '') { return TenantOwnedTables::all(); } $selectedTable = trim($selectedTable); if (! TenantOwnedTables::contains($selectedTable)) { $this->error(sprintf('Unknown tenant-owned table: %s', $selectedTable)); return []; } return [$selectedTable]; } private function normalizeMaxRows(): ?int { $maxRows = $this->option('max-rows'); if (! is_numeric($maxRows)) { return null; } $maxRows = (int) $maxRows; return $maxRows > 0 ? $maxRows : null; } /** * @param array $tables * @return array}> */ private function collectTableStats(array $tables): array { $stats = []; foreach ($tables as $table) { $missing = (int) DB::table($table)->whereNull('workspace_id')->count(); $unresolvableQuery = DB::table($table) ->leftJoin('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $table)) ->whereNull(sprintf('%s.workspace_id', $table)) ->where(function ($query): void { $query->whereNull('tenants.id') ->orWhereNull('tenants.workspace_id'); }); $unresolvable = (int) $unresolvableQuery->count(); $sampleIds = DB::table($table) ->leftJoin('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $table)) ->whereNull(sprintf('%s.workspace_id', $table)) ->where(function ($query): void { $query->whereNull('tenants.id') ->orWhereNull('tenants.workspace_id'); }) ->orderBy(sprintf('%s.id', $table)) ->limit(5) ->pluck(sprintf('%s.id', $table)) ->map(static fn (mixed $id): int => (int) $id) ->values() ->all(); $stats[] = [ 'table' => $table, 'missing' => $missing, 'unresolvable' => $unresolvable, 'sample_ids' => $sampleIds, ]; } return $stats; } /** * @param array $tables * @return array}> */ private function collectWorkspaceWorkloads(array $tables, ?int $maxRows): array { $workloads = []; foreach ($tables as $table) { $rows = DB::table($table) ->join('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $table)) ->whereNull(sprintf('%s.workspace_id', $table)) ->whereNotNull('tenants.workspace_id') ->selectRaw('tenants.workspace_id as workspace_id, COUNT(*) as row_count') ->groupBy('tenants.workspace_id') ->get(); foreach ($rows as $row) { $workspaceId = (int) $row->workspace_id; if ($workspaceId <= 0) { continue; } $rowCount = (int) $row->row_count; if ($maxRows !== null) { $rowCount = min($rowCount, $maxRows); } if ($rowCount <= 0) { continue; } if (! isset($workloads[$workspaceId])) { $workloads[$workspaceId] = [ 'total' => 0, 'tables' => [], ]; } $workloads[$workspaceId]['tables'][$table] = $rowCount; $workloads[$workspaceId]['total'] += $rowCount; } } return $workloads; } }