Commit 50189272 authored by Eduardo L. Buratti's avatar Eduardo L. Buratti

Add a basic aggregation framework

parent c5745dec
Pipeline #6630 passed with stage
in 29 seconds
......@@ -18,10 +18,15 @@
* along with blendb. If not, see <http://www.gnu.org/licenses/>.
*/
export interface IAggregateData {
metrics: any;
dimensions: any;
}
export class Aggregate {
public metrics: string[];
public dimensions: string[];
private data: any[];
private data: IAggregateData[];
constructor(metrics: string[], dimensions: string[], options?: any) {
this.metrics = metrics;
......@@ -30,11 +35,36 @@ export class Aggregate {
this.data = [];
}
public push(data: any) {
public push(data: IAggregateData) {
this.data.push(data);
}
public truncate() {
this.data = [];
}
public find(query: any) {
let result: any = [];
this.data.forEach((doc: IAggregateData) => {
let match = true;
for (let key in query) {
if (query.hasOwnProperty(key)) {
let value = query[key];
if (doc.dimensions[key] !== value) {
match = false;
break;
}
}
}
if (match) {
result.push(doc);
}
});
return result;
}
}
......@@ -23,9 +23,9 @@ import { expect } from "chai";
import { Server } from "./server";
describe("server class", () => {
const server = new Server();
it("should be able to create and retrieve sources", () => {
const server = new Server();
// create two sources
const source1 = server.source("source1");
const source2 = server.source("source2");
......@@ -45,24 +45,50 @@ describe("server class", () => {
expect(source1).to.be.equal(retrieved);
});
it("should be able to create and retrieve aggregates", () => {
const server = new Server();
// create two aggregates
const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]);
const aggr2 = server.aggregate(["met:two"], ["dim:one", "dim:two"]);
// retrieve the first one
const retrieved = server.aggregate(["met:one"], ["dim:two", "dim:one"]);
// check if aggregates were actually created/retrieved
expect(aggr1).to.be.an("object");
expect(aggr2).to.be.an("object");
expect(retrieved).to.be.an("object");
// check if the two created aggregates are different
expect(aggr1).to.not.be.equal(aggr2);
// check if the retrieved aggregate is the same as the created one
expect(aggr1).to.be.equal(retrieved);
});
it("should be able to create and retrieve transformers", () => {
const server = new Server();
const source1 = server.source("source1");
const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]);
const aggr2 = server.aggregate(["met:one", "met:two"], ["dim:one"]);
// create two transformers
const transformer1 = server.transformer("transformer1", {
source: "source1",
metrics: ["met:one"],
dimensions: ["dim:one"],
extractors: {
metrics: ((doc: any) => null),
dimensions: ((doc: any) => null),
source: source1,
destination: aggr1,
functions: {
map: (doc: any, emit: Function) => { return; },
reduce: (dimensions: any, metrics: any) => { return {}; }
}
});
const transformer2 = server.transformer("transformer2", {
source: "source2",
metrics: ["met:one"],
dimensions: ["dim:one"],
extractors: {
metrics: ((doc: any) => null),
dimensions: ((doc: any) => null),
source: source1,
destination: aggr2,
functions: {
map: (doc: any, emit: Function) => { return; },
reduce: (dimensions: any, metrics: any) => { return {}; }
}
});
......@@ -81,23 +107,39 @@ describe("server class", () => {
expect(transformer1).to.be.equal(retrieved);
});
it("should be able to create and retrieve aggregates", () => {
// create two aggregates
const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]);
const aggr2 = server.aggregate(["met:two"], ["dim:one", "dim:two"]);
it("should fail to create two transformer with name collision", () => {
const server = new Server();
// retrieve the first one
const retrieved = server.aggregate(["met:one"], ["dim:one", "dim:two"]);
const source1 = server.source("source1");
const aggr1 = server.aggregate(["met:one"], ["dim:one", "dim:two"]);
const aggr2 = server.aggregate(["met:one", "met:two"], ["dim:one"]);
server.transformer("transformer1", {
source: source1,
destination: aggr1,
functions: {
map: (doc: any, emit: Function) => { return; },
reduce: (dimensions: any, metrics: any) => { return {}; }
}
});
// check if aggregates were actually created/retrieved
expect(aggr1).to.be.an("object");
expect(aggr2).to.be.an("object");
expect(retrieved).to.be.an("object");
expect(() => {
server.transformer("transformer1", {
source: source1,
destination: aggr2,
functions: {
map: (doc: any, emit: Function) => { return; },
reduce: (dimensions: any, metrics: any) => { return {}; }
}
});
}).to.throw(Error);
});
// check if the two created aggregates are different
expect(aggr1).to.not.be.equal(aggr2);
it("should fail to retrieve a transformer that doesn't exist", () => {
const server = new Server();
// check if the retrieved aggregate is the same as the created one
expect(aggr1).to.be.equal(retrieved);
expect(() => {
server.transformer("transformerX");
}).to.throw(Error);
});
});
......@@ -46,6 +46,19 @@ export class Server {
}
}
public aggregate(metrics: string[], dimensions: string[], options?: any) {
const id = Hash.sha1(metrics.sort(), dimensions.sort());
if (this.aggregates.has(id)) {
return this.aggregates.get(id);
}
else {
const aggregate = new Aggregate(metrics, dimensions, options);
this.aggregates.set(id, aggregate);
return aggregate;
}
}
public transformer(name: string, options?: ITransformerOptions) {
if (typeof options !== "undefined") {
if (this.transformers.has(name)) {
......@@ -64,37 +77,4 @@ export class Server {
return this.transformers.get(name);
}
}
public aggregate(metrics: string[], dimensions: string[], options?: any) {
const id = Hash.sha1(metrics.sort(), dimensions.sort());
if (this.aggregates.has(id)) {
return this.aggregates.get(id);
}
else {
const aggregate = new Aggregate(metrics, dimensions, options);
this.aggregates.set(id, aggregate);
return aggregate;
}
}
public process() {
this.transformers.forEach((transformer: Transformer) => {
const source = this.source(transformer.source);
const aggr = this.aggregate(transformer.metrics,
transformer.dimensions);
source.forEach((doc: any) => {
aggr.push({
metrics: transformer.extractMetrics(doc),
dimensions: transformer.extractDimensions(doc)
});
});
// TODO: stream support
// source.stream()
// .pipe(transformer.stream());
// .pipe(aggregate.stream());
});
}
}
......@@ -22,7 +22,7 @@ export class Source {
public name: string;
private data: any[];
constructor(name: string, options: any) {
constructor(name: string, options?: any) {
this.name = name;
this.data = [];
......@@ -37,4 +37,8 @@ export class Source {
callback(value);
});
}
public truncate() {
this.data = [];
}
}
/*
* Copyright (C) 2015 Centro de Computacao Cientifica e Software Livre
* Departamento de Informatica - Universidade Federal do Parana
*
* This file is part of blendb.
*
* blendb is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* blendb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with blendb. If not, see <http://www.gnu.org/licenses/>.
*/
import { expect } from "chai";
import { Hash } from "../util/hash";
import { Transformer } from "./transformer";
import { Source } from "./source";
import { Aggregate } from "./aggregate";
describe("transformer class", () => {
const source = new Source("testSource");
const aggregate = new Aggregate(["met:one"], ["dim:one", "dim:two"]);
it("should be able to aggregate data", () => {
source.truncate();
for (let i = 0; i < 1000; i++) {
source.push({
id: i % 10,
seed: Math.random()
});
}
source.push({ id: 10, seed: 5 });
source.push({ id: 10, seed: 5 });
source.push({ id: 10, seed: 5 });
source.push({ id: 10, seed: 5 });
source.push({ id: 10, seed: 5 });
const transformer = new Transformer("processTransformer", {
source: source,
destination: aggregate,
functions: {
map: ((doc: any, emit: Function) => {
emit({
"dim:one": doc.id,
"dim:two": Hash.sha1(doc.seed)
}, {
"met:one": Math.floor(doc.seed * 100000) / 100
});
}),
reduce: ((dimensions: any, metrics: any) => {
let tmp = 0;
metrics.forEach((met: any) => {
tmp += met["met:one"];
});
return {
"met:one": tmp
};
})
}
});
transformer.apply();
let result = aggregate.find({
"dim:one": 10,
"dim:two": Hash.sha1(5)
});
expect(result).to.have.length(1);
expect(result[0]).to.have.property("metrics");
expect(result[0].metrics).to.have.property("met:one");
expect(result[0].metrics["met:one"]).to.be.equal(25000);
});
});
......@@ -18,40 +18,70 @@
* along with blendb. If not, see <http://www.gnu.org/licenses/>.
*/
import { Hash } from "../util/hash";
import { Source } from "./source";
import { Aggregate } from "./aggregate";
export interface ITransformerOptions {
source: string;
metrics: string[];
dimensions: string[];
extractors: {
metrics: (doc: any) => any;
dimensions: (doc: any) => any;
source: Source;
destination: Aggregate;
functions: {
map: (doc: any, emit: Function) => void;
reduce: (dimensions: any, metrics: any) => any;
};
}
export class Transformer {
public name: string;
public source: string;
public metrics: string[];
public dimensions: string[];
private extractors: any;
public source: Source;
public destination: Aggregate;
private functions: any;
constructor(name: string, options: ITransformerOptions) {
this.name = name;
this.source = options.source;
this.metrics = options.metrics;
this.dimensions = options.dimensions;
this.extractors = {
metrics: options.extractors.metrics,
dimensions: options.extractors.dimensions
this.destination = options.destination;
this.functions = {
map: options.functions.map,
reduce: options.functions.reduce
};
}
public extractMetrics(doc: any) {
return this.extractors.metrics(doc);
}
public apply() {
let temp = new Map();
this.destination.truncate();
this.source.forEach((doc: any) => {
let emit = (dimensions: any, metrics: any) => {
let key = Hash.sha1(dimensions);
let current = temp.get(key) || { dimensions, metrics: [] };
temp.set(key, {
dimensions,
metrics: current.metrics.concat([metrics])
});
};
this.functions.map(doc, emit);
});
temp.forEach((value, key) => {
let dimensions = value.dimensions;
let metrics = value.metrics;
public extractDimensions(doc: any) {
return this.extractors.dimensions(doc);
if (metrics.length > 1) {
this.destination.push({
dimensions: dimensions,
metrics: this.functions.reduce(dimensions, metrics)
});
}
else {
this.destination.push({
dimensions: dimensions,
metrics: metrics
});
}
});
}
}
......@@ -31,8 +31,8 @@ describe("hash utility library", () => {
});
it("should generate the same hash for the same input", () => {
let h1 = Hash.sha1("test", { obj: "test" }, ["list", "of", "things"]);
let h2 = Hash.sha1("test", { obj: "test" }, ["list", "of", "things"]);
let h1 = Hash.sha1("test", { obj: "test" }, 43.2, ["list", "of", "things"]);
let h2 = Hash.sha1("test", { obj: "test" }, 43.2, ["list", "of", "things"]);
expect(h1).to.be.a("string");
expect(h2).to.be.a("string");
......@@ -66,4 +66,14 @@ describe("hash utility library", () => {
expect(h2).to.be.a("string");
expect(h1).to.not.be.equal(h2);
});
it("should throw an error for unhashable objects", () => {
expect(() => {
Hash.sha1(
"test",
function (a: number, b: number) { return a + b; },
["of", "list", "things"]
);
}).to.throw(TypeError);
});
});
......@@ -31,6 +31,8 @@ export class Hash {
return obj;
case "object":
return JSON.stringify(obj);
case "number":
return obj.toString();
default:
throw new TypeError(typeof obj +
" cannot be hashed");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment