> ## Documentation Index
> Fetch the complete documentation index at: https://docs.encoreos.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Flow & Integration Patterns

> Version: 1.0.0 Last Updated: 2025-12-31 Status: Active

**Version:** 1.0.0\
**Last Updated:** 2025-12-31\
**Status:** Active

This document describes how data flows through the Encore Health OS Platform, from user requests to database queries to cross-module communication.

***

## Request Lifecycle

### 1. User Authentication Flow

```
User Login
  → Supabase Auth (/auth/v1/token)
  → JWT Token Generated
  → Token Stored in Client (httpOnly cookie)
  → All Requests Include Token in Authorization Header
```

**Key Points:**

* JWT contains `user_id`, `email`, and `role` claims
* Token valid for 1 hour (configurable)
* Refresh token used for silent renewal
* `auth.uid()` function extracts user\_id in RLS policies

### 2. Organization Context Setup

```
App Load
  → OrganizationProvider Mounts
  → Fetch User Organizations (RLS-filtered)
  → Load User Roles
  → Set Current Organization
  → Fetch Sites for Org
  → Context Available to All Components
```

**Context Flow:**

```typescript theme={null}
// OrganizationContext provides:
{
  currentOrganization: Organization | null,
  currentSite: Site | null,
  organizations: Organization[],
  sites: Site[],
  switchOrganization: (orgId: string) => void,
  switchSite: (siteId: string | null) => void
}
```

### 3. Data Query with RLS

```
User Action (e.g., "Load Forms")
  → React Query Hook (useFormList)
  → Supabase Client Request
  → Database RLS Policies Execute:
    - Extract user_id from JWT (auth.uid())
    - Check organization membership
    - Check role permissions
    - Filter rows by organization_id
  → Return Filtered Data
  → React Query Caches Result
  → UI Renders
```

**RLS Execution Example:**

```sql theme={null}
-- User queries: SELECT * FROM fw_forms
-- RLS rewrites to:
SELECT * FROM fw_forms
WHERE organization_id IN (
  SELECT organization_id 
  FROM pf_user_roles 
  WHERE user_id = auth.uid()
)
-- User only sees their org's forms
```

***

## Automation Flow (FW-03)

### Form Submission Triggers Automation

```
User Submits Form
  → INSERT into fw_form_submissions
  → Database Trigger: trigger_automation_on_submission()
    → Extract submission data
    → Find matching automation rules (RLS-filtered)
    → Publish to pg_notify('automation_trigger')
      → Event payload: {
          trigger_type: 'form_submitted',
          submission_id: uuid,
          form_id: uuid,
          organization_id: uuid,
          site_id: uuid,
          submitted_by: uuid,
          submission_data: jsonb
        }
  → Edge Function: automation-executor
    → Receives event via pg_notify listener
    → Fetch automation rule (RLS-filtered)
    → Evaluate conditions:
      - Resolve dynamic values {{submission.field_name}}
      - Compare using operators (equals, contains, greater_than, etc.)
      - Support complex AND/OR logic
    → If condition met:
      → Execute actions in order:
        1. send_email → Org email provider (Entra or Gmail API)
        2. send_notification → send_notification() RPC
        3. update_record → Supabase query (RLS-enforced)
        4. call_webhook → HTTPS POST (validated)
    → Log execution to fw_automation_logs
      → Status: success | failed
      → Execution time (ms)
      → Action results
      → Error details (if failed)
```

**Key Security Features:**

* Trigger scoped to organization\_id and site\_id
* RLS enforced on all database queries
* Email sender validation
* HTTPS-only webhooks
* Dynamic value resolution prevents code injection
* All executions audited

***

## Event Flow (Event Infrastructure)

### Domain Event Publishing

```
Database State Change
  → HR: Credential expires
    → UPDATE hr_employee_credentials SET status = 'expired'
    → Database Trigger: publish_credential_expired()
      → Call publish_domain_event('credential_expired', payload)
        → pg_notify('domain_events', event_json)
          → Event payload: {
              event_type: 'credential_expired',
              employee_id: uuid,
              credential_id: uuid,
              credential_type_id: uuid,
              expiration_date: date,
              organization_id: uuid,
              timestamp: timestamptz
            }
  → Edge Function: event-consumer
    → Receives event via pg_notify listener
    → Log to pf_audit_logs:
      - module: 'hr'
      - action: 'domain_event'
      - table_name: 'hr_employee_credentials'
      - new_values: event payload
    → Future Handlers (commented code ready):
      - Send notification to employee and manager
      - Create alert in compliance dashboard
      - Update credential status indicators
```

**Idempotency (R7):** Event-consumer and automation-executor handlers MUST use `event_id` or `execution_id` as idempotency keys—record processed IDs and skip duplicate processing on retry. See EVENT\_CONTRACTS § Idempotency and Retry and EDGE\_FUNCTIONS.md § automation-executor.

**Event Types Published:**

1. `credential_expired` - Employee credential has expired
2. `credential_verified` - Credential verification completed
3. `onboarding_completed` - Employee onboarding finished
4. `offboarding_completed` - Employee offboarding finished
5. `form_submitted` - Form submission created (triggers automations)

**Event Flow Diagram:**

```
┌─────────────────┐       pg_notify        ┌──────────────────┐
│ Database Trigger│ ───────────────────────>│ Event Consumer   │
│ (HR, FW cores)  │   ('domain_events')    │ Edge Function    │
└─────────────────┘                         └──────────────────┘
                                                     │
                                                     ▼
                                            ┌──────────────────┐
                                            │ pf_audit_logs    │
                                            │ (Permanent Log)  │
                                            └──────────────────┘
                                                     │
                                                     ▼
                                            ┌──────────────────┐
                                            │ Future Handlers  │
                                            │ - Notifications  │
                                            │ - Automations    │
                                            │ - Webhooks       │
                                            └──────────────────┘
```

***

## Notification Flow (PF-10)

### Creating and Delivering Notifications

```
Application Event (e.g., automation executes)
  → Call send_notification() RPC
    → Function Parameters:
      - _user_id: uuid
      - _type: string (e.g., 'form_submitted', 'credential_expiring')
      - _channel: 'email' | 'in_app' | 'sms'
      - _title: string
      - _body: string
      - _action_url: string (optional)
    → Check User Preferences:
      - Query pf_notification_preferences
      - Respect quiet hours (8 PM - 8 AM)
      - Check if user opted out of this type
    → If Preferences Allow:
      → Insert to pf_notifications:
        - status = 'pending' (for email/SMS)
        - status = 'sent' (for in_app)
        - created_at = now()
      → If channel = 'email':
        → Status set to 'pending'
        → Cron Job (every 5 minutes):
          → Edge Function: send-pending-notifications
            → Fetch up to 100 pending email notifications
            → For each notification:
              - Lookup user email via pf_profiles
              - Generate HTML email from template
              - Send via org email provider (Entra or Gmail)
              - Update status to 'sent' or 'failed'
              - Set sent_at timestamp
              - Log failure_reason if error
              - Increment retry_count if transient error
    → Return notification_id (or NULL if blocked by preferences)
```

**Email Delivery Cron Job:**

```sql theme={null}
-- Runs every 5 minutes
SELECT cron.schedule(
  'send-pending-notifications',
  '*/5 * * * *',
  $$
  SELECT net.http_post(
    url := 'https://<project-ref>.supabase.co/functions/v1/send-pending-notifications',
    headers := jsonb_build_object(
      'Authorization', 'Bearer <service-role-key>',
      'Content-Type', 'application/json'
    )
  );
  $$
);
```

**In-App Notification Realtime:**

```
Notification Created (status = 'sent', channel = 'in_app')
  → Realtime Subscription Fires
    → Frontend: useNotifications() hook
      → React Query invalidates cache
      → Re-fetch notifications list
      → Badge count updates
      → NotificationCenter dropdown updates
  → User Clicks Notification
    → Navigate to action_url (if provided)
    → Mark as read mutation
      → UPDATE pf_notifications SET read_at = now()
      → Badge count decrements
```

***

## Cross-Module Communication

### Scenario: RH Core Needs Form Submission

**WITHOUT Platform Integration (❌ Violates Constitution):**

```tsx theme={null}
// WRONG: Direct dependency on FW core
import { FormRenderer } from '@/cores/fw/components/FormRenderer';
```

**WITH Platform Integration (✅ Correct):**

