src/Services/ElasticsearchLogstashHandlerImp.php line 13

Open in your IDE?
  1. <?php
  2. namespace App\Services ;
  3. use Monolog\Formatter\FormatterInterface;
  4. use Monolog\Formatter\LogstashFormatter;
  5. use Monolog\Handler\FormattableHandlerTrait;
  6. use Monolog\Handler\ProcessableHandlerTrait;
  7. use Monolog\Logger;
  8. use Symfony\Bridge\Monolog\Handler\ElasticsearchLogstashHandler as MonologHandler;
  9. use Symfony\Component\HttpClient\HttpClient;
  10. use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
  11. use Symfony\Contracts\HttpClient\HttpClientInterface;
  12. class ElasticsearchLogstashHandlerImp extends MonologHandler
  13. {
  14.     use FormattableHandlerTrait;
  15.     use ProcessableHandlerTrait;
  16.     private $endpoint;
  17.     private $index;
  18.     private $client;
  19.     private $responses;
  20.     /**
  21.      * @param string|int $level The minimum logging level at which this handler will be triggered
  22.      */
  23.     public function __construct(string $endpoint 'http://127.0.0.1:9200'string $index 'monolog'HttpClientInterface $client null$level Logger::DEBUGbool $bubble true)
  24.     {
  25.         if (!interface_exists(HttpClientInterface::class)) {
  26.             throw new \LogicException(sprintf('The "%s" handler needs an HTTP client. Try running "composer require symfony/http-client".'__CLASS__));
  27.         }
  28.         parent::__construct($endpoint$index$client$level$bubble);
  29.         $this->endpoint $endpoint;
  30.         $this->index $index;
  31.         $this->client $client ?: HttpClient::create(['timeout' => 1]);
  32.         $this->responses = new \SplObjectStorage();
  33.     }
  34.     public function setIndex(?string $index) {
  35.         if ($index)
  36.             $this->index $index;
  37.     }
  38.     public function handle(array $record): bool
  39.     {
  40.         if (!$this->isHandling($record)) {
  41.             return false;
  42.         }
  43.         $this->sendToElasticsearch([$record]);
  44.         return !$this->bubble;
  45.     }
  46.     public function handleBatch(array $records): void
  47.     {
  48.         $records array_filter($records, [$this'isHandling']);
  49.         if ($records) {
  50.             $this->sendToElasticsearch($records);
  51.         }
  52.     }
  53.     protected function getDefaultFormatter(): FormatterInterface
  54.     {
  55.         // Monolog 1.X
  56.         if (\defined(LogstashFormatter::class.'::V1')) {
  57.             return new LogstashFormatter('application'nullnull'ctxt_'LogstashFormatter::V1);
  58.         }
  59.         // Monolog 2.X
  60.         return new LogstashFormatter('application');
  61.     }
  62.     private function sendToElasticsearch(array $records)
  63.     {
  64.         $formatter $this->getFormatter();
  65.         $body '';
  66.         foreach ($records as $record) {
  67.             foreach ($this->processors as $processor) {
  68.                 $record $processor($record);
  69.             }
  70.             $body .= json_encode([
  71.                 'index' => [
  72.                     '_index' => $this->index,
  73.                     '_type' => '_doc',
  74.                 ],
  75.             ]);
  76.             $body .= "\n";
  77.             $body .= $formatter->format($record);
  78.             $body .= "\n";
  79.         }
  80.         $response $this->client->request('POST'$this->endpoint.'/_bulk', [
  81.             'body' => $body,
  82.             'headers' => [
  83.                 'Content-Type' => 'application/json',
  84.                 'partner'=>$_ENV['PARTENER_ELASTICSEACH'],
  85.                 'appkey'=>$_ENV['APP_KEY_ELASTICSEACH'],
  86.             ],
  87.         ]);
  88.         $this->responses->attach($response);
  89.         $this->wait(false);
  90.     }
  91.     public function __destruct()
  92.     {
  93.         $this->wait(true);
  94.     }
  95.     private function wait(bool $blocking)
  96.     {
  97.         foreach ($this->client->stream($this->responses$blocking null 0.0) as $response => $chunk) {
  98.             try {
  99.                 if ($chunk->isTimeout() && !$blocking) {
  100.                     continue;
  101.                 }
  102.                 if (!$chunk->isFirst() && !$chunk->isLast()) {
  103.                     continue;
  104.                 }
  105.                 if ($chunk->isLast()) {
  106.                     $this->responses->detach($response);
  107.                 }
  108.             } catch (ExceptionInterface $e) {
  109.                 $this->responses->detach($response);
  110.                 error_log(sprintf("Could not push logs to Elasticsearch:\n%s", (string) $e));
  111.             }
  112.         }
  113.     }
  114. }