Update tools

parent 2020d098
Pipeline #18664 failed with stage
in 9 seconds
[run]
include=database/*
......@@ -2,4 +2,6 @@ env/
__pycache__
*.pyc
pairing/
.cache
.coverage
.vscode
image: ubuntu:17.10
stages:
- test
run_tests:
stage: test
script:
- apt-get update && apt-get install -y python3 python3-pip
- python3 -m pip install -r requirements.txt
- python3 -m coverage run -m py.test && python3 -m coverage report
tags:
- regular
- ubuntu
......@@ -357,7 +357,7 @@ max-locals=15
max-parents=7
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
max-public-methods=25
# Maximum number of return / yield for function / method body
max-returns=6
......
# HOTMapper #
This repository implements the HOTMapper, a tool that allows the user to manage his historical data using a mapping protocol
This respository was created in order to make available the HOTMapper, a tool that allows the user to manage his historical data using a mapping protocol for demonstration purposes for the EDBT 2019.
## Data ##
......@@ -8,6 +8,8 @@ The dataset "Matrícula" can be found at the link: [INEP](http://portal.inep.gov
The dataset "Local Oferta" can be found in the same link, but at the section "Censo da Educação Superior". Additionaly for increase the convenience, all data from "Local Oferta" is in the directory open_data.
**NOTE**: It's important that you ta
## Requirements ##
* Python 3 (It's recommended that you use a virtual environment, such as virtualenv)
......
# Administrador de base de dados SimCAQ/SMPPIR #
Esse repositório implementa a classe DatabaseTable e funções para verificar pareamento entre
diferentes anos inseridos no banco de dados. A ferramenta é desenvolvida em Python 3, e usa
como base arquivos de mapeamento em formato CSV.
Para a utilização a partir da linha de comando, a CLI manage.py pode ser utilizada sem
que se invoque manualmente as funções a partir da linha de comando Python.
## Requisitos ##
O utilitário foi desenvolvido em Python 3 usando a biblioteca SQLAlchemy com vistas ao banco
de dados MonetDB. Versões futuras podem ter modificações visando a compatibilidade com outros
bancos de dados, aproveitando as capacidades da biblioteca base.
Para a instalação dos requisitos conforme usados durante o desenvolvimento, o arquivo
requirements.txt pode ser usado como base (Recomenda-se o uso de um ambiente virtual).
```bash
(env) $ pip install -r requirements.txt
```
A CLI depende do módulo manage.py. Demais dependências serão listadas a seguir.
### Requisitos para a interface com a base de dados ###
* pymonetdb
* SQLAlchemy
* sqlalchemy-monetdb
### Requisitos para geração de pareamentos ###
* numpy
* pandas
* xlrd
* XlsxWriter
## Interface de linha de comando ##
A invocação da CLI utiliza o padrão do pacote manage.py, que é:
```bash
$ python manage.py [commando] [argumentos posicionais] [argumentos opcionais com valor]
```
Os comandos já implementados são:
* create: Cria a tabela conforme definido no protocolo de mapeamento.
```bash
$ python manage.py create <nome da tabela>
```
O único argumento usado é o nome da tabela. O script procurará por um protocolo de
mapeamento com o mesmo nome para a busca do esquema das colunas.
* insert: insere um arquivo de dados em formato CSV ou similar em uma tabela existente.
```bash
$ python manage.py insert <caminho para o arquivo> <nome da tabela> <ano> [--sep separador] [--null valor_nulo]
```
O caminho para o arquivo deve ser absoluto. A tabela utilizada deve existir e estar
sincronizada com o protocolo de mapeamento correspondente. O separador padrão utilizado
é ponto e vírgula (';'); caso outros separadores sejam utilizados pelo arquivo fonte,
devem ser especificados com --sep (por exemplo --sep \\| para pipe). O valor nulo padrão
é string vazia. Caso outro valor seja usado, deve ser especificado com --null.
* drop: derruba uma tabela do banco de dados.
```bash
$ python manage.py drop <nome da tabela>
```
O comando não contorna chaves estrangeiras que apontem para a tabela, e o banco de dados
pode retornar um erro caso exista alguma.
* remap: sincroniza uma tabela com o protocolo de mapeamento.
```bash
$ python manage.py remap <nome da tabela>
```
Esse comando deve ser utilizado sempre que um protocolo de mapeamento for atualizado.
O remapeamento permite a criação de novas colunas, derrubada de colunas existentes,
renomeamento de colunas e mudança de tipo. Dependendo do tamanho da tabela, o uso de
memória primária pode ser intenso.
* generate_pairing_report: gera relatórios de pareamento para comparação de dados ano
a ano.
```bash
$ python manage.py generate_pairing_report [--output xlsx|csv]
```
Os relatórios são criados na pasta pairing. Caso o formato não seja especificado,
csv será utilizado (um arquivo será criado para cada tabela). Caso xlsx seja o formato
utilizado, um arquivo será criado com todas as tabelas separadas em diferentes planilhas.
* generate_backup: Cria/Atualiza o arquivo monitorado para o backup.
```bash
$ python manage.py generate_backup
```
O arquivo é criado ou atualizado na máquina onde o banco de dados da produção está,
o procedimento de backup da equipe de infraestrutura o monitora para realizar o procedimento.
# Script para criar o Banco de Dados e adicionar entradas #
O script auto.sh pode ser utilizado para criar as tabelas base do banco de dados, como
também as tabelas dos protocolos de mapeamento e inserir dados nelas. O objetivo do script
é facilitar a criação do banco para os desenvolvedores.
O script apresenta um texto de ajuda quando é executado sem parâmetros. Para o funcionamento
correto é necessário seguir o padrão dos parâmetros para cada comando.
1. Observação: É importante verificar o arquivo de configurações para verificar o nome do
banco que será utilizado pelo script.
2. Observação: A execução dos comandos da biblioteca não são interrompidos por erros.
Exemplo de execução:
```bash
$ ./auto.sh all testdb /home/username/Documents/c3sl/datafiles/ 2016 2016
```
#!/bin/bash
# ---------------------------------------------------------------------------------------#
# Esse script tem como objetivo facilitar a criação do banco de dados do projeto SIMCAQ,
# conforme a necessidade dos desenvolvedores. O código é livre para modificações contanto
# que os que utilizam o script sejam notificados das mudanças decorrentes.
# ---------------------------------------------------------------------------------------#
# ---------------------------------------------------------------------------------------#
# Função para criar as tabelas que são consideradas bases para o banco de dados
# ---------------------------------------------------------------------------------------#
fBase ()
{
mclient -d $1 base/regiao.sql
mclient -d $1 base/estado.sql
mclient -d $1 base/municipio.sql
mclient -d $1 base/siope_uf.sql
mclient -d $1 base/siope_mun.sql
mclient -d $1 base/siope_mun_seed.sql
mclient -d $1 base/instituicao_superior.sql
mclient -d $1 base/formacao_superior.sql
mclient -d $1 base/formacao_superior_seed.sql
mclient -d $1 base/ibge_pib.sql
}
# ---------------------------------------------------------------------------------------#
# ---------------------------------------------------------------------------------------#
# Função para criar as tabelas a partir dos protocolos de mapeamento
# ---------------------------------------------------------------------------------------#
fCreate ()
{
./manage.py create escola
./manage.py create turma
./manage.py create matricula
./manage.py create docente
}
# ---------------------------------------------------------------------------------------#
# ---------------------------------------------------------------------------------------#
# Função para inserir dados nas tabelas criadas a partir dos protocolos de mapeamento
# ---------------------------------------------------------------------------------------#
fInsert()
{
local alpha="$2"
while [ "$alpha" -le "$3" ]; do
./manage.py insert $1${alpha}_ESCOLAS.CSV escola $alpha --sep=\|
./manage.py insert $1${alpha}_TURMAS.CSV turma $alpha --sep=\|
./manage.py insert $1${alpha}_DOCENTES_CO.CSV docente $alpha --sep=\|
./manage.py insert $1${alpha}_DOCENTES_NORTE.CSV docente $alpha --sep=\|
./manage.py insert $1${alpha}_DOCENTES_NORDESTE.CSV docente $alpha --sep=\|
./manage.py insert $1${alpha}_DOCENTES_SUDESTE.CSV docente $alpha --sep=\|
./manage.py insert $1${alpha}_DOCENTES_SUL.CSV docente $alpha --sep=\|
./manage.py insert $1${alpha}_MATRICULA_CO.CSV matricula $alpha --sep=\|
./manage.py insert $1${alpha}_MATRICULA_NORTE.CSV matricula $alpha --sep=\|
./manage.py insert $1${alpha}_MATRICULA_NORDESTE.CSV matricula $alpha --sep=\|
./manage.py insert $1${alpha}_MATRICULA_SUDESTE.CSV matricula $alpha --sep=\|
./manage.py insert $1${alpha}_MATRICULA_SUL.CSV matricula $alpha --sep=\|
alpha=$(($alpha + 1))
done
}
# ---------------------------------------------------------------------------------------#
# ---------------------------------------------------------------------------------------#
# Retorna uma ajuda caso não haja parâmetros de entrada
# ---------------------------------------------------------------------------------------#
if [ ! $1 ]; then
printf "\n# WARNING: Don't forget to check the settings file for the database name.\n"
printf "\n# This script has 4 commands:\n"
printf "# 1. all: execute all commands to create the database and insert data.\n"
printf "# 2. base: execute the commands to create de base tables.\n"
printf "# 3. create: execute the commands to create the tables.\n"
printf "# 4. insert: execute the commands to insert data to tables.\n\n"
printf "# Estructure of commands:\n"
printf "# 1. ./auto.sh all [database_name] [path_to_files] [initial_year]"
printf " [final_year]\n"
printf "# 2. ./auto.sh base [database_name]\n"
printf "# 3. ./auto.sh create\n"
printf "# 4. ./auto.sh insert [path_to_files] [initial_year] [final_year]\n\n"
exit 0;
fi
# ---------------------------------------------------------------------------------------#
# ---------------------------------------------------------------------------------------#
# Execução do script conforme os comandos passados
# ---------------------------------------------------------------------------------------#
source ./env/bin/activate
if [ $? = 0 ]; then
printf "\n# Environment activated!\n"
if [ "$1" = 'all' ]; then
if [ $2 ] && [ $3 ] && [ $4 ] && [ $5 ]; then
printf "\n# Initializing the creation of base tables (may need database"
printf " password)...\n"
sleep 1
fBase "$2"
printf "\n# Initializing the creation of mapping tables...\n"
sleep 1
fCreate
printf "\n# Initializing the insertion of data, this may take a while...\n"
sleep 2
fInsert "$3" "$4" "$5"
sleep 1
else
printf "# ERROR: Missing parameters!\n"
exit -1;
fi
elif [ "$1" = 'base' ]; then
if [ $2 ]; then
printf "\n# Initializing the creation of base tables (may need database"
printf " password)...\n"
sleep 1
fBase "$2"
sleep 1
else
printf "# ERROR: Missing parameters!\n"
exit -1;
fi
elif [ "$1" = 'create' ]; then
printf "\n# Initializing the creation of tables...\n"
sleep 1
fCreate
sleep 1
elif [ "$1" = 'insert' ]; then
if [ $2 ] && [ $3 ] && [ $4 ]; then
printf "\n# Initializing the insertion of data, this may take a while...\n"
sleep 2
fInsert "$2" "$3" "$4"
sleep 1
else
printf "# ERROR: Missing parameters!\n"
exit -1;
fi
else
printf "\n# ERROR: Missing parameters!\n"
deactivate
printf "\n# Environment deactivated!\n"
printf "# Terminating...\n"
sleep 1
exit -1;
fi
deactivate
printf "\n# Environment deactivated!\n"
printf "\n# All done! Terminating...\n"
sleep 1
exit 0;
else
printf "# ERROR: can't find the directory for environment!\n"
exit -1;
fi
'''Database manipulation actions - these can be used as models for other modules.'''
import logging
from sqlalchemy import create_engine, MetaData
from database.database_table import gen_data_table, gen_temporary, copy_to_temporary
from mapping_functions import generate_pairing_xlsx, generate_pairing_csv
from os import chdir
from datetime import datetime
from database.base import MissingTableError
from database.database_table import gen_data_table, copy_tabbed_to_csv
import settings
ENGINE = create_engine(settings.DATABASE_URI, echo=settings.ECHO)
META = MetaData(bind=ENGINE)
......@@ -14,32 +14,36 @@ logging.basicConfig(format = settings.LOGGING_FORMAT)
database_table_logger = logging.getLogger('database.database_table')
database_table_logger.setLevel(settings.LOGGING_LEVEL)
protocol_logger = logging.getLogger('database.protocol')
protocol_logger.setLevel(settings.LOGGING_LEVEL)
sqlalchemy_logger = logging.getLogger('sqlalchemy.engine')
sqlalchemy_logger.setLevel(settings.LOGGING_LEVEL)
def temporary_data(connection, file_name, table, year, offset=2, sep=';', null=''):
header = open(file_name, encoding="ISO-8859-9").readline()
header = header.split(sep)
columns = table.mount_original_columns(header, year)
ttable = gen_temporary('t_' + table.name, META, *columns)
table.set_temporary_primary_keys(ttable, year)
def temporary_data(connection, file_name, table, year, offset=2,
delimiters=[';', '\\n', '"'], null=''):
header = open(file_name, encoding="ISO-8859-9").readline().strip()
header = header.split(delimiters[0])
ttable = table.get_temporary(header, year)
ttable.create(bind=connection)
copy_to_temporary(connection, file_name, ttable, offset, sep, null)
return ttable
table.populate_temporary(ttable, file_name, header, year, delimiters, null, offset, bind=connection)
table.apply_derivatives(ttable, ttable.columns.keys(), year, bind=connection)
return ttable
def insert(file_name, table, year, offset=2, sep=';', null=''):
def insert(file_name, table, year, offset=2, delimiters=[';', '\\n', '"'], null='', notifybackup=None):
'''Inserts contents of csv in file_name in table using year as index for mapping'''
table = gen_data_table(table, META)
table.map_from_database()
if not table.exists():
raise MissingTableError(table.name)
with ENGINE.connect() as connection:
trans = connection.begin()
ttable = temporary_data(connection, file_name, table, year, offset, sep, null)
table.insert_from_temporary(connection, ttable, year)
ttable = temporary_data(connection, file_name, table, year, offset, delimiters, null)
table.insert_from_temporary(ttable, bind=connection)
trans.commit()
......@@ -47,7 +51,12 @@ def create(table):
'''Creates table from mapping_protocol metadata'''
table = gen_data_table(table, META)
table.create()
with ENGINE.connect() as connection:
trans = connection.begin()
table.create(bind=connection)
table.set_source(bind=connection)
table.create_mapping_table(bind=connection)
trans.commit()
def drop(table):
'''Drops table'''
......@@ -58,31 +67,55 @@ def drop(table):
def remap(table):
'''Applies change made in mapping protocols to database'''
table = gen_data_table(table, META)
table.map_from_database()
table.remap()
def generate_pairing_report(output='csv'):
'''Generates the pairing report for a given table'''
if output == 'csv':
generate_pairing_csv(ENGINE)
elif output == 'xlsx':
generate_pairing_xlsx(ENGINE)
else:
print('Unsuported output type "{}"'.format(output))
def update_from_file(csv_file, table, year, columns=None, target_list=None,
offset=2, sep=';', null=''):
def csv_from_tabbed(table_name, input_file, output_file, year, sep=';'):
table = gen_data_table(table_name, META)
protocol = table.get_protocol()
column_names, column_mappings = protocol.get_tabbed_mapping(year)
copy_tabbed_to_csv(input_file, column_mappings, settings.CHUNK_SIZE, output_file,
column_names=column_names, sep=sep)
def update_from_file(file_name, table, year, columns=None, target_list=None,
offset=2, delimiters=[';', '\\n', '"'], null=''):
'''Updates table columns from an input csv file'''
table = gen_data_table(table, META)
table.map_from_database()
if not table.exists():
raise MissingTableError(table.name)
if columns is None:
columns = []
columns = columns + table.columns_from_targets(target_list)
with ENGINE.connect() as connection:
trans = connection.begin()
ttable = temporary_data(connection, csv_file, table, year, offset, sep, null)
table.update_from_temporary(connection, ttable, year, columns)
ttable = temporary_data(connection, file_name, table, year, offset, delimiters, null)
table.update_from_temporary(ttable, columns, bind=connection)
trans.commit()
def run_aggregations(table, year):
'''
Runs aggregation queries from protocol
'''
table = gen_data_table(table, META)
table.map_from_database()
with ENGINE.connect() as connection:
trans = connection.begin()
table.run_aggregations(year, bind=connection)
trans.commit()
def generate_backup():
'''Create/Recriate file monitored by backup script in production'''
chdir(settings.BACKUP_FOLDER)
f = open(settings.BACKUP_FILE,"w")
f.write(str(datetime.now()))
f.close()
'''Module containing base declarations'''
class DatabaseError(Exception):
'''Base class for errors in database manipulation'''
pass
class DatabaseColumnError(DatabaseError):
'''This exception should be raised if the program tries to access a columns
that doesn't belong to a table object'''
def __init__(self, column_name):
self.column_name = self.message = column_name
super().__init__(column_name)
class DatabaseMappingError(DatabaseError):
'''This exception should be raised if some table mapping can't be done'''
pass
class MissingProtocolError(DatabaseError):
'''This exception should be raised if the program tries to use methods that
requires a protocol while there is none loaded'''
pass
class MissingForeignKeyError(DatabaseError):
'''This exception should be raised if an expected foreign key is not found.'''
def __init__(self, referred_table=None):
self.referred_table = referred_table
super().__init__(referred_table)
class MissingTableError(DatabaseError):
'''This exception should be raised if an expected table doesn't exist.'''
def __init__(self, table=None):
self.table = table
super().__init__(table)
class ProtocolError(Exception):
'''Base class for errors in protocols'''
pass
class InvalidTargetError(ProtocolError):
'''This exception should be raised if calls to a protocol require an invalid
target - either wrong syntax or non existing target'''
def __init__(self, target_name):
self.target = self.message = target_name
super().__init__(target_name)
class DuplicateColumnNameError(ProtocolError):
'''This exception should be raised if a column name is repeated throughout
the protocol'''
def __init__(self, column_name):
self.name = self.message = column_name
super().__init__(column_name)
class CircularReferenceError(ProtocolError):
'''
This exception should be raised if a derivative variable or group of variables
imply on a circular dependency tree.
'''
def __init__(self, target_name):
self.target_name = target_name
super().__init__(target_name)
This diff is collapsed.
......@@ -3,18 +3,24 @@ Names comonly used:
- original columns: columns as they are named in the original database;
- target columns: columns as named internaly in project;
- dbcolumns: columns as named in database.'''
import logging
import pandas as pd
from database.base import InvalidTargetError, DuplicateColumnNameError
logger = logging.getLogger(__name__)
standard_columns = {
'description': 'Novo Rótulo',
'target_name': 'Var.Lab',
'standard_name': 'Rot.Padrão',
'database_name': 'Nome Banco',
'data_type': 'Tipo de Dado'
'data_type': 'Tipo de Dado',
'temporary_column': 'Coluna temporária'
}
class Protocol():
class Protocol(object):
''' Protocol for table translation'''
def __init__(self, in_file=None, columns=None):
self._dataframe = None
......@@ -73,23 +79,69 @@ class Protocol():
if not indexes:
return None
if len(indexes) > 1:
return None
raise DuplicateColumnNameError(name)
return self._remaped[indexes[0]]
def get_temporary_columns(self, year):
dataframe = self._dataframe
indexes = dataframe[dataframe[standard_columns['temporary_column']] == 1].index.tolist()
df = dataframe.iloc[indexes][[
standard_columns['database_name'],
standard_columns['data_type'],
year]
]
return [l[1].tolist() for l in df.iterrows()]
def dbcolumn_from_target(self, name):
'''Gets database column from a target column name. Ouput is a list
with the column name, type and comment contents.
with the column name and type contents.
Input example: **{'name': 'CEBMA015N0'}
output could look like ['cor_raca_id', 'TINYINT', 'Cor/raça', 'TP_COR_RACA'] '''
output could look like ['cor_raca_id', 'TINYINT'] '''
indexes = self._dataframe[self._remaped == name].index.tolist()
if not indexes or len(indexes) > 1:
return [None, None, None, None]
comment = self._dataframe[standard_columns['description']][indexes[0]].strip()
standard = self._dataframe[standard_columns['standard_name']][indexes[0]].strip()
if len(indexes) > 1:
indexes = None
if indexes:
try:
is_temp = self._dataframe[standard_columns['temporary_column']]
is_temp = is_temp[indexes[0]]
except KeyError:
logger.warning("Protocol doesn't have temporary identifier")
is_temp = None
if not indexes:
raise InvalidTargetError(name)
column_name = self._dataframe[standard_columns['database_name']][indexes[0]].strip()
column_type = self._dataframe[standard_columns['data_type']][indexes[0]].strip()
return [column_name, column_type, comment, standard]
if not column_name or not column_type:
raise InvalidTargetError(name)
return [column_name, column_type]
def get_comment(self, target):
indexes = self._dataframe[self._remaped == target].index.tolist()
if len(indexes) > 1:
indexes = None
if indexes:
try:
is_temp = self._dataframe[standard_columns['temporary_column']]
is_temp = is_temp[indexes[0]]
except KeyError:
logger.warning("Protocol doesn't have temporary identifier")
is_temp = None
if not indexes or is_temp:
raise InvalidTargetError(target)
comment = self._dataframe[standard_columns['description']][indexes[0]]
return comment
def get_tabbed_mapping(self, year):
column_names = list(self._dataframe[self._dataframe['p0' + year] != ''][year])
column_mappings = [list(c) for _, c in
self._dataframe[['p0' + year, 'pf' + year]].iterrows() if bool(c[0])]
return column_names, column_mappings
def remap_from_protocol(self, new_protocol, column_list, reference_year='2015'):
'''Method to update a mapping protocol from another file'''
......
......@@ -20,6 +20,16 @@ def compile_double(element, compiler, **kwargs):
by default'''
return 'DOUBLE'
@compiles(TINYINT, 'mysql')
def compile_tinyint(element, compiler, **kwargs):
return 'SMALLINT'
@compiles(DOUBLE_PRECISION, 'mysql')
def compile_double(element, compiler, **kwargs):
'''Translation for double - not sure if implemented in sqlalchemy_monetdb
by default'''
return 'FLOAT'
def get_type(in_string):
'''Returns a remapped type object for a given type string'''
in_string = in_string.lower()
......
'''Generates schema in mysql dialect. Useful for documentation'''
from sqlalchemy import create_engine, MetaData, inspect
from sqlalchemy.dialects import mysql
from sqlalchemy.schema import CreateTable
from database.database_table import DatabaseTable
import settings
engine = create_engine(settings.DATABASE_URI, echo=settings.ECHO)
meta = MetaData(bind=engine)
insp = inspect(engine)
table_list = insp.get_table_names()
for table_name in table_list:
if table_name in [t.name for t in meta.sorted_tables]:
continue
table = DatabaseTable(table_name, meta)
table.map_from_database()
to_output = [table for table in meta.sorted_tables if not table.name.startswith('mapping_')]
for table in to_output:
print(CreateTable(table).compile(dialect=mysql.dialect()).string, end=';')
......@@ -4,14 +4,14 @@ from manager import Manager
import database.actions
manager = Manager()
@manager.command
def insert(csv_file, table, year, sep=';', null=''):
def insert(csv_file, table, year, sep=';', null='',notifybackup=None):
'''Inserts file in table using a year as index'''
database.actions.insert(csv_file, table, year, sep=sep, null=null)
database.actions.insert(csv_file, table, year, delimiters=[sep, '\\n', '"'], null=null)
if notifybackup:
database.actions.generate_backup()
@manager.command
def create(table):
'''Creates table using mapping protocols'''
......@@ -24,14 +24,9 @@ def drop(table):
@manager.command
def remap(table):
'''TODO'''
'''Restructures a table to match the mapping protocol.'''
database.actions.remap(table)
@manager.command
def generate_pairing_report(output='csv'):
'''In progress'''
database.actions.generate_pairing_report(output)
@manager.command
def update_from_file(csv_file, table, year, columns=None, target_list=None, offset=2, sep=';',
null=''):
......@@ -40,7 +35,25 @@ def update_from_file(csv_file, table, year, columns=None, target_list=None, offs
if target_list:
target_list = target_list.split(',')
database.actions.update_from_file(csv_file, table, year, columns=columns,
target_list=target_list, offset=offset, sep=sep, null=null)
target_list=target_list, offset=offset,
delimiters=[sep, '\\n', '"'], null=null)
@manager.command
def csv_from_tabbed(table_name, input_file, output_file, year, sep=';'):
database.actions.csv_from_tabbed(table_name, input_file, output_file, year, sep=';')