Resolve "Rider location caching" #1063

Merged
jankstudio merged 8 commits from 180-rider-location-caching into master 2019-05-28 03:24:23 +00:00
7 changed files with 329 additions and 82 deletions

View file

@ -0,0 +1,46 @@
<?php
namespace App\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use CrEOF\Spatial\PHP\Types\Geometry\Point;
use App\Service\RiderTracker;
class TestRiderTrackerCommand extends Command
{
protected function configure()
{
$this->setName('test:ridertracker')
->setDescription('Test the rider tracker service')
->setHelp('Test the rider tracker service.')
->addArgument('rider_id', InputArgument::REQUIRED, 'Rider ID');
}
public function __construct(RiderTracker $rtracker)
{
$this->rtracker = $rtracker;
parent::__construct();
}
public function execute(InputInterface $input, OutputInterface $output)
{
$rider_id = $input->getArgument('rider_id');
$coordinates = $this->rtracker->getRiderLocation($rider_id);
if ($coordinates != null)
{
echo "Rider Location: " . $coordinates->getLongitude() . "," . $coordinates->getLatitude() . "\n";
}
else
{
echo "Invalid rider id." . "\n";
}
}
}

View file

@ -26,6 +26,7 @@ use App\Service\InvoiceCreator;
use App\Service\RisingTideGateway; use App\Service\RisingTideGateway;
use App\Service\MQTTClient; use App\Service\MQTTClient;
use App\Service\GeofenceTracker; use App\Service\GeofenceTracker;
use App\Service\RiderTracker;
use App\Entity\MobileSession; use App\Entity\MobileSession;
use App\Entity\Customer; use App\Entity\Customer;
@ -1116,7 +1117,7 @@ class APIController extends Controller
return $res->getReturnResponse(); return $res->getReturnResponse();
} }
public function getRiderStatus(Request $req) public function getRiderStatus(Request $req, RiderTracker $rt)
{ {
$required_params = []; $required_params = [];
$em = $this->getDoctrine()->getManager(); $em = $this->getDoctrine()->getManager();
@ -1243,8 +1244,9 @@ class APIController extends Controller
case JOStatus::ASSIGNED: case JOStatus::ASSIGNED:
case JOStatus::IN_TRANSIT: case JOStatus::IN_TRANSIT:
case JOStatus::IN_PROGRESS: case JOStatus::IN_PROGRESS:
$coord = $jo->getHub()->getCoordinates();
$rider = $jo->getRider(); $rider = $jo->getRider();
// get rider coordinates from redis
$coord = $rt->getRiderLocation($rider->getID());
// default image url // default image url
$url_prefix = $req->getSchemeAndHttpHost(); $url_prefix = $req->getSchemeAndHttpHost();

View file

@ -0,0 +1,57 @@
<?php
namespace App\Service;
use App\Entity\Rider;
use Doctrine\ORM\EntityManagerInterface;
use CrEOF\Spatial\PHP\Types\Geometry\Point;
use Predis\Client as PredisClient;
class RiderTracker
{
const RIDER_PREFIX_KEY = 'rider.location';
protected $em;
protected $redis;
public function __construct(EntityManagerInterface $em)
{
$this->em = $em;
// TODO: make it read redis settings from config
// build a service maybe?
$this->redis = new PredisClient();
}
protected function getRiderKey($rider_id)
{
return self::RIDER_PREFIX_KEY . '.' . $rider_id;
}
public function getRiderLocation($rider_id)
{
$coordinates = new Point(0,0);
$key = $this->getRiderKey($rider_id);
// check redis cache for rider information
if (($this->redis->hexists($key, 'longitude')) &&
($this->redis->hexists($key, 'latitude')))
{
$long = $this->redis->hget($key, 'longitude');
$lat = $this->redis->hget($key, 'latitude');
$coordinates = new Point($long, $lat);
}
else
{
$rider = $this->em->getRepository(Rider::class)->find($rider_id);
$coordinates = $rider->getHub()->getCoordinates();
}
return $coordinates;
}
}

View file

@ -0,0 +1,129 @@
#!/usr/bin/env python
import sys, os, time, atexit
from signal import SIGTERM
class Daemon:
"""
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile,'w+').write("%s\n" % pid)
def delpid(self):
os.remove(self.pidfile)
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
"""
You should override this method when you subclass Daemon. It will be called after the process has been
daemonized by start() or restart().
"""

Binary file not shown.

View file

@ -1,106 +1,89 @@
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import ssl import ssl
from threading import Thread from threading import Thread
from daemon import Daemon
import redis import redis
import time import time
import signal import signal
import sys import sys
import os import os
import mysql.connector
import json import json
def mysql_connect(user, password, host, database): class RiderLocationCache(object):
conn = mysql.connector.connect(user=user,
password=password,
host=host,
database=database)
return conn
def init_subscriptions(client, conn): def run(self):
# given mysql connection, get all rider sessions client = mqtt.Client()
query = ("select id from rider_session")
cursor = conn.cursor() client.on_connect = on_connect
cursor.execute(query) # client.on_publish = on_publish
for (id) in cursor: client.on_message = on_message
print "subscribing to rider session %s" % id
client.subscribe('motorider_%s' % id) #client.tls_set(
cursor.close() # "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
# tls_version=ssl.PROTOCOL_TLSv1)
#client.tls_set(
# "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
# tls_version=ssl.PROTOCOL_TLSv1)
#client.connect("resqaws.jankstudio.com", 8883, 60)
client.connect("localhost", 1883, 60)
#t = Thread(target=getRedis, args=(1,))
#t.start()
#signal.signal(signal.SIGINT, sigint_handler)
client.loop_forever()
def init_subscriptions(client):
#print "subscribing to wildcard"
client.subscribe('#')
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
conn = mysql_connect('resq', 'Motolite456', '127.0.0.1', 'resq') init_subscriptions(client)
init_subscriptions(client, conn) #print("Connected with result code "+str(rc))
print("Connected with result code "+str(rc)) # client.subscribe("$SYS/#")
client.subscribe("$SYS/#")
def user_data_set(userdata):
conn = redis.StrictRedis(host='localhost', port=6379, db=0)
return conn
def on_publish(client, userdata, mid): def on_publish(client, userdata, mid):
pass pass
def on_message(client, userdata, message): def on_message(client, userdata, message):
print("message topic=",message.topic[0:10]) redis_conn = user_data_set(userdata)
#print("message topic=", message.topic[0:10])
if message.topic[0:10] != 'motorider_': if message.topic[0:10] != 'motorider_':
return return
#print repr(message) #print repr(message)
# check if json decodable # check if json decodable
res = json.loads(message.payload) res = json.loads(message.payload)
#print res
# get rider session id # get rider session id
sess_id = message.topic[10:] sess_id = message.topic[10:]
# check if it has event # check if it has event
if 'event' not in res: if 'event' not in res:
return return
# check if event is driver_location # check if event is driver_location
if res['event'] != 'driver_location': if res['event'] != 'driver_location':
return return
# update our redis key # save the longitude and latitude
key = 'location_%s' % sess_id # get the rider id from sess_id
print "setting %s" % key rider_key = "rider.location.%s" % sess_id
redis_conn.setex(key, 1600, message.payload) rider_long = str(res['longitude'])
rider_lat = str(res['latitude'])
# set the location
redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat})
# update our redis key
def getRedis(i): key = 'location_%s' % sess_id
r = redis.StrictRedis(host='localhost', port=6379, db=0) #print "setting %s" % key
while 1: redis_conn.setex(key, 1600, message.payload)
time.sleep(0)
data = r.brpop("events", 10)
if data:
info = data[1].split('|')
print "Channel: " + info[0] + " message: " + info[1]
client.publish(info[0], info[1])
def sigint_handler(signal, frame):
print 'Interrupted'
sys.exit(0)
client = mqtt.Client()
client.on_connect = on_connect
# client.on_publish = on_publish
client.on_message = on_message
redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
#client.tls_set(
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
# tls_version=ssl.PROTOCOL_TLSv1)
client.tls_set(
"/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1)
client.connect("resqaws.jankstudio.com", 8883, 60)
#t = Thread(target=getRedis, args=(1,))
#t.start()
#signal.signal(signal.SIGINT, sigint_handler)
client.loop_forever()

View file

@ -0,0 +1,30 @@
#!/usr/bin/env python
import sys
import time
from daemon import Daemon
import rider_location_cache
class RiderLocationDaemon(Daemon):
def run(self):
rider_location = rider_location_cache.RiderLocationCache()
rider_location.run()
if __name__ == "__main__":
daemon = RiderLocationDaemon('/tmp/rider_location_daemon.pid')
if len(sys.argv) == 2:
if 'start' == sys.argv[1]:
daemon.start()
elif 'stop' == sys.argv[1]:
daemon.stop()
elif 'restart' == sys.argv[1]:
daemon.restart()
elif 'foreground' == sys.argv[1]:
daemon.run()
else:
print "Unknown command"
sys.exit(2)
sys.exit(0)
else:
print "usage: %s start|stop|restart" % sys.argv[0]
sys.exit(2)