diff options
| -rwxr-xr-x | roles/daq-node/bin/fapg-daq-node | 35 | ||||
| -rwxr-xr-x | roles/daq-node/deploy | 190 | ||||
| -rw-r--r-- | roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm | 362 | ||||
| -rw-r--r-- | roles/daq-node/t/01-probe.t | 100 |
4 files changed, 687 insertions, 0 deletions
diff --git a/roles/daq-node/bin/fapg-daq-node b/roles/daq-node/bin/fapg-daq-node new file mode 100755 index 0000000..55a6d59 --- /dev/null +++ b/roles/daq-node/bin/fapg-daq-node @@ -0,0 +1,35 @@ +#!/usr/bin/env perl +# -*- mode: perl-ts; -*- + +use 5.40.1; +use strict; +use warnings; + +use FindBin; +use lib "${FindBin::Bin}/../lib/perl5"; + +use FAPG::DAQ::EZO; + +=head1 NAME + +fapg-daq - read sensor data and send over MQTT + +=cut + +my $probe = FAPG::DAQ::EZO::identify_probe(); + +print "Found probe " + . $probe->type + . " on " + . $probe->device + . ".\n"; + +while (1) { + my $reading = $probe->get_reading(); + + print "Reading: " . $reading->value . " " . $reading->unit . "\n"; + + $reading->publish_mqtt(); + + sleep 5; +} diff --git a/roles/daq-node/deploy b/roles/daq-node/deploy new file mode 100755 index 0000000..1fa1a83 --- /dev/null +++ b/roles/daq-node/deploy @@ -0,0 +1,190 @@ +#!/usr/bin/env bash + +set -eEuo pipefail + +readonly REPO_DIR="/opt/fapg/fapg-daq" +readonly NODE_BIN="${REPO_DIR}/roles/sensor-node/bin/fapg-daq-node" + +readonly SERVICE_NAME="fapg-daq-node.service" +readonly SERVICE_FILE="/etc/systemd/system/${SERVICE_NAME}" + +readonly ENV_DIR="/etc/fapg-daq" +readonly ENV_FILE="${ENV_DIR}/daq.conf" + +readonly STATE_DIR="/var/lib/fapg-daq" +readonly RUN_DIR="/run/fapg-daq" + +readonly RUN_USER='fapg-daq' +readonly RUN_GROUP='fapg-daq' + +readonly SERIAL_DEVICE='/dev/ttyUSB0' +readonly READ_INTERVAL=5 +readonly MQTT_HOST='fapg-daq-five-01' +readonly MQTT_PORT='1883' +readonly MQTT_USERNAME='fapg_zero' +MQTT_PASSWORD='' + +function die() { + echo "ERROR: $*" >&2 + exit 1 +} + +function require_root() { + if [[ "$(id -u)" -ne 0 ]]; then + die "$0 requires superuser privileges." + fi +} + +function install_os_dependencies() { + local dependencies='perl libnet-mqtt-simple-perl libdevice-serialport-perl' + local missing_dependencies='' + + for dep in ${dependencies}; do + if dpkg --get-selections "${dep}" | grep -w install > /dev/null 2>&1; then + echo "${dep} is already installed." + else + echo "${dep} is not installed." + missing_dependencies+=" ${dep}" + fi + done + + if [[ -n "${missing_dependencies}" ]]; then + apt-get install -y ${missing_dependencies} + fi + + echo "Installed all OS dependencies." +} + +# function install_perl_dependencies() { +# local dependencies="Net::MQTT::Simple" +# for dep in ${dependencies}; do +# if ! perl -M"${dep}" -e '1' > /dev/null 2>&1; then +# cpanm "${dep}" +# fi +# done + +# echo "Installed all Perl dependencies." +# } + +create_runtime_user() { + if ! getent group "${RUN_GROUP}" > /dev/null; then + addgroup --system "${RUN_GROUP}" + fi + + if ! id "${RUN_USER}" > /dev/null 2>&1; then + adduser \ + --system \ + --ingroup "${RUN_GROUP}" \ + --home "${STATE_DIR}" \ + --no-create-home \ + --disabled-login \ + "${RUN_USER}" + fi + + # USB serial devices are normally accessible to members of dialout. + usermod -aG dialout "${RUN_USER}" + + echo "Created runtime user ${RUN_USER}, group ${RUN_GROUP}." +} + +check_repo_layout() { + [[ -d "${REPO_DIR}" ]] || die "Repo not found at ${REPO_DIR}." + [[ -x "${NODE_BIN}" ]] || die "${NODE_BIN} does not exist or is not executable." + + mkdir -p "${STATE_DIR}" "${RUN_DIR}" + chown "${RUN_USER}:${RUN_GROUP}" "${STATE_DIR}" "${RUN_DIR}" + + echo "Verified layout of repo at ${REPO_DIR}." +} + +function write_env_file() { + mkdir -p "${ENV_DIR}" + + # TODO: print diff of existing and proposed env file + if [[ -f "${ENV_FILE}" ]]; then + echo "${ENV_FILE} already exists, remove or edit manually if necessary." + return + fi + + if [[ -z "${MQTT_PASSWORD:-}" ]]; then + read -rsp "MQTT password for user '${MQTT_USERNAME}': " MQTT_PASSWORD + fi + + [[ -n "${MQTT_PASSWORD:-}" ]] || die "MQTT_PASSWORD is required." + + # TODO: unsafe! + cat > "${ENV_FILE}" <<EOF +MQTT_SIMPLE_ALLOW_INSECURE_LOGIN=1 + +SERIAL_DEVICE="${SERIAL_DEVICE}" +READ_INTERVAL=${READ_INTERVAL} +MQTT_HOST="${MQTT_HOST}" +MQTT_PORT="${MQTT_PORT}" +MQTT_USERNAME="${MQTT_USERNAME}" +MQTT_PASSWORD="${MQTT_PASSWORD}" +EOF + + chown root:"${RUN_GROUP}" "${ENV_FILE}" + chmod 0640 "${ENV_FILE}" +} + +function write_systemd_service() { + cat > "${SERVICE_FILE}" <<EOF +[Unit] +Description=FAPG DAQ sensor node +Wants=network-online.target +After=network-online.target + +[Service] +Type=simple +User=${RUN_USER} +Group=${RUN_GROUP} +SupplementaryGroups=dialout + +WorkingDirectory=${REPO_DIR} +EnvironmentFile=${ENV_FILE} + +ExecStart=${NODE_BIN} + +Restart=always +RestartSec=5 + +# Basic hardening. Do not enable PrivateDevices=true because this service needs /dev/ttyUSB0. +NoNewPrivileges=true +ProtectHome=true +ProtectSystem=strict +ReadWritePaths=${STATE_DIR} ${RUN_DIR} + +[Install] +WantedBy=multi-user.target +EOF + + chmod 0644 "$SERVICE_FILE" +} + +function enable_service() { + systemctl daemon-reload + systemctl enable --now "${SERVICE_NAME}" +} + +show_status() { + cat <<"EOF" +Deployment complete. +Current service state: +EOF + systemctl --no-pager --full status "${SERVICE_NAME}" +} + +function main() { + require_root + install_os_dependencies + # install_perl_dependencies + create_runtime_user + check_repo_layout + write_env_file + write_systemd_service + enable_service + show_status +} + +main "$@" diff --git a/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm b/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm new file mode 100644 index 0000000..83466d0 --- /dev/null +++ b/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm @@ -0,0 +1,362 @@ +package FAPG::DAQ::EZO; +# -*- mode: perl-ts; -*- + +use 5.40.1; +use strict; +use warnings; + +use Device::SerialPort; +use Time::HiRes qw(time sleep); + +my %SUPPORTED_PROBES = ( + ph => { unit => 'pH' }, + do => { unit => 'mg/L' }, + orp => { unit => 'mV' }, + ec => { unit => 'uS/cm' }, +); + +sub identify_probe { + my $device = _find_usb_device(); + my $port = _open_serial_port($device); + + # Stop continuous mode if enabled. + # This makes later "R" commands deterministic. + _drain_serial($port, 0.25); + _write_ezo_command($port, 'C,0'); + sleep 0.4; + _drain_serial($port, 0.25); + + my @lines = _ezo_command($port, 'I', 2.0); + + my ($raw_type, $firmware, $raw_reply); + + for my $line (@lines) { + if ($line =~ /^\?I,([^,\r\n]+),([^,\r\n]+)/i) { + $raw_type = $1; + $firmware = $2; + $raw_reply = $line; + last; + } + } + + die "Could not identify EZO probe.\n" + unless defined $raw_type; + + my $type = _normalize_probe_type($raw_type); + + die "Unsupported EZO probe type '$raw_type'.\n" + unless exists $SUPPORTED_PROBES{$type}; + + return FAPG::DAQ::EZO::Probe->new( + device => $device, + port => $port, + type => $type, + raw_type => $raw_type, + firmware => $firmware, + unit => $SUPPORTED_PROBES{$type}->{unit}, + raw_reply => $raw_reply, + ); +} + +sub _find_usb_device { + return $ENV{EZO_SERIAL_DEVICE} + if defined $ENV{EZO_SERIAL_DEVICE} && length $ENV{EZO_SERIAL_DEVICE}; + + my @devices = grep { /UART|FTDI|Atlas/i } glob '/dev/serial/by-id/*'; + + die "Found no EZO USB serial device under /dev/serial/by-id/.\n" + unless @devices; + + die "Found multiple USB serial devices. Set EZO_SERIAL_DEVICE explicitly.\n" + if @devices > 1; + + return $devices[0]; +} + +sub _open_serial_port { + my ($device) = @_; + + my $port = Device::SerialPort->new($device) + or die "Cannot open serial port $device: $!"; + + $port->baudrate(9600); + $port->databits(8); + $port->parity('none'); + $port->stopbits(1); + $port->handshake('none'); + + $port->read_const_time(100); + $port->read_char_time(0); + + $port->write_settings + or die "Cannot apply serial settings to $device.\n"; + + return $port; +} + +sub _ezo_command { + my ($port, $command, $timeout) = @_; + + _drain_serial($port, 0.05); + _write_ezo_command($port, $command); + + return _read_ezo_lines($port, $timeout); +} + +sub _write_ezo_command { + my ($port, $command) = @_; + + my $payload = "$command\r"; + my $written = $port->write($payload); + + die "Could not write EZO command '$command'.\n" + unless defined $written && $written == length($payload); + + return 1; +} + +sub _read_ezo_lines { + my ($port, $timeout) = @_; + + my $deadline = time + $timeout; + my $buffer = ''; + my @lines; + + while (time < $deadline) { + my ($count, $chunk) = $port->read(128); + + if ($count && defined $chunk) { + $buffer .= $chunk; + + while ($buffer =~ s/^([^\r]*)\r//) { + my $line = $1; + $line =~ s/^\s+//; + $line =~ s/\s+$//; + + push @lines, $line if length $line; + } + } + + sleep 0.02; + } + + return @lines; +} + +sub _drain_serial { + my ($port, $seconds) = @_; + + my $deadline = time + $seconds; + + while (time < $deadline) { + my ($count, undef) = $port->read(255); + sleep($count ? 0.01 : 0.05); + } +} + +sub _normalize_probe_type { + my ($raw_type) = @_; + + my $type = lc $raw_type; + $type =~ s/[^a-z0-9]//g; + + return $type; +} + +sub _parse_first_number { + my ($line) = @_; + + my ($first_field) = split /,/, $line; + + return undef unless defined $first_field; + + $first_field =~ s/^\s+//; + $first_field =~ s/\s+$//; + + return undef + unless $first_field =~ /^([+-]?(?:\d+(?:\.\d*)?|\.\d+))$/; + + return 0 + $1; +} + +1; + +package FAPG::DAQ::EZO::Probe; +# -*- mode: perl-ts; -*- + +use 5.40.1; +use strict; +use warnings; + +sub new { + my ($class, %args) = @_; + return bless \%args, $class; +} + +sub device { + my ($self) = @_; + return $self->{device}; +} + +sub port { + my ($self) = @_; + return $self->{port}; +} + +sub type { + my ($self) = @_; + return $self->{type}; +} + +sub raw_type { + my ($self) = @_; + return $self->{raw_type}; +} + +sub firmware { + my ($self) = @_; + return $self->{firmware}; +} + +sub unit { + my ($self) = @_; + return $self->{unit}; +} + +sub get_reading { + my ($self) = @_; + + my @lines = FAPG::DAQ::EZO::_ezo_command($self->port, 'R', 3.0); + + for my $line (@lines) { + die "EZO probe returned error while reading: $line\n" + if $line =~ /^\*ER/i; + + next if $line =~ /^\*/; + next if $line =~ /^\?/; + + my $value = FAPG::DAQ::EZO::_parse_first_number($line); + + next unless defined $value; + + return FAPG::DAQ::EZO::Reading->new( + probe => $self->type, + value => $value, + unit => $self->unit, + device => $self->device, + raw_reply => $line, + ); + } + + die "Could not parse EZO reading from response: " . join(' | ', @lines) . "\n"; +} + +1; + +package FAPG::DAQ::EZO::Reading; +# -*- mode: perl-ts; -*- + +use 5.40.1; +use strict; +use warnings; + +use JSON::PP qw(encode_json); +use Net::MQTT::Simple; +use POSIX qw(strftime); +use Sys::Hostname qw(hostname); + +sub new { + my ($class, %args) = @_; + + $args{timestamp} //= _utc_timestamp(); + $args{node} //= $ENV{DAQ_NODE} || hostname(); + + return bless \%args, $class; +} + +sub probe { + my ($self) = @_; + return $self->{probe}; +} + +sub value { + my ($self) = @_; + return $self->{value}; +} + +sub unit { + my ($self) = @_; + return $self->{unit}; +} + +sub node { + my ($self) = @_; + return $self->{node}; +} + +sub timestamp { + my ($self) = @_; + return $self->{timestamp}; +} + +sub topic { + my ($self) = @_; + + return join '/', + 'fapg', + 'daq', + $self->probe, + $self->node, + 'reading'; +} + +sub as_hash { + my ($self) = @_; + + return { + timestamp => $self->timestamp, + probe => $self->probe, + value => $self->value, + unit => $self->unit, + node => $self->node, + }; +} + +sub as_json { + my ($self) = @_; + return encode_json($self->as_hash); +} + +sub publish_mqtt { + my ($self) = @_; + + my $host = $ENV{MQTT_HOST} // 'fapg-daq-five-01'; + my $port = $ENV{MQTT_PORT} // 1883; + + my $server = "$host:$port"; + + my $mqtt = Net::MQTT::Simple->new($server); + + my $username = exists $ENV{MQTT_USERNAME} + ? $ENV{MQTT_USERNAME} + : 'fapg_zero'; + + if (defined $username && length $username) { + my $password = $ENV{MQTT_PASSWORD}; + + die "MQTT_USERNAME is set but MQTT_PASSWORD is missing.\n" + unless defined $password && length $password; + + $mqtt->login($username, $password); + } + + $mqtt->publish($self->topic => $self->as_json); + + return 1; +} + +sub _utc_timestamp { + return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime); +} + +1; diff --git a/roles/daq-node/t/01-probe.t b/roles/daq-node/t/01-probe.t new file mode 100644 index 0000000..8a84da3 --- /dev/null +++ b/roles/daq-node/t/01-probe.t @@ -0,0 +1,100 @@ +#!/usr/bin/env perl +# -*- mode: perl-ts; -*- + +use strict; +use warnings; + +use Test::More tests => 3; +use Time::HiRes qw(usleep); + +# Basic Atlas Scientific EZO probe smoke test. +# +# Run on a daq-node: +# prove -v t/probe.t +# +# Optional overrides: +# FAPG_DAQ_SERIAL_PORT=/dev/ttyUSB1 FAPG_DAQ_SERIAL_BAUD=9600 prove -v t/probe.t + +my $port = $ENV{FAPG_DAQ_SERIAL_PORT} // $ARGV[0] // '/dev/ttyUSB0'; +my $baud = $ENV{FAPG_DAQ_SERIAL_BAUD} // $ARGV[1] // 9600; + +my $serial_module_ok = eval { + require Device::SerialPort; + Device::SerialPort->import; + 1; +}; + +BAIL_OUT("Device::SerialPort is required: $@") unless $serial_module_ok; +BAIL_OUT("Serial port does not exist: $port") unless -e $port; + +my $serial = Device::SerialPort->new($port) + or BAIL_OUT("Cannot open serial port: $port"); + +$serial->baudrate($baud) or BAIL_OUT("Cannot set baudrate to $baud"); +$serial->databits(8) or BAIL_OUT('Cannot set databits to 8'); +$serial->parity('none') or BAIL_OUT('Cannot set parity to none'); +$serial->stopbits(1) or BAIL_OUT('Cannot set stopbits to 1'); + +$serial->read_char_time(0); +$serial->read_const_time(100); + +note("Testing EZO probe on $port at $baud baud"); + +my $info = ezo_command($serial, 'I', 500_000); +note("I => " . printable($info)); + +ok($info ne '', 'probe is reachable'); +like($info, qr/(?:\?I,|EZO|PH|EC|DO|ORP)/i, 'probe is identifiable'); + +my $reading = ezo_command($serial, 'R', 1_500_000); +note("R => " . printable($reading)); + +like( + $reading, + qr/OK/, + 'probe returns a single reading' +); + +sub ezo_command { + my ($serial, $command, $wait_us) = @_; + + drain_serial($serial); + + my $written = $serial->write("$command\r"); + return '' unless defined $written && $written > 0; + + usleep($wait_us); + + my $reply = ''; + while (1) { + my ($count, $buffer) = $serial->read(255); + last unless $count; + $reply .= $buffer; + } + + return clean_reply($reply); +} + +sub drain_serial { + my ($serial) = @_; + + while (1) { + my ($count, undef) = $serial->read(255); + last unless $count; + } +} + +sub clean_reply { + my ($reply) = @_; + + $reply =~ s/\r/\n/g; + $reply =~ s/\n+/\n/g; + $reply =~ s/^\n|\n$//g; + + return $reply; +} + +sub printable { + my ($value) = @_; + return $value eq '' ? '<no response>' : $value; +} |