package example
import (
"context"
"encoding/json"
"errors"
"math/big"
"os"
"path/filepath"
"time"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/currency"
"github.com/formancehq/payments/cmd/connectors/internal/ingestion"
"github.com/formancehq/payments/cmd/connectors/internal/storage"
"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/cmd/connectors/internal/task"
"github.com/formancehq/stack/libs/go-libs/logging"
)
const Name = models.ConnectorProviderExample
type Loader struct{}
// Name returns the name of the connector.
func (l *Loader) Name() models.ConnectorProvider {
return models.ConnectorProviderExample
}
// AllowTasks returns the amount of tasks that are allowed to be scheduled.
func (l *Loader) AllowTasks() int {
return 10
}
// ApplyDefaults applies default values to the configuration.
func (l *Loader) ApplyDefaults(cfg Config) Config {
if cfg.name == "" {
cfg.name = "example"
}
return cfg
}
func (l *Loader) Router(store *storage.Storage) *mux.Router {
return nil
}
func (l *Loader) Load(logger logging.Logger, config Config) connectors.Connector {
return newConnector(logger, config)
}
func NewLoader() *Loader {
return &Loader{}
}
type Config struct {
name string
Directory string
}
func (c Config) ConnectorName() string {
return c.name
}
func (c Config) Validate() error {
if c.Directory == "" {
return errors.New("missing directory to watch")
}
return nil
}
func (c Config) Marshal() ([]byte, error) {
return json.Marshal(c)
}
type Connector struct {
logger logging.Logger
cfg Config
}
func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.TransferInitiation) error {
return connectors.ErrNotImplemented
}
func (c *Connector) CreateExternalBankAccount(ctx task.ConnectorContext, bankAccount *models.BankAccount) error {
return connectors.ErrNotImplemented
}
func (c *Connector) ReversePayment(ctx task.ConnectorContext, transferReversal *models.TransferReversal) error {
return connectors.ErrNotImplemented
}
func (c *Connector) SupportedCurrenciesAndDecimals() map[string]int {
return currency.ISO4217Currencies
}
func (c *Connector) UpdateConfig(ctx task.ConnectorContext, config models.ConnectorConfigObject) error {
cfg, ok := config.(Config)
if !ok {
return connectors.ErrInvalidConfig
}
c.cfg = cfg
return nil
}
type TaskDescriptor string
func (c *Connector) Install(ctx task.ConnectorContext) error {
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor("directory"))
if err != nil {
return err
}
return ctx.Scheduler().Schedule(
ctx.Context(),
taskDescriptor,
models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_PERIODICALLY,
Duration: 10 * time.Second,
RestartOption: models.OPTIONS_RESTART_ALWAYS,
},
)
}
func (c *Connector) Uninstall(ctx context.Context) error {
return nil
}
func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task {
taskDescriptor, err := models.DecodeTaskDescriptor[TaskDescriptor](descriptor)
if err != nil {
panic(err)
}
if taskDescriptor == "directory" {
return func(
ctx context.Context,
logger logging.Logger,
scheduler task.Scheduler,
) error {
logger.Infof("Opening directory '%s'...", c.cfg.Directory)
dir, err := os.ReadDir(c.cfg.Directory)
if err != nil {
return err
}
logger.Infof("Found %d files", len(dir))
for _, file := range dir {
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor(file.Name()))
if err != nil {
return err
}
if err := scheduler.Schedule(
ctx,
taskDescriptor,
models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
},
); err != nil {
return err
}
}
return nil
}
}
return func(
ctx context.Context,
connectorID models.ConnectorID,
logger logging.Logger,
ingester ingestion.Ingester,
) error {
file, err := os.Open(filepath.Join(c.cfg.Directory, string(taskDescriptor)))
if err != nil {
return err
}
type JsonPayment struct {
CreatedAt time.Time `json:"created_at"`
Reference string `json:"reference"`
Amount *big.Int `json:"amount"`
Type string `json:"type"`
Status string `json:"status"`
Scheme string `json:"scheme"`
Asset string `json:"asset"`
}
jsonPayment := &JsonPayment{}
err = json.NewDecoder(file).Decode(jsonPayment)
if err != nil {
return err
}
return ingester.IngestPayments(ctx, ingestion.PaymentBatch{
{
Payment: &models.Payment{
ID: models.PaymentID{
PaymentReference: models.PaymentReference{
Reference: jsonPayment.Reference,
Type: models.PaymentType(jsonPayment.Type),
},
ConnectorID: connectorID,
},
CreatedAt: jsonPayment.CreatedAt,
Reference: jsonPayment.Reference,
Amount: jsonPayment.Amount,
ConnectorID: connectorID,
Type: models.PaymentType(jsonPayment.Type),
Status: models.PaymentStatus(jsonPayment.Status),
Scheme: models.PaymentScheme(jsonPayment.Scheme),
Asset: models.Asset(jsonPayment.Asset),
},
},
})
}
}
var _ connectors.Connector = &Connector{}
func newConnector(logger logging.Logger, cfg Config) *Connector {
return &Connector{
logger: logger.WithFields(map[string]any{
"component": "connector",
}),
cfg: cfg,
}
}