Feedback
Import
Introduction
The Feedier Import System allows developers to bring external data (feedback, reviews, etc.) into the platform in a scalable manner. Each import type is defined by a Factory that produces a Connector, an Ingestion, and a Transformer, and optionally a Synchronizer. The system then uses the Pool Manager to process and store feedback asynchronously.
+---------------------------------+
| Import Manager |
|---------------------------------|
| - Connector (API/OAuth/Token) |
| - Ingestion (fetch raw data) |
| - Transformer (build Feedback) |
+----------------+-----------------+
|
| push(FeedbackData)
v
+---------------------------------+
| Pool Manager |
|---------------------------------|
| - Temporary storage (Redis/SQL) |
| - Batched consumption |
+----------------+-----------------+
|
| consume(FeedbackData)
v
+-------------------+
| Feedback DB |
+-------------------+Architecture Overview
Import Manager
Coordinates the import flow:- Loads the factory based on the import type.
- Creates Connector, Ingestion, Transformer.
- Validates credentials (isValid).
- Retrieves raw data (Ingestion).
- Transforms data (Transformer).
- Pushes results to the Pool.
Pool Manager
Queues the transformed feedbacks. Another job (or command) consumes them in batches to create or update feedback records, ensuring we don't hit timeouts.Synchronization Manager
If supported, verifies existing feedback to see if it has changed or been removed on the external source.
Import Manager
When an import is triggered (scheduled or on-demand), the Import Manager:
- Looks up the Factory for the requested import type (registered in
feedback.php). - Calls
createConnector(),createIngestion(), andcreateTransformer(). - Checks if
isValid()istrue; if not, it aborts. - Runs the ingestion flow:
- Yields raw items via
fetch(). - Passes them to the transformer.
- Yields raw items via
- The transformer pushes the finalized data into the Pool.
Factory
A Factory class centralizes all object creation for a specific import type.
namespace App\Services\Feedback\Import\Connectors\Tripadvisor;
use App\Services\Feedback\Import\Core\Abstracts\AbstractFactory;
use App\Services\Feedback\Import\Core\Interfaces\ConnectorInterface;
use App\Services\Feedback\Import\Core\Interfaces\IngestionInterface;
use App\Services\Feedback\Import\Core\Interfaces\TransformerInterface;
use App\Services\Feedback\Import\Core\Interfaces\Factory\SupportsSynchronization;
use App\Services\Feedback\Import\Core\Interfaces\SynchronizerInterface;
class TripadvisorFactory extends AbstractFactory implements SupportsSynchronization
{
public function createConnector(): ConnectorInterface
{
return app()->make(TripadvisorConnector::class);
}
public function createIngestion(ConnectorInterface $connector): IngestionInterface
{
return app()->make(TripadvisorIngestion::class, [
'connector' => $connector,
]);
}
public function createTransformer(): TransformerInterface
{
return app()->make(TripadvisorTransformer::class);
}
public function createSynchronizer(ConnectorInterface $connector): SynchronizerInterface
{
return app()->make(TripadvisorSynchronizer::class, [
'connector' => $connector,
'transformer' => $this->createTransformer(),
]);
}
}Connector
The Connector handles authentication and connectivity:
namespace App\Services\Feedback\Import\Connectors\Tripadvisor;
use App\Services\Feedback\Import\Core\Interfaces\ConnectorInterface;
use Illuminate\Http\Client\PendingRequest;
use Illuminate\Support\Facades\Http;
class TripadvisorConnector implements ConnectorInterface
{
public function connect(): PendingRequest
{
// Provide a valid token or any needed headers
return Http::withToken(config('services.tripadvisor.token'));
}
public function isValid(): bool
{
$response = $this->connect()->get('https://api.tripadvisor.com/health');
return $response->successful();
}
}Ingestion
Simple Ingestion
In the simplest scenario, you only implement a class with a fetch(): iterable method returning all raw items in one go:
namespace App\Services\Feedback\Import\Connectors\Tripadvisor;
use App\Services\Feedback\Import\Core\Abstracts\AbstractIngestion;
class TripadvisorIngestion extends AbstractIngestion
{
public function fetch(): iterable
{
$response = $this->getConnector()
->connect()
->get('https://api.tripadvisor.com/v1/reviews');
foreach ($response->json('results', []) as $rawItem) {
yield $rawItem;
}
}
}All yielded data passes to the transformer in a single job.
Dispatched Ingestion
If your source has millions of entries or needs multiple steps, you can use dispatch() (provided by AbstractIngestion). This spawns separate jobs, each calling a method in your ingestion class:
namespace App\Services\Feedback\Import\Connectors\Tripadvisor;
use App\Services\Feedback\Import\Core\Abstracts\AbstractIngestion;
class TripadvisorIngestion extends AbstractIngestion
{
public function fetch(): iterable
{
// This method itself yields nothing,
// but dispatches a sub-method in a separate job
$this->dispatch('fetchLocations');
return [];
}
public function fetchLocations(): void
{
$response = $this->getConnector()->connect()->get('https://api.tripadvisor.com/v1/locations');
foreach ($response->json('data', []) as $location) {
// For each location, dispatch another method that yields reviews
$this->dispatch('fetchReviewsForLocation', $location['id']);
}
}
public function fetchReviewsForLocation(string $locationId): iterable
{
$response = $this->getConnector()->connect()->get(
"https://api.tripadvisor.com/v1/locations/{$locationId}/reviews"
);
yield from $response->json('results', []);
}
}- Methods returning
iterable: Their items go to the transformer. - Methods returning
void: No items are transformed, but can do sub-dispatching.
Starts From
These optional interfaces allow you to resume imports so you don’t re-ingest older data. Often, when you detect data that’s older than your stored timestamp, you can stop iterating entirely rather than just skipping one item. This avoids fetching unnecessary reviews.
SupportsStartFrom
Defines a single methodsetStartFrom(?Carbon $date)for a global resume point.SupportsStartFromPerEntity
Lets you set and store multiple timestamps, one per entity (e.g., per location or account).setStartFromForEntity(string $entityId, ?Carbon $date)getProcessedEntities(): array
Example: SupportsStartFrom
namespace App\Services\Feedback\Import\Connectors\Tripadvisor;
use App\Services\Feedback\Import\Core\Abstracts\AbstractIngestion;
use App\Services\Feedback\Import\Core\Interfaces\Ingestion\SupportsStartFrom;
use Carbon\Carbon;
class TripadvisorIngestionGlobal extends AbstractIngestion implements SupportsStartFrom
{
protected ?Carbon $startFrom = null;
public function setStartFrom(?Carbon $date): void
{
// Called by the Import Manager before fetch() is executed
$this->startFrom = $date;
}
public function fetch(): iterable
{
$response = $this->getConnector()
->connect()
->get('https://api.tripadvisor.com/v1/reviews');
// Suppose the results are sorted from newest to oldest
foreach ($response->json('results', []) as $rawItem) {
$createdAt = Carbon::parse($rawItem['created_at'] ?? now());
if ($this->startFrom && $createdAt->lt($this->startFrom)) {
// Once we reach any item older than our startFrom date,
// we stop the entire loop (no need to process further)
break;
}
yield $rawItem;
}
}
}Example: SupportsStartFromPerEntity
namespace App\Services\Feedback\Import\Connectors\Tripadvisor;
use App\Services\Feedback\Import\Core\Abstracts\AbstractIngestion;
use App\Services\Feedback\Import\Core\Interfaces\Ingestion\SupportsStartFromPerEntity;
use Carbon\Carbon;
class TripadvisorIngestionPerEntity extends AbstractIngestion implements SupportsStartFromPerEntity
{
protected array $startFromByEntity = [];
protected array $processedEntities = [];
public function setStartFromForEntity(string $entityId, ?Carbon $date): void
{
// Called by the Import Manager before fetch() is executed
$this->startFromByEntity[$entityId] = $date;
}
public function getProcessedEntities(): array
{
// Called by the Import Manager after ingestion finishes
// to update timestamps in the import configuration
return $this->processedEntities;
}
public function fetch(): iterable
{
// Dispatch sub-jobs for each location
$this->dispatch('fetchLocations');
return [];
}
private function fetchLocations(): void
{
// In reality, you'd call an endpoint to list all location IDs
$locationIds = ['loc-1', 'loc-2', 'loc-3'];
foreach ($locationIds as $id) {
$this->dispatch('fetchReviewsForLocation', $id);
}
}
public function fetchReviewsForLocation(string $locationId): iterable
{
$startFrom = $this->startFromByEntity[$locationId] ?? null;
$response = $this->getConnector()->connect()->get(
"https://api.tripadvisor.com/v1/locations/{$locationId}/reviews"
);
// Suppose these are sorted from newest to oldest
foreach ($response->json('results', []) as $rawItem) {
$createdAt = Carbon::parse($rawItem['created_at'] ?? now());
if ($startFrom && $createdAt->lt($startFrom)) {
// We found an older item; we can stop this location's loop here
break;
}
// Update the "latest" timestamp processed for this entity
if (!isset($this->processedEntities[$locationId])
|| $createdAt->greaterThan(Carbon::createFromTimestamp($this->processedEntities[$locationId]))) {
$this->processedEntities[$locationId] = $createdAt->timestamp;
}
yield $rawItem;
}
}
}Transformer
A Transformer transforms raw items into a FeedbackData object. The simplest pattern is:
public function transform(mixed $raw): FeedbackData
{
return FeedbackData::from([
'carrierId' => $this->getImport()->carrier_id,
'source' => 'tripadvisor',
'attributes' => [],
'answers' => [],
// ...
]);
}SupportsMapping
By implementing SupportsMapping, you can define a mappers() array of closures:
- If you return an empty array (
[]), the system tries to apply default logic (auto-mapping). - If you define closures, these override or add custom fields.
Example:
// 1) Automatic (empty array)
public function mappers(): array
{
return [];
}
// 2) Manual
public function mappers(): array
{
return [
'score' => fn($raw) => $raw['rating'] ?? 0,
'comment' => fn($raw) => $raw['comment'] ?? '',
];
}SupportsUniqueness
To avoid importing the same feedback multiple times, you can implement SupportsUniqueness:
public function getUniqueFingerprint(mixed $raw): string
{
return 'tripadvisor:' . ($raw['id'] ?? uniqid());
}If two items have the same fingerprint, the system recognizes them as duplicates.
/!\ WARNING : Never use data_get in this method, otherwise it will work for the first feedback but all the next ones will be ignored and "silent fail". We prefer to receive a sentry when we cannot identify the unique fingerprint of a feedback rather than just importing it.
Pool Manager
After transformation, the import pipeline does not insert feedback directly into the database. Instead, each FeedbackData is pushed to the pool. Another job or command periodically consumes these entries in batches (for example, 50 items at a time) and processes them into actual feedback records.
Why a pool?
- Large-scale imports can produce hundreds of thousands or millions of records.
- Processing them in a separate step ensures the ingestion job never runs too long.
Synchronization Manager
If the factory implements SupportsSynchronization, you can create a Synchronizer to update or delete previously imported feedback.
public function synchronize(Feedback $feedback): void
{
$response = $this->getConnector()
->connect()
->get("https://api.tripadvisor.com/v1/reviews/{$feedback->fingerprint}");
if ($response->notFound()) {
$this->delete($feedback);
} else {
$this->update($feedback, $response->json());
}
}Here, $feedback->fingerprint is typically the unique identifier stored when the item was first imported.
Configuration
To register your import, update config/feedback.php. For example:
'import' => [
'types' => [
[
'type' => 'tripadvisor',
'factory' => \App\Services\Feedback\Import\Connectors\Tripadvisor\TripadvisorFactory::class,
'trigger' => \App\Services\Feedback\Import\Core\Enums\TriggerTypeEnum::SCHEDULED,
'recurrences' => [
'execution' => '5 * * * *', // run each hour at minute 5
'synchronization' => '30 * * * *', // sync each hour at minute 30
],
],
],
],Trigger Types
- SCHEDULED: Cron-based.
- ON_DEMAND: Only runs when explicitly triggered.
- ONE_TIME: Runs once, then never repeats.
Commands and Scheduling
Typically, three commands are scheduled in app/Console/Kernel.php:
feedback:import:dispatch-all– Runs due imports (creates new executions).feedback:import:sync-all– Executes synchronization jobs for imports that support it.feedback:pool:consume– Consumes pooled feedback in manageable batches.
How to Run an Import
1. Start an Import (using Import Manager)
If you want to run an import directly in your code:
use App\Models\FeedbackImportExecution;
use App\Services\Feedback\Import\ImportManager;
use App\Services\Feedback\Import\Core\Registry\ImportFactoryRegistry;
$execution = FeedbackImportExecution::findOrFail($executionId); // an existing execution record
// For example, obtain the registry from the application container
$factoryRegistry = app()->make(ImportFactoryRegistry::class);
// Instantiate the manager and run the import
$importManager = new ImportManager($factoryRegistry);
$importManager->run($execution);This will:
- Validate the connector (
isValid()). - Create ingestion and transformer instances.
- Fetch data, transform it, and push
FeedbackDataobjects to the pool.
2. Start an Import (using command)
To trigger all eligible imports via Artisan:
php artisan feedback:import:dispatch-allThis command:
- Scans all configured imports in
config/feedback.php. - Checks whether each import is due (based on
triggerandrecurrences). - Creates a
FeedbackImportExecutionand queues a job to run the import.
3. Execute the Feedback in the Pool
The transformed feedback stored in the pool isn’t persisted until a separate command processes it:
php artisan feedback:pool:consumeThis command:
- Finds pooled feedback items in batches (e.g., 50 at a time).
- Inserts them into the database via a background job, ensuring no long-running single job.
4. Execute the Synchronizer
If the factory supports synchronization (i.e., implements SupportsSynchronization), run:
php artisan feedback:import:sync-allThis checks previously imported feedback against the external source:
- If the external review no longer exists, it’s marked or removed locally.
- If new data is found, it updates the existing feedback record accordingly.
Key Takeaways
- Factory: Provides Connector, Ingestion, Transformer (+ optional Synchronizer).
- Connector: Must implement
connect()andisValid(). - Ingestion: Yields data via
fetch(); can dispatch sub-methods for large imports. - Transformer: Builds
FeedbackData; optionally usesmappers()and uniqueness checks. - Pool Manager: Separates ingestion from actual DB insertion for scalability.
- Synchronization: Keeps existing feedback updated or removes it if no longer valid.