2
0
Files
gitcaddy-server/modules/plugins/external.go
logikonline 174d18db22 refactor(plugins): migrate external plugin manager to grpc/connect
Replace manual HTTP/JSON RPC implementation with generated gRPC/Connect client, providing type-safe plugin communication.

Code Generation:
- Generate plugin.pb.go and pluginv1connect/plugin.connect.go from plugin.proto
- Add generate-plugin-proto Makefile target
- Delete hand-written types.go (replaced by generated code)

ExternalPluginManager Refactoring:
- Replace httpClient with pluginv1connect.PluginServiceClient
- Use h2c (cleartext HTTP/2) transport for gRPC without TLS
- Replace all manual callRPC/callRPCWithContext calls with typed Connect methods
- Remove JSON serialization/deserialization code
- Simplify error handling with native gRPC status codes

Benefits:
- Type safety: compile-time verification of request/response types
- Protocol compatibility: standard gRPC wire format
- Reduced code: ~100 lines of manual RPC code removed
- Better errors: structured gRPC status codes instead of string parsing
- Matches existing Actions runner pattern (Connect RPC over HTTP/2)

This completes the plugin framework migration to production-grade RPC transport.
2026-02-13 01:45:29 -05:00

339 lines
8.4 KiB
Go

// Copyright 2026 MarketAlly. All rights reserved.
// SPDX-License-Identifier: MIT
package plugins
import (
"context"
"crypto/tls"
"fmt"
"maps"
"net"
"net/http"
"os"
"os/exec"
"strings"
"sync"
"time"
"connectrpc.com/connect"
"golang.org/x/net/http2"
"code.gitcaddy.com/server/v3/modules/graceful"
"code.gitcaddy.com/server/v3/modules/log"
pluginv1 "code.gitcaddy.com/server/v3/modules/plugins/pluginv1"
"code.gitcaddy.com/server/v3/modules/plugins/pluginv1/pluginv1connect"
)
// PluginStatus represents the status of an external plugin
type PluginStatus string
const (
PluginStatusStarting PluginStatus = "starting"
PluginStatusOnline PluginStatus = "online"
PluginStatusOffline PluginStatus = "offline"
PluginStatusError PluginStatus = "error"
)
// ManagedPlugin tracks the state of an external plugin
type ManagedPlugin struct {
config *ExternalPluginConfig
process *os.Process
status PluginStatus
lastSeen time.Time
manifest *pluginv1.PluginManifest
failCount int
client pluginv1connect.PluginServiceClient
mu sync.RWMutex
}
// ExternalPluginManager manages external plugins (both managed and external mode)
type ExternalPluginManager struct {
mu sync.RWMutex
plugins map[string]*ManagedPlugin
config *Config
ctx context.Context
cancel context.CancelFunc
}
var globalExternalManager *ExternalPluginManager
// GetExternalManager returns the global external plugin manager
func GetExternalManager() *ExternalPluginManager {
return globalExternalManager
}
// NewExternalPluginManager creates a new external plugin manager
func NewExternalPluginManager(config *Config) *ExternalPluginManager {
ctx, cancel := context.WithCancel(context.Background())
m := &ExternalPluginManager{
plugins: make(map[string]*ManagedPlugin),
config: config,
ctx: ctx,
cancel: cancel,
}
globalExternalManager = m
return m
}
// StartAll launches managed plugins and connects to external ones
func (m *ExternalPluginManager) StartAll() error {
m.mu.Lock()
defer m.mu.Unlock()
for name, cfg := range m.config.ExternalPlugins {
if !cfg.Enabled {
log.Info("External plugin %s is disabled, skipping", name)
continue
}
address := cfg.Address
if address == "" {
log.Error("External plugin %s has no address configured", name)
continue
}
if !strings.HasPrefix(address, "http://") && !strings.HasPrefix(address, "https://") {
address = "http://" + address
}
mp := &ManagedPlugin{
config: cfg,
status: PluginStatusStarting,
client: pluginv1connect.NewPluginServiceClient(
newH2CClient(cfg.HealthTimeout),
address,
connect.WithGRPC(),
),
}
m.plugins[name] = mp
if cfg.IsManaged() {
if err := m.startManagedPlugin(mp); err != nil {
log.Error("Failed to start managed plugin %s: %v", name, err)
mp.status = PluginStatusError
continue
}
}
// Try to initialize the plugin
if err := m.initializePlugin(mp); err != nil {
log.Error("Failed to initialize external plugin %s: %v", name, err)
mp.status = PluginStatusError
continue
}
mp.status = PluginStatusOnline
mp.lastSeen = time.Now()
log.Info("External plugin %s is online (managed=%v)", name, cfg.IsManaged())
}
return nil
}
// StopAll gracefully shuts down all external plugins
func (m *ExternalPluginManager) StopAll() {
m.cancel()
m.mu.Lock()
defer m.mu.Unlock()
for name, mp := range m.plugins {
log.Info("Shutting down external plugin: %s", name)
// Send shutdown request via Connect RPC
m.shutdownPlugin(mp)
// Kill managed process
if mp.process != nil {
if err := mp.process.Signal(os.Interrupt); err != nil {
log.Warn("Failed to send interrupt to plugin %s, killing: %v", name, err)
_ = mp.process.Kill()
}
}
mp.status = PluginStatusOffline
}
}
// GetPlugin returns an external plugin by name
func (m *ExternalPluginManager) GetPlugin(name string) *ManagedPlugin {
m.mu.RLock()
defer m.mu.RUnlock()
return m.plugins[name]
}
// AllPlugins returns all external plugins
func (m *ExternalPluginManager) AllPlugins() map[string]*ManagedPlugin {
m.mu.RLock()
defer m.mu.RUnlock()
result := make(map[string]*ManagedPlugin, len(m.plugins))
maps.Copy(result, m.plugins)
return result
}
// OnEvent dispatches an event to all interested plugins (fire-and-forget with timeout)
func (m *ExternalPluginManager) OnEvent(event *pluginv1.PluginEvent) {
m.mu.RLock()
defer m.mu.RUnlock()
for name, mp := range m.plugins {
mp.mu.RLock()
if mp.status != PluginStatusOnline || mp.manifest == nil {
mp.mu.RUnlock()
continue
}
// Check if this plugin is subscribed to this event
subscribed := false
for _, e := range mp.manifest.SubscribedEvents {
if e == event.EventType || e == "*" {
subscribed = true
break
}
}
mp.mu.RUnlock()
if !subscribed {
continue
}
// Dispatch in background with timeout
go func(pluginName string, p *ManagedPlugin) {
ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second)
defer cancel()
resp, err := p.client.OnEvent(ctx, connect.NewRequest(event))
if err != nil {
log.Error("Failed to dispatch event %s to plugin %s: %v", event.EventType, pluginName, err)
return
}
if resp.Msg.Error != "" {
log.Error("Plugin %s returned error for event %s: %s", pluginName, event.EventType, resp.Msg.Error)
}
}(name, mp)
}
}
// HandleHTTP proxies an HTTP request to a plugin that declares the matching route
func (m *ExternalPluginManager) HandleHTTP(method, path string, headers map[string]string, body []byte) (*pluginv1.HTTPResponse, error) {
m.mu.RLock()
defer m.mu.RUnlock()
for name, mp := range m.plugins {
mp.mu.RLock()
if mp.status != PluginStatusOnline || mp.manifest == nil {
mp.mu.RUnlock()
continue
}
for _, route := range mp.manifest.Routes {
if route.Method == method && strings.HasPrefix(path, route.Path) {
mp.mu.RUnlock()
ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second)
defer cancel()
resp, err := mp.client.HandleHTTP(ctx, connect.NewRequest(&pluginv1.HTTPRequest{
Method: method,
Path: path,
Headers: headers,
Body: body,
}))
if err != nil {
return nil, fmt.Errorf("plugin %s HandleHTTP failed: %w", name, err)
}
return resp.Msg, nil
}
}
mp.mu.RUnlock()
}
return nil, fmt.Errorf("no plugin handles %s %s", method, path)
}
// Status returns the status of a plugin
func (mp *ManagedPlugin) Status() PluginStatus {
mp.mu.RLock()
defer mp.mu.RUnlock()
return mp.status
}
// Manifest returns the plugin's manifest
func (mp *ManagedPlugin) Manifest() *pluginv1.PluginManifest {
mp.mu.RLock()
defer mp.mu.RUnlock()
return mp.manifest
}
// --- Internal methods ---
func (m *ExternalPluginManager) startManagedPlugin(mp *ManagedPlugin) error {
args := strings.Fields(mp.config.Args)
cmd := exec.Command(mp.config.Binary, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start binary %s: %w", mp.config.Binary, err)
}
mp.process = cmd.Process
// Register with graceful manager for proper shutdown
graceful.GetManager().RunAtShutdown(m.ctx, func() {
if mp.process != nil {
_ = mp.process.Signal(os.Interrupt)
}
})
// Wait a bit for the process to start
time.Sleep(2 * time.Second)
return nil
}
func (m *ExternalPluginManager) initializePlugin(mp *ManagedPlugin) error {
resp, err := mp.client.Initialize(m.ctx, connect.NewRequest(&pluginv1.InitializeRequest{
ServerVersion: "3.0.0",
Config: map[string]string{},
}))
if err != nil {
return fmt.Errorf("plugin Initialize RPC failed: %w", err)
}
if !resp.Msg.Success {
return fmt.Errorf("plugin initialization failed: %s", resp.Msg.Error)
}
mp.mu.Lock()
mp.manifest = resp.Msg.Manifest
mp.mu.Unlock()
return nil
}
func (m *ExternalPluginManager) shutdownPlugin(mp *ManagedPlugin) {
_, err := mp.client.Shutdown(m.ctx, connect.NewRequest(&pluginv1.ShutdownRequest{
Reason: "server shutdown",
}))
if err != nil {
log.Warn("Plugin shutdown call failed: %v", err)
}
}
// newH2CClient creates an HTTP client that supports cleartext HTTP/2 (h2c)
// for communicating with gRPC services without TLS.
func newH2CClient(timeout time.Duration) *http.Client {
return &http.Client{
Timeout: timeout,
Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, network, addr)
},
},
}
}