14 KiB
14 KiB
Technical Implementation Notes: Backend Architecture Pivot
Feature: 005-backend-arch-pivot
Created: 2025-12-09
Purpose: Detailed implementation guidance for developers (not part of business specification)
BullMQ Setup
Installation
npm install bullmq ioredis
Redis Connection
File: lib/queue/redis.ts
import IORedis from 'ioredis';
import { env } from '@/lib/env.mjs';
export const redisConnection = new IORedis(env.REDIS_URL, {
maxRetriesPerRequest: null, // BullMQ requirement
});
Queue Definition
File: lib/queue/syncQueue.ts
import { Queue } from 'bullmq';
import { redisConnection } from './redis';
export const syncQueue = new Queue('intune-sync-queue', {
connection: redisConnection,
});
Worker Implementation
Worker Entry Point
File: worker/index.ts
import { Worker } from 'bullmq';
import { redisConnection } from '@/lib/queue/redis';
import { syncPolicies } from './jobs/syncPolicies';
const worker = new Worker(
'intune-sync-queue',
async (job) => {
console.log(`Processing job ${job.id} for tenant ${job.data.tenantId}`);
await syncPolicies(job.data.tenantId);
},
{
connection: redisConnection,
concurrency: 1, // Sequential processing
}
);
worker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err);
});
console.log('Worker started, listening on intune-sync-queue...');
Package.json Script
{
"scripts": {
"worker:start": "tsx watch worker/index.ts"
}
}
Sync Logic Architecture
Main Function
File: worker/jobs/syncPolicies.ts
export async function syncPolicies(tenantId: string) {
// 1. Get Access Token
const token = await getGraphAccessToken();
// 2. Fetch all policy types
const policies = await fetchAllPolicies(token);
// 3. Parse & Flatten
const flattenedSettings = policies.flatMap(policy =>
parsePolicySettings(policy)
);
// 4. Upsert to Database
await upsertPolicySettings(tenantId, flattenedSettings);
}
Authentication (Client Credentials)
import { ClientSecretCredential } from '@azure/identity';
async function getGraphAccessToken(): Promise<string> {
const credential = new ClientSecretCredential(
'common', // or specific tenant ID
env.AZURE_AD_CLIENT_ID,
env.AZURE_AD_CLIENT_SECRET
);
const token = await credential.getToken('https://graph.microsoft.com/.default');
return token.token;
}
Pagination Handling
async function fetchWithPagination<T>(url: string, token: string): Promise<T[]> {
let results: T[] = [];
let nextLink: string | undefined = url;
while (nextLink) {
const response = await fetch(nextLink, {
headers: { Authorization: `Bearer ${token}` }
});
const data = await response.json();
results = results.concat(data.value);
nextLink = data['@odata.nextLink'];
}
return results;
}
Graph API Endpoints
const GRAPH_ENDPOINTS = {
deviceConfigurations: 'https://graph.microsoft.com/v1.0/deviceManagement/deviceConfigurations',
compliancePolicies: 'https://graph.microsoft.com/v1.0/deviceManagement/deviceCompliancePolicies',
configurationPolicies: 'https://graph.microsoft.com/v1.0/deviceManagement/configurationPolicies',
intents: 'https://graph.microsoft.com/v1.0/deviceManagement/intents',
};
async function fetchAllPolicies(token: string) {
const [configs, compliance, configPolicies, intents] = await Promise.all([
fetchWithPagination(GRAPH_ENDPOINTS.deviceConfigurations, token),
fetchWithPagination(GRAPH_ENDPOINTS.compliancePolicies, token),
fetchWithPagination(GRAPH_ENDPOINTS.configurationPolicies, token),
fetchWithPagination(GRAPH_ENDPOINTS.intents, token),
]);
return [...configs, ...compliance, ...configPolicies, ...intents];
}
Flattening Strategy
Settings Catalog (Most Complex)
function parseSettingsCatalog(policy: any): FlattenedSetting[] {
if (!policy.settings) return [defaultEmptySetting(policy)];
return policy.settings.flatMap(setting => {
const settingId = setting.settingInstance.settingDefinitionId;
const value = extractValue(setting.settingInstance);
return {
settingName: humanizeSettingId(settingId),
settingValue: JSON.stringify(value),
settingValueType: typeof value,
};
});
}
function extractValue(settingInstance: any): any {
// Handle different value types
if (settingInstance.simpleSettingValue) {
return settingInstance.simpleSettingValue.value;
}
if (settingInstance.choiceSettingValue) {
return settingInstance.choiceSettingValue.value;
}
if (settingInstance.groupSettingCollectionValue) {
return settingInstance.groupSettingCollectionValue.children.map(
(child: any) => extractValue(child)
);
}
return null;
}
OMA-URI
function parseOmaUri(policy: any): FlattenedSetting[] {
if (!policy.omaSettings) return [defaultEmptySetting(policy)];
return policy.omaSettings.map(oma => ({
settingName: oma.omaUri,
settingValue: oma.value,
settingValueType: oma.valueType || 'string',
}));
}
Humanizer
function humanizeSettingId(id: string): string {
return id
.replace(/^device_vendor_msft_policy_config_/i, '')
.replace(/_/g, ' ')
.replace(/\b\w/g, c => c.toUpperCase());
}
Default Empty Setting
function defaultEmptySetting(policy: any): FlattenedSetting {
return {
policyId: policy.id,
policyName: policy.displayName,
policyType: detectPolicyType(policy['@odata.type']),
settingName: '(No settings configured)',
settingValue: '',
settingValueType: 'empty',
path: '',
};
}
Policy Type Detection
function detectPolicyType(odataType: string): string {
const typeMap: Record<string, string> = {
'#microsoft.graph.deviceManagementConfigurationPolicy': 'configurationPolicy',
'#microsoft.graph.windows10CustomConfiguration': 'deviceConfiguration',
'#microsoft.graph.windows10EndpointProtectionConfiguration': 'endpointSecurity',
'#microsoft.graph.deviceCompliancePolicy': 'compliancePolicy',
'#microsoft.graph.windowsUpdateForBusinessConfiguration': 'windowsUpdateForBusiness',
'#microsoft.graph.iosCustomConfiguration': 'deviceConfiguration',
'#microsoft.graph.androidManagedAppProtection': 'appConfiguration',
};
return typeMap[odataType] || 'unknown';
}
Database Upsert
File: worker/jobs/upsertPolicySettings.ts
import { db } from '@/lib/db';
import { policySettings } from '@/lib/db/schema/policySettings';
import { sql } from 'drizzle-orm';
async function upsertPolicySettings(
tenantId: string,
settings: FlattenedSetting[]
) {
const records = settings.map(s => ({
tenantId,
graphPolicyId: s.policyId,
policyName: s.policyName,
policyType: s.policyType,
settingName: s.settingName,
settingValue: s.settingValue,
settingValueType: s.settingValueType,
lastSyncedAt: new Date(),
}));
// Batch insert with conflict resolution
await db.insert(policySettings)
.values(records)
.onConflictDoUpdate({
target: [
policySettings.tenantId,
policySettings.graphPolicyId,
policySettings.settingName
],
set: {
policyName: sql`EXCLUDED.policy_name`,
policyType: sql`EXCLUDED.policy_type`,
settingValue: sql`EXCLUDED.setting_value`,
settingValueType: sql`EXCLUDED.setting_value_type`,
lastSyncedAt: sql`EXCLUDED.last_synced_at`,
},
});
}
Frontend Integration
Server Action Update
File: lib/actions/policySettings.ts
Before (n8n Webhook):
const response = await fetch(env.N8N_SYNC_WEBHOOK_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ tenantId }),
});
After (BullMQ Job):
import { syncQueue } from '@/lib/queue/syncQueue';
export async function triggerPolicySync(tenantId: string) {
const job = await syncQueue.add('sync-tenant', {
tenantId,
triggeredAt: new Date(),
});
return {
success: true,
jobId: job.id,
message: 'Sync job created successfully'
};
}
Environment Variables
.env Changes
Add:
REDIS_URL=redis://localhost:6379
Remove:
# POLICY_API_SECRET=... (DELETE)
# N8N_SYNC_WEBHOOK_URL=... (DELETE)
lib/env.mjs Updates
import { createEnv } from "@t3-oss/env-nextjs";
import { z } from "zod";
export const env = createEnv({
server: {
DATABASE_URL: z.string().url(),
NEXTAUTH_SECRET: z.string().min(1),
NEXTAUTH_URL: z.string().url(),
AZURE_AD_CLIENT_ID: z.string().min(1),
AZURE_AD_CLIENT_SECRET: z.string().min(1),
REDIS_URL: z.string().url(), // ADD THIS
// REMOVE: POLICY_API_SECRET, N8N_SYNC_WEBHOOK_URL
},
client: {},
runtimeEnv: {
DATABASE_URL: process.env.DATABASE_URL,
NEXTAUTH_SECRET: process.env.NEXTAUTH_SECRET,
NEXTAUTH_URL: process.env.NEXTAUTH_URL,
AZURE_AD_CLIENT_ID: process.env.AZURE_AD_CLIENT_ID,
AZURE_AD_CLIENT_SECRET: process.env.AZURE_AD_CLIENT_SECRET,
REDIS_URL: process.env.REDIS_URL, // ADD THIS
},
});
Retry & Error Handling
Exponential Backoff
async function fetchWithRetry<T>(
url: string,
token: string,
maxRetries = 3
): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const response = await fetch(url, {
headers: { Authorization: `Bearer ${token}` }
});
if (response.status === 429) {
// Rate limit - exponential backoff
const delay = Math.pow(2, attempt) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return await response.json();
} catch (error) {
lastError = error as Error;
// Don't retry on auth errors
if (error instanceof Error && error.message.includes('401')) {
throw error;
}
// Exponential backoff for transient errors
if (attempt < maxRetries - 1) {
const delay = Math.pow(2, attempt) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
throw lastError || new Error('Max retries exceeded');
}
Docker Compose Setup (Optional)
File: docker-compose.yml
version: '3.8'
services:
redis:
image: redis:alpine
ports:
- '6379:6379'
volumes:
- redis-data:/data
restart: unless-stopped
volumes:
redis-data:
Start Redis:
docker-compose up -d redis
Production Deployment
Worker as Systemd Service
File: /etc/systemd/system/tenantpilot-worker.service
[Unit]
Description=TenantPilot Policy Sync Worker
After=network.target redis.service
[Service]
Type=simple
User=www-data
WorkingDirectory=/var/www/tenantpilot
ExecStart=/usr/bin/node /var/www/tenantpilot/worker/index.js
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Enable & Start:
sudo systemctl enable tenantpilot-worker
sudo systemctl start tenantpilot-worker
sudo systemctl status tenantpilot-worker
Testing Strategy
Unit Tests
import { describe, it, expect, vi } from 'vitest';
import { humanizeSettingId } from './humanizer';
describe('humanizeSettingId', () => {
it('removes device_vendor_msft_policy_config prefix', () => {
const result = humanizeSettingId('device_vendor_msft_policy_config_wifi_allowwifihotspotreporting');
expect(result).toBe('Wifi Allowwifihotspotreporting');
});
});
Integration Tests
describe('syncPolicies', () => {
it('fetches and stores policies for tenant', async () => {
const testTenantId = 'test-tenant-123';
await syncPolicies(testTenantId);
const settings = await db.query.policySettings.findMany({
where: eq(policySettings.tenantId, testTenantId),
});
expect(settings.length).toBeGreaterThan(0);
});
});
Monitoring & Logging
Structured Logging
import winston from 'winston';
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'worker-error.log', level: 'error' }),
new winston.transports.File({ filename: 'worker-combined.log' }),
],
});
// In worker:
logger.info('Job started', { jobId: job.id, tenantId: job.data.tenantId });
logger.error('Job failed', { jobId: job.id, error: err.message });
Health Check Endpoint
File: app/api/worker-health/route.ts
import { syncQueue } from '@/lib/queue/syncQueue';
export async function GET() {
try {
const jobCounts = await syncQueue.getJobCounts();
return Response.json({
status: 'healthy',
queue: jobCounts,
});
} catch (error) {
return Response.json(
{ status: 'unhealthy', error: (error as Error).message },
{ status: 500 }
);
}
}
Migration Checklist
- Install dependencies (
bullmq,ioredis,@azure/identity) - Add
REDIS_URLto.env - Create
lib/queue/redis.tsandlib/queue/syncQueue.ts - Create
worker/index.tswith BullMQ Worker - Implement
worker/jobs/syncPolicies.tswith full logic - Update
lib/actions/policySettings.ts→ replace n8n webhook with BullMQ - Remove
app/api/policy-settings/route.ts - Remove
app/api/admin/tenants/route.ts - Remove
POLICY_API_SECRETfrom.envandlib/env.mjs - Remove
N8N_SYNC_WEBHOOK_URLfrom.envandlib/env.mjs - Add
worker:startscript topackage.json - Test locally: Start Redis, Start Worker, Trigger Sync from UI
- Deploy Worker as background service (PM2/Systemd/Docker)
- Verify end-to-end: Job creation → Worker processing → Database updates