TenantAtlas/app/Services/Inventory/InventorySyncService.php
ahmido 9c56a2349a feat/047-inventory-foundations-nodes (#51)
Adds Inventory Sync toggle include_foundations (default true) + persistence tests
Adds Coverage “Dependencies” column (/—) derived deterministically from graph_contracts (no Graph calls)
Spec/tasks/checklists updated + tasks ticked off

Co-authored-by: Ahmed Darrazi <ahmeddarrazi@adsmac.local>
Reviewed-on: #51
2026-01-10 20:47:29 +00:00

658 lines
24 KiB
PHP

<?php
namespace App\Services\Inventory;
use App\Models\InventoryItem;
use App\Models\InventorySyncRun;
use App\Models\Tenant;
use App\Models\User;
use App\Services\BackupScheduling\PolicyTypeResolver;
use App\Services\Graph\GraphClientInterface;
use App\Services\Graph\GraphResponse;
use Carbon\CarbonImmutable;
use Illuminate\Contracts\Cache\Lock;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
use Throwable;
class InventorySyncService
{
public function __construct(
private readonly GraphClientInterface $graphClient,
private readonly PolicyTypeResolver $policyTypeResolver,
private readonly InventorySelectionHasher $selectionHasher,
private readonly InventoryMetaSanitizer $metaSanitizer,
private readonly InventoryConcurrencyLimiter $concurrencyLimiter,
) {}
/**
* Runs an inventory sync inline (no queue), enforcing locks/concurrency and creating an observable run record.
*
* @param array<string, mixed> $selectionPayload
*/
public function syncNow(Tenant $tenant, array $selectionPayload): InventorySyncRun
{
$computed = $this->normalizeAndHashSelection($selectionPayload);
$normalizedSelection = $computed['selection'];
$selectionHash = $computed['selection_hash'];
$now = CarbonImmutable::now('UTC');
$globalSlot = $this->concurrencyLimiter->acquireGlobalSlot();
if (! $globalSlot instanceof Lock) {
return $this->createSkippedRun(
tenant: $tenant,
selectionHash: $selectionHash,
selectionPayload: $normalizedSelection,
now: $now,
errorCode: 'concurrency_limit_global',
);
}
$tenantSlot = $this->concurrencyLimiter->acquireTenantSlot((int) $tenant->id);
if (! $tenantSlot instanceof Lock) {
$globalSlot->release();
return $this->createSkippedRun(
tenant: $tenant,
selectionHash: $selectionHash,
selectionPayload: $normalizedSelection,
now: $now,
errorCode: 'concurrency_limit_tenant',
);
}
$selectionLock = Cache::lock($this->selectionLockKey($tenant, $selectionHash), 900);
if (! $selectionLock->get()) {
$tenantSlot->release();
$globalSlot->release();
return $this->createSkippedRun(
tenant: $tenant,
selectionHash: $selectionHash,
selectionPayload: $normalizedSelection,
now: $now,
errorCode: 'lock_contended',
);
}
$run = InventorySyncRun::query()->create([
'tenant_id' => $tenant->getKey(),
'user_id' => null,
'selection_hash' => $selectionHash,
'selection_payload' => $normalizedSelection,
'status' => InventorySyncRun::STATUS_RUNNING,
'had_errors' => false,
'error_codes' => [],
'error_context' => null,
'started_at' => $now,
'finished_at' => null,
'items_observed_count' => 0,
'items_upserted_count' => 0,
'errors_count' => 0,
]);
try {
return $this->executeRun($run, $tenant, $normalizedSelection);
} finally {
$selectionLock->release();
$tenantSlot->release();
$globalSlot->release();
}
}
/**
* @return array{policy_types: list<string>, categories: list<string>, include_foundations: bool, include_dependencies: bool}
*/
public function defaultSelectionPayload(): array
{
return [
'policy_types' => $this->policyTypeResolver->supportedPolicyTypes(),
'categories' => [],
'include_foundations' => true,
'include_dependencies' => true,
];
}
/**
* @param array<string, mixed> $selectionPayload
* @return array{selection: array{policy_types: list<string>, categories: list<string>, include_foundations: bool, include_dependencies: bool}, selection_hash: string}
*/
public function normalizeAndHashSelection(array $selectionPayload): array
{
$normalizedSelection = $this->selectionHasher->normalize($selectionPayload);
$normalizedSelection['policy_types'] = $this->policyTypeResolver->filterRuntime($normalizedSelection['policy_types']);
$selectionHash = $this->selectionHasher->hash($normalizedSelection);
return [
'selection' => $normalizedSelection,
'selection_hash' => $selectionHash,
];
}
/**
* Creates a pending run record attributed to the initiating user so the run remains observable even if queue workers are down.
*
* @param array<string, mixed> $selectionPayload
*/
public function createPendingRunForUser(Tenant $tenant, User $user, array $selectionPayload): InventorySyncRun
{
$computed = $this->normalizeAndHashSelection($selectionPayload);
return InventorySyncRun::query()->create([
'tenant_id' => $tenant->getKey(),
'user_id' => $user->getKey(),
'selection_hash' => $computed['selection_hash'],
'selection_payload' => $computed['selection'],
'status' => InventorySyncRun::STATUS_PENDING,
'had_errors' => false,
'error_codes' => [],
'error_context' => null,
'started_at' => null,
'finished_at' => null,
'items_observed_count' => 0,
'items_upserted_count' => 0,
'errors_count' => 0,
]);
}
/**
* Executes an existing pending run under locks/concurrency, updating that run to running/skipped/terminal.
*/
/**
* @param null|callable(string $policyType, bool $success, ?string $errorCode): void $onPolicyTypeProcessed
*/
public function executePendingRun(InventorySyncRun $run, Tenant $tenant, ?callable $onPolicyTypeProcessed = null): InventorySyncRun
{
$computed = $this->normalizeAndHashSelection($run->selection_payload ?? []);
$normalizedSelection = $computed['selection'];
$selectionHash = $computed['selection_hash'];
$now = CarbonImmutable::now('UTC');
$run->update([
'tenant_id' => $tenant->getKey(),
'selection_hash' => $selectionHash,
'selection_payload' => $normalizedSelection,
'status' => InventorySyncRun::STATUS_RUNNING,
'had_errors' => false,
'error_codes' => [],
'error_context' => null,
'started_at' => $now,
'finished_at' => null,
'items_observed_count' => 0,
'items_upserted_count' => 0,
'errors_count' => 0,
]);
$globalSlot = $this->concurrencyLimiter->acquireGlobalSlot();
if (! $globalSlot instanceof Lock) {
return $this->markExistingRunSkipped(
run: $run,
now: $now,
errorCode: 'concurrency_limit_global',
);
}
$tenantSlot = $this->concurrencyLimiter->acquireTenantSlot((int) $tenant->id);
if (! $tenantSlot instanceof Lock) {
$globalSlot->release();
return $this->markExistingRunSkipped(
run: $run,
now: $now,
errorCode: 'concurrency_limit_tenant',
);
}
$selectionLock = Cache::lock($this->selectionLockKey($tenant, $selectionHash), 900);
if (! $selectionLock->get()) {
$tenantSlot->release();
$globalSlot->release();
return $this->markExistingRunSkipped(
run: $run,
now: $now,
errorCode: 'lock_contended',
);
}
try {
return $this->executeRun($run, $tenant, $normalizedSelection, $onPolicyTypeProcessed);
} finally {
$selectionLock->release();
$tenantSlot->release();
$globalSlot->release();
}
}
/**
* @param array{policy_types: list<string>, categories: list<string>, include_foundations: bool, include_dependencies: bool} $normalizedSelection
*/
/**
* @param array{policy_types: list<string>, categories: list<string>, include_foundations: bool, include_dependencies: bool} $normalizedSelection
* @param null|callable(string $policyType, bool $success, ?string $errorCode): void $onPolicyTypeProcessed
*/
private function executeRun(InventorySyncRun $run, Tenant $tenant, array $normalizedSelection, ?callable $onPolicyTypeProcessed = null): InventorySyncRun
{
$observed = 0;
$upserted = 0;
$errors = 0;
$errorCodes = [];
$hadErrors = false;
$warnings = [];
try {
$typesConfig = $this->supportedTypeConfigByType();
$policyTypes = $normalizedSelection['policy_types'] ?? [];
$foundationTypes = $this->foundationTypes();
if ((bool) ($normalizedSelection['include_foundations'] ?? false)) {
$policyTypes = array_values(array_unique(array_merge($policyTypes, $foundationTypes)));
} else {
$policyTypes = array_values(array_diff($policyTypes, $foundationTypes));
}
foreach ($policyTypes as $policyType) {
$typeConfig = $typesConfig[$policyType] ?? null;
if (! is_array($typeConfig)) {
$hadErrors = true;
$errors++;
$errorCodes[] = 'unsupported_type';
$onPolicyTypeProcessed && $onPolicyTypeProcessed($policyType, false, 'unsupported_type');
continue;
}
$response = $this->listPoliciesWithRetry($policyType, [
'tenant' => $tenant->tenant_id ?? $tenant->external_id,
'client_id' => $tenant->app_client_id,
'client_secret' => $tenant->app_client_secret,
'platform' => $typeConfig['platform'] ?? null,
'filter' => $typeConfig['filter'] ?? null,
]);
if ($response->failed()) {
$hadErrors = true;
$errors++;
$errorCode = $this->mapGraphFailureToErrorCode($response);
$errorCodes[] = $errorCode;
$onPolicyTypeProcessed && $onPolicyTypeProcessed($policyType, false, $errorCode);
continue;
}
foreach ($response->data as $policyData) {
if (! is_array($policyData)) {
continue;
}
if ($this->shouldSkipPolicyForSelectedType($policyType, $policyData)) {
continue;
}
$externalId = $policyData['id'] ?? $policyData['external_id'] ?? null;
if (! is_string($externalId) || $externalId === '') {
continue;
}
$observed++;
$includeDeps = (bool) ($normalizedSelection['include_dependencies'] ?? true);
if ($includeDeps && $this->shouldHydrateAssignments($policyType)) {
$existingAssignments = $policyData['assignments'] ?? null;
if (! is_array($existingAssignments) || count($existingAssignments) === 0) {
$hydratedAssignments = $this->fetchAssignmentsForPolicyType($policyType, $tenant, $externalId, $warnings);
if (is_array($hydratedAssignments)) {
$policyData['assignments'] = $hydratedAssignments;
}
}
}
$displayName = $policyData['displayName'] ?? $policyData['name'] ?? null;
$displayName = is_string($displayName) ? $displayName : null;
$scopeTagIds = $policyData['roleScopeTagIds'] ?? null;
$assignmentTargetCount = null;
$assignments = $policyData['assignments'] ?? null;
if (is_array($assignments)) {
$assignmentTargetCount = count($assignments);
}
$meta = $this->metaSanitizer->sanitize([
'odata_type' => $policyData['@odata.type'] ?? $policyData['@OData.Type'] ?? null,
'etag' => $policyData['@odata.etag'] ?? null,
'scope_tag_ids' => is_array($scopeTagIds) ? $scopeTagIds : null,
'assignment_target_count' => $assignmentTargetCount,
'warnings' => [],
]);
$item = InventoryItem::query()->updateOrCreate(
[
'tenant_id' => $tenant->getKey(),
'policy_type' => $policyType,
'external_id' => $externalId,
],
[
'display_name' => $displayName,
'category' => $typeConfig['category'] ?? null,
'platform' => $typeConfig['platform'] ?? null,
'meta_jsonb' => $meta,
'last_seen_at' => now(),
'last_seen_run_id' => $run->getKey(),
]
);
$upserted++;
// Extract dependencies if requested in selection
if ($includeDeps) {
$warnings = array_merge(
$warnings,
app(\App\Services\Inventory\DependencyExtractionService::class)
->extractForPolicyData($item, $policyData)
);
}
}
$onPolicyTypeProcessed && $onPolicyTypeProcessed($policyType, true, null);
}
$status = $hadErrors ? InventorySyncRun::STATUS_PARTIAL : InventorySyncRun::STATUS_SUCCESS;
$run->update([
'status' => $status,
'had_errors' => $hadErrors,
'error_codes' => array_values(array_unique($errorCodes)),
'error_context' => [
'warnings' => array_values($warnings),
],
'items_observed_count' => $observed,
'items_upserted_count' => $upserted,
'errors_count' => $errors,
'finished_at' => CarbonImmutable::now('UTC'),
]);
return $run->refresh();
} catch (Throwable $throwable) {
$errorContext = $this->safeErrorContext($throwable);
$errorContext['warnings'] = array_values($warnings);
$run->update([
'status' => InventorySyncRun::STATUS_FAILED,
'had_errors' => true,
'error_codes' => ['unexpected_exception'],
'error_context' => $errorContext,
'items_observed_count' => $observed,
'items_upserted_count' => $upserted,
'errors_count' => $errors + 1,
'finished_at' => CarbonImmutable::now('UTC'),
]);
return $run->refresh();
}
}
private function shouldSkipPolicyForSelectedType(string $selectedPolicyType, array $policyData): bool
{
$configurationPolicyTypes = ['settingsCatalogPolicy', 'endpointSecurityPolicy', 'securityBaselinePolicy'];
if (! in_array($selectedPolicyType, $configurationPolicyTypes, true)) {
return false;
}
return $this->resolveConfigurationPolicyType($policyData) !== $selectedPolicyType;
}
private function shouldHydrateAssignments(string $policyType): bool
{
return in_array($policyType, ['settingsCatalogPolicy', 'endpointSecurityPolicy', 'securityBaselinePolicy'], true);
}
/**
* @param array<int, array<string, mixed>> $warnings
* @return null|array<int, mixed>
*/
private function fetchAssignmentsForPolicyType(string $policyType, Tenant $tenant, string $externalId, array &$warnings): ?array
{
$pathTemplate = config("graph_contracts.types.{$policyType}.assignments_list_path");
if (! is_string($pathTemplate) || $pathTemplate === '') {
return null;
}
$path = str_replace('{id}', $externalId, $pathTemplate);
$options = [
'tenant' => $tenant->tenant_id ?? $tenant->external_id,
'client_id' => $tenant->app_client_id,
'client_secret' => $tenant->app_client_secret,
];
$maxAttempts = 3;
for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
$response = $this->graphClient->request('GET', $path, $options);
if (! $response->failed()) {
$data = $response->data;
if (is_array($data) && array_key_exists('value', $data) && is_array($data['value'])) {
return $data['value'];
}
if (is_array($data)) {
return $data;
}
return null;
}
$status = (int) ($response->status ?? 0);
if (! in_array($status, [429, 503], true)) {
break;
}
}
$warning = [
'type' => 'assignments_fetch_failed',
'policy_id' => $externalId,
'policy_type' => $policyType,
'reason' => 'graph_assignments_list_failed',
];
$warnings[] = $warning;
Log::info('Failed to fetch policy assignments', $warning);
return null;
}
private function resolveConfigurationPolicyType(array $policyData): string
{
$templateReference = $policyData['templateReference'] ?? null;
$templateFamily = null;
if (is_array($templateReference)) {
$templateFamily = $templateReference['templateFamily'] ?? null;
}
if (is_string($templateFamily) && strcasecmp(trim($templateFamily), 'securityBaseline') === 0) {
return 'securityBaselinePolicy';
}
if ($this->isEndpointSecurityConfigurationPolicy($policyData, $templateFamily)) {
return 'endpointSecurityPolicy';
}
return 'settingsCatalogPolicy';
}
private function isEndpointSecurityConfigurationPolicy(array $policyData, ?string $templateFamily): bool
{
$technologies = $policyData['technologies'] ?? null;
if (is_string($technologies) && strcasecmp(trim($technologies), 'endpointSecurity') === 0) {
return true;
}
if (is_array($technologies)) {
foreach ($technologies as $technology) {
if (is_string($technology) && strcasecmp(trim($technology), 'endpointSecurity') === 0) {
return true;
}
}
}
return is_string($templateFamily) && str_starts_with(strtolower(trim($templateFamily)), 'endpointsecurity');
}
/**
* @return array<string, array<string, mixed>>
*/
private function supportedTypeConfigByType(): array
{
/** @var array<int, array<string, mixed>> $supported */
$supported = config('tenantpilot.supported_policy_types', []);
/** @var array<int, array<string, mixed>> $foundations */
$foundations = config('tenantpilot.foundation_types', []);
$all = array_merge(
is_array($supported) ? $supported : [],
is_array($foundations) ? $foundations : [],
);
$byType = [];
foreach ($all as $config) {
$type = $config['type'] ?? null;
if (is_string($type) && $type !== '') {
$byType[$type] = $config;
}
}
return $byType;
}
/**
* @return list<string>
*/
private function foundationTypes(): array
{
$types = config('tenantpilot.foundation_types', []);
if (! is_array($types)) {
return [];
}
return collect($types)
->map(fn (array $row) => $row['type'] ?? null)
->filter(fn ($type) => is_string($type) && $type !== '')
->values()
->all();
}
private function selectionLockKey(Tenant $tenant, string $selectionHash): string
{
return sprintf('inventory_sync:tenant:%s:selection:%s', (string) $tenant->getKey(), $selectionHash);
}
/**
* @param array<string, mixed> $selectionPayload
*/
private function createSkippedRun(
Tenant $tenant,
string $selectionHash,
array $selectionPayload,
CarbonImmutable $now,
string $errorCode,
): InventorySyncRun {
return InventorySyncRun::query()->create([
'tenant_id' => $tenant->getKey(),
'user_id' => null,
'selection_hash' => $selectionHash,
'selection_payload' => $selectionPayload,
'status' => InventorySyncRun::STATUS_SKIPPED,
'had_errors' => true,
'error_codes' => [$errorCode],
'error_context' => null,
'started_at' => $now,
'finished_at' => $now,
'items_observed_count' => 0,
'items_upserted_count' => 0,
'errors_count' => 0,
]);
}
private function markExistingRunSkipped(InventorySyncRun $run, CarbonImmutable $now, string $errorCode): InventorySyncRun
{
$run->update([
'status' => InventorySyncRun::STATUS_SKIPPED,
'had_errors' => true,
'error_codes' => [$errorCode],
'error_context' => null,
'started_at' => $run->started_at ?? $now,
'finished_at' => $now,
'items_observed_count' => 0,
'items_upserted_count' => 0,
'errors_count' => 0,
]);
return $run->refresh();
}
private function mapGraphFailureToErrorCode(GraphResponse $response): string
{
$status = (int) ($response->status ?? 0);
return match ($status) {
403 => 'graph_forbidden',
429 => 'graph_throttled',
503 => 'graph_transient',
default => 'graph_transient',
};
}
private function listPoliciesWithRetry(string $policyType, array $options): GraphResponse
{
$maxAttempts = 3;
for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
$response = $this->graphClient->listPolicies($policyType, $options);
if (! $response->failed()) {
return $response;
}
$status = (int) ($response->status ?? 0);
if (! in_array($status, [429, 503], true)) {
return $response;
}
if ($attempt >= $maxAttempts) {
return $response;
}
$baseMs = 250 * (2 ** ($attempt - 1));
$jitterMs = random_int(0, 250);
usleep(($baseMs + $jitterMs) * 1000);
}
return new GraphResponse(false, [], null, ['error' => ['code' => 'unexpected_exception', 'message' => 'retry loop failed']]);
}
/**
* @return array<string, mixed>
*/
private function safeErrorContext(Throwable $throwable): array
{
$message = $throwable->getMessage();
$message = preg_replace('/Bearer\s+[A-Za-z0-9\-\._~\+\/]+=*/', 'Bearer [REDACTED]', (string) $message);
$message = mb_substr((string) $message, 0, 500);
return [
'exception_class' => get_class($throwable),
'message' => $message,
];
}
}