mirror of
https://github.com/openvk/openvk
synced 2025-01-05 07:09:50 +03:00
83 lines
3.4 KiB
PHP
83 lines
3.4 KiB
PHP
<?php declare(strict_types=1);
|
|
namespace openvk\ServiceAPI;
|
|
use Latte\Engine as TemplatingEngine;
|
|
use RdKafka\{Conf as RDKConf, KafkaConsumer};
|
|
use openvk\Web\Models\Entities\User;
|
|
use openvk\Web\Models\Repositories\{Notifications as N};
|
|
|
|
class Notifications implements Handler
|
|
{
|
|
protected $user;
|
|
protected $notifs;
|
|
|
|
function __construct(?User $user)
|
|
{
|
|
$this->user = $user;
|
|
$this->notifs = new N;
|
|
}
|
|
|
|
function ack(callable $resolve, callable $reject): void
|
|
{
|
|
$this->user->updateNotificationOffset();
|
|
$this->user->save();
|
|
$resolve("OK");
|
|
}
|
|
|
|
function fetch(callable $resolve, callable $reject): void
|
|
{
|
|
$kafkaConf = OPENVK_ROOT_CONF["openvk"]["credentials"]["notificationsBroker"];
|
|
if(!$kafkaConf["enable"]) {
|
|
$reject(1999, "Disabled");
|
|
return;
|
|
}
|
|
|
|
$kafkaConf = $kafkaConf["kafka"];
|
|
$conf = new RDKConf();
|
|
$conf->set("metadata.broker.list", $kafkaConf["addr"] . ":" . $kafkaConf["port"]);
|
|
$conf->set("group.id", "UserFetch-" . $this->user->getId()); # Чтобы уведы приходили только на разные устройства одного чебупелика
|
|
$conf->set("auto.offset.reset", "latest");
|
|
|
|
set_time_limit(30);
|
|
$consumer = new KafkaConsumer($conf);
|
|
$consumer->subscribe([ $kafkaConf["topic"] ]);
|
|
|
|
while(true) {
|
|
$message = $consumer->consume(30*1000);
|
|
switch ($message->err) {
|
|
case RD_KAFKA_RESP_ERR_NO_ERROR:
|
|
$descriptor = $message->payload;
|
|
[,$user,] = explode(",", $descriptor);
|
|
if(((int) $user) === $this->user->getId()) {
|
|
$data = (object) [];
|
|
$notification = $this->notifs->fromDescriptor($descriptor, $data);
|
|
if(!$notification) {
|
|
$reject(1982, "Server Error");
|
|
return;
|
|
}
|
|
|
|
$tplDir = __DIR__ . "/../Web/Presenters/templates/components/notifications/";
|
|
$tplId = "$tplDir$data->actionCode/_$data->originModelType" . "_" . $data->targetModelType . "_.xml";
|
|
$latte = new TemplatingEngine;
|
|
$latte->setTempDirectory(CHANDLER_ROOT . "/tmp/cache/templates");
|
|
$latte->addFilter("translate", fn($trId) => tr($trId));
|
|
$resolve([
|
|
"title" => tr("notif_" . $data->actionCode . "_" . $data->originModelType . "_" . $data->targetModelType),
|
|
"body" => trim(preg_replace('%(\s){2,}%', "$1", $latte->renderToString($tplId, ["notification" => $notification]))),
|
|
"ava" => $notification->getModel(1)->getAvatarUrl(),
|
|
"priority" => 1,
|
|
]);
|
|
return;
|
|
}
|
|
|
|
break;
|
|
case RD_KAFKA_RESP_ERR__TIMED_OUT:
|
|
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
|
|
$reject(1983, "Nothing to report");
|
|
break 2;
|
|
default:
|
|
$reject(1981, "Kafka Error: " . $message->errstr());
|
|
break 2;
|
|
}
|
|
}
|
|
}
|
|
}
|