Feedback

Import

Introduction

The Feedier Import System allows developers to bring external data (feedback, reviews, etc.) into the platform in a scalable manner. Each import type is defined by a Factory that produces a Connector, an Ingestion, and a Transformer, and optionally a Synchronizer. The system then uses the Pool Manager to process and store feedback asynchronously.

                  +---------------------------------+
                  |        Import Manager           |
                  |---------------------------------|
                  | - Connector (API/OAuth/Token)   |
                  | - Ingestion (fetch raw data)    |
                  | - Transformer (build Feedback)  |
                  +----------------+-----------------+
                                   |
                                   |  push(FeedbackData)
                                   v
                  +---------------------------------+
                  |         Pool Manager            |
                  |---------------------------------|
                  | - Temporary storage (Redis/SQL) |
                  | - Batched consumption           |
                  +----------------+-----------------+
                                   |
                                   |  consume(FeedbackData)
                                   v
                           +-------------------+
                           |    Feedback DB    |
                           +-------------------+

Architecture Overview

  1. Import Manager
    Coordinates the import flow:

    • Loads the factory based on the import type.
    • Creates Connector, Ingestion, Transformer.
    • Validates credentials (isValid).
    • Retrieves raw data (Ingestion).
    • Transforms data (Transformer).
    • Pushes results to the Pool.
  2. Pool Manager
    Queues the transformed feedbacks. Another job (or command) consumes them in batches to create or update feedback records, ensuring we don't hit timeouts.

  3. Synchronization Manager
    If supported, verifies existing feedback to see if it has changed or been removed on the external source.


Import Manager

When an import is triggered (scheduled or on-demand), the Import Manager:

  1. Looks up the Factory for the requested import type (registered in feedback.php).
  2. Calls createConnector(), createIngestion(), and createTransformer().
  3. Checks if isValid() is true; if not, it aborts.
  4. Runs the ingestion flow:
    • Yields raw items via fetch().
    • Passes them to the transformer.
  5. The transformer pushes the finalized data into the Pool.

Factory

A Factory class centralizes all object creation for a specific import type.

namespace App\Services\Feedback\Import\Connectors\Tripadvisor;

use App\Services\Feedback\Import\Core\Abstracts\AbstractFactory;
use App\Services\Feedback\Import\Core\Interfaces\ConnectorInterface;
use App\Services\Feedback\Import\Core\Interfaces\IngestionInterface;
use App\Services\Feedback\Import\Core\Interfaces\TransformerInterface;
use App\Services\Feedback\Import\Core\Interfaces\Factory\SupportsSynchronization;
use App\Services\Feedback\Import\Core\Interfaces\SynchronizerInterface;

class TripadvisorFactory extends AbstractFactory implements SupportsSynchronization
{
    public function createConnector(): ConnectorInterface
    {
        return app()->make(TripadvisorConnector::class);
    }

    public function createIngestion(ConnectorInterface $connector): IngestionInterface
    {
        return app()->make(TripadvisorIngestion::class, [
            'connector' => $connector,
        ]);
    }

    public function createTransformer(): TransformerInterface
    {
        return app()->make(TripadvisorTransformer::class);
    }

    public function createSynchronizer(ConnectorInterface $connector): SynchronizerInterface
    {
        return app()->make(TripadvisorSynchronizer::class, [
            'connector'   => $connector,
            'transformer' => $this->createTransformer(),
        ]);
    }
}

Connector

The Connector handles authentication and connectivity:

namespace App\Services\Feedback\Import\Connectors\Tripadvisor;

use App\Services\Feedback\Import\Core\Interfaces\ConnectorInterface;
use Illuminate\Http\Client\PendingRequest;
use Illuminate\Support\Facades\Http;

class TripadvisorConnector implements ConnectorInterface
{
    public function connect(): PendingRequest
    {
        // Provide a valid token or any needed headers
        return Http::withToken(config('services.tripadvisor.token'));
    }

