tenantpilot/worker/jobs/graphFetch.ts

78 lines
2.2 KiB
TypeScript

import getGraphAccessToken from './graphAuth';
import { withRetry, isTransientError } from '../utils/retry';
type GraphRecord = Record<string, unknown>;
/**
* Fetch a Graph endpoint with pagination support for @odata.nextLink
* Returns an array of items aggregated across pages.
*/
export async function fetchWithPagination(
endpoint: string,
token: string,
baseUrl = 'https://graph.microsoft.com/v1.0'
): Promise<GraphRecord[]> {
const results: GraphRecord[] = [];
// Normalize URL
let url = endpoint.startsWith('http') ? endpoint : `${baseUrl}${endpoint.startsWith('/') ? '' : '/'}${endpoint}`;
while (url) {
const res = await withRetry(
async () => {
const response = await fetch(url, {
headers: {
Authorization: `Bearer ${token}`,
Accept: 'application/json',
},
});
// Handle rate limiting (429)
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After');
const delay = retryAfter ? parseInt(retryAfter, 10) * 1000 : 60000;
await new Promise((resolve) => setTimeout(resolve, delay));
throw new Error(`429 Rate limit exceeded, retrying after ${delay}ms`);
}
if (!response.ok) {
const txt = await response.text();
const error = new Error(`Graph fetch failed: ${response.status} ${response.statusText} - ${txt}`);
throw error;
}
return response;
},
{
maxAttempts: 3,
initialDelayMs: 1000,
shouldRetry: (error) => isTransientError(error),
}
);
const json = await res.json();
if (Array.isArray(json.value)) {
results.push(...json.value);
} else if (json.value !== undefined) {
// Some endpoints may return a single value
results.push(json.value as GraphRecord);
}
const next = json['@odata.nextLink'];
if (next) url = next;
else break;
}
return results;
}
/**
* Convenience function: obtains a Graph token and fetches pages for the given endpoint.
*/
export async function fetchFromGraph(endpoint: string) {
const token = await getGraphAccessToken();
return fetchWithPagination(endpoint, token);
}
export default fetchFromGraph;