```tsx theme={null}
// RIGHT: Use platform integration layer
import { FormEmbed } from '@/platform/forms';

export function ResidentIntake() {
  return (
    <FormEmbed 
      formId="resident-intake-v2"
      onSuccess={(submission) => {
        // Create resident from submission data
        createResident(submission.data);
      }}
    />
  );
}
```

### Data Flow: Platform Forms Integration

```
RH Component
  → <FormEmbed formId="intake" />
  → Platform Forms Layer (PF-08)
    → useFormDefinition() hook
      → Supabase Query to fw_forms
      → RLS applies (org + permissions)
    → FormRenderer component
      → Renders fields from definition
    → useFormSubmission() hook
      → Validates with Zod schema
      → Calls edge function validate-form-submission
      → Inserts to fw_form_submissions
      → Triggers automation rules (FW-03)
  → onSuccess callback back to RH
  → RH creates resident record
```

**Integration Contract:**

```typescript theme={null}
// Platform Forms exposes minimal API
interface FormEmbedProps {
  formId: string;
  initialData?: Record<string, any>;
  onSuccess?: (submission: FormSubmission) => void;
  onError?: (error: Error) => void;
}

// RH uses without knowing FW internals
<FormEmbed formId="intake" onSuccess={handleAdmission} />
```

***

## Notification Flow (PF-10)

### Sending a Notification

```
Trigger Event (e.g., form submitted)
  → Core Code Calls send_notification()
    → Database Function Executes:
      - Check user preferences
      - Respect quiet hours
      - Substitute template variables
      - Insert to pf_notifications
      - Emit pg_notify('notification_delivery')
  → Edge Function Listens to pg_notify
    → Process email/SMS delivery
    → Update notification status
  → Frontend Realtime Listener
    → Badge count updates
    → Notification dropdown refreshes
```

**Code Example:**

```typescript theme={null}
// In FW core: Form submitted
await supabase.rpc('send_notification', {
  _user_id: submission.submitted_by,
  _type: 'form_submitted',
  _channel: 'in_app',
  _title: 'Form Submitted',
  _body: `Form "${form.name}" submitted successfully`,
  _action_url: `/fw/submissions/${submission.id}`
});
```

### Receiving a Notification

```
User Opens App
  → NotificationCenter Mounts
  → useNotifications() Hook
    → Subscribe to Realtime Changes
      → ALTER PUBLICATION supabase_realtime ADD TABLE pf_notifications
    → Query Recent Notifications (RLS-filtered)
  → New Notification Arrives
    → Realtime Event Fires
    → React Query Invalidates Cache
    → UI Re-renders with New Notification
  → User Clicks Notification
    → Mark as Read Mutation
    → Badge Count Decrements
```

**Realtime Setup:**

```typescript theme={null}
// Realtime subscription
const channel = supabase
  .channel('notifications')
  .on('postgres_changes', {
    event: '*',
    schema: 'public',
    table: 'pf_notifications',
    filter: `user_id=eq.${userId}`
  }, (payload) => {
    queryClient.invalidateQueries(['notifications']);
  })
  .subscribe();
```

***

## Document Management Flow (PF-11)

### Upload Document

```
User Selects File
  → DocumentUpload Component
  → useDocumentUpload() Hook
    → Generate unique file path
      → {org_id}/{category}/{uuid}.{ext}
    → Upload to Supabase Storage
      → Storage RLS checks permissions
    → Insert Metadata to pf_documents
      → Database RLS applies
      → Audit log entry created
    → Create First Version (pf_document_versions)
  → Success Callback
    → Refresh Document List
```

**Upload Flow:**

```typescript theme={null}
// File upload
const uploadDocument = async (file: File) => {
  // 1. Generate path
  const path = `${orgId}/${category}/${uuidv4()}.${ext}`;
  
  // 2. Upload to storage
  const { data: storageData } = await supabase.storage
    .from('documents')
    .upload(path, file);
  
  // 3. Create metadata record
  const { data: doc } = await supabase
    .from('pf_documents')
    .insert({
      title, description, category, tags,
      storage_path: path,
      file_size_bytes: file.size,
      mime_type: file.type,
      author: userId,
      organization_id: orgId
    })
    .select()
    .single();
  
  // 4. Create first version
  await supabase
    .from('pf_document_versions')
    .insert({
      document_id: doc.id,
      storage_path: path,
      version_major: 1,
      version_minor: 0,
      changed_by: userId
    });
};
```