    public function isValid(): bool
    {
        $response = $this->connect()->get('https://api.tripadvisor.com/health');
        return $response->successful();
    }
}

Ingestion

Simple Ingestion

In the simplest scenario, you only implement a class with a fetch(): iterable method returning all raw items in one go:

namespace App\Services\Feedback\Import\Connectors\Tripadvisor;

use App\Services\Feedback\Import\Core\Abstracts\AbstractIngestion;

class TripadvisorIngestion extends AbstractIngestion
{
    public function fetch(): iterable
    {
        $response = $this->getConnector()
            ->connect()
            ->get('https://api.tripadvisor.com/v1/reviews');

        foreach ($response->json('results', []) as $rawItem) {
            yield $rawItem;
        }
    }
}

All yielded data passes to the transformer in a single job.

Dispatched Ingestion

If your source has millions of entries or needs multiple steps, you can use dispatch() (provided by AbstractIngestion). This spawns separate jobs, each calling a method in your ingestion class:

namespace App\Services\Feedback\Import\Connectors\Tripadvisor;

use App\Services\Feedback\Import\Core\Abstracts\AbstractIngestion;

class TripadvisorIngestion extends AbstractIngestion
{
    public function fetch(): iterable
    {
        // This method itself yields nothing,
        // but dispatches a sub-method in a separate job
        $this->dispatch('fetchLocations');
        return [];
    }

    public function fetchLocations(): void
    {
        $response = $this->getConnector()->connect()->get('https://api.tripadvisor.com/v1/locations');

        foreach ($response->json('data', []) as $location) {
            // For each location, dispatch another method that yields reviews
            $this->dispatch('fetchReviewsForLocation', $location['id']);
        }
    }

    public function fetchReviewsForLocation(string $locationId): iterable
    {
        $response = $this->getConnector()->connect()->get(
            "https://api.tripadvisor.com/v1/locations/{$locationId}/reviews"
        );

        yield from $response->json('results', []);
    }
}
  • Methods returning iterable: Their items go to the transformer.
  • Methods returning void: No items are transformed, but can do sub-dispatching.

Starts From

These optional interfaces allow you to resume imports so you don’t re-ingest older data. Often, when you detect data that’s older than your stored timestamp, you can stop iterating entirely rather than just skipping one item. This avoids fetching unnecessary reviews.

  • SupportsStartFrom
    Defines a single method setStartFrom(?Carbon $date) for a global resume point.

  • SupportsStartFromPerEntity
    Lets you set and store multiple timestamps, one per entity (e.g., per location or account).

    • setStartFromForEntity(string $entityId, ?Carbon $date)
    • getProcessedEntities(): array
Example: SupportsStartFrom
namespace App\Services\Feedback\Import\Connectors\Tripadvisor;

use App\Services\Feedback\Import\Core\Abstracts\AbstractIngestion;
use App\Services\Feedback\Import\Core\Interfaces\Ingestion\SupportsStartFrom;
use Carbon\Carbon;

class TripadvisorIngestionGlobal extends AbstractIngestion implements SupportsStartFrom
{
    protected ?Carbon $startFrom = null;

    public function setStartFrom(?Carbon $date): void
    {
        // Called by the Import Manager before fetch() is executed
        $this->startFrom = $date;
    }

    public function fetch(): iterable
    {
        $response = $this->getConnector()
            ->connect()
            ->get('https://api.tripadvisor.com/v1/reviews');

        // Suppose the results are sorted from newest to oldest
        foreach ($response->json('results', []) as $rawItem) {
            $createdAt = Carbon::parse($rawItem['created_at'] ?? now());

            if ($this->startFrom && $createdAt->lt($this->startFrom)) {
                // Once we reach any item older than our startFrom date,
                // we stop the entire loop (no need to process further)
                break;
            }

            yield $rawItem;
        }
    }
}
Example: SupportsStartFromPerEntity
namespace App\Services\Feedback\Import\Connectors\Tripadvisor;

