# Technical Implementation Notes: Backend Architecture Pivot **Feature**: 005-backend-arch-pivot **Created**: 2025-12-09 **Purpose**: Detailed implementation guidance for developers (not part of business specification) --- ## BullMQ Setup ### Installation ```bash npm install bullmq ioredis ``` ### Redis Connection **File**: `lib/queue/redis.ts` ```typescript import IORedis from 'ioredis'; import { env } from '@/lib/env.mjs'; export const redisConnection = new IORedis(env.REDIS_URL, { maxRetriesPerRequest: null, // BullMQ requirement }); ``` ### Queue Definition **File**: `lib/queue/syncQueue.ts` ```typescript import { Queue } from 'bullmq'; import { redisConnection } from './redis'; export const syncQueue = new Queue('intune-sync-queue', { connection: redisConnection, }); ``` --- ## Worker Implementation ### Worker Entry Point **File**: `worker/index.ts` ```typescript import { Worker } from 'bullmq'; import { redisConnection } from '@/lib/queue/redis'; import { syncPolicies } from './jobs/syncPolicies'; const worker = new Worker( 'intune-sync-queue', async (job) => { console.log(`Processing job ${job.id} for tenant ${job.data.tenantId}`); await syncPolicies(job.data.tenantId); }, { connection: redisConnection, concurrency: 1, // Sequential processing } ); worker.on('completed', (job) => { console.log(`Job ${job.id} completed`); }); worker.on('failed', (job, err) => { console.error(`Job ${job?.id} failed:`, err); }); console.log('Worker started, listening on intune-sync-queue...'); ``` ### Package.json Script ```json { "scripts": { "worker:start": "tsx watch worker/index.ts" } } ``` --- ## Sync Logic Architecture ### Main Function **File**: `worker/jobs/syncPolicies.ts` ```typescript export async function syncPolicies(tenantId: string) { // 1. Get Access Token const token = await getGraphAccessToken(); // 2. Fetch all policy types const policies = await fetchAllPolicies(token); // 3. Parse & Flatten const flattenedSettings = policies.flatMap(policy => parsePolicySettings(policy) ); // 4. Upsert to Database await upsertPolicySettings(tenantId, flattenedSettings); } ``` ### Authentication (Client Credentials) ```typescript import { ClientSecretCredential } from '@azure/identity'; async function getGraphAccessToken(): Promise { const credential = new ClientSecretCredential( 'common', // or specific tenant ID env.AZURE_AD_CLIENT_ID, env.AZURE_AD_CLIENT_SECRET ); const token = await credential.getToken('https://graph.microsoft.com/.default'); return token.token; } ``` ### Pagination Handling ```typescript async function fetchWithPagination(url: string, token: string): Promise { let results: T[] = []; let nextLink: string | undefined = url; while (nextLink) { const response = await fetch(nextLink, { headers: { Authorization: `Bearer ${token}` } }); const data = await response.json(); results = results.concat(data.value); nextLink = data['@odata.nextLink']; } return results; } ``` ### Graph API Endpoints ```typescript const GRAPH_ENDPOINTS = { deviceConfigurations: 'https://graph.microsoft.com/v1.0/deviceManagement/deviceConfigurations', compliancePolicies: 'https://graph.microsoft.com/v1.0/deviceManagement/deviceCompliancePolicies', configurationPolicies: 'https://graph.microsoft.com/v1.0/deviceManagement/configurationPolicies', intents: 'https://graph.microsoft.com/v1.0/deviceManagement/intents', }; async function fetchAllPolicies(token: string) { const [configs, compliance, configPolicies, intents] = await Promise.all([ fetchWithPagination(GRAPH_ENDPOINTS.deviceConfigurations, token), fetchWithPagination(GRAPH_ENDPOINTS.compliancePolicies, token), fetchWithPagination(GRAPH_ENDPOINTS.configurationPolicies, token), fetchWithPagination(GRAPH_ENDPOINTS.intents, token), ]); return [...configs, ...compliance, ...configPolicies, ...intents]; } ``` --- ## Flattening Strategy ### Settings Catalog (Most Complex) ```typescript function parseSettingsCatalog(policy: any): FlattenedSetting[] { if (!policy.settings) return [defaultEmptySetting(policy)]; return policy.settings.flatMap(setting => { const settingId = setting.settingInstance.settingDefinitionId; const value = extractValue(setting.settingInstance); return { settingName: humanizeSettingId(settingId), settingValue: JSON.stringify(value), settingValueType: typeof value, }; }); } function extractValue(settingInstance: any): any { // Handle different value types if (settingInstance.simpleSettingValue) { return settingInstance.simpleSettingValue.value; } if (settingInstance.choiceSettingValue) { return settingInstance.choiceSettingValue.value; } if (settingInstance.groupSettingCollectionValue) { return settingInstance.groupSettingCollectionValue.children.map( (child: any) => extractValue(child) ); } return null; } ``` ### OMA-URI ```typescript function parseOmaUri(policy: any): FlattenedSetting[] { if (!policy.omaSettings) return [defaultEmptySetting(policy)]; return policy.omaSettings.map(oma => ({ settingName: oma.omaUri, settingValue: oma.value, settingValueType: oma.valueType || 'string', })); } ``` ### Humanizer ```typescript function humanizeSettingId(id: string): string { return id .replace(/^device_vendor_msft_policy_config_/i, '') .replace(/_/g, ' ') .replace(/\b\w/g, c => c.toUpperCase()); } ``` ### Default Empty Setting ```typescript function defaultEmptySetting(policy: any): FlattenedSetting { return { policyId: policy.id, policyName: policy.displayName, policyType: detectPolicyType(policy['@odata.type']), settingName: '(No settings configured)', settingValue: '', settingValueType: 'empty', path: '', }; } ``` ### Policy Type Detection ```typescript function detectPolicyType(odataType: string): string { const typeMap: Record = { '#microsoft.graph.deviceManagementConfigurationPolicy': 'configurationPolicy', '#microsoft.graph.windows10CustomConfiguration': 'deviceConfiguration', '#microsoft.graph.windows10EndpointProtectionConfiguration': 'endpointSecurity', '#microsoft.graph.deviceCompliancePolicy': 'compliancePolicy', '#microsoft.graph.windowsUpdateForBusinessConfiguration': 'windowsUpdateForBusiness', '#microsoft.graph.iosCustomConfiguration': 'deviceConfiguration', '#microsoft.graph.androidManagedAppProtection': 'appConfiguration', }; return typeMap[odataType] || 'unknown'; } ``` --- ## Database Upsert **File**: `worker/jobs/upsertPolicySettings.ts` ```typescript import { db } from '@/lib/db'; import { policySettings } from '@/lib/db/schema/policySettings'; import { sql } from 'drizzle-orm'; async function upsertPolicySettings( tenantId: string, settings: FlattenedSetting[] ) { const records = settings.map(s => ({ tenantId, graphPolicyId: s.policyId, policyName: s.policyName, policyType: s.policyType, settingName: s.settingName, settingValue: s.settingValue, settingValueType: s.settingValueType, lastSyncedAt: new Date(), })); // Batch insert with conflict resolution await db.insert(policySettings) .values(records) .onConflictDoUpdate({ target: [ policySettings.tenantId, policySettings.graphPolicyId, policySettings.settingName ], set: { policyName: sql`EXCLUDED.policy_name`, policyType: sql`EXCLUDED.policy_type`, settingValue: sql`EXCLUDED.setting_value`, settingValueType: sql`EXCLUDED.setting_value_type`, lastSyncedAt: sql`EXCLUDED.last_synced_at`, }, }); } ``` --- ## Frontend Integration ### Server Action Update **File**: `lib/actions/policySettings.ts` **Before** (n8n Webhook): ```typescript const response = await fetch(env.N8N_SYNC_WEBHOOK_URL, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ tenantId }), }); ``` **After** (BullMQ Job): ```typescript import { syncQueue } from '@/lib/queue/syncQueue'; export async function triggerPolicySync(tenantId: string) { const job = await syncQueue.add('sync-tenant', { tenantId, triggeredAt: new Date(), }); return { success: true, jobId: job.id, message: 'Sync job created successfully' }; } ``` --- ## Environment Variables ### .env Changes **Add**: ```bash REDIS_URL=redis://localhost:6379 ``` **Remove**: ```bash # POLICY_API_SECRET=... (DELETE) # N8N_SYNC_WEBHOOK_URL=... (DELETE) ``` ### lib/env.mjs Updates ```typescript import { createEnv } from "@t3-oss/env-nextjs"; import { z } from "zod"; export const env = createEnv({ server: { DATABASE_URL: z.string().url(), NEXTAUTH_SECRET: z.string().min(1), NEXTAUTH_URL: z.string().url(), AZURE_AD_CLIENT_ID: z.string().min(1), AZURE_AD_CLIENT_SECRET: z.string().min(1), REDIS_URL: z.string().url(), // ADD THIS // REMOVE: POLICY_API_SECRET, N8N_SYNC_WEBHOOK_URL }, client: {}, runtimeEnv: { DATABASE_URL: process.env.DATABASE_URL, NEXTAUTH_SECRET: process.env.NEXTAUTH_SECRET, NEXTAUTH_URL: process.env.NEXTAUTH_URL, AZURE_AD_CLIENT_ID: process.env.AZURE_AD_CLIENT_ID, AZURE_AD_CLIENT_SECRET: process.env.AZURE_AD_CLIENT_SECRET, REDIS_URL: process.env.REDIS_URL, // ADD THIS }, }); ``` --- ## Retry & Error Handling ### Exponential Backoff ```typescript async function fetchWithRetry( url: string, token: string, maxRetries = 3 ): Promise { let lastError: Error | null = null; for (let attempt = 0; attempt < maxRetries; attempt++) { try { const response = await fetch(url, { headers: { Authorization: `Bearer ${token}` } }); if (response.status === 429) { // Rate limit - exponential backoff const delay = Math.pow(2, attempt) * 1000; await new Promise(resolve => setTimeout(resolve, delay)); continue; } if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } return await response.json(); } catch (error) { lastError = error as Error; // Don't retry on auth errors if (error instanceof Error && error.message.includes('401')) { throw error; } // Exponential backoff for transient errors if (attempt < maxRetries - 1) { const delay = Math.pow(2, attempt) * 1000; await new Promise(resolve => setTimeout(resolve, delay)); } } } throw lastError || new Error('Max retries exceeded'); } ``` --- ## Docker Compose Setup (Optional) **File**: `docker-compose.yml` ```yaml version: '3.8' services: redis: image: redis:alpine ports: - '6379:6379' volumes: - redis-data:/data restart: unless-stopped volumes: redis-data: ``` Start Redis: ```bash docker-compose up -d redis ``` --- ## Production Deployment ### Worker as Systemd Service **File**: `/etc/systemd/system/tenantpilot-worker.service` ```ini [Unit] Description=TenantPilot Policy Sync Worker After=network.target redis.service [Service] Type=simple User=www-data WorkingDirectory=/var/www/tenantpilot ExecStart=/usr/bin/node /var/www/tenantpilot/worker/index.js Restart=on-failure RestartSec=10 StandardOutput=journal StandardError=journal [Install] WantedBy=multi-user.target ``` Enable & Start: ```bash sudo systemctl enable tenantpilot-worker sudo systemctl start tenantpilot-worker sudo systemctl status tenantpilot-worker ``` --- ## Testing Strategy ### Unit Tests ```typescript import { describe, it, expect, vi } from 'vitest'; import { humanizeSettingId } from './humanizer'; describe('humanizeSettingId', () => { it('removes device_vendor_msft_policy_config prefix', () => { const result = humanizeSettingId('device_vendor_msft_policy_config_wifi_allowwifihotspotreporting'); expect(result).toBe('Wifi Allowwifihotspotreporting'); }); }); ``` ### Integration Tests ```typescript describe('syncPolicies', () => { it('fetches and stores policies for tenant', async () => { const testTenantId = 'test-tenant-123'; await syncPolicies(testTenantId); const settings = await db.query.policySettings.findMany({ where: eq(policySettings.tenantId, testTenantId), }); expect(settings.length).toBeGreaterThan(0); }); }); ``` --- ## Monitoring & Logging ### Structured Logging ```typescript import winston from 'winston'; const logger = winston.createLogger({ level: 'info', format: winston.format.json(), transports: [ new winston.transports.File({ filename: 'worker-error.log', level: 'error' }), new winston.transports.File({ filename: 'worker-combined.log' }), ], }); // In worker: logger.info('Job started', { jobId: job.id, tenantId: job.data.tenantId }); logger.error('Job failed', { jobId: job.id, error: err.message }); ``` ### Health Check Endpoint **File**: `app/api/worker-health/route.ts` ```typescript import { syncQueue } from '@/lib/queue/syncQueue'; export async function GET() { try { const jobCounts = await syncQueue.getJobCounts(); return Response.json({ status: 'healthy', queue: jobCounts, }); } catch (error) { return Response.json( { status: 'unhealthy', error: (error as Error).message }, { status: 500 } ); } } ``` --- ## Migration Checklist - [ ] Install dependencies (`bullmq`, `ioredis`, `@azure/identity`) - [ ] Add `REDIS_URL` to `.env` - [ ] Create `lib/queue/redis.ts` and `lib/queue/syncQueue.ts` - [ ] Create `worker/index.ts` with BullMQ Worker - [ ] Implement `worker/jobs/syncPolicies.ts` with full logic - [ ] Update `lib/actions/policySettings.ts` → replace n8n webhook with BullMQ - [ ] Remove `app/api/policy-settings/route.ts` - [ ] Remove `app/api/admin/tenants/route.ts` - [ ] Remove `POLICY_API_SECRET` from `.env` and `lib/env.mjs` - [ ] Remove `N8N_SYNC_WEBHOOK_URL` from `.env` and `lib/env.mjs` - [ ] Add `worker:start` script to `package.json` - [ ] Test locally: Start Redis, Start Worker, Trigger Sync from UI - [ ] Deploy Worker as background service (PM2/Systemd/Docker) - [ ] Verify end-to-end: Job creation → Worker processing → Database updates