Commit f8434262 authored by Rafael Ravedutti's avatar Rafael Ravedutti

AGILE#326: Citizen upload using sidekiq and redis

Signed-off-by: Rafael Ravedutti's avatarrrlmachado <rrlmachado@inf.ufpr.br>
parent c4f9abde
Pipeline #15225 failed with stage
in 1 minute and 28 seconds
AGENDADOR_SECRET_KEY_BASE=6e58e1f0e806642a44a504672e22665606100838a0c80c8b3b6766f45f58c36818937256ca0ee01879a3f1873bf3b3069e0aae21a83cca3e06402f4ab8fdb3fd
AGENDADOR_API_DB_USER=agendador
AGENDADOR_API_DB_PASSWORD=123mudar
AGENDADOR_REDIS_PASSWORD=123mudar
AGENDADOR_RAILS_ENV=test
......@@ -41,6 +41,7 @@ RUN gem install rails -v 5.0.0 && \
RUN echo "#! /bin/bash" > /exec.sh &&\
echo "rm -f /app/tmp/pids/server.pid && " >> /exec.sh && \
echo "RAILS_ENV=development /app/bin/rake agendador:setup" >> /exec.sh && \
echo "RAILS_ENV=development /app/bin/bundle exec sidekiq -C config/sidekiq.yml >> log/sidekiq.log &" >> /exec.sh && \
echo "RAILS_ENV=development /app/bin/bundle exec rails s -p 3000 -b '0.0.0.0'" >> /exec.sh && \
chmod +x /exec.sh
......
......@@ -39,6 +39,9 @@ gem 'rack-attack', '~> 5.0.1'
# Use Rack CORS for handling Cross-Origin Resource Sharing (CORS), making cross-origin AJAX possible
gem 'rack-cors', '~> 0.4.1'
# Use Sidekiq for background jobs
gem 'sidekiq'
# Get Brazilian address by zipcode, directly from Correios database.
gem 'correios-cep', '~> 0.6.4'
......
module Api::V1
class CitizensController < ApplicationController
class CitizensController < ApplicationController
include Authenticable
include HasPolicies
require 'csv'
......@@ -48,21 +48,20 @@ module Api::V1
if path.nil?
render json: {
errors: ["User #{params[:id]} does have a picture."]
errors: ["User #{params[:id]} does not have a picture."]
}, status: 404
else
if not params[:size].nil?
path.sub!('original', params[:size])
end
send_file path,
type: @citizen.avatar_content_type,
send_file path,
type: @citizen.avatar_content_type,
disposition: 'inline'
end
end
end
# GET /citizen/1/schedule_options
def schedule_options
@citizen = Citizen.find_by(cpf: params[:cpf])
......@@ -155,94 +154,86 @@ module Api::V1
render json: @citizen.complete_info_response, status: :created
else
render json: {
errors: error_message
errors: error_message
}, status: 422
end
end
# POST /citizens/upload
def upload
citizens = params["data"]
# GET /citizens/upload_log/1
def get_upload_log
# Find uploads for current citizen
@upload = CitizenUpload.find(params[:upload_id])
columns = [:name, :cpf, :rg, :birth_date, :cep,
:address_number, :address_complement,
:phone1, :phone2, :email, :pcd, :note, :active]
if @upload.nil?
render json: {
errors: ["Upload task #{params[:upload_id]} does not exist."]
}, status: 404
complete = [:name, :cpf, :rg, :birth_date, :cep, :address_street,
:address_number, :neighborhood, :address_complement, :city_id,
:phone1, :phone2, :email, :pcd, :note, :active]
else
# Upload log path
path = @upload.log.path
account_columns = [:uid, :provider, :encrypted_password]
# If log not found, displays not found message
if path.nil?
render json: {
errors: ["Log not found for current task."]
}, status: 404
line_number = 1
errors = Hash.new
to_create = Array.new
account_to_create = Array.new
# Otherwise, send file
else
send_file path,
type: @upload.log_content_type,
disposition: 'inline'
end
end
end
citizens.each do |c|
upload_params = Hash[columns.zip(c)]
citizen = Citizen.new(upload_params)
# GET /citizens/upload
def get_uploads
# Current citizen id
citizen_id = current_user[0][:id]
account = Account.new({
uid: citizen.cpf,
provider: "cpf"
})
# Find uploads for current citizen
@uploads = CitizenUpload.where(citizen_id: citizen_id)
.order("created_at DESC")
account.password = citizen.birth_date.strftime('%d%m%y')
# Citizen remaining info is added when .valid? method is called
if citizen.valid? and account.valid?
# Add valid citizen with complete info to to_create array
inst = [
citizen.name,
citizen.cpf,
citizen.rg,
citizen.birth_date,
citizen.cep,
citizen.address_street,
citizen.address_number,
citizen.neighborhood,
citizen.address_complement,
citizen.city_id,
citizen.phone1,
citizen.phone2,
citizen.email,
citizen.pcd,
citizen.note,
citizen.active
]
acc_inst = [
account.uid,
account.provider,
account.encrypted_password
]
to_create.append(inst)
account_to_create.append(acc_inst)
else
errors[line_number.to_s] = citizen.errors.to_hash
end
# Render uploads in JSON format
render json: @uploads
end
line_number += 1
end
# POST /citizens/upload
def upload
# Current citizen id
citizen_id = current_user[0][:id]
# Data must be defined in the parameters
if params.has_key?(:data) and params[:data].present?
# Number of citizens to upload
upload_size = params[:data].size
# Create upload object
upload_object = CitizenUpload.new({
citizen_id: citizen_id,
status: 0, # ready to start
amount: upload_size,
progress: 0.0
})
Citizen.transaction do
Citizen.import complete, to_create, validate: true
end
# Save upload object in the database
upload_object.save()
Account.transaction do
Account.import account_columns, account_to_create, validate: true
end
# Create sidekiq job for uploading the citizens
CitizenUploadWorker.perform_async(
upload_object.id, upload_size, params[:data])
if errors.size == 0
render json: {
errors: ["Citizens uploaded successfully."]
errors: ["Citizens scheduled to be imported!"]
}, status: 201
else
render json: errors.as_json, status: 422
render json: {
errors: ["Undefined citizens to import."]
}, status: 404
end
end
......@@ -280,7 +271,7 @@ module Api::V1
return
end
# Deactivate citizen, this will keep the citizen in the database, but
# Deactivate citizen, this will keep the citizen in the database, but
# it will not be displayed in future requests
@citizen.active = false
......@@ -303,7 +294,7 @@ module Api::V1
end
end
# Only allow a trusted parameter "white list" through.
def citizen_params
params.require(:citizen).permit(
......
......@@ -12,7 +12,7 @@ module Api::V1
sector_ids = sectors.map { |row| row["id"] }
# ======================== Service Types ========================
service_types_resp = ServiceType.where(sector_id: sector_ids, active: true)
.as_json(only: [:description, :id, :sector_id])
......@@ -33,7 +33,7 @@ module Api::V1
st_ids[i.id.to_s] = i.service_type_ids
end
for i in service_places_resp
for i in service_places_resp
i["service_types"] = st_ids[i["id"].to_s]
end
......@@ -49,7 +49,7 @@ module Api::V1
response[:service_place] = service_places_resp
response[:situation] = situations
render json: response.as_json
render json: response.as_json
end
# GET /forms/citizen_index
......@@ -246,7 +246,7 @@ module Api::V1
response[:service_places] = ServicePlace.all.as_json(only: [:id, :name])
response[:permissions] = roles.as_json
response[:permissions] = roles.as_json
when "adm_prefeitura"
response[:occupations] = Occupation.where(city_hall_id: city_hall.id)
......@@ -255,16 +255,16 @@ module Api::V1
response[:service_places] = ServicePlace.where(city_hall_id: city_hall.id)
.as_json(only: [:id, :name])
response[:permissions] = roles.as_json[1..-1]
response[:permissions] = roles.as_json[1..-1]
when "adm_local"
response[:occupations] = Occupation.where(city_hall_id: city_hall.id)
.as_json(only: [:id, :name])
response[:service_places] = ProfessionalsServicePlace.find(current_user[1])
.service_place.as_json(only: [:id, :name])
.service_place.as_json(only: [:id, :name])
response[:permissions] = roles.as_json[1..-2]
response[:permissions] = roles.as_json[1..-2]
else
render json: {
......@@ -272,7 +272,7 @@ module Api::V1
}, status: 403
return
end
render json: response.as_json
end
......@@ -327,7 +327,7 @@ module Api::V1
response[:service_places] = ServicePlace.all_active.local_city_hall(city_hall.id)
.as_json(only: [:id, :name])
else
render json: {
......@@ -355,7 +355,7 @@ module Api::V1
when "adm_c3sl"
# Every service place
response[:service_places] = ServicePlace.all_active.as_json(
only: [:id, :name]
only: [:id, :name, :city_hall_id]
)
# Every professional
......@@ -392,7 +392,7 @@ module Api::V1
when "adm_local"
# Current service place
service_places = [ service_place ]
service_places = [ service_place ]
sp_ids = service_place.id
response[:service_places] = service_places.as_json(
only: [:id, :name]
......@@ -418,7 +418,7 @@ module Api::V1
}, status: 403
return
end
render json: response.as_json
end
......@@ -480,7 +480,7 @@ module Api::V1
when "adm_local"
# Current service place
service_places = [ service_place ]
service_places = [ service_place ]
sp_ids = service_place.id
response[:service_places] = service_places.as_json(
only: [:id, :name]
......@@ -506,7 +506,7 @@ module Api::V1
}, status: 403
return
end
render json: response.as_json
end
......@@ -615,7 +615,7 @@ module Api::V1
when "adm_local"
# Current service place
service_places = [ service_place ]
service_places = [ service_place ]
sp_ids = service_place.id
response[:service_places] = service_places.as_json(
only: [:id, :name]
......@@ -643,10 +643,10 @@ module Api::V1
}, status: 403
return
end
render json: response.as_json
end
# GET /forms/schedule_per_type_index
def schedule_per_type_index
citizen = current_user[0]
......
class CitizenUpload < ApplicationRecord
belongs_to :citizen
# Specify location where the log should be stored (default is public)
has_attached_file :log,
path: "/data/citizen_upload/:id/log.csv"
# Validates format of logs
validates_attachment_content_type :log,
:content_type => ["text/plain", "text/csv"]
end
class CitizenUploadSerializer < ActiveModel::Serializer
attributes :id,
:citizen_id,
:amount,
:status,
:status_string,
:progress,
:created_at,
:updated_at
def status_string
# Check if status is "Ready to start"
if object.status == 0
return "Ready to start"
# Check if status is "In progress"
elsif object.status == 1
return "In progress"
# Check if status is "Completed"
elsif object.status == 2
return "Completed"
# Check if status is "Completed with errors"
elsif object.status == 3
return "Completed with errors"
end
return "Undefined"
end
end
class CitizenUploadWorker
include Sidekiq::Worker
sidekiq_options :queue => :citizens_upload
def perform(upload_id, upload_size, citizens)
# Batch size for upload
batch_size = 100
# Batch counter
batch_counter = 0
# Columns for citizens
columns = [:name, :cpf, :rg, :birth_date, :cep,
:address_number, :address_complement,
:phone1, :phone2, :email, :pcd, :note, :active]
# Complete list of columns for citizens
complete = [:name, :cpf, :rg, :birth_date, :cep, :address_street,
:address_number, :neighborhood, :address_complement, :city_id,
:phone1, :phone2, :email, :pcd, :note, :active]
# Columns for accounts
account_columns = [:uid, :provider, :encrypted_password]
# Line number starts with one
line_number = 1
# Hash with errors
errors = Hash.new
# Buffer containing users to create
to_create = Array.new
# Buffer containing accounts to create
account_to_create = Array.new
# Update task status to in progress
CitizenUpload.update(
upload_id,
status: 1 # in progress
)
# Go through each citizen in the list
citizens.each do |c|
# Parameters for current line
upload_params = Hash[columns.zip(c)]
# Create citizen object with defined parameters
citizen = Citizen.new(upload_params)
# Create account object with defined parameters
account = Account.new({
uid: citizen.cpf,
provider: "cpf"
})
# Create default password for current citizen
account.password = citizen.birth_date.strftime('%d%m%y')
# Citizen remaining info is added when .valid? method is called
if citizen.valid? and account.valid?
# Add valid citizen with complete info to to_create array
inst = [
citizen.name,
citizen.cpf,
citizen.rg,
citizen.birth_date,
citizen.cep,
citizen.address_street,
citizen.address_number,
citizen.neighborhood,
citizen.address_complement,
citizen.city_id,
citizen.phone1,
citizen.phone2,
citizen.email,
citizen.pcd,
citizen.note,
citizen.active
]
# Add valid account with complete info to to_create array
acc_inst = [
account.uid,
account.provider,
account.encrypted_password
]
# Insert current citizen data to buffer of citizens to create
to_create.append(inst)
# Insert current account data to buffer of accounts to create
account_to_create.append(acc_inst)
else
# If there was an error, store it in the errors hash
errors[line_number.to_s] = citizen.errors.to_hash
end
# Increase bath counter
batch_counter += 1
# If already reach batch size limit
if batch_counter >= batch_size
# Import citizens of current batch to database
Citizen.transaction do
Citizen.import complete, to_create, validate: true
end
# Import accounts of current batch to database
Account.transaction do
Account.import account_columns, account_to_create, validate: true
end
# Update task progress
CitizenUpload.update(
upload_id,
progress: ((line_number - 1).to_f / upload_size.to_f) * 100.0
)
# Reset buffer containing citizens to create
to_create = Array.new
# Reset buffer containing accounts to create
account_to_create = Array.new
# Reset batch counter
batch_counter = 0
end
# Increase line number
line_number += 1
end
# If batch isn't empty
if batch_counter > 0
# Import remaining citizens to database
Citizen.transaction do
Citizen.import complete, to_create, validate: true
end
# Import remaining accounts to database
Account.transaction do
Account.import account_columns, account_to_create, validate: true
end
end
# New status to update
new_status = 2 # completed with no errors
# If there were errors, change status to completed with errors
if errors.size > 0
new_status = 3 # completed with errors
end
# Update upload object progress
CitizenUpload.update(
upload_id,
status: new_status,
progress: 100.0
)
# Initialize log content buffer
log_content = StringIO.new("Line,Error Message\n")
# Go through each error and write it in the log file
errors.each do |line, message|
log_content.puts "%d,%s\n" % [line, message]
end
# Create upload object to save log
upload_object = CitizenUpload.find(upload_id)
upload_object.log = log_content
upload_object.save
end
end
require 'sidekiq'
Sidekiq.configure_server do |config|
config.redis = {
url: 'redis://agendador-redis:6379',
password: ENV.fetch("AGENDADOR_REDIS_PASSWORD") { "123mudar" }
}
end
Sidekiq.configure_client do |config|
config.redis = {
url: 'redis://agendador-redis:6379',
password: ENV.fetch("AGENDADOR_REDIS_PASSWORD") { "123mudar" }
}
end
Rails.application.routes.draw do
devise_for :accounts
# require 'sidekiq/web'
# mount Sidekiq::Web => '/sidekiq'
scope module: 'api' do
namespace :v1 do
mount_devise_token_auth_for 'Account', at: 'auth', controllers: {
......@@ -11,6 +14,9 @@ Rails.application.routes.draw do
get "accounts/self" => "accounts#index"
get "citizens/schedule_options" => "citizens#schedule_options"
get "citizens/upload" => "citizens#get_uploads"
get "citizens/upload_log/:upload_id" => "citizens#get_upload_log"
post "citizens/upload" => "citizens#upload"
resources :citizens do
resources :dependants
......@@ -19,8 +25,6 @@ Rails.application.routes.draw do
end
end
post "citizens/upload" => "citizens#upload"
resources :schedules do
member do
put 'confirm'
......@@ -39,13 +43,13 @@ Rails.application.routes.draw do
resources :shifts
resources :solicitations, only: [:create, :index, :show]
resources :notifications
resources :notifications
resources :resources
resources :resource_bookings
resources :resource_types
resources :resource_shifts
post "validate_cep" => "cep#validate"
get "forms/schedule_history" => "forms#schedule_history"
......
:concurrency: 5
:queues:
- citizens_upload
class CreateCitizenUploads < ActiveRecord::Migration[5.0]
def change
create_table :citizen_uploads do |t|
t.references :citizen, foreign_key: true
t.integer :amount
t.float :progress
t.timestamps
end
end
end
class AddColumnsToCitizenUpload < ActiveRecord::Migration[5.0]
def change
add_column :citizen_uploads, :status, :integer
end
end
class AddAttachmentLogToCitizenUploads < ActiveRecord::Migration
def self.up
change_table :citizen_uploads do |t|
t.attachment :log
end
end
def self.down
remove_attachment :citizen_uploads, :log
end
end
......@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20171109134344) do
ActiveRecord::Schema.define(version: 20180413130918) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -76,6 +76,20 @@ ActiveRecord::Schema.define(version: 20171109134344) do
t.index ["state_id"], name: "index_cities_on_state_id", using: :btree
end
create_table "citizen_uploads", force: :cascade do |t|
t.integer "citizen_id"
t.integer "amount"
t.float "progress"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.integer "status"
t.string "log_file_name"
t.string "log_content_type"
t.integer "log_file_size"
t.datetime "log_updated_at"
t.index ["citizen_id"], name: "index_citizen_uploads_on_citizen_id", using: :btree
end
create_table "citizens", force: :cascade do |t|
t.date "birth_date", null: false
t.string "name", null: false
......@@ -353,4 +367,5 @@ ActiveRecord::Schema.define(version: 20171109134344) do
t.datetime "updated_at", null: false
end
add_foreign_key "citizen_uploads", "citizens"
end
......@@ -10,15 +10,28 @@ services:
networks:
backend:
agendador-redis:
image: redis:3.2
command: redis-server --requirepass ${AGENDADOR_REDIS_PASSWORD}
ports:
- '6379:6379'
volumes:
- '/redis/data'
networks:
backend:
agendador-backend:
build: .