$selectionPayload */ public function syncNow(Tenant $tenant, array $selectionPayload): InventorySyncRun { $computed = $this->normalizeAndHashSelection($selectionPayload); $normalizedSelection = $computed['selection']; $selectionHash = $computed['selection_hash']; $now = CarbonImmutable::now('UTC'); $globalSlot = $this->concurrencyLimiter->acquireGlobalSlot(); if (! $globalSlot instanceof Lock) { return $this->createSkippedRun( tenant: $tenant, selectionHash: $selectionHash, selectionPayload: $normalizedSelection, now: $now, errorCode: 'concurrency_limit_global', ); } $tenantSlot = $this->concurrencyLimiter->acquireTenantSlot((int) $tenant->id); if (! $tenantSlot instanceof Lock) { $globalSlot->release(); return $this->createSkippedRun( tenant: $tenant, selectionHash: $selectionHash, selectionPayload: $normalizedSelection, now: $now, errorCode: 'concurrency_limit_tenant', ); } $selectionLock = Cache::lock($this->selectionLockKey($tenant, $selectionHash), 900); if (! $selectionLock->get()) { $tenantSlot->release(); $globalSlot->release(); return $this->createSkippedRun( tenant: $tenant, selectionHash: $selectionHash, selectionPayload: $normalizedSelection, now: $now, errorCode: 'lock_contended', ); } $run = InventorySyncRun::query()->create([ 'tenant_id' => $tenant->getKey(), 'user_id' => null, 'selection_hash' => $selectionHash, 'selection_payload' => $normalizedSelection, 'status' => InventorySyncRun::STATUS_RUNNING, 'had_errors' => false, 'error_codes' => [], 'error_context' => null, 'started_at' => $now, 'finished_at' => null, 'items_observed_count' => 0, 'items_upserted_count' => 0, 'errors_count' => 0, ]); try { return $this->executeRun($run, $tenant, $normalizedSelection); } finally { $selectionLock->release(); $tenantSlot->release(); $globalSlot->release(); } } /** * @return array{policy_types: list, categories: list, include_foundations: bool, include_dependencies: bool} */ public function defaultSelectionPayload(): array { return [ 'policy_types' => $this->policyTypeResolver->supportedPolicyTypes(), 'categories' => [], 'include_foundations' => true, 'include_dependencies' => true, ]; } /** * @param array $selectionPayload * @return array{selection: array{policy_types: list, categories: list, include_foundations: bool, include_dependencies: bool}, selection_hash: string} */ public function normalizeAndHashSelection(array $selectionPayload): array { $normalizedSelection = $this->selectionHasher->normalize($selectionPayload); $normalizedSelection['policy_types'] = $this->policyTypeResolver->filterRuntime($normalizedSelection['policy_types']); $selectionHash = $this->selectionHasher->hash($normalizedSelection); return [ 'selection' => $normalizedSelection, 'selection_hash' => $selectionHash, ]; } /** * Creates a pending run record attributed to the initiating user so the run remains observable even if queue workers are down. * * @param array $selectionPayload */ public function createPendingRunForUser(Tenant $tenant, User $user, array $selectionPayload): InventorySyncRun { $computed = $this->normalizeAndHashSelection($selectionPayload); return InventorySyncRun::query()->create([ 'tenant_id' => $tenant->getKey(), 'user_id' => $user->getKey(), 'selection_hash' => $computed['selection_hash'], 'selection_payload' => $computed['selection'], 'status' => InventorySyncRun::STATUS_PENDING, 'had_errors' => false, 'error_codes' => [], 'error_context' => null, 'started_at' => null, 'finished_at' => null, 'items_observed_count' => 0, 'items_upserted_count' => 0, 'errors_count' => 0, ]); } /** * Executes an existing pending run under locks/concurrency, updating that run to running/skipped/terminal. */ /** * @param null|callable(string $policyType, bool $success, ?string $errorCode): void $onPolicyTypeProcessed */ public function executePendingRun(InventorySyncRun $run, Tenant $tenant, ?callable $onPolicyTypeProcessed = null): InventorySyncRun { $computed = $this->normalizeAndHashSelection($run->selection_payload ?? []); $normalizedSelection = $computed['selection']; $selectionHash = $computed['selection_hash']; $now = CarbonImmutable::now('UTC'); $run->update([ 'tenant_id' => $tenant->getKey(), 'selection_hash' => $selectionHash, 'selection_payload' => $normalizedSelection, 'status' => InventorySyncRun::STATUS_RUNNING, 'had_errors' => false, 'error_codes' => [], 'error_context' => null, 'started_at' => $now, 'finished_at' => null, 'items_observed_count' => 0, 'items_upserted_count' => 0, 'errors_count' => 0, ]); $globalSlot = $this->concurrencyLimiter->acquireGlobalSlot(); if (! $globalSlot instanceof Lock) { return $this->markExistingRunSkipped( run: $run, now: $now, errorCode: 'concurrency_limit_global', ); } $tenantSlot = $this->concurrencyLimiter->acquireTenantSlot((int) $tenant->id); if (! $tenantSlot instanceof Lock) { $globalSlot->release(); return $this->markExistingRunSkipped( run: $run, now: $now, errorCode: 'concurrency_limit_tenant', ); } $selectionLock = Cache::lock($this->selectionLockKey($tenant, $selectionHash), 900); if (! $selectionLock->get()) { $tenantSlot->release(); $globalSlot->release(); return $this->markExistingRunSkipped( run: $run, now: $now, errorCode: 'lock_contended', ); } try { return $this->executeRun($run, $tenant, $normalizedSelection, $onPolicyTypeProcessed); } finally { $selectionLock->release(); $tenantSlot->release(); $globalSlot->release(); } } /** * @param array{policy_types: list, categories: list, include_foundations: bool, include_dependencies: bool} $normalizedSelection */ /** * @param array{policy_types: list, categories: list, include_foundations: bool, include_dependencies: bool} $normalizedSelection * @param null|callable(string $policyType, bool $success, ?string $errorCode): void $onPolicyTypeProcessed */ private function executeRun(InventorySyncRun $run, Tenant $tenant, array $normalizedSelection, ?callable $onPolicyTypeProcessed = null): InventorySyncRun { $observed = 0; $upserted = 0; $errors = 0; $errorCodes = []; $hadErrors = false; $warnings = []; try { $typesConfig = $this->supportedTypeConfigByType(); foreach ($normalizedSelection['policy_types'] as $policyType) { $typeConfig = $typesConfig[$policyType] ?? null; if (! is_array($typeConfig)) { $hadErrors = true; $errors++; $errorCodes[] = 'unsupported_type'; $onPolicyTypeProcessed && $onPolicyTypeProcessed($policyType, false, 'unsupported_type'); continue; } $response = $this->listPoliciesWithRetry($policyType, [ 'tenant' => $tenant->tenant_id ?? $tenant->external_id, 'client_id' => $tenant->app_client_id, 'client_secret' => $tenant->app_client_secret, 'platform' => $typeConfig['platform'] ?? null, 'filter' => $typeConfig['filter'] ?? null, ]); if ($response->failed()) { $hadErrors = true; $errors++; $errorCode = $this->mapGraphFailureToErrorCode($response); $errorCodes[] = $errorCode; $onPolicyTypeProcessed && $onPolicyTypeProcessed($policyType, false, $errorCode); continue; } foreach ($response->data as $policyData) { if (! is_array($policyData)) { continue; } if ($this->shouldSkipPolicyForSelectedType($policyType, $policyData)) { continue; } $externalId = $policyData['id'] ?? $policyData['external_id'] ?? null; if (! is_string($externalId) || $externalId === '') { continue; } $observed++; $includeDeps = (bool) ($normalizedSelection['include_dependencies'] ?? true); if ($includeDeps && $this->shouldHydrateAssignments($policyType)) { $existingAssignments = $policyData['assignments'] ?? null; if (! is_array($existingAssignments)) { $hydratedAssignments = $this->fetchAssignmentsForPolicyType($policyType, $tenant, $externalId, $warnings); if (is_array($hydratedAssignments)) { $policyData['assignments'] = $hydratedAssignments; } } } $displayName = $policyData['displayName'] ?? $policyData['name'] ?? null; $displayName = is_string($displayName) ? $displayName : null; $scopeTagIds = $policyData['roleScopeTagIds'] ?? null; $assignmentTargetCount = null; $assignments = $policyData['assignments'] ?? null; if (is_array($assignments)) { $assignmentTargetCount = count($assignments); } $meta = $this->metaSanitizer->sanitize([ 'odata_type' => $policyData['@odata.type'] ?? $policyData['@OData.Type'] ?? null, 'etag' => $policyData['@odata.etag'] ?? null, 'scope_tag_ids' => is_array($scopeTagIds) ? $scopeTagIds : null, 'assignment_target_count' => $assignmentTargetCount, 'warnings' => [], ]); $item = InventoryItem::query()->updateOrCreate( [ 'tenant_id' => $tenant->getKey(), 'policy_type' => $policyType, 'external_id' => $externalId, ], [ 'display_name' => $displayName, 'category' => $typeConfig['category'] ?? null, 'platform' => $typeConfig['platform'] ?? null, 'meta_jsonb' => $meta, 'last_seen_at' => now(), 'last_seen_run_id' => $run->getKey(), ] ); $upserted++; // Extract dependencies if requested in selection if ($includeDeps) { $warnings = array_merge( $warnings, app(\App\Services\Inventory\DependencyExtractionService::class) ->extractForPolicyData($item, $policyData) ); } } $onPolicyTypeProcessed && $onPolicyTypeProcessed($policyType, true, null); } $status = $hadErrors ? InventorySyncRun::STATUS_PARTIAL : InventorySyncRun::STATUS_SUCCESS; $run->update([ 'status' => $status, 'had_errors' => $hadErrors, 'error_codes' => array_values(array_unique($errorCodes)), 'error_context' => [ 'warnings' => array_values($warnings), ], 'items_observed_count' => $observed, 'items_upserted_count' => $upserted, 'errors_count' => $errors, 'finished_at' => CarbonImmutable::now('UTC'), ]); return $run->refresh(); } catch (Throwable $throwable) { $errorContext = $this->safeErrorContext($throwable); $errorContext['warnings'] = array_values($warnings); $run->update([ 'status' => InventorySyncRun::STATUS_FAILED, 'had_errors' => true, 'error_codes' => ['unexpected_exception'], 'error_context' => $errorContext, 'items_observed_count' => $observed, 'items_upserted_count' => $upserted, 'errors_count' => $errors + 1, 'finished_at' => CarbonImmutable::now('UTC'), ]); return $run->refresh(); } } private function shouldSkipPolicyForSelectedType(string $selectedPolicyType, array $policyData): bool { $configurationPolicyTypes = ['settingsCatalogPolicy', 'endpointSecurityPolicy', 'securityBaselinePolicy']; if (! in_array($selectedPolicyType, $configurationPolicyTypes, true)) { return false; } return $this->resolveConfigurationPolicyType($policyData) !== $selectedPolicyType; } private function shouldHydrateAssignments(string $policyType): bool { return in_array($policyType, ['settingsCatalogPolicy', 'endpointSecurityPolicy', 'securityBaselinePolicy'], true); } /** * @param array> $warnings * @return null|array */ private function fetchAssignmentsForPolicyType(string $policyType, Tenant $tenant, string $externalId, array &$warnings): ?array { $pathTemplate = config("graph_contracts.types.{$policyType}.assignments_list_path"); if (! is_string($pathTemplate) || $pathTemplate === '') { return null; } $path = str_replace('{id}', $externalId, $pathTemplate); $options = [ 'tenant' => $tenant->tenant_id ?? $tenant->external_id, 'client_id' => $tenant->app_client_id, 'client_secret' => $tenant->app_client_secret, ]; $maxAttempts = 3; for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) { $response = $this->graphClient->request('GET', $path, $options); if (! $response->failed()) { $data = $response->data; if (is_array($data) && array_key_exists('value', $data) && is_array($data['value'])) { return $data['value']; } if (is_array($data)) { return $data; } return null; } $status = (int) ($response->status ?? 0); if (! in_array($status, [429, 503], true)) { break; } } $warning = [ 'type' => 'assignments_fetch_failed', 'policy_id' => $externalId, 'policy_type' => $policyType, 'reason' => 'graph_assignments_list_failed', ]; $warnings[] = $warning; Log::info('Failed to fetch policy assignments', $warning); return null; } private function resolveConfigurationPolicyType(array $policyData): string { $templateReference = $policyData['templateReference'] ?? null; $templateFamily = null; if (is_array($templateReference)) { $templateFamily = $templateReference['templateFamily'] ?? null; } if (is_string($templateFamily) && strcasecmp(trim($templateFamily), 'securityBaseline') === 0) { return 'securityBaselinePolicy'; } if ($this->isEndpointSecurityConfigurationPolicy($policyData, $templateFamily)) { return 'endpointSecurityPolicy'; } return 'settingsCatalogPolicy'; } private function isEndpointSecurityConfigurationPolicy(array $policyData, ?string $templateFamily): bool { $technologies = $policyData['technologies'] ?? null; if (is_string($technologies) && strcasecmp(trim($technologies), 'endpointSecurity') === 0) { return true; } if (is_array($technologies)) { foreach ($technologies as $technology) { if (is_string($technology) && strcasecmp(trim($technology), 'endpointSecurity') === 0) { return true; } } } return is_string($templateFamily) && str_starts_with(strtolower(trim($templateFamily)), 'endpointsecurity'); } /** * @return array> */ private function supportedTypeConfigByType(): array { /** @var array> $supported */ $supported = config('tenantpilot.supported_policy_types', []); $byType = []; foreach ($supported as $config) { $type = $config['type'] ?? null; if (is_string($type) && $type !== '') { $byType[$type] = $config; } } return $byType; } private function selectionLockKey(Tenant $tenant, string $selectionHash): string { return sprintf('inventory_sync:tenant:%s:selection:%s', (string) $tenant->getKey(), $selectionHash); } /** * @param array $selectionPayload */ private function createSkippedRun( Tenant $tenant, string $selectionHash, array $selectionPayload, CarbonImmutable $now, string $errorCode, ): InventorySyncRun { return InventorySyncRun::query()->create([ 'tenant_id' => $tenant->getKey(), 'user_id' => null, 'selection_hash' => $selectionHash, 'selection_payload' => $selectionPayload, 'status' => InventorySyncRun::STATUS_SKIPPED, 'had_errors' => true, 'error_codes' => [$errorCode], 'error_context' => null, 'started_at' => $now, 'finished_at' => $now, 'items_observed_count' => 0, 'items_upserted_count' => 0, 'errors_count' => 0, ]); } private function markExistingRunSkipped(InventorySyncRun $run, CarbonImmutable $now, string $errorCode): InventorySyncRun { $run->update([ 'status' => InventorySyncRun::STATUS_SKIPPED, 'had_errors' => true, 'error_codes' => [$errorCode], 'error_context' => null, 'started_at' => $run->started_at ?? $now, 'finished_at' => $now, 'items_observed_count' => 0, 'items_upserted_count' => 0, 'errors_count' => 0, ]); return $run->refresh(); } private function mapGraphFailureToErrorCode(GraphResponse $response): string { $status = (int) ($response->status ?? 0); return match ($status) { 403 => 'graph_forbidden', 429 => 'graph_throttled', 503 => 'graph_transient', default => 'graph_transient', }; } private function listPoliciesWithRetry(string $policyType, array $options): GraphResponse { $maxAttempts = 3; for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) { $response = $this->graphClient->listPolicies($policyType, $options); if (! $response->failed()) { return $response; } $status = (int) ($response->status ?? 0); if (! in_array($status, [429, 503], true)) { return $response; } if ($attempt >= $maxAttempts) { return $response; } $baseMs = 250 * (2 ** ($attempt - 1)); $jitterMs = random_int(0, 250); usleep(($baseMs + $jitterMs) * 1000); } return new GraphResponse(false, [], null, ['error' => ['code' => 'unexpected_exception', 'message' => 'retry loop failed']]); } /** * @return array */ private function safeErrorContext(Throwable $throwable): array { $message = $throwable->getMessage(); $message = preg_replace('/Bearer\s+[A-Za-z0-9\-\._~\+\/]+=*/', 'Bearer [REDACTED]', (string) $message); $message = mb_substr((string) $message, 0, 500); return [ 'exception_class' => get_class($throwable), 'message' => $message, ]; } }