### Download Document

```
User Clicks Download
  → useDocumentDownload() Hook
    → Call Supabase Storage createSignedUrl()
      → RLS checks document permissions
      → Generate 1-hour signed URL
    → Open URL in New Tab
      → Browser downloads file
    → Track Download Event
      → Insert to pf_audit_logs
```

**Download Flow:**

```typescript theme={null}
// Generate signed URL
const { data } = await supabase.storage
  .from('documents')
  .createSignedUrl(document.storage_path, 3600); // 1 hour

// Open in new tab
window.open(data.signedUrl, '_blank');

// Log download
await supabase.rpc('log_audit_event', {
  action: 'download',
  module: 'pf',
  table_name: 'pf_documents',
  record_id: document.id
});
```

***

## Reporting Flow (PF-12)

### Execute Report

```
User Runs Report
  → ReportViewer Component
  → useReportExecution() Hook
    → Check Cache (15min TTL)
    → If Cache Miss:
      → Call execute-report Edge Function
        → Fetch report definition (RLS-filtered)
        → Inject parameters into SQL template
        → Execute query with timeout (30s)
        → Apply RLS on results
        → Cache result
      → Return Data
    → If Cache Hit:
      → Return Cached Data
  → Display in Table
    → Paginate (100 rows/page)
    → Apply client-side filters/sort
```

**Report Execution:**

```typescript theme={null}
// Execute report
const { data } = await supabase.functions.invoke('execute-report', {
  body: {
    report_id: reportId,
    params: { start_date: '2025-01-01', end_date: '2025-01-31' }
  }
});

// Result structure
{
  columns: [
    { key: 'name', label: 'Resident Name' },
    { key: 'days', label: 'Days in Care' }
  ],
  rows: [
    { name: 'John Doe', days: 45 },
    { name: 'Jane Smith', days: 30 }
  ],
  total_rows: 2,
  execution_time_ms: 234
}
```

### Visual Query Builder

```
User Builds Query
  → TableSelector (choose table)
  → ColumnSelector (choose columns)
  → FilterBuilder (add WHERE clauses)
  → JoinBuilder (add JOINs)
  → SQLPreview (show generated SQL)
  → Execute (call edge function)
```

***

## Audit Logging Flow (PF-04)

### Automatic Logging (Trigger-Based)

```
User Updates Record
  → Database Trigger Fires
    → log_audit_event() function
      → Capture OLD and NEW values
      → Extract user_id from auth.uid()
      → Get organization_id from record
      → Insert to pf_audit_logs
  → Commit Transaction
  → Audit Entry Permanent (immutable)
```

**Trigger Example:**

```sql theme={null}
CREATE TRIGGER audit_fw_forms
AFTER INSERT OR UPDATE OR DELETE ON fw_forms
FOR EACH ROW
EXECUTE FUNCTION log_audit_event();

-- Function captures:
{
  user_id: auth.uid(),
  organization_id: NEW.organization_id,
  module: 'fw',
  action: 'update',
  table_name: 'fw_forms',
  record_id: NEW.id,
  old_values: to_jsonb(OLD),
  new_values: to_jsonb(NEW)
}
```

### Manual Logging (Function-Based)

```
Application Code
  → Call send_notification()
    → Manually log to pf_audit_logs
      → Include custom event details
  → Insert via Edge Function (service role)
```

**Manual Audit Example:**

```typescript theme={null}
// In edge function (service role bypass RLS)
await supabaseAdmin
  .from('pf_audit_logs')
  .insert({
    user_id: userId,
    organization_id: orgId,
    module: 'rh',
    action: 'resident_admitted',
    table_name: 'rh_residents',
    record_id: residentId,
    new_values: { phase: 'intake', bed_id: bedId }
  });
```

***

## Multi-Tenant Data Isolation

### How RLS Enforces Isolation

```sql theme={null}
-- Every table with org data has this pattern:
CREATE POLICY "org_isolation"
ON table_name
FOR SELECT
TO authenticated
USING (
  organization_id IN (
    SELECT organization_id 
    FROM pf_user_roles 
    WHERE user_id = auth.uid()
  )
);
```

