redis = $redis_client->getRedisClient(); $this->key = $key; } public function __destruct() { // $this->mclient->disconnect(); } public function publish($channel, $message) { // $this->mclient->publish($channel, $message); $data = $channel . '|' . $message; $this->redis->lpush($this->key, $data); } public function sendEvent(JobOrder $job_order, $payload) { error_log('sending mqtt event: '); error_log(print_r($payload, true)); $sessions = $job_order->getCustomer()->getMobileSessions(); if (count($sessions) == 0) { error_log("no sessions to send mqtt event to"); return; } $channels = []; // send to every customer session foreach ($sessions as $sess) { $phone_num = $sess->getPhoneNumber(); $channel = self::PREFIX . $phone_num; // gather channels, so we only send once $channels[$channel] = json_encode($payload); } foreach ($channels as $channel => $json_payload) { $this->publish($channel, $json_payload); // error_log('sent to ' . $channel); } } public function sendRiderEvent(JobOrder $job_order, $payload) { // check if a rider is available $rider = $job_order->getRider(); if ($rider == null) return; // check if rider has sessions $sessions = $rider->getSessions(); if (count($sessions) == 0) return; // send to every rider session foreach ($sessions as $sess) { $sess_id = $sess->getID(); $channel = self::RIDER_PREFIX . $sess_id; $this->publish($channel, json_encode($payload)); } } }