-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathreactphp-report.php
116 lines (90 loc) · 4.55 KB
/
reactphp-report.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
<?php
//https://www.youtube.com/watch?v=dRroOoQRVIs&t=3163s
//Creditos: Sergio Sianuco Leon
require 'vendor/autoload.php';
//crear event-loop
$loop = \React\EventLoop\Factory::create();
//ejecutar algo cada 1 sg
$loop->addPeriodicTimer(5, function() use ($loop){
//conectar a la base de datos
//Get a new connection to query jobs
//utilizar factory MySQL (para eso se instalo react/mysql)
$factory = new \React\MySQL\Factory($loop);
$uri= 'root:123456@localhost/test_db?timeout=0.5';
$connectionJob = $factory->createLazyConnection($uri);
/*
//consulta de prueba
$connectionJob->query('SELECT * FROM employees')->then(
function (\React\MySQL\QueryResult $command) {
echo count($command->resultRows) . ' row(s) in set' . PHP_EOL;
},
function (Exception $error) {
echo 'Error: ' . $error->getMessage() . PHP_EOL;
}
);
*/
//Get unreserved jobs
$connectionJob->query('SELECT * FROM reports WHERE created_at IS NULL LIMIT 1')->then( //query-1
function(\React\MySQL\QueryResult $command) use($connectionJob, $loop, $factory){
//Do nothing if there is no jobs
if(count($command->resultRows) === 0){
//'no hay jobs';
return;
}
//Get job
$resultRow = $command->resultRows[0];
//Reserve job
$connectionJob
->query('UPDATE reports SET created_at = '.\Carbon\Carbon::now()->getTimestamp().' WHERE id ='.$resultRow['id'])
->then(function() use($connectionJob, $resultRow, $loop){ //then 1
$tableName = $resultRow['table_name'];
//Get a new connection to query report
$factory = new \React\MySQL\Factory($loop);
$uri = 'root:123456@localhost/test_db?timeout=0.5';
$connectionReport = $factory->createLazyConnection($uri);
//Perform an async query and stream the rows of the result set
$stream = $connectionReport->queryStream('SELECT * FROM '. $tableName);
echo "Inicio informe ".$tableName. PHP_EOL;
//Create a stream for file
$fileName = tempnam(__DIR__, "report_".$tableName."_");
//agregar extension
rename($fileName, $fileName .= '.csv');
echo 'filename: '.$fileName. PHP_EOL;;
$fileStream = new \React\Stream\WritableResourceStream(fopen($fileName, 'w'), $loop, 1);
//Create a Stream to transform ResultSet to CSV line
$transformStream = new \React\Stream\ThroughStream(function($data){
return implode(',', array_values($data)).PHP_EOL;
});
//write to file
$stream->pipe($transformStream)->pipe($fileStream); //conectar streams el de bd y el del archivo
$command = $tableName;
//When stream ends, close file, close connections
$stream->on('end', function() use ($connectionJob, $connectionReport, $fileStream, $resultRow, $tableName) {
echo "Fin informe ".$tableName. PHP_EOL;
//close file
$fileStream->end();
//close connection to query report
$connectionReport->quit();
//delete job
$connectionJob
->query("DELETE FROM `test_db`.`reports` WHERE (`id` ='{$resultRow['id']}')")
->then(
function() use($connectionJob){
//close connection to query jobs
$connectionJob->quit();
},
function (Exception $error){
echo 'Error '.$error->getMessage(). PHP_EOL;
});
});
}, function (Exception $error){ //}); //end then 1
echo 'Error '.$error->getMessage(). PHP_EOL;
});
},
function (Exception $error) {
echo 'Error: ' . $error->getMessage() . PHP_EOL;
}
); //end-query-1
});
$loop->run();
?>