### Validation Flow

```
User Query: SELECT * FROM fw_forms
  → Postgres Planner
  → Check RLS Policies for fw_forms
  → Rewrite Query:
    SELECT * FROM fw_forms
    WHERE organization_id IN (
      SELECT organization_id FROM pf_user_roles WHERE user_id = 'user-uuid'
    )
  → Execute Rewritten Query
  → Return Filtered Results
```

### Cross-Org Protection

```
Malicious Request: SELECT * FROM fw_forms WHERE organization_id = 'other-org-id'
  → RLS Policy Still Applies
  → Query Becomes:
    SELECT * FROM fw_forms 
    WHERE organization_id = 'other-org-id'
    AND organization_id IN ('current-user-orgs')
  → Result: Empty Set (intersection is empty)
  → No Data Leakage
```

**Security Test:**

```typescript theme={null}
// Org1 user tries to access Org2 data
const org1Client = createAuthenticatedClient(org1AdminToken);
const { data } = await org1Client
  .from('fw_forms')
  .select('*')
  .eq('organization_id', org2Id); // Malicious query

// Result: data = [] (RLS blocks it)
expect(data).toHaveLength(0);
```

***

## Performance Optimization

### Caching Strategy

```
Client Request
  → React Query Cache Check (staleTime: 5min)
  → If Fresh: Return from Cache
  → If Stale:
    → Background Refetch
    → Return Stale Data Immediately
    → Update UI When Fresh Data Arrives
```

**Cache Configuration:**

```typescript theme={null}
const { data } = useQuery({
  queryKey: ['forms', orgId],
  queryFn: () => fetchForms(orgId),
  staleTime: 5 * 60 * 1000, // 5 minutes
  cacheTime: 30 * 60 * 1000, // 30 minutes
  refetchOnWindowFocus: true
});
```

### Database Query Optimization

```
Slow Query Detected (> 1s)
  → Check Indexes:
    - All foreign keys indexed?
    - Full-text search using GIN index?
    - organization_id indexed?
  → Check RLS Policies:
    - Using indexed columns in subqueries?
    - Avoid function calls in USING clause?
  → Check Data Volume:
    - Need partitioning by organization?
    - Need archival of old data?
```

**Index Strategy:**

```sql theme={null}
-- Foreign keys (required)
CREATE INDEX idx_forms_org ON fw_forms(organization_id);
CREATE INDEX idx_forms_site ON fw_forms(site_id);

-- Common queries
CREATE INDEX idx_forms_status ON fw_forms(status);
CREATE INDEX idx_forms_created ON fw_forms(created_at DESC);

-- Full-text search
CREATE INDEX idx_documents_search ON pf_documents USING GIN(search_vector);

-- Composite indexes for multi-column queries
CREATE INDEX idx_forms_org_status ON fw_forms(organization_id, status);
```

***

## Integration Patterns Summary

### Pattern 1: Platform Integration Layer

**Use Case:** Cross-cutting capabilities (forms, notifications, documents)

**Example:** RH core uses forms without depending on FW

```tsx theme={null}
import { FormEmbed } from '@/platform/forms'; // ✅ Correct

<FormEmbed formId="intake" onSuccess={handleAdmission} />
```

### Pattern 2: Event-Based Integration

**Use Case:** Asynchronous workflows, loose coupling

**Example:** Automation engine reacts to form submissions

```sql theme={null}
-- Trigger emits event
PERFORM pg_notify('automation_trigger', json_build_object(
  'trigger_type', 'form_submitted',
  'submission_id', NEW.id
)::text);

-- Edge function listens and processes
```

### Pattern 3: API Contracts

**Use Case:** Synchronous request-response

**Example:** RH requests census data from FA

```typescript theme={null}
// Define contract
interface CensusRequest {
  org_id: string;
  site_id: string;
  date: string;
}

// Implement as edge function
export async function getCensus(req: CensusRequest): Promise<CensusData>
```

***

See `/constitution.md` Section 5.6 for performance targets and Section 1.3 for integration pattern details.

***

**Last Updated:** 2025-11-24\
**Maintained by:** Platform Foundation Team
