Harden SyncPoliciesJob type input parsing + fail fast when supported types are empty/mismatched. Pass supported policy types from Tenant sync action and add regression tests. Co-authored-by: Ahmed Darrazi <ahmeddarrazi@adsmac.fritz.box> Reviewed-on: #75
606 lines
19 KiB
PHP
606 lines
19 KiB
PHP
<?php
|
|
|
|
namespace App\Services;
|
|
|
|
use App\Models\OperationRun;
|
|
use App\Models\Tenant;
|
|
use App\Models\User;
|
|
use App\Notifications\OperationRunCompleted as OperationRunCompletedNotification;
|
|
use App\Notifications\OperationRunQueued as OperationRunQueuedNotification;
|
|
use App\Services\Operations\BulkIdempotencyFingerprint;
|
|
use App\Support\OperationRunOutcome;
|
|
use App\Support\OperationRunStatus;
|
|
use App\Support\OpsUx\BulkRunContext;
|
|
use App\Support\OpsUx\RunFailureSanitizer;
|
|
use App\Support\OpsUx\SummaryCountsNormalizer;
|
|
use Illuminate\Database\QueryException;
|
|
use Illuminate\Support\Facades\DB;
|
|
use InvalidArgumentException;
|
|
use ReflectionFunction;
|
|
use ReflectionMethod;
|
|
use Throwable;
|
|
|
|
class OperationRunService
|
|
{
|
|
public function isStaleQueuedRun(OperationRun $run, int $thresholdMinutes = 5): bool
|
|
{
|
|
if ($run->status !== OperationRunStatus::Queued->value) {
|
|
return false;
|
|
}
|
|
|
|
if ($run->started_at !== null) {
|
|
return false;
|
|
}
|
|
|
|
if ($run->created_at === null) {
|
|
return false;
|
|
}
|
|
|
|
return $run->created_at->lte(now()->subMinutes($thresholdMinutes));
|
|
}
|
|
|
|
public function failStaleQueuedRun(OperationRun $run, string $message = 'Run was queued but never started.'): OperationRun
|
|
{
|
|
return $this->updateRun(
|
|
$run,
|
|
status: OperationRunStatus::Completed->value,
|
|
outcome: OperationRunOutcome::Failed->value,
|
|
failures: [
|
|
[
|
|
'code' => 'run.stale_queued',
|
|
'message' => $message,
|
|
],
|
|
],
|
|
);
|
|
}
|
|
|
|
public function ensureRun(
|
|
Tenant $tenant,
|
|
string $type,
|
|
array $inputs,
|
|
?User $initiator = null
|
|
): OperationRun {
|
|
$hash = $this->calculateHash($tenant->id, $type, $inputs);
|
|
|
|
// Idempotency Check (Fast Path)
|
|
// We check specific status to match the partial unique index
|
|
$existing = OperationRun::query()
|
|
->where('tenant_id', $tenant->id)
|
|
->where('run_identity_hash', $hash)
|
|
->whereIn('status', OperationRunStatus::values())
|
|
->where('status', '!=', OperationRunStatus::Completed->value)
|
|
->first();
|
|
|
|
if ($existing) {
|
|
return $existing;
|
|
}
|
|
|
|
// Create new run (race-safe via partial unique index)
|
|
try {
|
|
return OperationRun::create([
|
|
'tenant_id' => $tenant->id,
|
|
'user_id' => $initiator?->id,
|
|
'initiator_name' => $initiator?->name ?? 'System',
|
|
'type' => $type,
|
|
'status' => OperationRunStatus::Queued->value,
|
|
'outcome' => OperationRunOutcome::Pending->value,
|
|
'run_identity_hash' => $hash,
|
|
'context' => $inputs,
|
|
]);
|
|
} catch (QueryException $e) {
|
|
// Unique violation (active-run dedupe):
|
|
// - PostgreSQL: 23505
|
|
// - SQLite (tests): 23000 (generic integrity violation; message indicates UNIQUE constraint failed)
|
|
if (! in_array(($e->errorInfo[0] ?? null), ['23505', '23000'], true)) {
|
|
throw $e;
|
|
}
|
|
|
|
$existing = OperationRun::query()
|
|
->where('tenant_id', $tenant->id)
|
|
->where('run_identity_hash', $hash)
|
|
->whereIn('status', [OperationRunStatus::Queued->value, OperationRunStatus::Running->value])
|
|
->first();
|
|
|
|
if ($existing) {
|
|
return $existing;
|
|
}
|
|
|
|
throw $e;
|
|
}
|
|
}
|
|
|
|
public function ensureRunWithIdentity(
|
|
Tenant $tenant,
|
|
string $type,
|
|
array $identityInputs,
|
|
array $context,
|
|
?User $initiator = null
|
|
): OperationRun {
|
|
$hash = $this->calculateHash($tenant->id, $type, $identityInputs);
|
|
|
|
// Idempotency Check (Fast Path)
|
|
// We check specific status to match the partial unique index
|
|
$existing = OperationRun::query()
|
|
->where('tenant_id', $tenant->id)
|
|
->where('run_identity_hash', $hash)
|
|
->whereIn('status', OperationRunStatus::values())
|
|
->where('status', '!=', OperationRunStatus::Completed->value)
|
|
->first();
|
|
|
|
if ($existing) {
|
|
return $existing;
|
|
}
|
|
|
|
// Create new run (race-safe via partial unique index)
|
|
try {
|
|
return OperationRun::create([
|
|
'tenant_id' => $tenant->id,
|
|
'user_id' => $initiator?->id,
|
|
'initiator_name' => $initiator?->name ?? 'System',
|
|
'type' => $type,
|
|
'status' => OperationRunStatus::Queued->value,
|
|
'outcome' => OperationRunOutcome::Pending->value,
|
|
'run_identity_hash' => $hash,
|
|
'context' => $context,
|
|
]);
|
|
} catch (QueryException $e) {
|
|
// Unique violation (active-run dedupe):
|
|
// - PostgreSQL: 23505
|
|
// - SQLite (tests): 23000 (generic integrity violation; message indicates UNIQUE constraint failed)
|
|
if (! in_array(($e->errorInfo[0] ?? null), ['23505', '23000'], true)) {
|
|
throw $e;
|
|
}
|
|
|
|
$existing = OperationRun::query()
|
|
->where('tenant_id', $tenant->id)
|
|
->where('run_identity_hash', $hash)
|
|
->whereIn('status', [OperationRunStatus::Queued->value, OperationRunStatus::Running->value])
|
|
->first();
|
|
|
|
if ($existing) {
|
|
return $existing;
|
|
}
|
|
|
|
throw $e;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Standardized enqueue helper for bulk operations.
|
|
*
|
|
* Builds the canonical bulk context contract and ensures active-run dedupe
|
|
* is based on target scope + selection identity.
|
|
*
|
|
* @param array{entra_tenant_id?: mixed, directory_context_id?: mixed} $targetScope
|
|
* @param array{kind: string, ids_hash?: string, query_hash?: string} $selectionIdentity
|
|
* @param array<string, mixed> $extraContext
|
|
*/
|
|
public function enqueueBulkOperation(
|
|
Tenant $tenant,
|
|
string $type,
|
|
array $targetScope,
|
|
array $selectionIdentity,
|
|
callable $dispatcher,
|
|
?User $initiator = null,
|
|
array $extraContext = [],
|
|
bool $emitQueuedNotification = true
|
|
): OperationRun {
|
|
$targetScope = BulkRunContext::normalizeTargetScope($targetScope);
|
|
|
|
$entraTenantId = $targetScope['entra_tenant_id'] ?? null;
|
|
|
|
if (is_string($entraTenantId) && $entraTenantId !== '' && $entraTenantId !== (string) $tenant->graphTenantId()) {
|
|
throw new InvalidArgumentException('Bulk enqueue target_scope entra_tenant_id must match the current tenant.');
|
|
}
|
|
|
|
/** @var BulkIdempotencyFingerprint $fingerprints */
|
|
$fingerprints = app(BulkIdempotencyFingerprint::class);
|
|
|
|
$fingerprint = $fingerprints->build($type, $targetScope, $selectionIdentity);
|
|
|
|
$context = array_merge($extraContext, [
|
|
'operation' => [
|
|
'type' => $type,
|
|
],
|
|
'target_scope' => $targetScope,
|
|
'selection' => $selectionIdentity,
|
|
'idempotency' => [
|
|
'fingerprint' => $fingerprint,
|
|
],
|
|
]);
|
|
|
|
$run = $this->ensureRunWithIdentity(
|
|
tenant: $tenant,
|
|
type: $type,
|
|
identityInputs: [
|
|
'target_scope' => $targetScope,
|
|
'selection' => $selectionIdentity,
|
|
],
|
|
context: $context,
|
|
initiator: $initiator,
|
|
);
|
|
|
|
if ($run->wasRecentlyCreated) {
|
|
$this->dispatchOrFail($run, $dispatcher, emitQueuedNotification: $emitQueuedNotification);
|
|
}
|
|
|
|
return $run;
|
|
}
|
|
|
|
public function updateRun(
|
|
OperationRun $run,
|
|
string $status,
|
|
?string $outcome = null,
|
|
array $summaryCounts = [],
|
|
array $failures = []
|
|
): OperationRun {
|
|
$previousStatus = (string) $run->status;
|
|
|
|
if (! in_array($status, OperationRunStatus::values(), true)) {
|
|
throw new InvalidArgumentException('Invalid OperationRun status: '.$status);
|
|
}
|
|
|
|
if ($outcome !== null) {
|
|
if (! in_array($outcome, OperationRunOutcome::values(), true)) {
|
|
throw new InvalidArgumentException('Invalid OperationRun outcome: '.$outcome);
|
|
}
|
|
|
|
// Reserved/future: MUST NOT be produced by feature 054.
|
|
if ($outcome === OperationRunOutcome::Cancelled->value) {
|
|
$outcome = OperationRunOutcome::Failed->value;
|
|
$failures[] = [
|
|
'code' => 'run.cancelled',
|
|
'message' => 'Run cancelled (reserved outcome mapped to failed).',
|
|
];
|
|
}
|
|
}
|
|
|
|
$updateData = [
|
|
'status' => $status,
|
|
];
|
|
|
|
if ($outcome) {
|
|
$updateData['outcome'] = $outcome;
|
|
}
|
|
|
|
if (! empty($summaryCounts)) {
|
|
$updateData['summary_counts'] = $this->sanitizeSummaryCounts($summaryCounts);
|
|
}
|
|
|
|
if (! empty($failures)) {
|
|
$updateData['failure_summary'] = $this->sanitizeFailures($failures);
|
|
}
|
|
|
|
if ($status === OperationRunStatus::Running->value && is_null($run->started_at)) {
|
|
$updateData['started_at'] = now();
|
|
}
|
|
|
|
if ($status === OperationRunStatus::Completed->value && is_null($run->completed_at)) {
|
|
$updateData['completed_at'] = now();
|
|
}
|
|
|
|
$run->update($updateData);
|
|
|
|
$run->refresh();
|
|
|
|
if ($previousStatus !== OperationRunStatus::Completed->value
|
|
&& $run->status === OperationRunStatus::Completed->value
|
|
&& $run->user instanceof User
|
|
) {
|
|
$run->user->notify(new OperationRunCompletedNotification($run));
|
|
}
|
|
|
|
return $run;
|
|
}
|
|
|
|
/**
|
|
* Increment whitelisted summary_counts keys for a run.
|
|
*
|
|
* Uses a transaction + row lock to prevent lost updates when multiple workers
|
|
* update counts concurrently.
|
|
*
|
|
* @param array<string, mixed> $delta
|
|
*/
|
|
public function incrementSummaryCounts(OperationRun $run, array $delta): OperationRun
|
|
{
|
|
$delta = $this->sanitizeSummaryCounts($delta);
|
|
|
|
if ($delta === []) {
|
|
return $run;
|
|
}
|
|
|
|
/** @var OperationRun $updated */
|
|
$updated = DB::transaction(function () use ($run, $delta): OperationRun {
|
|
$locked = OperationRun::query()
|
|
->whereKey($run->getKey())
|
|
->lockForUpdate()
|
|
->first();
|
|
|
|
if (! $locked instanceof OperationRun) {
|
|
return $run;
|
|
}
|
|
|
|
$current = is_array($locked->summary_counts ?? null) ? $locked->summary_counts : [];
|
|
$current = SummaryCountsNormalizer::normalize($current);
|
|
|
|
foreach ($delta as $key => $value) {
|
|
$current[$key] = ($current[$key] ?? 0) + $value;
|
|
}
|
|
|
|
$locked->summary_counts = $current;
|
|
$locked->save();
|
|
|
|
return $locked;
|
|
});
|
|
|
|
$updated->refresh();
|
|
|
|
return $updated;
|
|
}
|
|
|
|
/**
|
|
* Dispatch a queued operation safely.
|
|
*
|
|
* If dispatch fails synchronously (misconfiguration, serialization errors, etc.),
|
|
* the OperationRun is marked terminal failed so we do not leave a misleading queued run behind.
|
|
*/
|
|
public function dispatchOrFail(OperationRun $run, callable $dispatcher, bool $emitQueuedNotification = true): void
|
|
{
|
|
try {
|
|
$this->invokeDispatcher($dispatcher, $run);
|
|
|
|
if ($emitQueuedNotification && $run->wasRecentlyCreated && $run->user instanceof User) {
|
|
$run->user->notify(new OperationRunQueuedNotification($run));
|
|
}
|
|
} catch (Throwable $e) {
|
|
$this->updateRun(
|
|
$run,
|
|
status: OperationRunStatus::Completed->value,
|
|
outcome: OperationRunOutcome::Failed->value,
|
|
failures: [
|
|
[
|
|
'code' => 'dispatch.failed',
|
|
'message' => $e->getMessage(),
|
|
],
|
|
],
|
|
);
|
|
|
|
throw $e;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Append failure entries to failure_summary (sanitized + bounded) without overwriting existing.
|
|
*
|
|
* @param array<int, array{code?: mixed, message?: mixed}> $failures
|
|
*/
|
|
public function appendFailures(OperationRun $run, array $failures): OperationRun
|
|
{
|
|
$failures = $this->sanitizeFailures($failures);
|
|
|
|
if ($failures === []) {
|
|
return $run;
|
|
}
|
|
|
|
/** @var OperationRun $updated */
|
|
$updated = DB::transaction(function () use ($run, $failures): OperationRun {
|
|
$locked = OperationRun::query()
|
|
->whereKey($run->getKey())
|
|
->lockForUpdate()
|
|
->first();
|
|
|
|
if (! $locked instanceof OperationRun) {
|
|
return $run;
|
|
}
|
|
|
|
$current = is_array($locked->failure_summary ?? null) ? $locked->failure_summary : [];
|
|
$current = $this->sanitizeFailures($current);
|
|
|
|
$merged = array_merge($current, $failures);
|
|
|
|
// Prevent runaway payloads.
|
|
$merged = array_slice($merged, 0, 50);
|
|
|
|
$locked->failure_summary = $merged;
|
|
$locked->save();
|
|
|
|
return $locked;
|
|
});
|
|
|
|
$updated->refresh();
|
|
|
|
return $updated;
|
|
}
|
|
|
|
/**
|
|
* Mark a bulk run as completed if summary_counts indicate all work is processed.
|
|
*/
|
|
public function maybeCompleteBulkRun(OperationRun $run): OperationRun
|
|
{
|
|
$run->refresh();
|
|
|
|
if ($run->status === OperationRunStatus::Completed->value) {
|
|
return $run;
|
|
}
|
|
|
|
$updated = DB::transaction(function () use ($run): OperationRun {
|
|
$locked = OperationRun::query()
|
|
->whereKey($run->getKey())
|
|
->lockForUpdate()
|
|
->first();
|
|
|
|
if (! $locked instanceof OperationRun) {
|
|
return $run;
|
|
}
|
|
|
|
if ($locked->status === OperationRunStatus::Completed->value) {
|
|
return $locked;
|
|
}
|
|
|
|
$counts = is_array($locked->summary_counts ?? null) ? $locked->summary_counts : [];
|
|
$counts = SummaryCountsNormalizer::normalize($counts);
|
|
|
|
$total = (int) ($counts['total'] ?? 0);
|
|
$processed = (int) ($counts['processed'] ?? 0);
|
|
$failed = (int) ($counts['failed'] ?? 0);
|
|
|
|
if ($total <= 0 || $processed < $total) {
|
|
return $locked;
|
|
}
|
|
|
|
$outcome = OperationRunOutcome::Succeeded->value;
|
|
|
|
if ($failed > 0 && $failed < $total) {
|
|
$outcome = OperationRunOutcome::PartiallySucceeded->value;
|
|
}
|
|
|
|
if ($failed >= $total) {
|
|
$outcome = OperationRunOutcome::Failed->value;
|
|
}
|
|
|
|
return $this->updateRun(
|
|
$locked,
|
|
status: OperationRunStatus::Completed->value,
|
|
outcome: $outcome,
|
|
);
|
|
});
|
|
|
|
$updated->refresh();
|
|
|
|
return $updated;
|
|
}
|
|
|
|
public function failRun(OperationRun $run, Throwable $e): OperationRun
|
|
{
|
|
return $this->updateRun(
|
|
$run,
|
|
status: OperationRunStatus::Completed->value,
|
|
outcome: OperationRunOutcome::Failed->value,
|
|
failures: [
|
|
[
|
|
'code' => 'exception.unhandled',
|
|
'message' => $e->getMessage(),
|
|
],
|
|
]
|
|
);
|
|
}
|
|
|
|
private function invokeDispatcher(callable $dispatcher, OperationRun $run): void
|
|
{
|
|
$ref = null;
|
|
|
|
if (is_array($dispatcher) && count($dispatcher) === 2) {
|
|
$ref = new ReflectionMethod($dispatcher[0], (string) $dispatcher[1]);
|
|
} elseif (is_string($dispatcher) && str_contains($dispatcher, '::')) {
|
|
[$class, $method] = explode('::', $dispatcher, 2);
|
|
$ref = new ReflectionMethod($class, $method);
|
|
} elseif ($dispatcher instanceof \Closure) {
|
|
$ref = new ReflectionFunction($dispatcher);
|
|
} elseif (is_object($dispatcher) && method_exists($dispatcher, '__invoke')) {
|
|
$ref = new ReflectionMethod($dispatcher, '__invoke');
|
|
}
|
|
|
|
if ($ref && $ref->getNumberOfParameters() >= 1) {
|
|
$dispatcher($run);
|
|
|
|
return;
|
|
}
|
|
|
|
$dispatcher();
|
|
}
|
|
|
|
protected function calculateHash(int $tenantId, string $type, array $inputs): string
|
|
{
|
|
$normalizedInputs = $this->normalizeInputs($inputs);
|
|
|
|
$json = json_encode($normalizedInputs, JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
|
|
|
|
return hash('sha256', $tenantId.'|'.$type.'|'.$json);
|
|
}
|
|
|
|
/**
|
|
* Normalize inputs for stable identity hashing.
|
|
*
|
|
* - Associative arrays: sorted by key.
|
|
* - Lists: elements normalized and then sorted by a stable JSON representation.
|
|
*/
|
|
protected function normalizeInputs(array $value): array
|
|
{
|
|
if ($this->isListArray($value)) {
|
|
$items = array_map(function ($item) {
|
|
return is_array($item) ? $this->normalizeInputs($item) : $item;
|
|
}, $value);
|
|
|
|
usort($items, function ($a, $b): int {
|
|
$aJson = json_encode($a, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
|
|
$bJson = json_encode($b, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
|
|
|
|
return strcmp((string) $aJson, (string) $bJson);
|
|
});
|
|
|
|
return array_values($items);
|
|
}
|
|
|
|
ksort($value);
|
|
|
|
foreach ($value as $key => $item) {
|
|
if (is_array($item)) {
|
|
$value[$key] = $this->normalizeInputs($item);
|
|
}
|
|
}
|
|
|
|
return $value;
|
|
}
|
|
|
|
protected function isListArray(array $array): bool
|
|
{
|
|
if ($array === []) {
|
|
return true;
|
|
}
|
|
|
|
return array_keys($array) === range(0, count($array) - 1);
|
|
}
|
|
|
|
/**
|
|
* @param array<int, array{code?: mixed, message?: mixed}> $failures
|
|
* @return array<int, array{code: string, reason_code: string, message: string}>
|
|
*/
|
|
protected function sanitizeFailures(array $failures): array
|
|
{
|
|
$sanitized = [];
|
|
|
|
foreach ($failures as $failure) {
|
|
$code = (string) ($failure['code'] ?? 'unknown');
|
|
$reasonCode = (string) ($failure['reason_code'] ?? $code);
|
|
$message = (string) ($failure['message'] ?? '');
|
|
|
|
$sanitized[] = [
|
|
'code' => $this->sanitizeFailureCode($code),
|
|
'reason_code' => RunFailureSanitizer::normalizeReasonCode($reasonCode),
|
|
'message' => $this->sanitizeMessage($message),
|
|
];
|
|
}
|
|
|
|
return $sanitized;
|
|
}
|
|
|
|
protected function sanitizeFailureCode(string $code): string
|
|
{
|
|
return RunFailureSanitizer::sanitizeCode($code);
|
|
}
|
|
|
|
protected function sanitizeMessage(string $message): string
|
|
{
|
|
return RunFailureSanitizer::sanitizeMessage($message);
|
|
}
|
|
|
|
/**
|
|
* @param array<string, mixed> $summaryCounts
|
|
* @return array<string, int>
|
|
*/
|
|
protected function sanitizeSummaryCounts(array $summaryCounts): array
|
|
{
|
|
return SummaryCountsNormalizer::normalize($summaryCounts);
|
|
}
|
|
}
|