...
 
Commits (9)
import { View } from "../core/view";
import { Source } from "../core/source";
import { FilterOperator } from "../core/filter";
import { Metric } from "../core/metric";
import { Adapter } from "../core/adapter";
import { DataType, AggregationType } from "../common/types";
import { Client, ConfigOptions } from "elasticsearch";
/** @hidden */
const elasticsearch = require('elasticsearch');
/**
* Adapter which connects with a Elasticsearch database.
*/
interface ViewOperation {
opcode?: number;
values?: any[];
clauses?: any[];
sort?: any[];
origin?: boolean;
operation?: object;
id?: string;
name?: string;
}
export class ElasticsearchAdapter extends Adapter {
/** Information used to connect with a PostgreSQL database. */
private client: Client;
/**
* Creates a new adapter with the database connection configuration.
* @param conf - The information required to create a connection with
* the database.
*/
constructor (conf: ConfigOptions) {
super();
this.client = new elasticsearch.Client(conf);
}
public getQueryFromView(view: View): object {
/**
* Translate a view to a Elasticsearch query.
* @param view - View to be translated.
*/
/** A Elasticsearch query is JSON format, the header always begin with size:0 */
let query = {
size: 0,
query: {
bool: {}
}
};
let filter = {};
let match = [];
let nMatch = [];
/** Query is used to define the filters */
if (view.clauses.length > 0) {
for(let i=0;i<view.clauses[0].filters.length; i++) {
if (view.clauses[0].filters[i].operator == 2) {
filter = {"match": { [view.clauses[0].filters[i].target.name]: view.clauses[0].filters[i].value }};
nMatch.push(filter);
} else {
filter = {"match": { [view.clauses[0].filters[i].target.name]: view.clauses[0].filters[i].value }};
match.push(filter);
}
}
Object.assign(query.query.bool, {must: match});
Object.assign(query.query.bool, {must_not: nMatch});
}
/** Aggregation is used to define the dimention and the aggregation */
let partialQuery = this.translateAggs(view, 0);
Object.assign(query, {aggs: partialQuery});
return query;
}
/**
* Asynchronously reads all data from given view.
* In other words perform a SELECT query.
* @param view - "Location" from all data should be read.
* @param cb - Callback function which contains the data read.
*/
public getDataFromView(view: View, cb: (error: Error, result?: any[]) => void): void {
const query = this.getQueryFromView(view);
let partialView = view.operation;
const index = this.findIndex(partialView);
this.executeQuery(index, view, query, cb);
}
/**
* Asynchronously executes a query and get its result.
* @param query - Query (Elasticsearch DSL format) to be executed.
* @param cb - Callback function which contains the data read.
* @param cb.error - Error information when the method fails.
* @param cb.result - Query result.
*/
private executeQuery(index: string, view: View, query: object, cb: (error: Error, result?: any[]) => void): void {
this.client.search({
index: index.replace(/\"/g, ''),
body: query
},(err, result) => {
let displayResult = this.formateResult(view, [result], 0);
cb(err, displayResult);
})
}
public insertIntoSource(source: Source, data: any[], cb: (err: Error, result?: any[]) => void): void {
}
private getAggregateFunction(aggrType: AggregationType): string {
switch (aggrType) {
case AggregationType.SUM:
return "sum";
case AggregationType.AVG:
return "avg";
case AggregationType.COUNT:
return "value_count";
case AggregationType.MAX:
return "max";
case AggregationType.MIN:
return "min";
default:
return "";
}
}
/** Elasticsearch, unlike the others adapter, need to find its index to do the query,
* so need to use aliasAsName in the config.yaml and run the recursive function
* below to find the index.
*/
private findIndex(operation: ViewOperation): string{
if (operation.values[0].origin == true) {
return operation.values[0].name;
}
let index = this.findIndex(operation.values[0].operation);
return index;
}
private translateAggs(view: View, numDimensions: number): object {
if (numDimensions == view.dimensions.length) {
let func = "";
let aggrName = "";
let partialQuery = {};
for (let i=0; i<view.metrics.length; i++) {
func = this.getAggregateFunction(view.metrics[i].aggregation);
aggrName = view.metrics[i].name;
Object.assign(partialQuery, { [aggrName]: {[func]: {field: view.metrics[i].name}}});
}
return partialQuery;
}
let returnedQuery = this.translateAggs(view, numDimensions+1);
if (view.dimensions.length > 0) {
let group_by = view.dimensions[numDimensions].name;
let dim = view.dimensions[numDimensions].name;
if (view.dimensions[numDimensions].dataType < 3) {
dim = dim;
} else {
dim = dim + ".keyword";
}
let aggregation = {
[group_by]: {
terms: {
field: dim,
size: 100000
},
aggs: {}
}
};
Object.assign(aggregation[group_by], {aggs: returnedQuery});
if (numDimensions == view.dimensions.length - 1 && view.sort.length > 0) {
let aggrName = view.sort[0].name;
Object.assign(aggregation[group_by].terms, {order: {[aggrName] : "asc"}});
}
return aggregation;
}
}
private formateResult(view: View, result: any[], numDimensions: number): any[] {
let dimensionsResult = {};
let back = [];
let resultArray: any[] = [];
if (numDimensions == 0 && view.dimensions.length > 0) {
let firstDim = view.dimensions[0].name;
let resultArray: any[] = [];
for (let i=0; i< result[0].aggregations[firstDim].buckets.length; i++) {
back = this.formateResult(view, [result[0].aggregations[firstDim].buckets[i]], 1);
for (let j=0; j<back.length; j++) {
Object.assign(back[j], {[firstDim]: result[0].aggregations[firstDim].buckets[i].key});
}
resultArray = resultArray.concat(back);
}
return resultArray;
}
if (numDimensions == view.dimensions.length) {
let metricsResult = {};
let findPrefix = result[0];
if (view.dimensions.length == 0) {
findPrefix = result[0].aggregations;
}
for (let i=0; i<view.metrics.length; i++) {
let met = view.metrics[i].name;
Object.assign(metricsResult, {[met]: findPrefix[met].value});
}
return [metricsResult];
}
let nDim = view.dimensions[numDimensions].name;
for (let i=0; i< result[0][nDim].buckets.length; i++) {
back = this.formateResult(view, [result[0][nDim].buckets[i]], numDimensions+1);
for (let j=0; j<back.length; j++) {
Object.assign(back[j], {[nDim]: result[0][nDim].buckets[i].key});
}
resultArray = resultArray.concat(back);
}
return resultArray;
}
}
......@@ -22,8 +22,10 @@ import { Middleware } from "../types";
import { Adapter } from "../../core/adapter";
import { PostgresAdapter } from "../../adapter/postgres";
import { MonetAdapter, MonetConfig } from "../../adapter/monet";
import { ElasticsearchAdapter } from "../../adapter/elasticsearch";
import { PoolConfig } from "pg";
import { Connection } from "../../util/configParser";
import { ConfigOptions } from "elasticsearch";
/**
* Creates a PostgreSQL adapter and middleware that
......@@ -68,3 +70,20 @@ export function MonetMw(config: Connection): Middleware {
};
}
/*Elastic */
export function ElasticsearchMw(config: Connection): Middleware {
let parsedConfig: ConfigOptions = {
host: config.host + ":" + config.port,
maxRetries: 5,
requestTimeout: 60000,
sniffOnStart: true,
};
let adapter: Adapter = new ElasticsearchAdapter(parsedConfig);
return function elasticsearchMiddleware(req, res, next) {
req.adapter = adapter;
next();
};
}
......@@ -52,7 +52,7 @@ const config = ConfigParser.parse(configPath);
// include middlewares
import { EngineMw } from "./api/middlewares/engine";
import { PostgresMw, MonetMw } from "./api/middlewares/adapter";
import { PostgresMw, MonetMw, ElasticsearchMw } from "./api/middlewares/adapter";
import { ErrorMw } from "./api/middlewares/error";
app.use(EngineMw(config));
......@@ -64,6 +64,10 @@ else if (config.adapters[0] === "monet") {
app.use(MonetMw(config.connections[0]));
}
else if (config.adapters[0] === "elasticsearch") {
app.use(ElasticsearchMw(config.connections[0]));
}
else {
console.error("Invalid adapter. Options available are: postgres and monet");
process.exit(1);
......