calculateHash($tenant->id, $type, $inputs); // Idempotency Check (Fast Path) // We check specific status to match the partial unique index $existing = OperationRun::query() ->where('tenant_id', $tenant->id) ->where('run_identity_hash', $hash) ->whereIn('status', OperationRunStatus::values()) ->where('status', '!=', OperationRunStatus::Completed->value) ->first(); if ($existing) { return $existing; } // Create new run (race-safe via partial unique index) try { return OperationRun::create([ 'tenant_id' => $tenant->id, 'user_id' => $initiator?->id, 'initiator_name' => $initiator?->name ?? 'System', 'type' => $type, 'status' => OperationRunStatus::Queued->value, 'outcome' => OperationRunOutcome::Pending->value, 'run_identity_hash' => $hash, 'context' => $inputs, ]); } catch (QueryException $e) { // Unique violation (active-run dedupe): // - PostgreSQL: 23505 // - SQLite (tests): 23000 (generic integrity violation; message indicates UNIQUE constraint failed) if (! in_array(($e->errorInfo[0] ?? null), ['23505', '23000'], true)) { throw $e; } $existing = OperationRun::query() ->where('tenant_id', $tenant->id) ->where('run_identity_hash', $hash) ->whereIn('status', [OperationRunStatus::Queued->value, OperationRunStatus::Running->value]) ->first(); if ($existing) { return $existing; } throw $e; } } public function ensureRunWithIdentity( Tenant $tenant, string $type, array $identityInputs, array $context, ?User $initiator = null ): OperationRun { $hash = $this->calculateHash($tenant->id, $type, $identityInputs); // Idempotency Check (Fast Path) // We check specific status to match the partial unique index $existing = OperationRun::query() ->where('tenant_id', $tenant->id) ->where('run_identity_hash', $hash) ->whereIn('status', OperationRunStatus::values()) ->where('status', '!=', OperationRunStatus::Completed->value) ->first(); if ($existing) { return $existing; } // Create new run (race-safe via partial unique index) try { return OperationRun::create([ 'tenant_id' => $tenant->id, 'user_id' => $initiator?->id, 'initiator_name' => $initiator?->name ?? 'System', 'type' => $type, 'status' => OperationRunStatus::Queued->value, 'outcome' => OperationRunOutcome::Pending->value, 'run_identity_hash' => $hash, 'context' => $context, ]); } catch (QueryException $e) { // Unique violation (active-run dedupe): // - PostgreSQL: 23505 // - SQLite (tests): 23000 (generic integrity violation; message indicates UNIQUE constraint failed) if (! in_array(($e->errorInfo[0] ?? null), ['23505', '23000'], true)) { throw $e; } $existing = OperationRun::query() ->where('tenant_id', $tenant->id) ->where('run_identity_hash', $hash) ->whereIn('status', [OperationRunStatus::Queued->value, OperationRunStatus::Running->value]) ->first(); if ($existing) { return $existing; } throw $e; } } /** * Standardized enqueue helper for bulk operations. * * Builds the canonical bulk context contract and ensures active-run dedupe * is based on target scope + selection identity. * * @param array{entra_tenant_id?: mixed, directory_context_id?: mixed} $targetScope * @param array{kind: string, ids_hash?: string, query_hash?: string} $selectionIdentity * @param array $extraContext */ public function enqueueBulkOperation( Tenant $tenant, string $type, array $targetScope, array $selectionIdentity, callable $dispatcher, ?User $initiator = null, array $extraContext = [], bool $emitQueuedNotification = true ): OperationRun { $targetScope = BulkRunContext::normalizeTargetScope($targetScope); $entraTenantId = $targetScope['entra_tenant_id'] ?? null; if (is_string($entraTenantId) && $entraTenantId !== '' && $entraTenantId !== (string) $tenant->graphTenantId()) { throw new InvalidArgumentException('Bulk enqueue target_scope entra_tenant_id must match the current tenant.'); } /** @var BulkIdempotencyFingerprint $fingerprints */ $fingerprints = app(BulkIdempotencyFingerprint::class); $fingerprint = $fingerprints->build($type, $targetScope, $selectionIdentity); $context = array_merge($extraContext, [ 'operation' => [ 'type' => $type, ], 'target_scope' => $targetScope, 'selection' => $selectionIdentity, 'idempotency' => [ 'fingerprint' => $fingerprint, ], ]); $run = $this->ensureRunWithIdentity( tenant: $tenant, type: $type, identityInputs: [ 'target_scope' => $targetScope, 'selection' => $selectionIdentity, ], context: $context, initiator: $initiator, ); if ($run->wasRecentlyCreated) { $this->dispatchOrFail($run, $dispatcher, emitQueuedNotification: $emitQueuedNotification); } return $run; } public function updateRun( OperationRun $run, string $status, ?string $outcome = null, array $summaryCounts = [], array $failures = [] ): OperationRun { $previousStatus = (string) $run->status; if (! in_array($status, OperationRunStatus::values(), true)) { throw new InvalidArgumentException('Invalid OperationRun status: '.$status); } if ($outcome !== null) { if (! in_array($outcome, OperationRunOutcome::values(), true)) { throw new InvalidArgumentException('Invalid OperationRun outcome: '.$outcome); } // Reserved/future: MUST NOT be produced by feature 054. if ($outcome === OperationRunOutcome::Cancelled->value) { $outcome = OperationRunOutcome::Failed->value; $failures[] = [ 'code' => 'run.cancelled', 'message' => 'Run cancelled (reserved outcome mapped to failed).', ]; } } $updateData = [ 'status' => $status, ]; if ($outcome) { $updateData['outcome'] = $outcome; } if (! empty($summaryCounts)) { $updateData['summary_counts'] = $this->sanitizeSummaryCounts($summaryCounts); } if (! empty($failures)) { $updateData['failure_summary'] = $this->sanitizeFailures($failures); } if ($status === OperationRunStatus::Running->value && is_null($run->started_at)) { $updateData['started_at'] = now(); } if ($status === OperationRunStatus::Completed->value && is_null($run->completed_at)) { $updateData['completed_at'] = now(); } $run->update($updateData); $run->refresh(); if ($previousStatus !== OperationRunStatus::Completed->value && $run->status === OperationRunStatus::Completed->value && $run->user instanceof User ) { $run->user->notify(new OperationRunCompletedNotification($run)); } return $run; } /** * Increment whitelisted summary_counts keys for a run. * * Uses a transaction + row lock to prevent lost updates when multiple workers * update counts concurrently. * * @param array $delta */ public function incrementSummaryCounts(OperationRun $run, array $delta): OperationRun { $delta = $this->sanitizeSummaryCounts($delta); if ($delta === []) { return $run; } /** @var OperationRun $updated */ $updated = DB::transaction(function () use ($run, $delta): OperationRun { $locked = OperationRun::query() ->whereKey($run->getKey()) ->lockForUpdate() ->first(); if (! $locked instanceof OperationRun) { return $run; } $current = is_array($locked->summary_counts ?? null) ? $locked->summary_counts : []; $current = SummaryCountsNormalizer::normalize($current); foreach ($delta as $key => $value) { $current[$key] = ($current[$key] ?? 0) + $value; } $locked->summary_counts = $current; $locked->save(); return $locked; }); $updated->refresh(); return $updated; } /** * Dispatch a queued operation safely. * * If dispatch fails synchronously (misconfiguration, serialization errors, etc.), * the OperationRun is marked terminal failed so we do not leave a misleading queued run behind. */ public function dispatchOrFail(OperationRun $run, callable $dispatcher, bool $emitQueuedNotification = true): void { try { $this->invokeDispatcher($dispatcher, $run); if ($emitQueuedNotification && $run->wasRecentlyCreated && $run->user instanceof User) { $run->user->notify(new OperationRunQueuedNotification($run)); } } catch (Throwable $e) { $this->updateRun( $run, status: OperationRunStatus::Completed->value, outcome: OperationRunOutcome::Failed->value, failures: [ [ 'code' => 'dispatch.failed', 'message' => $e->getMessage(), ], ], ); throw $e; } } /** * Append failure entries to failure_summary (sanitized + bounded) without overwriting existing. * * @param array $failures */ public function appendFailures(OperationRun $run, array $failures): OperationRun { $failures = $this->sanitizeFailures($failures); if ($failures === []) { return $run; } /** @var OperationRun $updated */ $updated = DB::transaction(function () use ($run, $failures): OperationRun { $locked = OperationRun::query() ->whereKey($run->getKey()) ->lockForUpdate() ->first(); if (! $locked instanceof OperationRun) { return $run; } $current = is_array($locked->failure_summary ?? null) ? $locked->failure_summary : []; $current = $this->sanitizeFailures($current); $merged = array_merge($current, $failures); // Prevent runaway payloads. $merged = array_slice($merged, 0, 50); $locked->failure_summary = $merged; $locked->save(); return $locked; }); $updated->refresh(); return $updated; } /** * Mark a bulk run as completed if summary_counts indicate all work is processed. */ public function maybeCompleteBulkRun(OperationRun $run): OperationRun { $run->refresh(); if ($run->status === OperationRunStatus::Completed->value) { return $run; } $updated = DB::transaction(function () use ($run): OperationRun { $locked = OperationRun::query() ->whereKey($run->getKey()) ->lockForUpdate() ->first(); if (! $locked instanceof OperationRun) { return $run; } if ($locked->status === OperationRunStatus::Completed->value) { return $locked; } $counts = is_array($locked->summary_counts ?? null) ? $locked->summary_counts : []; $counts = SummaryCountsNormalizer::normalize($counts); $total = (int) ($counts['total'] ?? 0); $processed = (int) ($counts['processed'] ?? 0); $failed = (int) ($counts['failed'] ?? 0); if ($total <= 0 || $processed < $total) { return $locked; } $outcome = OperationRunOutcome::Succeeded->value; if ($failed > 0 && $failed < $total) { $outcome = OperationRunOutcome::PartiallySucceeded->value; } if ($failed >= $total) { $outcome = OperationRunOutcome::Failed->value; } return $this->updateRun( $locked, status: OperationRunStatus::Completed->value, outcome: $outcome, ); }); $updated->refresh(); return $updated; } public function failRun(OperationRun $run, Throwable $e): OperationRun { return $this->updateRun( $run, status: OperationRunStatus::Completed->value, outcome: OperationRunOutcome::Failed->value, failures: [ [ 'code' => 'exception.unhandled', 'message' => $e->getMessage(), ], ] ); } private function invokeDispatcher(callable $dispatcher, OperationRun $run): void { $ref = null; if (is_array($dispatcher) && count($dispatcher) === 2) { $ref = new ReflectionMethod($dispatcher[0], (string) $dispatcher[1]); } elseif (is_string($dispatcher) && str_contains($dispatcher, '::')) { [$class, $method] = explode('::', $dispatcher, 2); $ref = new ReflectionMethod($class, $method); } elseif ($dispatcher instanceof \Closure) { $ref = new ReflectionFunction($dispatcher); } elseif (is_object($dispatcher) && method_exists($dispatcher, '__invoke')) { $ref = new ReflectionMethod($dispatcher, '__invoke'); } if ($ref && $ref->getNumberOfParameters() >= 1) { $dispatcher($run); return; } $dispatcher(); } protected function calculateHash(int $tenantId, string $type, array $inputs): string { $normalizedInputs = $this->normalizeInputs($inputs); $json = json_encode($normalizedInputs, JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); return hash('sha256', $tenantId.'|'.$type.'|'.$json); } /** * Normalize inputs for stable identity hashing. * * - Associative arrays: sorted by key. * - Lists: elements normalized and then sorted by a stable JSON representation. */ protected function normalizeInputs(array $value): array { if ($this->isListArray($value)) { $items = array_map(function ($item) { return is_array($item) ? $this->normalizeInputs($item) : $item; }, $value); usort($items, function ($a, $b): int { $aJson = json_encode($a, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $bJson = json_encode($b, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); return strcmp((string) $aJson, (string) $bJson); }); return array_values($items); } ksort($value); foreach ($value as $key => $item) { if (is_array($item)) { $value[$key] = $this->normalizeInputs($item); } } return $value; } protected function isListArray(array $array): bool { if ($array === []) { return true; } return array_keys($array) === range(0, count($array) - 1); } /** * @param array $failures * @return array */ protected function sanitizeFailures(array $failures): array { $sanitized = []; foreach ($failures as $failure) { $code = (string) ($failure['code'] ?? 'unknown'); $message = (string) ($failure['message'] ?? ''); $sanitized[] = [ 'code' => $this->sanitizeFailureCode($code), 'message' => $this->sanitizeMessage($message), ]; } return $sanitized; } protected function sanitizeFailureCode(string $code): string { return RunFailureSanitizer::sanitizeCode($code); } protected function sanitizeMessage(string $message): string { return RunFailureSanitizer::sanitizeMessage($message); } /** * @param array $summaryCounts * @return array */ protected function sanitizeSummaryCounts(array $summaryCounts): array { return SummaryCountsNormalizer::normalize($summaryCounts); } }