redis = $redis_client->getRedisClient(); $this->key = $key; } public function __destruct() { // $this->mclient->disconnect(); } public function publish($channel, $message) { // $this->mclient->publish($channel, $message); $data = $channel . '|' . $message; $this->redis->lpush($this->key, $data); } public function sendEvent(JobOrder $job_order, $payload) { //error_log('sending mqtt event: '); //error_log(print_r($payload, true)); // get all v2 sessions $sessions = []; $cust_user = $job_order->getCustomer()->getCustomerUser(); if (!empty($cust_user)) { $sessions = $cust_user->getMobileSessions(); } if (empty($sessions)) { error_log("no sessions to send mqtt event to"); return; } $channels = []; // send to every customer session foreach ($sessions as $sess) { $phone_num = $sess->getPhoneNumber(); $channel = self::PREFIX . $phone_num; // attach jo id to all payloads $payload['jo_id'] = $job_order->getID(); // gather channels, so we only send once $channels[$channel] = json_encode($payload); } foreach ($channels as $channel => $json_payload) { $this->publish($channel, $json_payload); // error_log('sent to ' . $channel); } } }