Merge branch '180-rider-location-caching' into 'master'
Resolve "Rider location caching" Closes #180 See merge request jankstudio/resq!252
This commit is contained in:
commit
740110b42f
7 changed files with 329 additions and 82 deletions
46
src/Command/TestRiderTrackerCommand.php
Normal file
46
src/Command/TestRiderTrackerCommand.php
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
<?php
|
||||
|
||||
namespace App\Command;
|
||||
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
use CrEOF\Spatial\PHP\Types\Geometry\Point;
|
||||
|
||||
use App\Service\RiderTracker;
|
||||
|
||||
class TestRiderTrackerCommand extends Command
|
||||
{
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName('test:ridertracker')
|
||||
->setDescription('Test the rider tracker service')
|
||||
->setHelp('Test the rider tracker service.')
|
||||
->addArgument('rider_id', InputArgument::REQUIRED, 'Rider ID');
|
||||
}
|
||||
|
||||
public function __construct(RiderTracker $rtracker)
|
||||
{
|
||||
$this->rtracker = $rtracker;
|
||||
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
public function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
$rider_id = $input->getArgument('rider_id');
|
||||
|
||||
$coordinates = $this->rtracker->getRiderLocation($rider_id);
|
||||
|
||||
if ($coordinates != null)
|
||||
{
|
||||
echo "Rider Location: " . $coordinates->getLongitude() . "," . $coordinates->getLatitude() . "\n";
|
||||
}
|
||||
else
|
||||
{
|
||||
echo "Invalid rider id." . "\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -26,6 +26,7 @@ use App\Service\InvoiceCreator;
|
|||
use App\Service\RisingTideGateway;
|
||||
use App\Service\MQTTClient;
|
||||
use App\Service\GeofenceTracker;
|
||||
use App\Service\RiderTracker;
|
||||
|
||||
use App\Entity\MobileSession;
|
||||
use App\Entity\Customer;
|
||||
|
|
@ -1116,7 +1117,7 @@ class APIController extends Controller
|
|||
return $res->getReturnResponse();
|
||||
}
|
||||
|
||||
public function getRiderStatus(Request $req)
|
||||
public function getRiderStatus(Request $req, RiderTracker $rt)
|
||||
{
|
||||
$required_params = [];
|
||||
$em = $this->getDoctrine()->getManager();
|
||||
|
|
@ -1243,8 +1244,9 @@ class APIController extends Controller
|
|||
case JOStatus::ASSIGNED:
|
||||
case JOStatus::IN_TRANSIT:
|
||||
case JOStatus::IN_PROGRESS:
|
||||
$coord = $jo->getHub()->getCoordinates();
|
||||
$rider = $jo->getRider();
|
||||
// get rider coordinates from redis
|
||||
$coord = $rt->getRiderLocation($rider->getID());
|
||||
|
||||
// default image url
|
||||
$url_prefix = $req->getSchemeAndHttpHost();
|
||||
|
|
|
|||
57
src/Service/RiderTracker.php
Normal file
57
src/Service/RiderTracker.php
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
<?php
|
||||
|
||||
namespace App\Service;
|
||||
|
||||
use App\Entity\Rider;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
|
||||
use CrEOF\Spatial\PHP\Types\Geometry\Point;
|
||||
|
||||
use Predis\Client as PredisClient;
|
||||
|
||||
class RiderTracker
|
||||
{
|
||||
const RIDER_PREFIX_KEY = 'rider.location';
|
||||
|
||||
protected $em;
|
||||
protected $redis;
|
||||
|
||||
public function __construct(EntityManagerInterface $em)
|
||||
{
|
||||
$this->em = $em;
|
||||
|
||||
// TODO: make it read redis settings from config
|
||||
// build a service maybe?
|
||||
$this->redis = new PredisClient();
|
||||
}
|
||||
|
||||
protected function getRiderKey($rider_id)
|
||||
{
|
||||
return self::RIDER_PREFIX_KEY . '.' . $rider_id;
|
||||
}
|
||||
|
||||
public function getRiderLocation($rider_id)
|
||||
{
|
||||
$coordinates = new Point(0,0);
|
||||
$key = $this->getRiderKey($rider_id);
|
||||
|
||||
// check redis cache for rider information
|
||||
if (($this->redis->hexists($key, 'longitude')) &&
|
||||
($this->redis->hexists($key, 'latitude')))
|
||||
{
|
||||
$long = $this->redis->hget($key, 'longitude');
|
||||
$lat = $this->redis->hget($key, 'latitude');
|
||||
|
||||
$coordinates = new Point($long, $lat);
|
||||
}
|
||||
else
|
||||
{
|
||||
$rider = $this->em->getRepository(Rider::class)->find($rider_id);
|
||||
$coordinates = $rider->getHub()->getCoordinates();
|
||||
}
|
||||
|
||||
return $coordinates;
|
||||
|
||||
}
|
||||
}
|
||||
129
utils/rider_location_cache/daemon.py
Normal file
129
utils/rider_location_cache/daemon.py
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import sys, os, time, atexit
|
||||
from signal import SIGTERM
|
||||
|
||||
class Daemon:
|
||||
"""
|
||||
A generic daemon class.
|
||||
|
||||
Usage: subclass the Daemon class and override the run() method
|
||||
"""
|
||||
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
|
||||
self.stdin = stdin
|
||||
self.stdout = stdout
|
||||
self.stderr = stderr
|
||||
self.pidfile = pidfile
|
||||
|
||||
def daemonize(self):
|
||||
"""
|
||||
do the UNIX double-fork magic, see Stevens' "Advanced
|
||||
Programming in the UNIX Environment" for details (ISBN 0201563177)
|
||||
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
|
||||
"""
|
||||
try:
|
||||
pid = os.fork()
|
||||
if pid > 0:
|
||||
# exit first parent
|
||||
sys.exit(0)
|
||||
except OSError, e:
|
||||
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
|
||||
sys.exit(1)
|
||||
|
||||
# decouple from parent environment
|
||||
os.chdir("/")
|
||||
os.setsid()
|
||||
os.umask(0)
|
||||
|
||||
# do second fork
|
||||
try:
|
||||
pid = os.fork()
|
||||
if pid > 0:
|
||||
# exit from second parent
|
||||
sys.exit(0)
|
||||
except OSError, e:
|
||||
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
|
||||
sys.exit(1)
|
||||
|
||||
# redirect standard file descriptors
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
si = file(self.stdin, 'r')
|
||||
so = file(self.stdout, 'a+')
|
||||
se = file(self.stderr, 'a+', 0)
|
||||
os.dup2(si.fileno(), sys.stdin.fileno())
|
||||
os.dup2(so.fileno(), sys.stdout.fileno())
|
||||
os.dup2(se.fileno(), sys.stderr.fileno())
|
||||
|
||||
# write pidfile
|
||||
atexit.register(self.delpid)
|
||||
pid = str(os.getpid())
|
||||
file(self.pidfile,'w+').write("%s\n" % pid)
|
||||
|
||||
def delpid(self):
|
||||
os.remove(self.pidfile)
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start the daemon
|
||||
"""
|
||||
# Check for a pidfile to see if the daemon already runs
|
||||
try:
|
||||
pf = file(self.pidfile,'r')
|
||||
pid = int(pf.read().strip())
|
||||
pf.close()
|
||||
except IOError:
|
||||
pid = None
|
||||
|
||||
if pid:
|
||||
message = "pidfile %s already exist. Daemon already running?\n"
|
||||
sys.stderr.write(message % self.pidfile)
|
||||
sys.exit(1)
|
||||
|
||||
# Start the daemon
|
||||
self.daemonize()
|
||||
self.run()
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop the daemon
|
||||
"""
|
||||
# Get the pid from the pidfile
|
||||
try:
|
||||
pf = file(self.pidfile,'r')
|
||||
pid = int(pf.read().strip())
|
||||
pf.close()
|
||||
except IOError:
|
||||
pid = None
|
||||
|
||||
if not pid:
|
||||
message = "pidfile %s does not exist. Daemon not running?\n"
|
||||
sys.stderr.write(message % self.pidfile)
|
||||
return # not an error in a restart
|
||||
|
||||
# Try killing the daemon process
|
||||
try:
|
||||
while 1:
|
||||
os.kill(pid, SIGTERM)
|
||||
time.sleep(0.1)
|
||||
except OSError, err:
|
||||
err = str(err)
|
||||
if err.find("No such process") > 0:
|
||||
if os.path.exists(self.pidfile):
|
||||
os.remove(self.pidfile)
|
||||
else:
|
||||
print str(err)
|
||||
sys.exit(1)
|
||||
|
||||
def restart(self):
|
||||
"""
|
||||
Restart the daemon
|
||||
"""
|
||||
self.stop()
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
You should override this method when you subclass Daemon. It will be called after the process has been
|
||||
daemonized by start() or restart().
|
||||
"""
|
||||
BIN
utils/rider_location_cache/daemon.pyc
Normal file
BIN
utils/rider_location_cache/daemon.pyc
Normal file
Binary file not shown.
|
|
@ -1,106 +1,89 @@
|
|||
import paho.mqtt.client as mqtt
|
||||
import ssl
|
||||
from threading import Thread
|
||||
from daemon import Daemon
|
||||
import redis
|
||||
import time
|
||||
import signal
|
||||
import sys
|
||||
import os
|
||||
import mysql.connector
|
||||
import json
|
||||
|
||||
def mysql_connect(user, password, host, database):
|
||||
conn = mysql.connector.connect(user=user,
|
||||
password=password,
|
||||
host=host,
|
||||
database=database)
|
||||
return conn
|
||||
class RiderLocationCache(object):
|
||||
|
||||
def init_subscriptions(client, conn):
|
||||
# given mysql connection, get all rider sessions
|
||||
query = ("select id from rider_session")
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(query)
|
||||
for (id) in cursor:
|
||||
print "subscribing to rider session %s" % id
|
||||
client.subscribe('motorider_%s' % id)
|
||||
cursor.close()
|
||||
def run(self):
|
||||
client = mqtt.Client()
|
||||
|
||||
client.on_connect = on_connect
|
||||
# client.on_publish = on_publish
|
||||
client.on_message = on_message
|
||||
|
||||
#client.tls_set(
|
||||
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||
# tls_version=ssl.PROTOCOL_TLSv1)
|
||||
#client.tls_set(
|
||||
# "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||
# tls_version=ssl.PROTOCOL_TLSv1)
|
||||
#client.connect("resqaws.jankstudio.com", 8883, 60)
|
||||
client.connect("localhost", 1883, 60)
|
||||
|
||||
#t = Thread(target=getRedis, args=(1,))
|
||||
|
||||
#t.start()
|
||||
|
||||
#signal.signal(signal.SIGINT, sigint_handler)
|
||||
client.loop_forever()
|
||||
|
||||
def init_subscriptions(client):
|
||||
#print "subscribing to wildcard"
|
||||
client.subscribe('#')
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
conn = mysql_connect('resq', 'Motolite456', '127.0.0.1', 'resq')
|
||||
init_subscriptions(client, conn)
|
||||
print("Connected with result code "+str(rc))
|
||||
client.subscribe("$SYS/#")
|
||||
init_subscriptions(client)
|
||||
#print("Connected with result code "+str(rc))
|
||||
# client.subscribe("$SYS/#")
|
||||
|
||||
def user_data_set(userdata):
|
||||
conn = redis.StrictRedis(host='localhost', port=6379, db=0)
|
||||
return conn
|
||||
|
||||
def on_publish(client, userdata, mid):
|
||||
pass
|
||||
pass
|
||||
|
||||
def on_message(client, userdata, message):
|
||||
print("message topic=",message.topic[0:10])
|
||||
redis_conn = user_data_set(userdata)
|
||||
#print("message topic=", message.topic[0:10])
|
||||
|
||||
if message.topic[0:10] != 'motorider_':
|
||||
return
|
||||
#print repr(message)
|
||||
if message.topic[0:10] != 'motorider_':
|
||||
return
|
||||
#print repr(message)
|
||||
|
||||
# check if json decodable
|
||||
res = json.loads(message.payload)
|
||||
# check if json decodable
|
||||
res = json.loads(message.payload)
|
||||
#print res
|
||||
|
||||
# get rider session id
|
||||
sess_id = message.topic[10:]
|
||||
# get rider session id
|
||||
sess_id = message.topic[10:]
|
||||
|
||||
# check if it has event
|
||||
if 'event' not in res:
|
||||
return
|
||||
# check if it has event
|
||||
if 'event' not in res:
|
||||
return
|
||||
|
||||
# check if event is driver_location
|
||||
if res['event'] != 'driver_location':
|
||||
return
|
||||
# check if event is driver_location
|
||||
if res['event'] != 'driver_location':
|
||||
return
|
||||
|
||||
# update our redis key
|
||||
key = 'location_%s' % sess_id
|
||||
print "setting %s" % key
|
||||
redis_conn.setex(key, 1600, message.payload)
|
||||
# save the longitude and latitude
|
||||
# get the rider id from sess_id
|
||||
rider_key = "rider.location.%s" % sess_id
|
||||
rider_long = str(res['longitude'])
|
||||
rider_lat = str(res['latitude'])
|
||||
|
||||
# set the location
|
||||
redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat})
|
||||
|
||||
|
||||
def getRedis(i):
|
||||
r = redis.StrictRedis(host='localhost', port=6379, db=0)
|
||||
while 1:
|
||||
time.sleep(0)
|
||||
data = r.brpop("events", 10)
|
||||
if data:
|
||||
info = data[1].split('|')
|
||||
print "Channel: " + info[0] + " message: " + info[1]
|
||||
client.publish(info[0], info[1])
|
||||
|
||||
|
||||
def sigint_handler(signal, frame):
|
||||
print 'Interrupted'
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
|
||||
client = mqtt.Client()
|
||||
client.on_connect = on_connect
|
||||
# client.on_publish = on_publish
|
||||
client.on_message = on_message
|
||||
|
||||
redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
|
||||
|
||||
|
||||
#client.tls_set(
|
||||
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||
# tls_version=ssl.PROTOCOL_TLSv1)
|
||||
client.tls_set(
|
||||
"/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||
tls_version=ssl.PROTOCOL_TLSv1)
|
||||
client.connect("resqaws.jankstudio.com", 8883, 60)
|
||||
|
||||
|
||||
#t = Thread(target=getRedis, args=(1,))
|
||||
|
||||
#t.start()
|
||||
|
||||
#signal.signal(signal.SIGINT, sigint_handler)
|
||||
client.loop_forever()
|
||||
# update our redis key
|
||||
key = 'location_%s' % sess_id
|
||||
#print "setting %s" % key
|
||||
redis_conn.setex(key, 1600, message.payload)
|
||||
|
||||
|
|
|
|||
30
utils/rider_location_cache/rider_location_daemon.py
Normal file
30
utils/rider_location_cache/rider_location_daemon.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import sys
|
||||
import time
|
||||
from daemon import Daemon
|
||||
import rider_location_cache
|
||||
|
||||
class RiderLocationDaemon(Daemon):
|
||||
def run(self):
|
||||
rider_location = rider_location_cache.RiderLocationCache()
|
||||
rider_location.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
daemon = RiderLocationDaemon('/tmp/rider_location_daemon.pid')
|
||||
if len(sys.argv) == 2:
|
||||
if 'start' == sys.argv[1]:
|
||||
daemon.start()
|
||||
elif 'stop' == sys.argv[1]:
|
||||
daemon.stop()
|
||||
elif 'restart' == sys.argv[1]:
|
||||
daemon.restart()
|
||||
elif 'foreground' == sys.argv[1]:
|
||||
daemon.run()
|
||||
else:
|
||||
print "Unknown command"
|
||||
sys.exit(2)
|
||||
sys.exit(0)
|
||||
else:
|
||||
print "usage: %s start|stop|restart" % sys.argv[0]
|
||||
sys.exit(2)
|
||||
Loading…
Reference in a new issue