Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,19 @@ pkg_check_modules(CGRAPH IMPORTED_TARGET libcgraph>=2.30)
pkg_check_modules(GVC IMPORTED_TARGET libgvc>=2.30)
pkg_check_modules(LIBUSB IMPORTED_TARGET libusb-1.0>=1.0.23)
pkg_check_modules(NANOMSG IMPORTED_TARGET nanomsg)
pkg_check_modules(ARROW IMPORTED_TARGET arrow)
pkg_check_modules(PARQUET IMPORTED_TARGET parquet)
pkg_check_modules(RESTCLIENTCPP restclient-cpp)

if(NOT NANOMSG_FOUND)
pkg_check_modules(NANOMSG IMPORTED_TARGET libnanomsg>=1.0.0)
endif()

if(ARROW_FOUND AND PARQUET_FOUND AND RESTCLIENTCPP_FOUND)
set(DELTALIBS_FOUND ON)
else()
set(DELTALIBS_FOUND OFF)
endif()

if (REDISPP_FOUND)
file(READ "${REDISPP_INCLUDEDIR}/sw/redis++/tls.h" CONTENTS)
Expand Down Expand Up @@ -180,6 +189,7 @@ cmake_dependent_option(WITH_SRC "Build executables"
cmake_dependent_option(WITH_TESTS "Run tests" "${WITH_DEFAULTS}" "TOPLEVEL_PROJECT" OFF)
cmake_dependent_option(WITH_TOOLS "Build auxilary tools" "${WITH_DEFAULTS}" "TOPLEVEL_PROJECT" OFF)
cmake_dependent_option(WITH_WEB "Build with internal webserver" "${WITH_DEFAULTS}" "LIBWEBSOCKETS_FOUND" OFF)
cmake_dependent_option(WITH_DELTASHARING "Build with delta sharing active" "${WITH_DEFAULTS}" "DELTALIBS_FOUND" OFF)

cmake_dependent_option(WITH_NODE_AMQP "Build with amqp node-type" "${WITH_DEFAULTS}" "RABBITMQ_C_FOUND" OFF)
cmake_dependent_option(WITH_NODE_CAN "Build with can node-type" "${WITH_DEFAULTS}" "" OFF)
Expand Down Expand Up @@ -214,6 +224,7 @@ cmake_dependent_option(WITH_NODE_ULDAQ "Build with uldaq node-type"
cmake_dependent_option(WITH_NODE_WEBRTC "Build with webrtc node-type" "${WITH_DEFAULTS}" "WITH_WEB; LibDataChannel_FOUND" OFF)
cmake_dependent_option(WITH_NODE_WEBSOCKET "Build with websocket node-type" "${WITH_DEFAULTS}" "WITH_WEB" OFF)
cmake_dependent_option(WITH_NODE_ZEROMQ "Build with zeromq node-type" "${WITH_DEFAULTS}" "LIBZMQ_FOUND; NOT WITHOUT_GPL" OFF)
cmake_dependent_option(WITH_NODE_DELTASHARING "Build with delta_sharing node-type" "${WITH_DEFAULTS}" "WITH_DELTASHARING" OFF)

# Set a default for the build type
if("${CMAKE_BUILD_TYPE}" STREQUAL "")
Expand Down Expand Up @@ -324,6 +335,7 @@ add_feature_info(NODE_ULDAQ WITH_NODE_ULDAQ "Build with
add_feature_info(NODE_WEBRTC WITH_NODE_WEBRTC "Build with webrtc node-type")
add_feature_info(NODE_WEBSOCKET WITH_NODE_WEBSOCKET "Build with websocket node-type")
add_feature_info(NODE_ZEROMQ WITH_NODE_ZEROMQ "Build with zeromq node-type")
add_feature_info(NODE_DELTASHARING WITH_NODE_DELTASHARING "Build with delta-sharing node-type")

if(TOPLEVEL_PROJECT)
feature_summary(WHAT ALL VAR FEATURES)
Expand Down
23 changes: 23 additions & 0 deletions etc/examples/nodes/delta_sharing.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
# SPDX-License-Identifier: Apache-2.0
nodes = {
delta_reader = {
type = "delta_sharing"
profile_path = "."
cache_dir = "."
table_path = "open-datasets.share#delta_sharing.default.COVID_19_NYT",
op = "read"
batch_size = 10
},
file1 = {
type = "file"
uri = "."
format = "json"
}
}
paths = (
{
in = "delta_reader"
out = "file1"
}
)
73 changes: 73 additions & 0 deletions include/villas/nodes/delta_sharing/delta_sharing.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/* Node type: Delta Sharing.
*
* Author: Ritesh Karki <ritesh.karki@rwth-aachen.de>
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <cstddef>
#include <memory>
#include <string>
#include <vector>

#include <jansson.h>

#include <villas/nodes/delta_sharing/delta_sharing_client.hpp>
#include <villas/nodes/delta_sharing/protocol.hpp>

namespace arrow {
class Table;
}

namespace villas {
namespace node {

// Forward declarations
class NodeCompat;

struct delta_sharing {
// Configuration
std::string profile_path;
std::string cache_dir;
std::string table_path;
size_t batch_size;
std::string schema;
std::string share;
std::string table;

// Client and state
std::shared_ptr<DeltaSharing::DeltaSharingClient> client;
std::shared_ptr<std::vector<DeltaSharing::DeltaSharingProtocol::Schema>>
schemas;
std::shared_ptr<arrow::Table> table_ptr;
std::shared_ptr<std::vector<DeltaSharing::DeltaSharingProtocol::Table>>
tables;
std::shared_ptr<std::vector<DeltaSharing::DeltaSharingProtocol::Share>>
shares;

enum class TableOp { TABLE_NOOP, TABLE_READ, TABLE_WRITE } table_op;
};

char *deltaSharing_print(NodeCompat *n);

int deltaSharing_parse(NodeCompat *n, json_t *json);

int deltaSharing_start(NodeCompat *n);

int deltaSharing_stop(NodeCompat *n);

int deltaSharing_init(NodeCompat *n);

int deltaSharing_destroy(NodeCompat *n);

int deltaSharing_poll_fds(NodeCompat *n, int fds[]);

int deltaSharing_read(NodeCompat *n, struct Sample *const smps[], unsigned cnt);

int deltaSharing_write(NodeCompat *n, struct Sample *const smps[],
unsigned cnt);

} // namespace node
} // namespace villas
51 changes: 51 additions & 0 deletions include/villas/nodes/delta_sharing/delta_sharing_client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/* Node type: Delta Sharing.
*
* Author: Ritesh Karki <ritesh.karki@rwth-aachen.de>
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <iostream>
#include <optional>

#include <arrow/table.h>

#include <villas/nodes/delta_sharing/delta_sharing_rest_client.hpp>

namespace DeltaSharing {

struct DeltaSharingClient {
public:
DeltaSharingClient(const std::string &filename,
std::optional<std::string> cacheLocation);
std::shared_ptr<arrow::Table> LoadAsArrowTable(std::string &url);
std::shared_ptr<arrow::Table> ReadTableFromCache(std::string &url);
const std::shared_ptr<std::vector<DeltaSharingProtocol::Share>>
ListShares(int maxResult, const std::string &pageToken) const;
const std::shared_ptr<std::vector<DeltaSharingProtocol::Schema>>
ListSchemas(const DeltaSharingProtocol::Share &share, int maxResult,
const std::string &pageToken) const;
const std::shared_ptr<std::vector<DeltaSharingProtocol::Table>>
ListTables(const DeltaSharingProtocol::Schema &schema, int maxResult,
const std::string &pageToken) const;
const std::shared_ptr<std::vector<DeltaSharingProtocol::Table>>
ListAllTables(const DeltaSharingProtocol::Share &share, int maxResult,
const std::string &pageToken) const;
const std::shared_ptr<std::vector<DeltaSharingProtocol::File>>
ListFilesInTable(const DeltaSharingProtocol::Table &) const;
const DeltaSharingProtocol::Metadata
QueryTableMetadata(const DeltaSharingProtocol::Table &table) const;
const int GetNumberOfThreads() { return this->maxThreads; };
void PopulateCache(const std::string &url) {
this->restClient.PopulateCache(url, this->cacheLocation);
};

protected:
private:
DeltaSharingRestClient restClient;
std::string cacheLocation;
int maxThreads;
};
}; // namespace DeltaSharing
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* Node type: Delta Sharing.
*
* Author: Ritesh Karki <ritesh.karki@rwth-aachen.de>
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <iostream>
#include <list>

#include <nlohmann/json.hpp>
#include <restclient-cpp/connection.h>
#include <restclient-cpp/restclient.h>

#include <villas/nodes/delta_sharing/protocol.hpp>

using json = nlohmann::json;

namespace DeltaSharing {
struct DeltaSharingRestClient {
public:
DeltaSharingRestClient(const std::string &filename);
~DeltaSharingRestClient();
const std::shared_ptr<std::vector<DeltaSharingProtocol::Share>>
ListShares(int maxResult, const std::string &pageToken) const;
const std::shared_ptr<std::vector<DeltaSharingProtocol::Schema>>
ListSchemas(const DeltaSharingProtocol::Share &share, int maxResult,
const std::string &pageToken) const;
const std::shared_ptr<std::vector<DeltaSharingProtocol::Table>>
ListTables(const DeltaSharingProtocol::Schema &schema, int maxResult,
const std::string &pageToken) const;
const std::shared_ptr<std::vector<DeltaSharingProtocol::Table>>
ListAllTables(const DeltaSharingProtocol::Share &share, int maxResult,
const std::string &pageToken) const;
const std::shared_ptr<std::vector<DeltaSharingProtocol::File>>
ListFilesInTable(const DeltaSharingProtocol::Table &) const;
const DeltaSharingProtocol::Metadata
QueryTableMetadata(const DeltaSharingProtocol::Table &table) const;
const DeltaSharingProtocol::DeltaSharingProfile &GetProfile() const;
RestClient::Response get(std::string url);
void PopulateCache(const std::string &url, const std::string &cacheLocation);
const bool shouldRetry(RestClient::Response &response) const;

protected:
json ReadFromFile(const std::string &filename);

private:
DeltaSharingProtocol::DeltaSharingProfile profile;
static const std::string user_agent;
};
}; // namespace DeltaSharing
29 changes: 29 additions & 0 deletions include/villas/nodes/delta_sharing/functions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Node type: Delta Sharing.
*
* Author: Ritesh Karki <ritesh.karki@rwth-aachen.de>
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <iostream>
#include <string>
#include <thread>
#include <vector>

#include <arrow/table.h>

#include <villas/nodes/delta_sharing/delta_sharing_client.hpp>

namespace DeltaSharing {

const std::vector<std::string> ParseURL(const std::string &path);
std::shared_ptr<DeltaSharingClient>
NewDeltaSharingClient(std::string profile,
std::optional<std::string> cacheLocation);
const std::shared_ptr<arrow::Table> LoadAsArrowTable(std::string path,
int fileno);
}; // namespace DeltaSharing

// namespace DeltaSharing
Loading