TenantAtlas/app/Jobs/BackfillWorkspaceIdsJob.php
2026-02-14 23:08:26 +01:00

208 lines
7.1 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Jobs;
use App\Models\OperationRun;
use App\Models\Workspace;
use App\Services\Audit\WorkspaceAuditLogger;
use App\Services\OperationRunService;
use App\Support\WorkspaceIsolation\TenantOwnedTables;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\QueryException;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\DB;
class BackfillWorkspaceIdsJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
public int $operationRunId,
public int $workspaceId,
public string $table,
public int $batchSize = 5000,
public ?int $maxRows = null,
public int $resumeFrom = 0,
) {}
public function handle(OperationRunService $operationRunService, WorkspaceAuditLogger $workspaceAuditLogger): void
{
if (! TenantOwnedTables::contains($this->table)) {
return;
}
$run = OperationRun::query()->find($this->operationRunId);
if (! $run instanceof OperationRun) {
return;
}
if ((int) $run->workspace_id !== $this->workspaceId) {
return;
}
try {
if ($run->status === 'queued') {
$operationRunService->updateRun($run, status: 'running');
}
$processed = 0;
$cursor = max(0, $this->resumeFrom);
$batchSize = max(1, $this->batchSize);
while (true) {
$ids = DB::table($this->table)
->join('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $this->table))
->whereNull(sprintf('%s.workspace_id', $this->table))
->where(sprintf('%s.id', $this->table), '>', $cursor)
->where('tenants.workspace_id', $this->workspaceId)
->orderBy(sprintf('%s.id', $this->table))
->limit($batchSize)
->pluck(sprintf('%s.id', $this->table))
->map(static fn (mixed $id): int => (int) $id)
->all();
if ($ids === []) {
break;
}
if ($this->maxRows !== null) {
$remaining = $this->maxRows - $processed;
if ($remaining <= 0) {
break;
}
if (count($ids) > $remaining) {
$ids = array_slice($ids, 0, $remaining);
}
}
$updated = DB::table($this->table)
->whereIn('id', $ids)
->whereNull('workspace_id')
->update(['workspace_id' => $this->workspaceId]);
$processed += $updated;
$cursor = max($ids);
if ($updated > 0) {
$operationRunService->incrementSummaryCounts($run, [
'processed' => $updated,
'succeeded' => $updated,
]);
}
$this->persistTableProgress($cursor, $updated);
if ($this->maxRows !== null && $processed >= $this->maxRows) {
break;
}
}
if ($this->remainingRows() === 0) {
$this->reconcileTableProgressWithPlannedTotal($operationRunService, $run, $cursor);
}
$run->refresh();
$statusBeforeCompletion = $run->status;
$operationRunService->maybeCompleteBulkRun($run);
$run->refresh();
if ($statusBeforeCompletion !== 'completed' && $run->status === 'completed') {
$workspace = Workspace::query()->find($this->workspaceId);
if ($workspace instanceof Workspace) {
$workspaceAuditLogger->log(
workspace: $workspace,
action: 'workspace_isolation.backfill_workspace_ids.completed',
context: [
'operation_run_id' => (int) $run->getKey(),
'table' => $this->table,
'outcome' => (string) $run->outcome,
],
status: (string) $run->outcome,
resourceType: 'operation_run',
resourceId: (string) $run->getKey(),
);
}
}
} catch (QueryException $e) {
$operationRunService->appendFailures($run, [
[
'code' => 'workspace_backfill.query_failed',
'message' => $e->getMessage(),
],
]);
$operationRunService->incrementSummaryCounts($run, ['failed' => 1]);
$operationRunService->maybeCompleteBulkRun($run);
throw $e;
}
}
private function remainingRows(): int
{
return (int) DB::table($this->table)
->join('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $this->table))
->whereNull(sprintf('%s.workspace_id', $this->table))
->where('tenants.workspace_id', $this->workspaceId)
->count();
}
private function reconcileTableProgressWithPlannedTotal(
OperationRunService $operationRunService,
OperationRun $run,
int $lastProcessedId,
): void {
$run->refresh();
$targetRows = (int) data_get($run->context, sprintf('table_progress.%s.target_rows', $this->table), 0);
$processedRows = (int) data_get($run->context, sprintf('table_progress.%s.processed', $this->table), 0);
$remaining = max(0, $targetRows - $processedRows);
if ($remaining <= 0) {
return;
}
$operationRunService->incrementSummaryCounts($run, [
'processed' => $remaining,
'succeeded' => $remaining,
]);
$this->persistTableProgress($lastProcessedId, $remaining);
}
private function persistTableProgress(int $lastProcessedId, int $processedDelta): void
{
DB::transaction(function () use ($lastProcessedId, $processedDelta): void {
$run = OperationRun::query()
->whereKey($this->operationRunId)
->lockForUpdate()
->first();
if (! $run instanceof OperationRun) {
return;
}
$context = is_array($run->context) ? $run->context : [];
$tableProgress = is_array(data_get($context, sprintf('table_progress.%s', $this->table)))
? data_get($context, sprintf('table_progress.%s', $this->table))
: [];
$tableProgress['last_processed_id'] = $lastProcessedId;
$tableProgress['processed'] = (int) ($tableProgress['processed'] ?? 0) + $processedDelta;
$context['table_progress'][$this->table] = $tableProgress;
$run->update(['context' => $context]);
});
}
}