208 lines
7.1 KiB
PHP
208 lines
7.1 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Jobs;
|
|
|
|
use App\Models\OperationRun;
|
|
use App\Models\Workspace;
|
|
use App\Services\Audit\WorkspaceAuditLogger;
|
|
use App\Services\OperationRunService;
|
|
use App\Support\WorkspaceIsolation\TenantOwnedTables;
|
|
use Illuminate\Bus\Queueable;
|
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
|
use Illuminate\Database\QueryException;
|
|
use Illuminate\Foundation\Bus\Dispatchable;
|
|
use Illuminate\Queue\InteractsWithQueue;
|
|
use Illuminate\Queue\SerializesModels;
|
|
use Illuminate\Support\Facades\DB;
|
|
|
|
class BackfillWorkspaceIdsJob implements ShouldQueue
|
|
{
|
|
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
|
|
|
public function __construct(
|
|
public int $operationRunId,
|
|
public int $workspaceId,
|
|
public string $table,
|
|
public int $batchSize = 5000,
|
|
public ?int $maxRows = null,
|
|
public int $resumeFrom = 0,
|
|
) {}
|
|
|
|
public function handle(OperationRunService $operationRunService, WorkspaceAuditLogger $workspaceAuditLogger): void
|
|
{
|
|
if (! TenantOwnedTables::contains($this->table)) {
|
|
return;
|
|
}
|
|
|
|
$run = OperationRun::query()->find($this->operationRunId);
|
|
|
|
if (! $run instanceof OperationRun) {
|
|
return;
|
|
}
|
|
|
|
if ((int) $run->workspace_id !== $this->workspaceId) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
if ($run->status === 'queued') {
|
|
$operationRunService->updateRun($run, status: 'running');
|
|
}
|
|
|
|
$processed = 0;
|
|
$cursor = max(0, $this->resumeFrom);
|
|
$batchSize = max(1, $this->batchSize);
|
|
|
|
while (true) {
|
|
$ids = DB::table($this->table)
|
|
->join('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $this->table))
|
|
->whereNull(sprintf('%s.workspace_id', $this->table))
|
|
->where(sprintf('%s.id', $this->table), '>', $cursor)
|
|
->where('tenants.workspace_id', $this->workspaceId)
|
|
->orderBy(sprintf('%s.id', $this->table))
|
|
->limit($batchSize)
|
|
->pluck(sprintf('%s.id', $this->table))
|
|
->map(static fn (mixed $id): int => (int) $id)
|
|
->all();
|
|
|
|
if ($ids === []) {
|
|
break;
|
|
}
|
|
|
|
if ($this->maxRows !== null) {
|
|
$remaining = $this->maxRows - $processed;
|
|
|
|
if ($remaining <= 0) {
|
|
break;
|
|
}
|
|
|
|
if (count($ids) > $remaining) {
|
|
$ids = array_slice($ids, 0, $remaining);
|
|
}
|
|
}
|
|
|
|
$updated = DB::table($this->table)
|
|
->whereIn('id', $ids)
|
|
->whereNull('workspace_id')
|
|
->update(['workspace_id' => $this->workspaceId]);
|
|
|
|
$processed += $updated;
|
|
$cursor = max($ids);
|
|
|
|
if ($updated > 0) {
|
|
$operationRunService->incrementSummaryCounts($run, [
|
|
'processed' => $updated,
|
|
'succeeded' => $updated,
|
|
]);
|
|
}
|
|
|
|
$this->persistTableProgress($cursor, $updated);
|
|
|
|
if ($this->maxRows !== null && $processed >= $this->maxRows) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if ($this->remainingRows() === 0) {
|
|
$this->reconcileTableProgressWithPlannedTotal($operationRunService, $run, $cursor);
|
|
}
|
|
|
|
$run->refresh();
|
|
$statusBeforeCompletion = $run->status;
|
|
$operationRunService->maybeCompleteBulkRun($run);
|
|
$run->refresh();
|
|
|
|
if ($statusBeforeCompletion !== 'completed' && $run->status === 'completed') {
|
|
$workspace = Workspace::query()->find($this->workspaceId);
|
|
|
|
if ($workspace instanceof Workspace) {
|
|
$workspaceAuditLogger->log(
|
|
workspace: $workspace,
|
|
action: 'workspace_isolation.backfill_workspace_ids.completed',
|
|
context: [
|
|
'operation_run_id' => (int) $run->getKey(),
|
|
'table' => $this->table,
|
|
'outcome' => (string) $run->outcome,
|
|
],
|
|
status: (string) $run->outcome,
|
|
resourceType: 'operation_run',
|
|
resourceId: (string) $run->getKey(),
|
|
);
|
|
}
|
|
}
|
|
} catch (QueryException $e) {
|
|
$operationRunService->appendFailures($run, [
|
|
[
|
|
'code' => 'workspace_backfill.query_failed',
|
|
'message' => $e->getMessage(),
|
|
],
|
|
]);
|
|
|
|
$operationRunService->incrementSummaryCounts($run, ['failed' => 1]);
|
|
$operationRunService->maybeCompleteBulkRun($run);
|
|
|
|
throw $e;
|
|
}
|
|
}
|
|
|
|
private function remainingRows(): int
|
|
{
|
|
return (int) DB::table($this->table)
|
|
->join('tenants', 'tenants.id', '=', sprintf('%s.tenant_id', $this->table))
|
|
->whereNull(sprintf('%s.workspace_id', $this->table))
|
|
->where('tenants.workspace_id', $this->workspaceId)
|
|
->count();
|
|
}
|
|
|
|
private function reconcileTableProgressWithPlannedTotal(
|
|
OperationRunService $operationRunService,
|
|
OperationRun $run,
|
|
int $lastProcessedId,
|
|
): void {
|
|
$run->refresh();
|
|
|
|
$targetRows = (int) data_get($run->context, sprintf('table_progress.%s.target_rows', $this->table), 0);
|
|
$processedRows = (int) data_get($run->context, sprintf('table_progress.%s.processed', $this->table), 0);
|
|
$remaining = max(0, $targetRows - $processedRows);
|
|
|
|
if ($remaining <= 0) {
|
|
return;
|
|
}
|
|
|
|
$operationRunService->incrementSummaryCounts($run, [
|
|
'processed' => $remaining,
|
|
'succeeded' => $remaining,
|
|
]);
|
|
|
|
$this->persistTableProgress($lastProcessedId, $remaining);
|
|
}
|
|
|
|
private function persistTableProgress(int $lastProcessedId, int $processedDelta): void
|
|
{
|
|
DB::transaction(function () use ($lastProcessedId, $processedDelta): void {
|
|
$run = OperationRun::query()
|
|
->whereKey($this->operationRunId)
|
|
->lockForUpdate()
|
|
->first();
|
|
|
|
if (! $run instanceof OperationRun) {
|
|
return;
|
|
}
|
|
|
|
$context = is_array($run->context) ? $run->context : [];
|
|
$tableProgress = is_array(data_get($context, sprintf('table_progress.%s', $this->table)))
|
|
? data_get($context, sprintf('table_progress.%s', $this->table))
|
|
: [];
|
|
|
|
$tableProgress['last_processed_id'] = $lastProcessedId;
|
|
$tableProgress['processed'] = (int) ($tableProgress['processed'] ?? 0) + $processedDelta;
|
|
|
|
$context['table_progress'][$this->table] = $tableProgress;
|
|
|
|
$run->update(['context' => $context]);
|
|
});
|
|
}
|
|
}
|