use App\Services\Feedback\Import\Core\Abstracts\AbstractIngestion;
use App\Services\Feedback\Import\Core\Interfaces\Ingestion\SupportsStartFromPerEntity;
use Carbon\Carbon;

class TripadvisorIngestionPerEntity extends AbstractIngestion implements SupportsStartFromPerEntity
{
    protected array $startFromByEntity = [];
    protected array $processedEntities = [];

    public function setStartFromForEntity(string $entityId, ?Carbon $date): void
    {
        // Called by the Import Manager before fetch() is executed
        $this->startFromByEntity[$entityId] = $date;
    }

    public function getProcessedEntities(): array
    {
        // Called by the Import Manager after ingestion finishes
        // to update timestamps in the import configuration
        return $this->processedEntities;
    }

    public function fetch(): iterable
    {
        // Dispatch sub-jobs for each location
        $this->dispatch('fetchLocations');
        return [];
    }

    private function fetchLocations(): void
    {
        // In reality, you'd call an endpoint to list all location IDs
        $locationIds = ['loc-1', 'loc-2', 'loc-3'];
        foreach ($locationIds as $id) {
            $this->dispatch('fetchReviewsForLocation', $id);
        }
    }

    public function fetchReviewsForLocation(string $locationId): iterable
    {
        $startFrom = $this->startFromByEntity[$locationId] ?? null;

        $response = $this->getConnector()->connect()->get(
            "https://api.tripadvisor.com/v1/locations/{$locationId}/reviews"
        );

        // Suppose these are sorted from newest to oldest
        foreach ($response->json('results', []) as $rawItem) {
            $createdAt = Carbon::parse($rawItem['created_at'] ?? now());

            if ($startFrom && $createdAt->lt($startFrom)) {
                // We found an older item; we can stop this location's loop here
                break;
            }

            // Update the "latest" timestamp processed for this entity
            if (!isset($this->processedEntities[$locationId])
                || $createdAt->greaterThan(Carbon::createFromTimestamp($this->processedEntities[$locationId]))) {
                $this->processedEntities[$locationId] = $createdAt->timestamp;
            }

            yield $rawItem;
        }
    }
}

Transformer

A Transformer transforms raw items into a FeedbackData object. The simplest pattern is:

public function transform(mixed $raw): FeedbackData
{
    return FeedbackData::from([
        'carrierId'  => $this->getImport()->carrier_id,
        'source'     => 'tripadvisor',
        'attributes' => [],
        'answers'    => [],
        // ...
    ]);
}

SupportsMapping

By implementing SupportsMapping, you can define a mappers() array of closures:

  • If you return an empty array ([]), the system tries to apply default logic (auto-mapping).
  • If you define closures, these override or add custom fields.

Example:

// 1) Automatic (empty array)
public function mappers(): array
{
    return [];
}

// 2) Manual
public function mappers(): array
{
    return [
        'score'   => fn($raw) => $raw['rating'] ?? 0,
        'comment' => fn($raw) => $raw['comment'] ?? '',
    ];
}

SupportsUniqueness

To avoid importing the same feedback multiple times, you can implement SupportsUniqueness:

public function getUniqueFingerprint(mixed $raw): string
{
    return 'tripadvisor:' . ($raw['id'] ?? uniqid());
}

If two items have the same fingerprint, the system recognizes them as duplicates.

/!\ WARNING : Never use data_get in this method, otherwise it will work for the first feedback but all the next ones will be ignored and "silent fail". We prefer to receive a sentry when we cannot identify the unique fingerprint of a feedback rather than just importing it.

Pool Manager

After transformation, the import pipeline does not insert feedback directly into the database. Instead, each FeedbackData is pushed to the pool. Another job or command periodically consumes these entries in batches (for example, 50 items at a time) and processes them into actual feedback records.

