summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarius Peter <dev@marius-peter.com>2026-07-03 16:40:17 +0200
committerMarius Peter <dev@marius-peter.com>2026-07-03 16:40:17 +0200
commit239de739d538966ccb900eb391759b5e14b8a217 (patch)
tree54c160a19f1ce0ccc029493f4bfda0a765e01fe1
parentb8881057b64c6d0eedcfa2b07c138908ef2b6f76 (diff)
Add role for daq-node.
-rwxr-xr-xroles/daq-node/bin/fapg-daq-node35
-rwxr-xr-xroles/daq-node/deploy190
-rw-r--r--roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm362
-rw-r--r--roles/daq-node/t/01-probe.t100
4 files changed, 687 insertions, 0 deletions
diff --git a/roles/daq-node/bin/fapg-daq-node b/roles/daq-node/bin/fapg-daq-node
new file mode 100755
index 0000000..55a6d59
--- /dev/null
+++ b/roles/daq-node/bin/fapg-daq-node
@@ -0,0 +1,35 @@
+#!/usr/bin/env perl
+# -*- mode: perl-ts; -*-
+
+use 5.40.1;
+use strict;
+use warnings;
+
+use FindBin;
+use lib "${FindBin::Bin}/../lib/perl5";
+
+use FAPG::DAQ::EZO;
+
+=head1 NAME
+
+fapg-daq - read sensor data and send over MQTT
+
+=cut
+
+my $probe = FAPG::DAQ::EZO::identify_probe();
+
+print "Found probe "
+ . $probe->type
+ . " on "
+ . $probe->device
+ . ".\n";
+
+while (1) {
+ my $reading = $probe->get_reading();
+
+ print "Reading: " . $reading->value . " " . $reading->unit . "\n";
+
+ $reading->publish_mqtt();
+
+ sleep 5;
+}
diff --git a/roles/daq-node/deploy b/roles/daq-node/deploy
new file mode 100755
index 0000000..1fa1a83
--- /dev/null
+++ b/roles/daq-node/deploy
@@ -0,0 +1,190 @@
+#!/usr/bin/env bash
+
+set -eEuo pipefail
+
+readonly REPO_DIR="/opt/fapg/fapg-daq"
+readonly NODE_BIN="${REPO_DIR}/roles/sensor-node/bin/fapg-daq-node"
+
+readonly SERVICE_NAME="fapg-daq-node.service"
+readonly SERVICE_FILE="/etc/systemd/system/${SERVICE_NAME}"
+
+readonly ENV_DIR="/etc/fapg-daq"
+readonly ENV_FILE="${ENV_DIR}/daq.conf"
+
+readonly STATE_DIR="/var/lib/fapg-daq"
+readonly RUN_DIR="/run/fapg-daq"
+
+readonly RUN_USER='fapg-daq'
+readonly RUN_GROUP='fapg-daq'
+
+readonly SERIAL_DEVICE='/dev/ttyUSB0'
+readonly READ_INTERVAL=5
+readonly MQTT_HOST='fapg-daq-five-01'
+readonly MQTT_PORT='1883'
+readonly MQTT_USERNAME='fapg_zero'
+MQTT_PASSWORD=''
+
+function die() {
+ echo "ERROR: $*" >&2
+ exit 1
+}
+
+function require_root() {
+ if [[ "$(id -u)" -ne 0 ]]; then
+ die "$0 requires superuser privileges."
+ fi
+}
+
+function install_os_dependencies() {
+ local dependencies='perl libnet-mqtt-simple-perl libdevice-serialport-perl'
+ local missing_dependencies=''
+
+ for dep in ${dependencies}; do
+ if dpkg --get-selections "${dep}" | grep -w install > /dev/null 2>&1; then
+ echo "${dep} is already installed."
+ else
+ echo "${dep} is not installed."
+ missing_dependencies+=" ${dep}"
+ fi
+ done
+
+ if [[ -n "${missing_dependencies}" ]]; then
+ apt-get install -y ${missing_dependencies}
+ fi
+
+ echo "Installed all OS dependencies."
+}
+
+# function install_perl_dependencies() {
+# local dependencies="Net::MQTT::Simple"
+# for dep in ${dependencies}; do
+# if ! perl -M"${dep}" -e '1' > /dev/null 2>&1; then
+# cpanm "${dep}"
+# fi
+# done
+
+# echo "Installed all Perl dependencies."
+# }
+
+create_runtime_user() {
+ if ! getent group "${RUN_GROUP}" > /dev/null; then
+ addgroup --system "${RUN_GROUP}"
+ fi
+
+ if ! id "${RUN_USER}" > /dev/null 2>&1; then
+ adduser \
+ --system \
+ --ingroup "${RUN_GROUP}" \
+ --home "${STATE_DIR}" \
+ --no-create-home \
+ --disabled-login \
+ "${RUN_USER}"
+ fi
+
+ # USB serial devices are normally accessible to members of dialout.
+ usermod -aG dialout "${RUN_USER}"
+
+ echo "Created runtime user ${RUN_USER}, group ${RUN_GROUP}."
+}
+
+check_repo_layout() {
+ [[ -d "${REPO_DIR}" ]] || die "Repo not found at ${REPO_DIR}."
+ [[ -x "${NODE_BIN}" ]] || die "${NODE_BIN} does not exist or is not executable."
+
+ mkdir -p "${STATE_DIR}" "${RUN_DIR}"
+ chown "${RUN_USER}:${RUN_GROUP}" "${STATE_DIR}" "${RUN_DIR}"
+
+ echo "Verified layout of repo at ${REPO_DIR}."
+}
+
+function write_env_file() {
+ mkdir -p "${ENV_DIR}"
+
+ # TODO: print diff of existing and proposed env file
+ if [[ -f "${ENV_FILE}" ]]; then
+ echo "${ENV_FILE} already exists, remove or edit manually if necessary."
+ return
+ fi
+
+ if [[ -z "${MQTT_PASSWORD:-}" ]]; then
+ read -rsp "MQTT password for user '${MQTT_USERNAME}': " MQTT_PASSWORD
+ fi
+
+ [[ -n "${MQTT_PASSWORD:-}" ]] || die "MQTT_PASSWORD is required."
+
+ # TODO: unsafe!
+ cat > "${ENV_FILE}" <<EOF
+MQTT_SIMPLE_ALLOW_INSECURE_LOGIN=1
+
+SERIAL_DEVICE="${SERIAL_DEVICE}"
+READ_INTERVAL=${READ_INTERVAL}
+MQTT_HOST="${MQTT_HOST}"
+MQTT_PORT="${MQTT_PORT}"
+MQTT_USERNAME="${MQTT_USERNAME}"
+MQTT_PASSWORD="${MQTT_PASSWORD}"
+EOF
+
+ chown root:"${RUN_GROUP}" "${ENV_FILE}"
+ chmod 0640 "${ENV_FILE}"
+}
+
+function write_systemd_service() {
+ cat > "${SERVICE_FILE}" <<EOF
+[Unit]
+Description=FAPG DAQ sensor node
+Wants=network-online.target
+After=network-online.target
+
+[Service]
+Type=simple
+User=${RUN_USER}
+Group=${RUN_GROUP}
+SupplementaryGroups=dialout
+
+WorkingDirectory=${REPO_DIR}
+EnvironmentFile=${ENV_FILE}
+
+ExecStart=${NODE_BIN}
+
+Restart=always
+RestartSec=5
+
+# Basic hardening. Do not enable PrivateDevices=true because this service needs /dev/ttyUSB0.
+NoNewPrivileges=true
+ProtectHome=true
+ProtectSystem=strict
+ReadWritePaths=${STATE_DIR} ${RUN_DIR}
+
+[Install]
+WantedBy=multi-user.target
+EOF
+
+ chmod 0644 "$SERVICE_FILE"
+}
+
+function enable_service() {
+ systemctl daemon-reload
+ systemctl enable --now "${SERVICE_NAME}"
+}
+
+show_status() {
+ cat <<"EOF"
+Deployment complete.
+Current service state:
+EOF
+ systemctl --no-pager --full status "${SERVICE_NAME}"
+}
+
+function main() {
+ require_root
+ install_os_dependencies
+ # install_perl_dependencies
+ create_runtime_user
+ check_repo_layout
+ write_env_file
+ write_systemd_service
+ enable_service
+ show_status
+}
+
+main "$@"
diff --git a/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm b/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm
new file mode 100644
index 0000000..83466d0
--- /dev/null
+++ b/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm
@@ -0,0 +1,362 @@
+package FAPG::DAQ::EZO;
+# -*- mode: perl-ts; -*-
+
+use 5.40.1;
+use strict;
+use warnings;
+
+use Device::SerialPort;
+use Time::HiRes qw(time sleep);
+
+my %SUPPORTED_PROBES = (
+ ph => { unit => 'pH' },
+ do => { unit => 'mg/L' },
+ orp => { unit => 'mV' },
+ ec => { unit => 'uS/cm' },
+);
+
+sub identify_probe {
+ my $device = _find_usb_device();
+ my $port = _open_serial_port($device);
+
+ # Stop continuous mode if enabled.
+ # This makes later "R" commands deterministic.
+ _drain_serial($port, 0.25);
+ _write_ezo_command($port, 'C,0');
+ sleep 0.4;
+ _drain_serial($port, 0.25);
+
+ my @lines = _ezo_command($port, 'I', 2.0);
+
+ my ($raw_type, $firmware, $raw_reply);
+
+ for my $line (@lines) {
+ if ($line =~ /^\?I,([^,\r\n]+),([^,\r\n]+)/i) {
+ $raw_type = $1;
+ $firmware = $2;
+ $raw_reply = $line;
+ last;
+ }
+ }
+
+ die "Could not identify EZO probe.\n"
+ unless defined $raw_type;
+
+ my $type = _normalize_probe_type($raw_type);
+
+ die "Unsupported EZO probe type '$raw_type'.\n"
+ unless exists $SUPPORTED_PROBES{$type};
+
+ return FAPG::DAQ::EZO::Probe->new(
+ device => $device,
+ port => $port,
+ type => $type,
+ raw_type => $raw_type,
+ firmware => $firmware,
+ unit => $SUPPORTED_PROBES{$type}->{unit},
+ raw_reply => $raw_reply,
+ );
+}
+
+sub _find_usb_device {
+ return $ENV{EZO_SERIAL_DEVICE}
+ if defined $ENV{EZO_SERIAL_DEVICE} && length $ENV{EZO_SERIAL_DEVICE};
+
+ my @devices = grep { /UART|FTDI|Atlas/i } glob '/dev/serial/by-id/*';
+
+ die "Found no EZO USB serial device under /dev/serial/by-id/.\n"
+ unless @devices;
+
+ die "Found multiple USB serial devices. Set EZO_SERIAL_DEVICE explicitly.\n"
+ if @devices > 1;
+
+ return $devices[0];
+}
+
+sub _open_serial_port {
+ my ($device) = @_;
+
+ my $port = Device::SerialPort->new($device)
+ or die "Cannot open serial port $device: $!";
+
+ $port->baudrate(9600);
+ $port->databits(8);
+ $port->parity('none');
+ $port->stopbits(1);
+ $port->handshake('none');
+
+ $port->read_const_time(100);
+ $port->read_char_time(0);
+
+ $port->write_settings
+ or die "Cannot apply serial settings to $device.\n";
+
+ return $port;
+}
+
+sub _ezo_command {
+ my ($port, $command, $timeout) = @_;
+
+ _drain_serial($port, 0.05);
+ _write_ezo_command($port, $command);
+
+ return _read_ezo_lines($port, $timeout);
+}
+
+sub _write_ezo_command {
+ my ($port, $command) = @_;
+
+ my $payload = "$command\r";
+ my $written = $port->write($payload);
+
+ die "Could not write EZO command '$command'.\n"
+ unless defined $written && $written == length($payload);
+
+ return 1;
+}
+
+sub _read_ezo_lines {
+ my ($port, $timeout) = @_;
+
+ my $deadline = time + $timeout;
+ my $buffer = '';
+ my @lines;
+
+ while (time < $deadline) {
+ my ($count, $chunk) = $port->read(128);
+
+ if ($count && defined $chunk) {
+ $buffer .= $chunk;
+
+ while ($buffer =~ s/^([^\r]*)\r//) {
+ my $line = $1;
+ $line =~ s/^\s+//;
+ $line =~ s/\s+$//;
+
+ push @lines, $line if length $line;
+ }
+ }
+
+ sleep 0.02;
+ }
+
+ return @lines;
+}
+
+sub _drain_serial {
+ my ($port, $seconds) = @_;
+
+ my $deadline = time + $seconds;
+
+ while (time < $deadline) {
+ my ($count, undef) = $port->read(255);
+ sleep($count ? 0.01 : 0.05);
+ }
+}
+
+sub _normalize_probe_type {
+ my ($raw_type) = @_;
+
+ my $type = lc $raw_type;
+ $type =~ s/[^a-z0-9]//g;
+
+ return $type;
+}
+
+sub _parse_first_number {
+ my ($line) = @_;
+
+ my ($first_field) = split /,/, $line;
+
+ return undef unless defined $first_field;
+
+ $first_field =~ s/^\s+//;
+ $first_field =~ s/\s+$//;
+
+ return undef
+ unless $first_field =~ /^([+-]?(?:\d+(?:\.\d*)?|\.\d+))$/;
+
+ return 0 + $1;
+}
+
+1;
+
+package FAPG::DAQ::EZO::Probe;
+# -*- mode: perl-ts; -*-
+
+use 5.40.1;
+use strict;
+use warnings;
+
+sub new {
+ my ($class, %args) = @_;
+ return bless \%args, $class;
+}
+
+sub device {
+ my ($self) = @_;
+ return $self->{device};
+}
+
+sub port {
+ my ($self) = @_;
+ return $self->{port};
+}
+
+sub type {
+ my ($self) = @_;
+ return $self->{type};
+}
+
+sub raw_type {
+ my ($self) = @_;
+ return $self->{raw_type};
+}
+
+sub firmware {
+ my ($self) = @_;
+ return $self->{firmware};
+}
+
+sub unit {
+ my ($self) = @_;
+ return $self->{unit};
+}
+
+sub get_reading {
+ my ($self) = @_;
+
+ my @lines = FAPG::DAQ::EZO::_ezo_command($self->port, 'R', 3.0);
+
+ for my $line (@lines) {
+ die "EZO probe returned error while reading: $line\n"
+ if $line =~ /^\*ER/i;
+
+ next if $line =~ /^\*/;
+ next if $line =~ /^\?/;
+
+ my $value = FAPG::DAQ::EZO::_parse_first_number($line);
+
+ next unless defined $value;
+
+ return FAPG::DAQ::EZO::Reading->new(
+ probe => $self->type,
+ value => $value,
+ unit => $self->unit,
+ device => $self->device,
+ raw_reply => $line,
+ );
+ }
+
+ die "Could not parse EZO reading from response: " . join(' | ', @lines) . "\n";
+}
+
+1;
+
+package FAPG::DAQ::EZO::Reading;
+# -*- mode: perl-ts; -*-
+
+use 5.40.1;
+use strict;
+use warnings;
+
+use JSON::PP qw(encode_json);
+use Net::MQTT::Simple;
+use POSIX qw(strftime);
+use Sys::Hostname qw(hostname);
+
+sub new {
+ my ($class, %args) = @_;
+
+ $args{timestamp} //= _utc_timestamp();
+ $args{node} //= $ENV{DAQ_NODE} || hostname();
+
+ return bless \%args, $class;
+}
+
+sub probe {
+ my ($self) = @_;
+ return $self->{probe};
+}
+
+sub value {
+ my ($self) = @_;
+ return $self->{value};
+}
+
+sub unit {
+ my ($self) = @_;
+ return $self->{unit};
+}
+
+sub node {
+ my ($self) = @_;
+ return $self->{node};
+}
+
+sub timestamp {
+ my ($self) = @_;
+ return $self->{timestamp};
+}
+
+sub topic {
+ my ($self) = @_;
+
+ return join '/',
+ 'fapg',
+ 'daq',
+ $self->probe,
+ $self->node,
+ 'reading';
+}
+
+sub as_hash {
+ my ($self) = @_;
+
+ return {
+ timestamp => $self->timestamp,
+ probe => $self->probe,
+ value => $self->value,
+ unit => $self->unit,
+ node => $self->node,
+ };
+}
+
+sub as_json {
+ my ($self) = @_;
+ return encode_json($self->as_hash);
+}
+
+sub publish_mqtt {
+ my ($self) = @_;
+
+ my $host = $ENV{MQTT_HOST} // 'fapg-daq-five-01';
+ my $port = $ENV{MQTT_PORT} // 1883;
+
+ my $server = "$host:$port";
+
+ my $mqtt = Net::MQTT::Simple->new($server);
+
+ my $username = exists $ENV{MQTT_USERNAME}
+ ? $ENV{MQTT_USERNAME}
+ : 'fapg_zero';
+
+ if (defined $username && length $username) {
+ my $password = $ENV{MQTT_PASSWORD};
+
+ die "MQTT_USERNAME is set but MQTT_PASSWORD is missing.\n"
+ unless defined $password && length $password;
+
+ $mqtt->login($username, $password);
+ }
+
+ $mqtt->publish($self->topic => $self->as_json);
+
+ return 1;
+}
+
+sub _utc_timestamp {
+ return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime);
+}
+
+1;
diff --git a/roles/daq-node/t/01-probe.t b/roles/daq-node/t/01-probe.t
new file mode 100644
index 0000000..8a84da3
--- /dev/null
+++ b/roles/daq-node/t/01-probe.t
@@ -0,0 +1,100 @@
+#!/usr/bin/env perl
+# -*- mode: perl-ts; -*-
+
+use strict;
+use warnings;
+
+use Test::More tests => 3;
+use Time::HiRes qw(usleep);
+
+# Basic Atlas Scientific EZO probe smoke test.
+#
+# Run on a daq-node:
+# prove -v t/probe.t
+#
+# Optional overrides:
+# FAPG_DAQ_SERIAL_PORT=/dev/ttyUSB1 FAPG_DAQ_SERIAL_BAUD=9600 prove -v t/probe.t
+
+my $port = $ENV{FAPG_DAQ_SERIAL_PORT} // $ARGV[0] // '/dev/ttyUSB0';
+my $baud = $ENV{FAPG_DAQ_SERIAL_BAUD} // $ARGV[1] // 9600;
+
+my $serial_module_ok = eval {
+ require Device::SerialPort;
+ Device::SerialPort->import;
+ 1;
+};
+
+BAIL_OUT("Device::SerialPort is required: $@") unless $serial_module_ok;
+BAIL_OUT("Serial port does not exist: $port") unless -e $port;
+
+my $serial = Device::SerialPort->new($port)
+ or BAIL_OUT("Cannot open serial port: $port");
+
+$serial->baudrate($baud) or BAIL_OUT("Cannot set baudrate to $baud");
+$serial->databits(8) or BAIL_OUT('Cannot set databits to 8');
+$serial->parity('none') or BAIL_OUT('Cannot set parity to none');
+$serial->stopbits(1) or BAIL_OUT('Cannot set stopbits to 1');
+
+$serial->read_char_time(0);
+$serial->read_const_time(100);
+
+note("Testing EZO probe on $port at $baud baud");
+
+my $info = ezo_command($serial, 'I', 500_000);
+note("I => " . printable($info));
+
+ok($info ne '', 'probe is reachable');
+like($info, qr/(?:\?I,|EZO|PH|EC|DO|ORP)/i, 'probe is identifiable');
+
+my $reading = ezo_command($serial, 'R', 1_500_000);
+note("R => " . printable($reading));
+
+like(
+ $reading,
+ qr/OK/,
+ 'probe returns a single reading'
+);
+
+sub ezo_command {
+ my ($serial, $command, $wait_us) = @_;
+
+ drain_serial($serial);
+
+ my $written = $serial->write("$command\r");
+ return '' unless defined $written && $written > 0;
+
+ usleep($wait_us);
+
+ my $reply = '';
+ while (1) {
+ my ($count, $buffer) = $serial->read(255);
+ last unless $count;
+ $reply .= $buffer;
+ }
+
+ return clean_reply($reply);
+}
+
+sub drain_serial {
+ my ($serial) = @_;
+
+ while (1) {
+ my ($count, undef) = $serial->read(255);
+ last unless $count;
+ }
+}
+
+sub clean_reply {
+ my ($reply) = @_;
+
+ $reply =~ s/\r/\n/g;
+ $reply =~ s/\n+/\n/g;
+ $reply =~ s/^\n|\n$//g;
+
+ return $reply;
+}
+
+sub printable {
+ my ($value) = @_;
+ return $value eq '' ? '<no response>' : $value;
+}
Copyright 2019--2026 Marius PETER