slotKey = $slotKey ?: now()->startOfMinute()->toDateTimeString(); } /** * @return array */ public function middleware(): array { return [ (new WithoutOverlapping(self::OPERATION_TYPE)) ->expireAfter(self::LOCK_TTL_SECONDS) ->dontRelease(), ]; } /** * Execute the job. */ public function handle( AdapterRunReconciler $reconciler, OperationRunService $operationRunService, ): void { $workspace = $this->resolveWorkspace(); if (! $workspace instanceof Workspace) { Log::warning('ReconcileAdapterRunsJob skipped because no workspace was found.'); return; } $operationRun = $operationRunService->ensureWorkspaceRunWithIdentity( workspace: $workspace, type: self::OPERATION_TYPE, identityInputs: [ 'job' => self::OPERATION_TYPE, 'slot' => (string) $this->slotKey, ], context: [ 'job' => self::OPERATION_TYPE, 'slot' => (string) $this->slotKey, 'trigger' => 'schedule', ], ); if ($operationRun->status !== OperationRunStatus::Completed->value) { $operationRunService->updateRun($operationRun, OperationRunStatus::Running->value); } try { $result = $this->reconcile($reconciler); $candidates = (int) ($result['candidates'] ?? 0); $reconciled = (int) ($result['reconciled'] ?? 0); $skipped = (int) ($result['skipped'] ?? 0); $operationRunService->updateRun( $operationRun, status: OperationRunStatus::Completed->value, outcome: OperationRunOutcome::Succeeded->value, summaryCounts: [ 'total' => $candidates, 'processed' => $reconciled + $skipped, 'failed' => 0, ], ); Log::info('ReconcileAdapterRunsJob completed', [ 'operation_run_id' => (int) $operationRun->getKey(), 'candidates' => $candidates, 'reconciled' => $reconciled, 'skipped' => $skipped, ]); } catch (Throwable $e) { $safeMessage = RunFailureSanitizer::sanitizeMessage($e->getMessage()); $operationRunService->updateRun( $operationRun, status: OperationRunStatus::Completed->value, outcome: OperationRunOutcome::Failed->value, summaryCounts: [ 'total' => 0, 'processed' => 0, 'failed' => 1, ], failures: [[ 'code' => 'ops.reconcile_adapter_runs.failed', 'reason_code' => RunFailureSanitizer::normalizeReasonCode($e->getMessage()), 'message' => $safeMessage !== '' ? $safeMessage : 'Adapter run reconciliation failed.', ]], ); Log::warning('ReconcileAdapterRunsJob failed', [ 'operation_run_id' => (int) $operationRun->getKey(), 'error' => $safeMessage !== '' ? $safeMessage : 'Adapter run reconciliation failed.', ]); throw $e; } } /** * @return array{candidates:int, reconciled:int, skipped:int} */ protected function reconcile(AdapterRunReconciler $reconciler): array { return $reconciler->reconcile([ 'older_than_minutes' => 60, 'limit' => 50, 'dry_run' => false, ]); } private function resolveWorkspace(): ?Workspace { if (is_int($this->workspaceId) && $this->workspaceId > 0) { return Workspace::query()->find($this->workspaceId); } return Workspace::query() ->orderBy('id') ->first(); } }