Why a pool?

  • Large-scale imports can produce hundreds of thousands or millions of records.
  • Processing them in a separate step ensures the ingestion job never runs too long.

Synchronization Manager

If the factory implements SupportsSynchronization, you can create a Synchronizer to update or delete previously imported feedback.

public function synchronize(Feedback $feedback): void
{
    $response = $this->getConnector()
        ->connect()
        ->get("https://api.tripadvisor.com/v1/reviews/{$feedback->fingerprint}");

    if ($response->notFound()) {
        $this->delete($feedback);
    } else {
        $this->update($feedback, $response->json());
    }
}

Here, $feedback->fingerprint is typically the unique identifier stored when the item was first imported.

Configuration

To register your import, update config/feedback.php. For example:

'import' => [
    'types' => [
        [
            'type'        => 'tripadvisor',
            'factory'     => \App\Services\Feedback\Import\Connectors\Tripadvisor\TripadvisorFactory::class,
            'trigger'     => \App\Services\Feedback\Import\Core\Enums\TriggerTypeEnum::SCHEDULED,
            'recurrences' => [
                'execution'       => '5 * * * *',    // run each hour at minute 5
                'synchronization' => '30 * * * *',   // sync each hour at minute 30
            ],
        ],
    ],
],

Trigger Types

  • SCHEDULED: Cron-based.
  • ON_DEMAND: Only runs when explicitly triggered.
  • ONE_TIME: Runs once, then never repeats.

Commands and Scheduling

Typically, three commands are scheduled in app/Console/Kernel.php:

  1. feedback:import:dispatch-all – Runs due imports (creates new executions).
  2. feedback:import:sync-all – Executes synchronization jobs for imports that support it.
  3. feedback:pool:consume – Consumes pooled feedback in manageable batches.

How to Run an Import

1. Start an Import (using Import Manager)

If you want to run an import directly in your code:

use App\Models\FeedbackImportExecution;
use App\Services\Feedback\Import\ImportManager;
use App\Services\Feedback\Import\Core\Registry\ImportFactoryRegistry;

$execution = FeedbackImportExecution::findOrFail($executionId); // an existing execution record

// For example, obtain the registry from the application container
$factoryRegistry = app()->make(ImportFactoryRegistry::class);

// Instantiate the manager and run the import
$importManager = new ImportManager($factoryRegistry);
$importManager->run($execution);

This will:

  1. Validate the connector (isValid()).
  2. Create ingestion and transformer instances.
  3. Fetch data, transform it, and push FeedbackData objects to the pool.

2. Start an Import (using command)

To trigger all eligible imports via Artisan:

php artisan feedback:import:dispatch-all

This command:

  • Scans all configured imports in config/feedback.php.
  • Checks whether each import is due (based on trigger and recurrences).
  • Creates a FeedbackImportExecution and queues a job to run the import.

3. Execute the Feedback in the Pool

The transformed feedback stored in the pool isn’t persisted until a separate command processes it:

php artisan feedback:pool:consume

This command:

  • Finds pooled feedback items in batches (e.g., 50 at a time).
  • Inserts them into the database via a background job, ensuring no long-running single job.

4. Execute the Synchronizer

If the factory supports synchronization (i.e., implements SupportsSynchronization), run:

php artisan feedback:import:sync-all

This checks previously imported feedback against the external source:

  • If the external review no longer exists, it’s marked or removed locally.
  • If new data is found, it updates the existing feedback record accordingly.

Key Takeaways

  • Factory: Provides Connector, Ingestion, Transformer (+ optional Synchronizer).
  • Connector: Must implement connect() and isValid().
  • Ingestion: Yields data via fetch(); can dispatch sub-methods for large imports.
  • Transformer: Builds FeedbackData; optionally uses mappers() and uniqueness checks.
  • Pool Manager: Separates ingestion from actual DB insertion for scalability.
  • Synchronization: Keeps existing feedback updated or removes it if no longer valid.
Previous
Processing
Next
FQL