Issue #3024: Add script to create rejected_net_usage on MongoDB

Signed-off-by: Giovanni Venancio De Souza's avatarGiovanni Venâncio de Souza <gvs11ufpr@gmail.com>
parent bb535142
This script gets all information from rejected_net_usage table in PostgreSQL and creates a schema to be inserted on MongoDB
[postgresql]
user = user
password = password
dbname = database
host = host
port = port
[mongodb]
host = host
port = port
database = database
collection = collection
[misc]
base_date = 2014-02-05
date_exceptions = 2015-05-07, 2015-05-08
#!/usr/bin/env python
import sys
import psycopg2
import datetime
import ConfigParser
from utils import get_pg_conn, get_mg_conn
# Get config
configFile = sys.argv[1]
config = ConfigParser.ConfigParser()
config.read(configFile)
# Create PostgreSQL connection string
pg_conn = get_pg_conn(config.items('postgresql'))
# Get base date to get info about rejected_net_usage
base_date = config.get('misc', 'base_date')
def get_date_info(contact_dates, pipe):
"""
For each day in contact_dates, get all rows
from rejected_net_usage
"""
# Connect with PostgreSQL to get information about points
pg_date_conn = psycopg2.connect(pg_conn)
day_cur = pg_date_conn.cursor()
dates_count = len(contact_dates)
for day in contact_dates:
print "%d days remaining" % dates_count
print "[%s] Getting information from day %s" % \
(str(datetime.datetime.now())[:19], day[0])
columns = [
"contact_date",
"id_point",
"id_ent_net",
"macaddr",
"collect_date",
"collect_time",
"down_bytes",
"down_packages",
"up_bytes",
"up_packages",
"ip",
"id_city",
"city_code",
"rejected_reason"
]
query = (
"""
SELECT
%s
FROM
rejected_net_usage
WHERE
contact_date = '%s'
"""
) % (','.join(columns), day[0])
day_cur.execute(query)
rows = day_cur.fetchmany(100000)
dates_count -= 1
if not rows:
continue
while True:
if not rows:
pipe.send('day_finished')
break
pipe.send(rows)
rows = day_cur.fetchmany(100000)
# Close PostgreSQL connection
pg_date_conn.close()
# When finished, signal parent process
pipe.send('done')
def insert_on_mongodb(pipe):
"""
Insert docs on MongoDB
"""
# Connect with MongoDB
mg_conn, rejected_net_usage = get_mg_conn(config)
while True:
docs = pipe.recv()
if docs == 'done':
break
rejected_net_usage.insert_many(docs, ordered=False)
# Close MongoDB connection
mg_conn.close()
# When finished, signal parent process
pipe.send('done')
#!/usr/bin/env python
import sys
import psycopg2
import ConfigParser
import datetime
from multiprocessing import Process, Pipe
from utils import *
from process_functions import *
if len(sys.argv) != 2:
print "usage: %s <database.cfg>" % sys.argv[0]
exit(1)
configFile = sys.argv[1]
# Get PostgreSQL config
config = ConfigParser.ConfigParser()
config.read(configFile)
# Connect with PostgreSQL
pg_conn = psycopg2.connect(get_pg_conn(config.items('postgresql')))
pg_cur = pg_conn.cursor()
# Get exceptions dates that need to be reinserted
date_exceptions = config.get('misc', 'date_exceptions').split(',')
# Remove exception dates
if date_exceptions != ['']:
mg_conn, rejected_net_usage = get_mg_conn(config)
for date in date_exceptions:
dt = datetime.datetime(*[int(i) for i in date.split('-')])
rejected_net_usage.delete_many({'metadata.contact': { "$eq": dt }})
mg_conn.close()
# Get contact dates
#pg_cur.execute("SELECT DISTINCT contact_date FROM rejected_net_usage")
contact_dates = pg_cur.fetchall()
# Create Pipes
parent_date_pipe, date_pipe = Pipe()
parent_mongo_pipe, mongo_pipe = Pipe()
# Create Process
pg_reader = Process(target = get_date_info, args = (contact_dates, date_pipe))
mg_insert = Process(target = insert_on_mongodb, args = (mongo_pipe,))
pg_reader.start()
mg_insert.start()
docs = {}
while True:
day_rows = parent_date_pipe.recv()
if day_rows == 'day_finished':
# Do something with docs.values()
parent_mongo_pipe.send(docs.values())
docs = {}
continue
if day_rows == 'done':
break
date = str(day_rows[0][0])
print "[%s] Processing information from day %s" % \
(str(datetime.datetime.now())[:19], date)
# Process all rows from a day
for r_row in day_rows:
row = {
'contact_date': create_date(r_row[0]),
'id_point': r_row[1],
'id_ent_net': r_row[2],
'macaddr': r_row[3],
'collect_date': create_date(r_row[4]),
'collect_time': r_row[5],
'down_bytes': r_row[6],
'down_packages': r_row[7],
'up_bytes': r_row[8],
'up_packages': r_row[9],
'ip': r_row[10],
'id_city': r_row[11],
'city_code': r_row[12],
'rejected_reason': r_row[13]
}
if row['ip']:
collect_by = row['ip']
elif row['macaddr']:
collect_by = row['macaddr']
elif row['id_point']:
collect_by = row['id_point']
key = '_'.join([collect_by, date])
# Create document with metadata
if key not in docs:
docs[key] = create_document(row)
# Update document with info received
update_document(docs, key, row)
parent_mongo_pipe.send('done')
# Kill process
pg_reader.terminate()
# Wait for MongoDB insertions to finish, then kill the process
parent_mongo_pipe.recv()
mg_insert.terminate()
exit(0)
#!/usr/bin/env python
import datetime
from pymongo import MongoClient
def get_pg_conn(config):
"""
Return a PostgreSQL connection string
"""
conn = ''
for attr in config:
conn += '='.join(attr) + " "
return conn
def get_mg_conn(config):
"""
Return a MongoDB connection and collection
"""
conn = []
mg_host = config.get('mongodb', 'host')
mg_port = config.getint('mongodb', 'port')
mg_database = config.get('mongodb', 'database')
mg_collection = config.get('mongodb', 'collection')
# Connect with MongoDB
mg_conn = MongoClient(mg_host, mg_port)
# Get database
db = mg_conn[mg_database]
# Get collection
collection = db[mg_collection]
return [mg_conn, collection]
def interval_to_minutes(interval):
"""
Used to build every five minutes of a hour
"""
interval *= 5
interval = str(interval)
if len(interval) == 1:
return '0' + interval
else:
return interval
def interval_to_hour(interval):
"""
Used to build hourly document
"""
if len(str(interval)) == 1:
return '0' + str(interval)
else:
return str(interval)
def round_time(dt):
"""
Round time
"""
return datetime.time(
dt.hour,
5 * (dt.minute // 5)
)
def create_date(date):
"""
PyMongo doesn't support datetime.date instances
convert it to datetime.datetime then
"""
return datetime.datetime(*(date.timetuple()[:6]))
def create_five_min_document():
"""
Create base document
"""
return dict((interval_to_minutes(i), None) for i in range(12))
def create_info_doc(row):
"""
Create doc with info about net usage and rejected reason
"""
return {
"reason": row['rejected_reason'],
"city_code": row['city_code'],
"net": {
"dn_B": row['down_bytes'],
"dn_pkg": row['down_packages'],
"up_B": row['up_bytes'],
"up_pkg": row['up_packages']
}
}
def create_document(row):
"""
Create base document
"""
new_doc = dict((interval_to_hour(i), create_five_min_document()) for i in range(24))
new_doc['metadata'] = {
'contact': row['contact_date'],
'collect': row['collect_date'],
'id_point': row['id_point'],
"id_ent_net": row['id_ent_net'],
'id_city': row['id_city'],
"ip": row['ip'],
"macaddr": row['macaddr']
}
return new_doc
def update_document(docs, key, row):
"""
Update document with net usage and rejected reason
"""
# Some points have collect time outside 'every five min pattern', round them
if row['collect_time'].minute % 5:
time = str(round_time(row['collect_time']))[:5]
else:
time = str(row['collect_time'])[:5]
hour = str(time)[:2]
minute = str(time)[3:5]
if not docs[key][hour][minute]:
docs[key][hour][minute] = []
docs[key][hour][minute].append(create_info_doc(row))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment