TenantAtlas/app/Jobs/FetchAssignmentsJob.php
ahmido 03127a670b Spec 096: Ops polish (assignment summaries + dedupe + reconcile tracking + seed DX) (#115)
Implements Spec 096 ops polish bundle:

- Persist durable OperationRun.summary_counts for assignment fetch/restore (final attempt wins)
- Server-side dedupe for assignment jobs (15-minute cooldown + non-canonical skip)
- Track ReconcileAdapterRunsJob via workspace-scoped OperationRun + stable failure codes + overlap prevention
- Seed DX: ensure seeded tenants use UUID v4 external_id and seed satisfies workspace_id NOT NULL constraints

Verification (local / evidence-based):
- `vendor/bin/sail artisan test --compact tests/Feature/Operations/AssignmentRunSummaryCountsTest.php tests/Feature/Operations/AssignmentJobDedupeTest.php tests/Feature/Operations/ReconcileAdapterRunsJobTrackingTest.php tests/Feature/Seed/PoliciesSeederExternalIdTest.php`
- `vendor/bin/sail bin pint --dirty`

Spec artifacts included under `specs/096-ops-polish-assignment-dedupe-system-tracking/` (spec/plan/tasks/checklists).

Co-authored-by: Ahmed Darrazi <ahmed.darrazi@live.de>
Reviewed-on: #115
2026-02-15 20:49:38 +00:00

362 lines
13 KiB
PHP

<?php
namespace App\Jobs;
use App\Models\BackupItem;
use App\Models\OperationRun;
use App\Models\Tenant;
use App\Models\User;
use App\Services\AssignmentBackupService;
use App\Services\OperationRunService;
use App\Support\OperationRunOutcome;
use App\Support\OperationRunStatus;
use App\Support\OpsUx\AssignmentJobFingerprint;
use App\Support\OpsUx\RunFailureSanitizer;
use App\Support\Providers\ProviderReasonCodes;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use RuntimeException;
class FetchAssignmentsJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
private const string OPERATION_TYPE = 'assignments.fetch';
private const int DEDUPE_COOLDOWN_MINUTES = 15;
private const int EXECUTION_LOCK_TTL_SECONDS = 900;
public ?OperationRun $operationRun = null;
/**
* The number of times the job may be attempted.
*/
public int $tries = 1;
/**
* The number of seconds to wait before retrying the job.
*/
public int $backoff = 0;
/**
* Create a new job instance.
*/
public function __construct(
public int $backupItemId,
public string $tenantExternalId,
public string $policyExternalId,
public array $policyPayload,
?OperationRun $operationRun = null,
) {
$this->operationRun = $operationRun;
}
public static function dispatchTracked(
BackupItem $backupItem,
array $policyPayload,
?User $initiator = null,
): OperationRun {
$tenant = $backupItem->tenant;
if (! $tenant instanceof Tenant) {
throw new RuntimeException('BackupItem tenant context is required to dispatch assignment fetch job.');
}
/** @var OperationRunService $operationRunService */
$operationRunService = app(OperationRunService::class);
$identityInputs = self::operationRunIdentityInputs(
tenantId: (int) $tenant->getKey(),
backupItemId: (int) $backupItem->getKey(),
tenantExternalId: (string) ($tenant->external_id ?? ''),
policyExternalId: (string) $backupItem->policy_identifier,
);
$run = $operationRunService->ensureRunWithIdentityCooldown(
tenant: $tenant,
type: self::OPERATION_TYPE,
identityInputs: $identityInputs,
context: self::operationRunContext($backupItem),
cooldownMinutes: self::DEDUPE_COOLDOWN_MINUTES,
initiator: $initiator,
);
if ($run->wasRecentlyCreated) {
dispatch(new self(
backupItemId: (int) $backupItem->getKey(),
tenantExternalId: (string) ($tenant->external_id ?? ''),
policyExternalId: (string) $backupItem->policy_identifier,
policyPayload: $policyPayload,
operationRun: $run,
));
}
return $run;
}
/**
* Execute the job.
*/
public function handle(
AssignmentBackupService $assignmentBackupService,
OperationRunService $operationRunService,
): void {
$backupItem = BackupItem::query()
->with('tenant')
->find($this->backupItemId);
if (! $backupItem instanceof BackupItem) {
Log::warning('FetchAssignmentsJob: BackupItem not found', [
'backup_item_id' => $this->backupItemId,
]);
return;
}
$tenant = $backupItem->tenant;
if (! $tenant instanceof Tenant) {
Log::warning('FetchAssignmentsJob: Tenant not found for BackupItem', [
'backup_item_id' => $this->backupItemId,
]);
return;
}
$fingerprint = AssignmentJobFingerprint::forFetch(
backupItemId: (int) $backupItem->getKey(),
tenantExternalId: $this->tenantExternalId,
policyExternalId: $this->policyExternalId,
);
$identityInputs = self::operationRunIdentityInputs(
tenantId: (int) $tenant->getKey(),
backupItemId: (int) $backupItem->getKey(),
tenantExternalId: $this->tenantExternalId,
policyExternalId: $this->policyExternalId,
);
if (! $this->operationRun instanceof OperationRun) {
$this->operationRun = $operationRunService->ensureRunWithIdentityCooldown(
tenant: $tenant,
type: self::OPERATION_TYPE,
identityInputs: $identityInputs,
context: self::operationRunContext($backupItem),
cooldownMinutes: self::DEDUPE_COOLDOWN_MINUTES,
);
}
$canonicalRun = $operationRunService->findCanonicalRunWithIdentity(
tenant: $tenant,
type: self::OPERATION_TYPE,
identityInputs: $identityInputs,
cooldownMinutes: self::DEDUPE_COOLDOWN_MINUTES,
);
if ($canonicalRun instanceof OperationRun && $canonicalRun->status === OperationRunStatus::Completed->value) {
Log::info('FetchAssignmentsJob: Skipping because identity is in cooldown window', [
'backup_item_id' => (int) $backupItem->getKey(),
'operation_run_id' => (int) $canonicalRun->getKey(),
]);
return;
}
if ($canonicalRun instanceof OperationRun
&& $this->operationRun instanceof OperationRun
&& (int) $canonicalRun->getKey() !== (int) $this->operationRun->getKey()
) {
Log::info('FetchAssignmentsJob: Skipping non-canonical execution', [
'backup_item_id' => (int) $backupItem->getKey(),
'canonical_run_id' => (int) $canonicalRun->getKey(),
'job_run_id' => (int) $this->operationRun->getKey(),
]);
return;
}
$run = $this->operationRun instanceof OperationRun ? $this->operationRun : $canonicalRun;
if (! $run instanceof OperationRun) {
return;
}
$executionIdentityKey = AssignmentJobFingerprint::executionIdentityKey(
jobType: self::OPERATION_TYPE,
tenantId: (int) $tenant->getKey(),
fingerprint: $fingerprint,
operationRunId: (int) $run->getKey(),
);
$lock = Cache::lock('assignment-job:'.$executionIdentityKey, self::EXECUTION_LOCK_TTL_SECONDS);
if (! $lock->get()) {
Log::info('FetchAssignmentsJob: Overlap prevented for duplicate execution', [
'backup_item_id' => (int) $backupItem->getKey(),
'operation_run_id' => (int) $run->getKey(),
'execution_identity' => $executionIdentityKey,
]);
return;
}
try {
if ($run->status !== OperationRunStatus::Completed->value) {
$operationRunService->updateRun($run, OperationRunStatus::Running->value);
}
if ($backupItem->policy_type !== 'settingsCatalogPolicy') {
Log::info('FetchAssignmentsJob: Skipping non-Settings Catalog policy', [
'backup_item_id' => (int) $backupItem->getKey(),
'policy_type' => (string) $backupItem->policy_type,
]);
$this->completeRun(
operationRunService: $operationRunService,
run: $run,
fetchFailed: false,
);
return;
}
$assignmentBackupService->enrichWithAssignments(
backupItem: $backupItem,
tenant: $tenant,
policyType: (string) $backupItem->policy_type,
policyId: (string) $backupItem->policy_identifier,
policyPayload: $this->policyPayload,
includeAssignments: true,
);
$backupItem->refresh();
$metadata = is_array($backupItem->metadata) ? $backupItem->metadata : [];
$fetchFailed = (bool) ($metadata['assignments_fetch_failed'] ?? false);
$reasonCandidate = $metadata['assignments_fetch_error_code']
?? $metadata['assignments_fetch_error']
?? ProviderReasonCodes::UnknownError;
$reasonCode = RunFailureSanitizer::normalizeReasonCode(
$this->normalizeReasonCandidate($reasonCandidate),
);
$this->completeRun(
operationRunService: $operationRunService,
run: $run,
fetchFailed: $fetchFailed,
failures: $fetchFailed
? [[
'code' => 'assignments.fetch_failed',
'reason_code' => $reasonCode,
'message' => (string) ($metadata['assignments_fetch_error'] ?? 'Assignments fetch failed.'),
]]
: [],
);
Log::info('FetchAssignmentsJob: Successfully processed', [
'backup_item_id' => (int) $backupItem->getKey(),
'operation_run_id' => (int) $run->getKey(),
'assignment_count' => $backupItem->getAssignmentCountAttribute(),
'fetch_failed' => $fetchFailed,
]);
} catch (\Throwable $throwable) {
Log::error('FetchAssignmentsJob: Failed to enrich BackupItem', [
'backup_item_id' => (int) $backupItem->getKey(),
'operation_run_id' => (int) $run->getKey(),
'error' => $throwable->getMessage(),
]);
$safeMessage = RunFailureSanitizer::sanitizeMessage($throwable->getMessage());
$operationRunService->updateRun(
$run,
status: OperationRunStatus::Completed->value,
outcome: OperationRunOutcome::Failed->value,
summaryCounts: [
'total' => 1,
'processed' => 0,
'failed' => 1,
],
failures: [[
'code' => 'assignments.fetch_failed',
'reason_code' => RunFailureSanitizer::normalizeReasonCode($throwable->getMessage()),
'message' => $safeMessage !== '' ? $safeMessage : 'Assignments fetch failed.',
]],
);
} finally {
$lock->release();
}
}
/**
* @return array<string, int|string>
*/
private static function operationRunIdentityInputs(
int $tenantId,
int $backupItemId,
string $tenantExternalId,
string $policyExternalId,
): array {
return [
'tenant_id' => $tenantId,
'job_type' => self::OPERATION_TYPE,
'fingerprint' => AssignmentJobFingerprint::forFetch(
backupItemId: $backupItemId,
tenantExternalId: $tenantExternalId,
policyExternalId: $policyExternalId,
),
];
}
/**
* @return array<string, int|string|null>
*/
private static function operationRunContext(BackupItem $backupItem): array
{
return [
'backup_set_id' => is_numeric($backupItem->backup_set_id) ? (int) $backupItem->backup_set_id : null,
'backup_item_id' => (int) $backupItem->getKey(),
'policy_id' => is_numeric($backupItem->policy_id) ? (int) $backupItem->policy_id : null,
'policy_identifier' => (string) $backupItem->policy_identifier,
];
}
/**
* @param array<int, array{code:string, reason_code:string, message:string}> $failures
*/
private function completeRun(
OperationRunService $operationRunService,
OperationRun $run,
bool $fetchFailed,
array $failures = [],
): void {
$operationRunService->updateRun(
$run,
status: OperationRunStatus::Completed->value,
outcome: $fetchFailed ? OperationRunOutcome::Failed->value : OperationRunOutcome::Succeeded->value,
summaryCounts: [
'total' => 1,
'processed' => $fetchFailed ? 0 : 1,
'failed' => $fetchFailed ? 1 : 0,
],
failures: $failures,
);
}
private function normalizeReasonCandidate(mixed $reasonCandidate): string
{
if (is_string($reasonCandidate) && trim($reasonCandidate) !== '') {
return trim($reasonCandidate);
}
return ProviderReasonCodes::UnknownError;
}
}