Skip to content

Commit

Permalink
Copy Files From External Storage To Local Storage for Import
Browse files Browse the repository at this point in the history
  • Loading branch information
wattnpapa committed Dec 30, 2024
1 parent a12430e commit 1363232
Showing 1 changed file with 59 additions and 33 deletions.
92 changes: 59 additions & 33 deletions src/Snapshot.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
namespace Spatie\DbSnapshots;

use Carbon\Carbon;
use Illuminate\Contracts\Filesystem\Factory;
use Illuminate\Filesystem\FilesystemAdapter as Disk;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\LazyCollection;
use Spatie\DbSnapshots\Events\DeletedSnapshot;
use Spatie\DbSnapshots\Events\DeletingSnapshot;
use Spatie\DbSnapshots\Events\LoadedSnapshot;
use Spatie\DbSnapshots\Events\LoadingSnapshot;
use Spatie\TemporaryDirectory\TemporaryDirectory;

class Snapshot
{
Expand All @@ -25,6 +27,8 @@ class Snapshot

public const STREAM_BUFFER_SIZE = 16384;

protected Factory $filesystemFactory;

public function __construct(Disk $disk, string $fileName)
{
$this->disk = $disk;
Expand All @@ -39,6 +43,8 @@ public function __construct(Disk $disk, string $fileName)
}

$this->name = pathinfo($fileName, PATHINFO_FILENAME);

$this->filesystemFactory = app(Factory::class);
}

public function useStream()
Expand Down Expand Up @@ -90,45 +96,65 @@ protected function shouldIgnoreLine(string $line): bool

protected function loadStream(string $connectionName = null)
{
LazyCollection::make(function () {
$stream = $this->compressionExtension === 'gz'
? gzopen($this->disk->path($this->fileName), 'r')
: $this->disk->readStream($this->fileName);

$statement = '';
while (! feof($stream)) {
$chunk = $this->compressionExtension === 'gz'
? gzread($stream, self::STREAM_BUFFER_SIZE)
: fread($stream, self::STREAM_BUFFER_SIZE);
$directory = (new TemporaryDirectory(config('db-snapshots.temporary_directory_path')))->create();

$lines = explode("\n", $chunk);
foreach ($lines as $idx => $line) {
if ($this->shouldIgnoreLine($line)) {
continue;
}
config([
'filesystems.disks.' . self::class => [
'driver' => 'local',
'root' => $directory->path(),
'throw' => false,
]
]);

$statement .= $line;
$localDisk = $this->filesystemFactory->disk(self::class);

// Carry-over the last line to the next chunk since it
// is possible that this chunk finished mid-line right on
// a semi-colon.
if (count($lines) == $idx + 1) {
break;
}
try {
LazyCollection::make(function () use ($localDisk) {
$localDisk->writeStream($this->fileName, $this->disk->readStream($this->fileName));

$stream = $this->compressionExtension === 'gz'
? gzopen($localDisk->path($this->fileName), 'r')
: $localDisk->readStream($this->fileName);

if (substr(trim($statement), -1, 1) === ';') {
yield $statement;
$statement = '';
$statement = '';
while (! feof($stream)) {
$chunk = $this->compressionExtension === 'gz'
? gzread($stream, self::STREAM_BUFFER_SIZE)
: fread($stream, self::STREAM_BUFFER_SIZE);

$lines = explode("\n", $chunk);
foreach ($lines as $idx => $line) {
if ($this->shouldIgnoreLine($line)) {
continue;
}

$statement .= $line;

// Carry-over the last line to the next chunk since it
// is possible that this chunk finished mid-line right on
// a semi-colon.
if (count($lines) == $idx + 1) {
break;
}

if (substr(trim($statement), -1, 1) === ';') {
yield $statement;
$statement = '';
}
}
}
}

if (substr(trim($statement), -1, 1) === ';') {
yield $statement;
}
})->each(function (string $statement) use ($connectionName) {
DB::connection($connectionName)->unprepared($statement);
});

if ($this->compressionExtension === 'gz') {
gzclose($stream);
} else {
fclose($stream);
}
})->each(function (string $statement) use ($connectionName) {
DB::connection($connectionName)->unprepared($statement);
});
} finally {
$directory->delete();
}
}

public function delete(): void
Expand Down

0 comments on commit 1363232

Please sign in to comment.