Skip to content

05 - Create

This section implements the Create method. We'll use the async pattern here - Create returns immediately with a RequestID, and the agent polls Status() until completion.

The Async Pattern

Many infrastructure operations take time. Instead of blocking, plugins can use an async pattern:

┌─────────┐         ┌────────┐         ┌──────────────┐
│  Agent  │         │ Plugin │         │ Infrastructure│
└────┬────┘         └───┬────┘         └──────┬───────┘
     │                  │                     │
     │  Create(req)     │                     │
     │─────────────────>│                     │
     │                  │  Start operation    │
     │                  │────────────────────>│
     │  InProgress +    │                     │
     │  RequestID       │                     │
     │<─────────────────│                     │
     │                  │                     │
     │  Status(reqID)   │                     │
     │─────────────────>│                     │
     │  InProgress      │                     │
     │<─────────────────│                     │
     │                  │                     │
     │  Status(reqID)   │                     │
     │─────────────────>│  Check status       │
     │                  │────────────────────>│
     │  Success +       │  Completed          │
     │  Properties      │<────────────────────│
     │<─────────────────│                     │
  1. Create starts the operation and returns InProgress with a RequestID
  2. Agent polls Status with the RequestID until Success or Failure
  3. On Success, the resource exists and properties are returned

When to Use Async

The async pattern is optional for Create, Update, and Delete. If your infrastructure operation completes quickly, you can return Success directly without going through the polling cycle. The agent enforces a timeout on plugin calls, so we recommend using the async pattern for any operation that might take longer than 5 seconds.

For our SFTP plugin, file uploads could take time for large files, so we implement Create as async. Update and Delete are synchronous since they typically complete quickly.

ProgressResult

All mutating operations (Create, Update, Delete) return their results wrapped in a ProgressResult. This common structure enables the agent to handle async operations uniformly:

type ProgressResult struct {
    Operation          Operation          // Which operation (Create, Update, Delete, CheckStatus)
    OperationStatus    OperationStatus    // InProgress, Success, or Failure
    RequestID          string             // Identifier for polling (async operations)
    NativeID           string             // Infrastructure identifier for the resource
    ResourceProperties json.RawMessage    // Current resource state (on success)
    ErrorCode          OperationErrorCode // Categorized error (on failure)
    StatusMessage      string             // Human-readable status or error details
}

The OperationStatus tells the agent what to do next:

Status Meaning Agent Action
InProgress Operation still running Poll Status() again
Success Operation completed Update inventory with new state
Failure Operation failed Check ErrorCode for details

When an operation fails, set the ErrorCode to help the agent understand the nature of the failure. See 10 - Error Handling for the complete list of error codes and when to use each one.

CreateRequest

The agent sends a CreateRequest when provisioning a new resource:

type CreateRequest struct {
    ResourceType string          // Fully-qualified type, e.g. "SFTP::Files::File"
    Label        string          // User-defined label for this resource
    Properties   json.RawMessage // Resource properties from the forma file
    TargetConfig json.RawMessage // Connection details (parsed in section 03)
}

Test

Create sftp_test.go with the Create test:

// © 2025 Platform Engineering Labs Inc.
//
// SPDX-License-Identifier: FSL-1.1-ALv2

//go:build integration

package main

