616 lines
14 KiB
Markdown
616 lines
14 KiB
Markdown
# Technical Implementation Notes: Backend Architecture Pivot
|
|
|
|
**Feature**: 005-backend-arch-pivot
|
|
**Created**: 2025-12-09
|
|
**Purpose**: Detailed implementation guidance for developers (not part of business specification)
|
|
|
|
---
|
|
|
|
## BullMQ Setup
|
|
|
|
### Installation
|
|
|
|
```bash
|
|
npm install bullmq ioredis
|
|
```
|
|
|
|
### Redis Connection
|
|
|
|
**File**: `lib/queue/redis.ts`
|
|
|
|
```typescript
|
|
import IORedis from 'ioredis';
|
|
import { env } from '@/lib/env.mjs';
|
|
|
|
export const redisConnection = new IORedis(env.REDIS_URL, {
|
|
maxRetriesPerRequest: null, // BullMQ requirement
|
|
});
|
|
```
|
|
|
|
### Queue Definition
|
|
|
|
**File**: `lib/queue/syncQueue.ts`
|
|
|
|
```typescript
|
|
import { Queue } from 'bullmq';
|
|
import { redisConnection } from './redis';
|
|
|
|
export const syncQueue = new Queue('intune-sync-queue', {
|
|
connection: redisConnection,
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## Worker Implementation
|
|
|
|
### Worker Entry Point
|
|
|
|
**File**: `worker/index.ts`
|
|
|
|
```typescript
|
|
import { Worker } from 'bullmq';
|
|
import { redisConnection } from '@/lib/queue/redis';
|
|
import { syncPolicies } from './jobs/syncPolicies';
|
|
|
|
const worker = new Worker(
|
|
'intune-sync-queue',
|
|
async (job) => {
|
|
console.log(`Processing job ${job.id} for tenant ${job.data.tenantId}`);
|
|
await syncPolicies(job.data.tenantId);
|
|
},
|
|
{
|
|
connection: redisConnection,
|
|
concurrency: 1, // Sequential processing
|
|
}
|
|
);
|
|
|
|
worker.on('completed', (job) => {
|
|
console.log(`Job ${job.id} completed`);
|
|
});
|
|
|
|
worker.on('failed', (job, err) => {
|
|
console.error(`Job ${job?.id} failed:`, err);
|
|
});
|
|
|
|
console.log('Worker started, listening on intune-sync-queue...');
|
|
```
|
|
|
|
### Package.json Script
|
|
|
|
```json
|
|
{
|
|
"scripts": {
|
|
"worker:start": "tsx watch worker/index.ts"
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Sync Logic Architecture
|
|
|
|
### Main Function
|
|
|
|
**File**: `worker/jobs/syncPolicies.ts`
|
|
|
|
```typescript
|
|
export async function syncPolicies(tenantId: string) {
|
|
// 1. Get Access Token
|
|
const token = await getGraphAccessToken();
|
|
|
|
// 2. Fetch all policy types
|
|
const policies = await fetchAllPolicies(token);
|
|
|
|
// 3. Parse & Flatten
|
|
const flattenedSettings = policies.flatMap(policy =>
|
|
parsePolicySettings(policy)
|
|
);
|
|
|
|
// 4. Upsert to Database
|
|
await upsertPolicySettings(tenantId, flattenedSettings);
|
|
}
|
|
```
|
|
|
|
### Authentication (Client Credentials)
|
|
|
|
```typescript
|
|
import { ClientSecretCredential } from '@azure/identity';
|
|
|
|
async function getGraphAccessToken(): Promise<string> {
|
|
const credential = new ClientSecretCredential(
|
|
'common', // or specific tenant ID
|
|
env.AZURE_AD_CLIENT_ID,
|
|
env.AZURE_AD_CLIENT_SECRET
|
|
);
|
|
|
|
const token = await credential.getToken('https://graph.microsoft.com/.default');
|
|
return token.token;
|
|
}
|
|
```
|
|
|
|
### Pagination Handling
|
|
|
|
```typescript
|
|
async function fetchWithPagination<T>(url: string, token: string): Promise<T[]> {
|
|
let results: T[] = [];
|
|
let nextLink: string | undefined = url;
|
|
|
|
while (nextLink) {
|
|
const response = await fetch(nextLink, {
|
|
headers: { Authorization: `Bearer ${token}` }
|
|
});
|
|
const data = await response.json();
|
|
|
|
results = results.concat(data.value);
|
|
nextLink = data['@odata.nextLink'];
|
|
}
|
|
|
|
return results;
|
|
}
|
|
```
|
|
|
|
### Graph API Endpoints
|
|
|
|
```typescript
|
|
const GRAPH_ENDPOINTS = {
|
|
deviceConfigurations: 'https://graph.microsoft.com/v1.0/deviceManagement/deviceConfigurations',
|
|
compliancePolicies: 'https://graph.microsoft.com/v1.0/deviceManagement/deviceCompliancePolicies',
|
|
configurationPolicies: 'https://graph.microsoft.com/v1.0/deviceManagement/configurationPolicies',
|
|
intents: 'https://graph.microsoft.com/v1.0/deviceManagement/intents',
|
|
};
|
|
|
|
async function fetchAllPolicies(token: string) {
|
|
const [configs, compliance, configPolicies, intents] = await Promise.all([
|
|
fetchWithPagination(GRAPH_ENDPOINTS.deviceConfigurations, token),
|
|
fetchWithPagination(GRAPH_ENDPOINTS.compliancePolicies, token),
|
|
fetchWithPagination(GRAPH_ENDPOINTS.configurationPolicies, token),
|
|
fetchWithPagination(GRAPH_ENDPOINTS.intents, token),
|
|
]);
|
|
|
|
return [...configs, ...compliance, ...configPolicies, ...intents];
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Flattening Strategy
|
|
|
|
### Settings Catalog (Most Complex)
|
|
|
|
```typescript
|
|
function parseSettingsCatalog(policy: any): FlattenedSetting[] {
|
|
if (!policy.settings) return [defaultEmptySetting(policy)];
|
|
|
|
return policy.settings.flatMap(setting => {
|
|
const settingId = setting.settingInstance.settingDefinitionId;
|
|
const value = extractValue(setting.settingInstance);
|
|
|
|
return {
|
|
settingName: humanizeSettingId(settingId),
|
|
settingValue: JSON.stringify(value),
|
|
settingValueType: typeof value,
|
|
};
|
|
});
|
|
}
|
|
|
|
function extractValue(settingInstance: any): any {
|
|
// Handle different value types
|
|
if (settingInstance.simpleSettingValue) {
|
|
return settingInstance.simpleSettingValue.value;
|
|
}
|
|
|
|
if (settingInstance.choiceSettingValue) {
|
|
return settingInstance.choiceSettingValue.value;
|
|
}
|
|
|
|
if (settingInstance.groupSettingCollectionValue) {
|
|
return settingInstance.groupSettingCollectionValue.children.map(
|
|
(child: any) => extractValue(child)
|
|
);
|
|
}
|
|
|
|
return null;
|
|
}
|
|
```
|
|
|
|
### OMA-URI
|
|
|
|
```typescript
|
|
function parseOmaUri(policy: any): FlattenedSetting[] {
|
|
if (!policy.omaSettings) return [defaultEmptySetting(policy)];
|
|
|
|
return policy.omaSettings.map(oma => ({
|
|
settingName: oma.omaUri,
|
|
settingValue: oma.value,
|
|
settingValueType: oma.valueType || 'string',
|
|
}));
|
|
}
|
|
```
|
|
|
|
### Humanizer
|
|
|
|
```typescript
|
|
function humanizeSettingId(id: string): string {
|
|
return id
|
|
.replace(/^device_vendor_msft_policy_config_/i, '')
|
|
.replace(/_/g, ' ')
|
|
.replace(/\b\w/g, c => c.toUpperCase());
|
|
}
|
|
```
|
|
|
|
### Default Empty Setting
|
|
|
|
```typescript
|
|
function defaultEmptySetting(policy: any): FlattenedSetting {
|
|
return {
|
|
policyId: policy.id,
|
|
policyName: policy.displayName,
|
|
policyType: detectPolicyType(policy['@odata.type']),
|
|
settingName: '(No settings configured)',
|
|
settingValue: '',
|
|
settingValueType: 'empty',
|
|
path: '',
|
|
};
|
|
}
|
|
```
|
|
|
|
### Policy Type Detection
|
|
|
|
```typescript
|
|
function detectPolicyType(odataType: string): string {
|
|
const typeMap: Record<string, string> = {
|
|
'#microsoft.graph.deviceManagementConfigurationPolicy': 'configurationPolicy',
|
|
'#microsoft.graph.windows10CustomConfiguration': 'deviceConfiguration',
|
|
'#microsoft.graph.windows10EndpointProtectionConfiguration': 'endpointSecurity',
|
|
'#microsoft.graph.deviceCompliancePolicy': 'compliancePolicy',
|
|
'#microsoft.graph.windowsUpdateForBusinessConfiguration': 'windowsUpdateForBusiness',
|
|
'#microsoft.graph.iosCustomConfiguration': 'deviceConfiguration',
|
|
'#microsoft.graph.androidManagedAppProtection': 'appConfiguration',
|
|
};
|
|
|
|
return typeMap[odataType] || 'unknown';
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Database Upsert
|
|
|
|
**File**: `worker/jobs/upsertPolicySettings.ts`
|
|
|
|
```typescript
|
|
import { db } from '@/lib/db';
|
|
import { policySettings } from '@/lib/db/schema/policySettings';
|
|
import { sql } from 'drizzle-orm';
|
|
|
|
async function upsertPolicySettings(
|
|
tenantId: string,
|
|
settings: FlattenedSetting[]
|
|
) {
|
|
const records = settings.map(s => ({
|
|
tenantId,
|
|
graphPolicyId: s.policyId,
|
|
policyName: s.policyName,
|
|
policyType: s.policyType,
|
|
settingName: s.settingName,
|
|
settingValue: s.settingValue,
|
|
settingValueType: s.settingValueType,
|
|
lastSyncedAt: new Date(),
|
|
}));
|
|
|
|
// Batch insert with conflict resolution
|
|
await db.insert(policySettings)
|
|
.values(records)
|
|
.onConflictDoUpdate({
|
|
target: [
|
|
policySettings.tenantId,
|
|
policySettings.graphPolicyId,
|
|
policySettings.settingName
|
|
],
|
|
set: {
|
|
policyName: sql`EXCLUDED.policy_name`,
|
|
policyType: sql`EXCLUDED.policy_type`,
|
|
settingValue: sql`EXCLUDED.setting_value`,
|
|
settingValueType: sql`EXCLUDED.setting_value_type`,
|
|
lastSyncedAt: sql`EXCLUDED.last_synced_at`,
|
|
},
|
|
});
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Frontend Integration
|
|
|
|
### Server Action Update
|
|
|
|
**File**: `lib/actions/policySettings.ts`
|
|
|
|
**Before** (n8n Webhook):
|
|
```typescript
|
|
const response = await fetch(env.N8N_SYNC_WEBHOOK_URL, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ tenantId }),
|
|
});
|
|
```
|
|
|
|
**After** (BullMQ Job):
|
|
```typescript
|
|
import { syncQueue } from '@/lib/queue/syncQueue';
|
|
|
|
export async function triggerPolicySync(tenantId: string) {
|
|
const job = await syncQueue.add('sync-tenant', {
|
|
tenantId,
|
|
triggeredAt: new Date(),
|
|
});
|
|
|
|
return {
|
|
success: true,
|
|
jobId: job.id,
|
|
message: 'Sync job created successfully'
|
|
};
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Environment Variables
|
|
|
|
### .env Changes
|
|
|
|
**Add**:
|
|
```bash
|
|
REDIS_URL=redis://localhost:6379
|
|
```
|
|
|
|
**Remove**:
|
|
```bash
|
|
# POLICY_API_SECRET=... (DELETE)
|
|
# N8N_SYNC_WEBHOOK_URL=... (DELETE)
|
|
```
|
|
|
|
### lib/env.mjs Updates
|
|
|
|
```typescript
|
|
import { createEnv } from "@t3-oss/env-nextjs";
|
|
import { z } from "zod";
|
|
|
|
export const env = createEnv({
|
|
server: {
|
|
DATABASE_URL: z.string().url(),
|
|
NEXTAUTH_SECRET: z.string().min(1),
|
|
NEXTAUTH_URL: z.string().url(),
|
|
AZURE_AD_CLIENT_ID: z.string().min(1),
|
|
AZURE_AD_CLIENT_SECRET: z.string().min(1),
|
|
REDIS_URL: z.string().url(), // ADD THIS
|
|
// REMOVE: POLICY_API_SECRET, N8N_SYNC_WEBHOOK_URL
|
|
},
|
|
client: {},
|
|
runtimeEnv: {
|
|
DATABASE_URL: process.env.DATABASE_URL,
|
|
NEXTAUTH_SECRET: process.env.NEXTAUTH_SECRET,
|
|
NEXTAUTH_URL: process.env.NEXTAUTH_URL,
|
|
AZURE_AD_CLIENT_ID: process.env.AZURE_AD_CLIENT_ID,
|
|
AZURE_AD_CLIENT_SECRET: process.env.AZURE_AD_CLIENT_SECRET,
|
|
REDIS_URL: process.env.REDIS_URL, // ADD THIS
|
|
},
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## Retry & Error Handling
|
|
|
|
### Exponential Backoff
|
|
|
|
```typescript
|
|
async function fetchWithRetry<T>(
|
|
url: string,
|
|
token: string,
|
|
maxRetries = 3
|
|
): Promise<T> {
|
|
let lastError: Error | null = null;
|
|
|
|
for (let attempt = 0; attempt < maxRetries; attempt++) {
|
|
try {
|
|
const response = await fetch(url, {
|
|
headers: { Authorization: `Bearer ${token}` }
|
|
});
|
|
|
|
if (response.status === 429) {
|
|
// Rate limit - exponential backoff
|
|
const delay = Math.pow(2, attempt) * 1000;
|
|
await new Promise(resolve => setTimeout(resolve, delay));
|
|
continue;
|
|
}
|
|
|
|
if (!response.ok) {
|
|
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
|
|
}
|
|
|
|
return await response.json();
|
|
} catch (error) {
|
|
lastError = error as Error;
|
|
|
|
// Don't retry on auth errors
|
|
if (error instanceof Error && error.message.includes('401')) {
|
|
throw error;
|
|
}
|
|
|
|
// Exponential backoff for transient errors
|
|
if (attempt < maxRetries - 1) {
|
|
const delay = Math.pow(2, attempt) * 1000;
|
|
await new Promise(resolve => setTimeout(resolve, delay));
|
|
}
|
|
}
|
|
}
|
|
|
|
throw lastError || new Error('Max retries exceeded');
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Docker Compose Setup (Optional)
|
|
|
|
**File**: `docker-compose.yml`
|
|
|
|
```yaml
|
|
version: '3.8'
|
|
|
|
services:
|
|
redis:
|
|
image: redis:alpine
|
|
ports:
|
|
- '6379:6379'
|
|
volumes:
|
|
- redis-data:/data
|
|
restart: unless-stopped
|
|
|
|
volumes:
|
|
redis-data:
|
|
```
|
|
|
|
Start Redis:
|
|
```bash
|
|
docker-compose up -d redis
|
|
```
|
|
|
|
---
|
|
|
|
## Production Deployment
|
|
|
|
### Worker as Systemd Service
|
|
|
|
**File**: `/etc/systemd/system/tenantpilot-worker.service`
|
|
|
|
```ini
|
|
[Unit]
|
|
Description=TenantPilot Policy Sync Worker
|
|
After=network.target redis.service
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=www-data
|
|
WorkingDirectory=/var/www/tenantpilot
|
|
ExecStart=/usr/bin/node /var/www/tenantpilot/worker/index.js
|
|
Restart=on-failure
|
|
RestartSec=10
|
|
StandardOutput=journal
|
|
StandardError=journal
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
```
|
|
|
|
Enable & Start:
|
|
```bash
|
|
sudo systemctl enable tenantpilot-worker
|
|
sudo systemctl start tenantpilot-worker
|
|
sudo systemctl status tenantpilot-worker
|
|
```
|
|
|
|
---
|
|
|
|
## Testing Strategy
|
|
|
|
### Unit Tests
|
|
|
|
```typescript
|
|
import { describe, it, expect, vi } from 'vitest';
|
|
import { humanizeSettingId } from './humanizer';
|
|
|
|
describe('humanizeSettingId', () => {
|
|
it('removes device_vendor_msft_policy_config prefix', () => {
|
|
const result = humanizeSettingId('device_vendor_msft_policy_config_wifi_allowwifihotspotreporting');
|
|
expect(result).toBe('Wifi Allowwifihotspotreporting');
|
|
});
|
|
});
|
|
```
|
|
|
|
### Integration Tests
|
|
|
|
```typescript
|
|
describe('syncPolicies', () => {
|
|
it('fetches and stores policies for tenant', async () => {
|
|
const testTenantId = 'test-tenant-123';
|
|
|
|
await syncPolicies(testTenantId);
|
|
|
|
const settings = await db.query.policySettings.findMany({
|
|
where: eq(policySettings.tenantId, testTenantId),
|
|
});
|
|
|
|
expect(settings.length).toBeGreaterThan(0);
|
|
});
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## Monitoring & Logging
|
|
|
|
### Structured Logging
|
|
|
|
```typescript
|
|
import winston from 'winston';
|
|
|
|
const logger = winston.createLogger({
|
|
level: 'info',
|
|
format: winston.format.json(),
|
|
transports: [
|
|
new winston.transports.File({ filename: 'worker-error.log', level: 'error' }),
|
|
new winston.transports.File({ filename: 'worker-combined.log' }),
|
|
],
|
|
});
|
|
|
|
// In worker:
|
|
logger.info('Job started', { jobId: job.id, tenantId: job.data.tenantId });
|
|
logger.error('Job failed', { jobId: job.id, error: err.message });
|
|
```
|
|
|
|
### Health Check Endpoint
|
|
|
|
**File**: `app/api/worker-health/route.ts`
|
|
|
|
```typescript
|
|
import { syncQueue } from '@/lib/queue/syncQueue';
|
|
|
|
export async function GET() {
|
|
try {
|
|
const jobCounts = await syncQueue.getJobCounts();
|
|
|
|
return Response.json({
|
|
status: 'healthy',
|
|
queue: jobCounts,
|
|
});
|
|
} catch (error) {
|
|
return Response.json(
|
|
{ status: 'unhealthy', error: (error as Error).message },
|
|
{ status: 500 }
|
|
);
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Migration Checklist
|
|
|
|
- [ ] Install dependencies (`bullmq`, `ioredis`, `@azure/identity`)
|
|
- [ ] Add `REDIS_URL` to `.env`
|
|
- [ ] Create `lib/queue/redis.ts` and `lib/queue/syncQueue.ts`
|
|
- [ ] Create `worker/index.ts` with BullMQ Worker
|
|
- [ ] Implement `worker/jobs/syncPolicies.ts` with full logic
|
|
- [ ] Update `lib/actions/policySettings.ts` → replace n8n webhook with BullMQ
|
|
- [ ] Remove `app/api/policy-settings/route.ts`
|
|
- [ ] Remove `app/api/admin/tenants/route.ts`
|
|
- [ ] Remove `POLICY_API_SECRET` from `.env` and `lib/env.mjs`
|
|
- [ ] Remove `N8N_SYNC_WEBHOOK_URL` from `.env` and `lib/env.mjs`
|
|
- [ ] Add `worker:start` script to `package.json`
|
|
- [ ] Test locally: Start Redis, Start Worker, Trigger Sync from UI
|
|
- [ ] Deploy Worker as background service (PM2/Systemd/Docker)
|
|
- [ ] Verify end-to-end: Job creation → Worker processing → Database updates
|