Implements Spec 096 ops polish bundle: - Persist durable OperationRun.summary_counts for assignment fetch/restore (final attempt wins) - Server-side dedupe for assignment jobs (15-minute cooldown + non-canonical skip) - Track ReconcileAdapterRunsJob via workspace-scoped OperationRun + stable failure codes + overlap prevention - Seed DX: ensure seeded tenants use UUID v4 external_id and seed satisfies workspace_id NOT NULL constraints Verification (local / evidence-based): - `vendor/bin/sail artisan test --compact tests/Feature/Operations/AssignmentRunSummaryCountsTest.php tests/Feature/Operations/AssignmentJobDedupeTest.php tests/Feature/Operations/ReconcileAdapterRunsJobTrackingTest.php tests/Feature/Seed/PoliciesSeederExternalIdTest.php` - `vendor/bin/sail bin pint --dirty` Spec artifacts included under `specs/096-ops-polish-assignment-dedupe-system-tracking/` (spec/plan/tasks/checklists). Co-authored-by: Ahmed Darrazi <ahmed.darrazi@live.de> Reviewed-on: #115
362 lines
13 KiB
PHP
362 lines
13 KiB
PHP
<?php
|
|
|
|
namespace App\Jobs;
|
|
|
|
use App\Models\BackupItem;
|
|
use App\Models\OperationRun;
|
|
use App\Models\Tenant;
|
|
use App\Models\User;
|
|
use App\Services\AssignmentBackupService;
|
|
use App\Services\OperationRunService;
|
|
use App\Support\OperationRunOutcome;
|
|
use App\Support\OperationRunStatus;
|
|
use App\Support\OpsUx\AssignmentJobFingerprint;
|
|
use App\Support\OpsUx\RunFailureSanitizer;
|
|
use App\Support\Providers\ProviderReasonCodes;
|
|
use Illuminate\Bus\Queueable;
|
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
|
use Illuminate\Foundation\Bus\Dispatchable;
|
|
use Illuminate\Queue\InteractsWithQueue;
|
|
use Illuminate\Queue\SerializesModels;
|
|
use Illuminate\Support\Facades\Cache;
|
|
use Illuminate\Support\Facades\Log;
|
|
use RuntimeException;
|
|
|
|
class FetchAssignmentsJob implements ShouldQueue
|
|
{
|
|
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
|
|
|
private const string OPERATION_TYPE = 'assignments.fetch';
|
|
|
|
private const int DEDUPE_COOLDOWN_MINUTES = 15;
|
|
|
|
private const int EXECUTION_LOCK_TTL_SECONDS = 900;
|
|
|
|
public ?OperationRun $operationRun = null;
|
|
|
|
/**
|
|
* The number of times the job may be attempted.
|
|
*/
|
|
public int $tries = 1;
|
|
|
|
/**
|
|
* The number of seconds to wait before retrying the job.
|
|
*/
|
|
public int $backoff = 0;
|
|
|
|
/**
|
|
* Create a new job instance.
|
|
*/
|
|
public function __construct(
|
|
public int $backupItemId,
|
|
public string $tenantExternalId,
|
|
public string $policyExternalId,
|
|
public array $policyPayload,
|
|
?OperationRun $operationRun = null,
|
|
) {
|
|
$this->operationRun = $operationRun;
|
|
}
|
|
|
|
public static function dispatchTracked(
|
|
BackupItem $backupItem,
|
|
array $policyPayload,
|
|
?User $initiator = null,
|
|
): OperationRun {
|
|
$tenant = $backupItem->tenant;
|
|
|
|
if (! $tenant instanceof Tenant) {
|
|
throw new RuntimeException('BackupItem tenant context is required to dispatch assignment fetch job.');
|
|
}
|
|
|
|
/** @var OperationRunService $operationRunService */
|
|
$operationRunService = app(OperationRunService::class);
|
|
|
|
$identityInputs = self::operationRunIdentityInputs(
|
|
tenantId: (int) $tenant->getKey(),
|
|
backupItemId: (int) $backupItem->getKey(),
|
|
tenantExternalId: (string) ($tenant->external_id ?? ''),
|
|
policyExternalId: (string) $backupItem->policy_identifier,
|
|
);
|
|
|
|
$run = $operationRunService->ensureRunWithIdentityCooldown(
|
|
tenant: $tenant,
|
|
type: self::OPERATION_TYPE,
|
|
identityInputs: $identityInputs,
|
|
context: self::operationRunContext($backupItem),
|
|
cooldownMinutes: self::DEDUPE_COOLDOWN_MINUTES,
|
|
initiator: $initiator,
|
|
);
|
|
|
|
if ($run->wasRecentlyCreated) {
|
|
dispatch(new self(
|
|
backupItemId: (int) $backupItem->getKey(),
|
|
tenantExternalId: (string) ($tenant->external_id ?? ''),
|
|
policyExternalId: (string) $backupItem->policy_identifier,
|
|
policyPayload: $policyPayload,
|
|
operationRun: $run,
|
|
));
|
|
}
|
|
|
|
return $run;
|
|
}
|
|
|
|
/**
|
|
* Execute the job.
|
|
*/
|
|
public function handle(
|
|
AssignmentBackupService $assignmentBackupService,
|
|
OperationRunService $operationRunService,
|
|
): void {
|
|
$backupItem = BackupItem::query()
|
|
->with('tenant')
|
|
->find($this->backupItemId);
|
|
|
|
if (! $backupItem instanceof BackupItem) {
|
|
Log::warning('FetchAssignmentsJob: BackupItem not found', [
|
|
'backup_item_id' => $this->backupItemId,
|
|
]);
|
|
|
|
return;
|
|
}
|
|
|
|
$tenant = $backupItem->tenant;
|
|
|
|
if (! $tenant instanceof Tenant) {
|
|
Log::warning('FetchAssignmentsJob: Tenant not found for BackupItem', [
|
|
'backup_item_id' => $this->backupItemId,
|
|
]);
|
|
|
|
return;
|
|
}
|
|
|
|
$fingerprint = AssignmentJobFingerprint::forFetch(
|
|
backupItemId: (int) $backupItem->getKey(),
|
|
tenantExternalId: $this->tenantExternalId,
|
|
policyExternalId: $this->policyExternalId,
|
|
);
|
|
|
|
$identityInputs = self::operationRunIdentityInputs(
|
|
tenantId: (int) $tenant->getKey(),
|
|
backupItemId: (int) $backupItem->getKey(),
|
|
tenantExternalId: $this->tenantExternalId,
|
|
policyExternalId: $this->policyExternalId,
|
|
);
|
|
|
|
if (! $this->operationRun instanceof OperationRun) {
|
|
$this->operationRun = $operationRunService->ensureRunWithIdentityCooldown(
|
|
tenant: $tenant,
|
|
type: self::OPERATION_TYPE,
|
|
identityInputs: $identityInputs,
|
|
context: self::operationRunContext($backupItem),
|
|
cooldownMinutes: self::DEDUPE_COOLDOWN_MINUTES,
|
|
);
|
|
}
|
|
|
|
$canonicalRun = $operationRunService->findCanonicalRunWithIdentity(
|
|
tenant: $tenant,
|
|
type: self::OPERATION_TYPE,
|
|
identityInputs: $identityInputs,
|
|
cooldownMinutes: self::DEDUPE_COOLDOWN_MINUTES,
|
|
);
|
|
|
|
if ($canonicalRun instanceof OperationRun && $canonicalRun->status === OperationRunStatus::Completed->value) {
|
|
Log::info('FetchAssignmentsJob: Skipping because identity is in cooldown window', [
|
|
'backup_item_id' => (int) $backupItem->getKey(),
|
|
'operation_run_id' => (int) $canonicalRun->getKey(),
|
|
]);
|
|
|
|
return;
|
|
}
|
|
|
|
if ($canonicalRun instanceof OperationRun
|
|
&& $this->operationRun instanceof OperationRun
|
|
&& (int) $canonicalRun->getKey() !== (int) $this->operationRun->getKey()
|
|
) {
|
|
Log::info('FetchAssignmentsJob: Skipping non-canonical execution', [
|
|
'backup_item_id' => (int) $backupItem->getKey(),
|
|
'canonical_run_id' => (int) $canonicalRun->getKey(),
|
|
'job_run_id' => (int) $this->operationRun->getKey(),
|
|
]);
|
|
|
|
return;
|
|
}
|
|
|
|
$run = $this->operationRun instanceof OperationRun ? $this->operationRun : $canonicalRun;
|
|
|
|
if (! $run instanceof OperationRun) {
|
|
return;
|
|
}
|
|
|
|
$executionIdentityKey = AssignmentJobFingerprint::executionIdentityKey(
|
|
jobType: self::OPERATION_TYPE,
|
|
tenantId: (int) $tenant->getKey(),
|
|
fingerprint: $fingerprint,
|
|
operationRunId: (int) $run->getKey(),
|
|
);
|
|
|
|
$lock = Cache::lock('assignment-job:'.$executionIdentityKey, self::EXECUTION_LOCK_TTL_SECONDS);
|
|
|
|
if (! $lock->get()) {
|
|
Log::info('FetchAssignmentsJob: Overlap prevented for duplicate execution', [
|
|
'backup_item_id' => (int) $backupItem->getKey(),
|
|
'operation_run_id' => (int) $run->getKey(),
|
|
'execution_identity' => $executionIdentityKey,
|
|
]);
|
|
|
|
return;
|
|
}
|
|
|
|
try {
|
|
if ($run->status !== OperationRunStatus::Completed->value) {
|
|
$operationRunService->updateRun($run, OperationRunStatus::Running->value);
|
|
}
|
|
|
|
if ($backupItem->policy_type !== 'settingsCatalogPolicy') {
|
|
Log::info('FetchAssignmentsJob: Skipping non-Settings Catalog policy', [
|
|
'backup_item_id' => (int) $backupItem->getKey(),
|
|
'policy_type' => (string) $backupItem->policy_type,
|
|
]);
|
|
|
|
$this->completeRun(
|
|
operationRunService: $operationRunService,
|
|
run: $run,
|
|
fetchFailed: false,
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
$assignmentBackupService->enrichWithAssignments(
|
|
backupItem: $backupItem,
|
|
tenant: $tenant,
|
|
policyType: (string) $backupItem->policy_type,
|
|
policyId: (string) $backupItem->policy_identifier,
|
|
policyPayload: $this->policyPayload,
|
|
includeAssignments: true,
|
|
);
|
|
|
|
$backupItem->refresh();
|
|
$metadata = is_array($backupItem->metadata) ? $backupItem->metadata : [];
|
|
|
|
$fetchFailed = (bool) ($metadata['assignments_fetch_failed'] ?? false);
|
|
$reasonCandidate = $metadata['assignments_fetch_error_code']
|
|
?? $metadata['assignments_fetch_error']
|
|
?? ProviderReasonCodes::UnknownError;
|
|
|
|
$reasonCode = RunFailureSanitizer::normalizeReasonCode(
|
|
$this->normalizeReasonCandidate($reasonCandidate),
|
|
);
|
|
|
|
$this->completeRun(
|
|
operationRunService: $operationRunService,
|
|
run: $run,
|
|
fetchFailed: $fetchFailed,
|
|
failures: $fetchFailed
|
|
? [[
|
|
'code' => 'assignments.fetch_failed',
|
|
'reason_code' => $reasonCode,
|
|
'message' => (string) ($metadata['assignments_fetch_error'] ?? 'Assignments fetch failed.'),
|
|
]]
|
|
: [],
|
|
);
|
|
|
|
Log::info('FetchAssignmentsJob: Successfully processed', [
|
|
'backup_item_id' => (int) $backupItem->getKey(),
|
|
'operation_run_id' => (int) $run->getKey(),
|
|
'assignment_count' => $backupItem->getAssignmentCountAttribute(),
|
|
'fetch_failed' => $fetchFailed,
|
|
]);
|
|
} catch (\Throwable $throwable) {
|
|
Log::error('FetchAssignmentsJob: Failed to enrich BackupItem', [
|
|
'backup_item_id' => (int) $backupItem->getKey(),
|
|
'operation_run_id' => (int) $run->getKey(),
|
|
'error' => $throwable->getMessage(),
|
|
]);
|
|
|
|
$safeMessage = RunFailureSanitizer::sanitizeMessage($throwable->getMessage());
|
|
|
|
$operationRunService->updateRun(
|
|
$run,
|
|
status: OperationRunStatus::Completed->value,
|
|
outcome: OperationRunOutcome::Failed->value,
|
|
summaryCounts: [
|
|
'total' => 1,
|
|
'processed' => 0,
|
|
'failed' => 1,
|
|
],
|
|
failures: [[
|
|
'code' => 'assignments.fetch_failed',
|
|
'reason_code' => RunFailureSanitizer::normalizeReasonCode($throwable->getMessage()),
|
|
'message' => $safeMessage !== '' ? $safeMessage : 'Assignments fetch failed.',
|
|
]],
|
|
);
|
|
} finally {
|
|
$lock->release();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @return array<string, int|string>
|
|
*/
|
|
private static function operationRunIdentityInputs(
|
|
int $tenantId,
|
|
int $backupItemId,
|
|
string $tenantExternalId,
|
|
string $policyExternalId,
|
|
): array {
|
|
return [
|
|
'tenant_id' => $tenantId,
|
|
'job_type' => self::OPERATION_TYPE,
|
|
'fingerprint' => AssignmentJobFingerprint::forFetch(
|
|
backupItemId: $backupItemId,
|
|
tenantExternalId: $tenantExternalId,
|
|
policyExternalId: $policyExternalId,
|
|
),
|
|
];
|
|
}
|
|
|
|
/**
|
|
* @return array<string, int|string|null>
|
|
*/
|
|
private static function operationRunContext(BackupItem $backupItem): array
|
|
{
|
|
return [
|
|
'backup_set_id' => is_numeric($backupItem->backup_set_id) ? (int) $backupItem->backup_set_id : null,
|
|
'backup_item_id' => (int) $backupItem->getKey(),
|
|
'policy_id' => is_numeric($backupItem->policy_id) ? (int) $backupItem->policy_id : null,
|
|
'policy_identifier' => (string) $backupItem->policy_identifier,
|
|
];
|
|
}
|
|
|
|
/**
|
|
* @param array<int, array{code:string, reason_code:string, message:string}> $failures
|
|
*/
|
|
private function completeRun(
|
|
OperationRunService $operationRunService,
|
|
OperationRun $run,
|
|
bool $fetchFailed,
|
|
array $failures = [],
|
|
): void {
|
|
$operationRunService->updateRun(
|
|
$run,
|
|
status: OperationRunStatus::Completed->value,
|
|
outcome: $fetchFailed ? OperationRunOutcome::Failed->value : OperationRunOutcome::Succeeded->value,
|
|
summaryCounts: [
|
|
'total' => 1,
|
|
'processed' => $fetchFailed ? 0 : 1,
|
|
'failed' => $fetchFailed ? 1 : 0,
|
|
],
|
|
failures: $failures,
|
|
);
|
|
}
|
|
|
|
private function normalizeReasonCandidate(mixed $reasonCandidate): string
|
|
{
|
|
if (is_string($reasonCandidate) && trim($reasonCandidate) !== '') {
|
|
return trim($reasonCandidate);
|
|
}
|
|
|
|
return ProviderReasonCodes::UnknownError;
|
|
}
|
|
}
|