import (
    "context"
    "encoding/json"
    "os"
    "testing"
    "time"

    "github.com/platform-engineering-labs/formae-plugin-sftp/pkg/asyncsftp"
    "github.com/platform-engineering-labs/formae/pkg/plugin/resource"
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

// testConfig returns the SFTP connection configuration for tests.
func testConfig() asyncsftp.Config {
    return asyncsftp.Config{
        Host:     "localhost",
        Port:     "2222",
        Username: os.Getenv("SFTP_USERNAME"),
        Password: os.Getenv("SFTP_PASSWORD"),
    }
}

// testTargetConfig returns the target configuration JSON for plugin requests.
func testTargetConfig() json.RawMessage {
    return json.RawMessage(`{"url": "sftp://localhost:2222"}`)
}

// TestCreate verifies that the Plugin.Create method:
// 1. Returns InProgress status with a RequestID (async pattern)
// 2. Eventually completes successfully when polled via Status
// 3. Actually creates the file on the SFTP server
func TestCreate(t *testing.T) {
    if os.Getenv("SFTP_USERNAME") == "" || os.Getenv("SFTP_PASSWORD") == "" {
        t.Skip("SFTP_USERNAME and SFTP_PASSWORD must be set")
    }

    ctx := context.Background()
    plugin := &Plugin{}

    // Test file properties
    filePath := "/upload/test-create.txt"
    fileContent := "Hello from Create test"
    filePermissions := "0644"

    // Build the CreateRequest
    properties := map[string]any{
        "path":        filePath,
        "content":     fileContent,
        "permissions": filePermissions,
    }
    propertiesJSON, err := json.Marshal(properties)
    require.NoError(t, err, "failed to marshal properties")

    req := &resource.CreateRequest{
        ResourceType: "SFTP::Files::File",
        Label:        "test-create",
        Properties:   propertiesJSON,
        TargetConfig: testTargetConfig(),
    }

    // --- Step 1: Call Create and verify InProgress + RequestID ---
    result, err := plugin.Create(ctx, req)
    require.NoError(t, err, "Create should not return error")
    require.NotNil(t, result.ProgressResult, "Create should return ProgressResult")

    assert.Equal(t, resource.OperationStatusInProgress, result.ProgressResult.OperationStatus,
        "Create should return InProgress status")
    assert.NotEmpty(t, result.ProgressResult.RequestID,
        "Create should return RequestID for async operation")

    requestID := result.ProgressResult.RequestID
    t.Logf("Create returned RequestID: %s", requestID)

    // --- Step 2: Poll Status until Success ---
    statusReq := &resource.StatusRequest{
        RequestID:    requestID,
        ResourceType: "SFTP::Files::File",
        TargetConfig: testTargetConfig(),
    }

    require.Eventually(t, func() bool {
        statusResult, err := plugin.Status(ctx, statusReq)
        if err != nil {
            t.Logf("Status error: %v", err)
            return false
        }
        if statusResult.ProgressResult == nil {
            return false
        }
        status := statusResult.ProgressResult.OperationStatus
        t.Logf("Status poll: %s", status)

        if status == resource.OperationStatusFailure {
            t.Errorf("operation failed: %s", statusResult.ProgressResult.StatusMessage)
            return true // Stop polling
        }
        return status == resource.OperationStatusSuccess
    }, 10*time.Second, 100*time.Millisecond, "Create operation should complete successfully")

    // --- Step 3: Verify file exists using asyncsftp ---
    client, err := asyncsftp.NewClient(testConfig())
    require.NoError(t, err, "failed to create asyncsftp client")
    defer client.Close()

    fileInfo, err := client.ReadFile(filePath)
    require.NoError(t, err, "file should exist after Create")

    assert.Equal(t, fileContent, fileInfo.Content, "content should match")
    assert.Equal(t, filePermissions, fileInfo.Permissions, "permissions should match")

    t.Logf("File created successfully: path=%s, size=%d", fileInfo.Path, fileInfo.Size)

    // --- Cleanup ---
    _ = client.StartDelete(filePath)
}

Test Structure

The test follows our testing pattern:

Step Action Verify with
Setup - -
Execute Plugin.Create() Assert InProgress + RequestID
Poll Plugin.Status() Assert eventually Success
Verify asyncsftp.ReadFile() Assert file exists with correct content
Cleanup asyncsftp.StartDelete() -

We use a separate asyncsftp client in the test to verify results. This keeps the test isolated from Plugin internals.

Implementation

First, add the FileProperties type and update the Plugin struct to hold a client:

// FileProperties represents the properties of an SFTP file resource.
type FileProperties struct {
    Path        string `json:"path"`
    Content     string `json:"content"`
    Permissions string `json:"permissions"`
    Size        int64  `json:"size,omitempty"`
    ModifiedAt  string `json:"modifiedAt,omitempty"`
}

// parseFileProperties extracts file properties from a JSON request.
func parseFileProperties(data json.RawMessage) (*FileProperties, error) {
    var props FileProperties
    if err := json.Unmarshal(data, &props); err != nil {
        return nil, fmt.Errorf("invalid file properties: %w", err)
    }
    if props.Path == "" {
        return nil, fmt.Errorf("file properties missing 'path'")
    }
    if props.Permissions == "" {
        props.Permissions = "0644" // Default permissions
    }
    return &props, nil
}

Update the Plugin struct to hold a client with lazy initialization:

import (
    "sync"
    "github.com/platform-engineering-labs/formae-plugin-sftp/pkg/asyncsftp"
)

type Plugin struct {
    mu     sync.Mutex
    client *asyncsftp.Client
}

// getClient returns the SFTP client, creating it if necessary.
func (p *Plugin) getClient(targetConfig json.RawMessage) (*asyncsftp.Client, error) {
    p.mu.Lock()
    defer p.mu.Unlock()

    if p.client != nil {
        return p.client, nil
    }

    // Parse target config
    cfg, err := parseTargetConfig(targetConfig)
    if err != nil {
        return nil, err
    }

    // Parse URL to get host and port
    host, port, err := parseURL(cfg.URL)
    if err != nil {
        return nil, err
    }

    // Get credentials from environment
    username, password, err := getCredentials()
    if err != nil {
        return nil, err
    }

    // Create client
    client, err := asyncsftp.NewClient(asyncsftp.Config{
        Host:     host,
        Port:     port,
        Username: username,
        Password: password,
    })
    if err != nil {
        return nil, fmt.Errorf("failed to create SFTP client: %w", err)
    }

    p.client = client
    return p.client, nil
}

Now implement Create:

// Create provisions a new resource.
// Returns InProgress with a RequestID - poll Status() for completion.
func (p *Plugin) Create(ctx context.Context, req *resource.CreateRequest) (*resource.CreateResult, error) {
    // Parse file properties from request
    props, err := parseFileProperties(req.Properties)
    if err != nil {
        return &resource.CreateResult{
            ProgressResult: &resource.ProgressResult{
                Operation:       resource.OperationCreate,
                OperationStatus: resource.OperationStatusFailure,
                ErrorCode:       resource.OperationErrorCodeInvalidRequest,
                StatusMessage:   err.Error(),
            },
        }, nil
    }

    // Get SFTP client
    client, err := p.getClient(req.TargetConfig)
    if err != nil {
        return &resource.CreateResult{
            ProgressResult: &resource.ProgressResult{
                Operation:       resource.OperationCreate,
                OperationStatus: resource.OperationStatusFailure,
                ErrorCode:       resource.OperationErrorCodeInternalFailure,
                StatusMessage:   err.Error(),
            },
        }, nil
    }

    // Parse permissions string to os.FileMode
    var perm os.FileMode = 0644
    if props.Permissions != "" {
        _, _ = fmt.Sscanf(props.Permissions, "%o", &perm)
    }

    // Start async upload - returns immediately with operation ID
    requestID := client.StartUpload(props.Path, props.Content, perm)

    return &resource.CreateResult{
        ProgressResult: &resource.ProgressResult{
            Operation:       resource.OperationCreate,
            OperationStatus: resource.OperationStatusInProgress,
            RequestID:       requestID,
            NativeID:        props.Path, // File path is the native identifier
        },
    }, nil
}

Key Points

  1. Parse properties - Extract path, content, permissions from the request
  2. Get client - Lazy initialization reuses the connection
  3. Start async operation - StartUpload returns immediately with an operation ID
  4. Return InProgress - The agent will poll Status() with the RequestID
  5. Set NativeID - The file path uniquely identifies this resource

Status Implementation

After Create returns InProgress, the agent polls the Status method to check on the operation's progress.

StatusRequest

type StatusRequest struct {
    RequestID    string          // The RequestID returned from Create/Update/Delete
    ResourceType string          // e.g., "SFTP::Files::File"
    TargetConfig json.RawMessage // Connection details for the target
}

The RequestID links this status check back to the original operation. Your plugin needs to track in-flight operations so it can report their progress.

Implementation

// Status checks the progress of an async operation.
func (p *Plugin) Status(ctx context.Context, req *resource.StatusRequest) (*resource.StatusResult, error) {
    // Client must exist if we have a RequestID from a previous operation
    if p.client == nil {
        return &resource.StatusResult{
            ProgressResult: &resource.ProgressResult{
                Operation:       resource.OperationCheckStatus,
                OperationStatus: resource.OperationStatusFailure,
                ErrorCode:       resource.OperationErrorCodeInternalFailure,
                StatusMessage:   "no client available",
            },
        }, nil
    }

    // Get operation status from asyncsftp
    op, err := p.client.GetStatus(req.RequestID)
    if err != nil {
        return &resource.StatusResult{
            ProgressResult: &resource.ProgressResult{
                Operation:       resource.OperationCheckStatus,
                OperationStatus: resource.OperationStatusFailure,
                ErrorCode:       resource.OperationErrorCodeInternalFailure,
                StatusMessage:   err.Error(),
            },
        }, nil
    }

    // Map asyncsftp state to resource.OperationStatus
    var status resource.OperationStatus
    var errorCode resource.OperationErrorCode
    var resourceProps json.RawMessage

    switch op.State {
    case asyncsftp.StateInProgress:
        status = resource.OperationStatusInProgress
    case asyncsftp.StateCompleted:
        status = resource.OperationStatusSuccess
        // Include resource properties on success
        if op.Result != nil {
            resourceProps, _ = json.Marshal(FileProperties{
                Path:        op.Result.Path,
                Content:     op.Result.Content,
                Permissions: op.Result.Permissions,
                Size:        op.Result.Size,
                ModifiedAt:  op.Result.ModifiedAt.Format("2006-01-02T15:04:05Z07:00"),
            })
        }
    case asyncsftp.StateFailure:
        status = resource.OperationStatusFailure
        errorCode = resource.OperationErrorCodeInternalFailure
    }

    return &resource.StatusResult{
        ProgressResult: &resource.ProgressResult{
            Operation:          resource.OperationCheckStatus,
            OperationStatus:    status,
            RequestID:          req.RequestID,
            NativeID:           op.Path,
            ResourceProperties: resourceProps,
            ErrorCode:          errorCode,
            StatusMessage:      op.Error,
        },
    }, nil
}

State Mapping

The asyncsftp library uses its own state enum, so we need to translate these to the formae SDK's OperationStatus type. This mapping is straightforward since both represent the same three states:

asyncsftp State resource.OperationStatus
StateInProgress OperationStatusInProgress
StateCompleted OperationStatusSuccess
StateFailure OperationStatusFailure

When the operation completes successfully, we also populate ResourceProperties with the current state of the resource. On failure, we set an appropriate ErrorCode - see 10 - Error Handling for guidance on choosing the right error code.

Verify

To verify that Create works as expected, run the integration test:

SFTP_USERNAME=testuser SFTP_PASSWORD=testpass go test -tags=integration -run TestCreate -v

The test output shows the async flow in action - Create returns immediately with a RequestID, then Status polling eventually returns Success:

=== RUN   TestCreate
    sftp_test.go:96: Create returned RequestID: 30a519b3-0ecd-4a85-8b39-429199ca666a
    sftp_test.go:115: Status poll: InProgress
    sftp_test.go:115: Status poll: Success
    sftp_test.go:135: File created successfully: path=/upload/test-create.txt, size=22
--- PASS: TestCreate (0.52s)
PASS

Summary

You've implemented the async Create pattern:

  1. Create parses properties, starts the upload, returns InProgress + RequestID
  2. Status polls the operation and returns InProgress, Success, or Failure
  3. The test verifies the full flow and checks the file exists independently

This async pattern is common in infrastructure plugins where operations may take time (creating VMs, databases, etc.).


Next: 06 - Read - Implement file reading with NotFound semantics