summaryrefslogtreecommitdiff
path: root/roles/daq-node/lib
diff options
context:
space:
mode:
Diffstat (limited to 'roles/daq-node/lib')
-rw-r--r--roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm362
1 files changed, 362 insertions, 0 deletions
diff --git a/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm b/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm
new file mode 100644
index 0000000..83466d0
--- /dev/null
+++ b/roles/daq-node/lib/perl5/FAPG/DAQ/EZO.pm
@@ -0,0 +1,362 @@
+package FAPG::DAQ::EZO;
+# -*- mode: perl-ts; -*-
+
+use 5.40.1;
+use strict;
+use warnings;
+
+use Device::SerialPort;
+use Time::HiRes qw(time sleep);
+
+my %SUPPORTED_PROBES = (
+ ph => { unit => 'pH' },
+ do => { unit => 'mg/L' },
+ orp => { unit => 'mV' },
+ ec => { unit => 'uS/cm' },
+);
+
+sub identify_probe {
+ my $device = _find_usb_device();
+ my $port = _open_serial_port($device);
+
+ # Stop continuous mode if enabled.
+ # This makes later "R" commands deterministic.
+ _drain_serial($port, 0.25);
+ _write_ezo_command($port, 'C,0');
+ sleep 0.4;
+ _drain_serial($port, 0.25);
+
+ my @lines = _ezo_command($port, 'I', 2.0);
+
+ my ($raw_type, $firmware, $raw_reply);
+
+ for my $line (@lines) {
+ if ($line =~ /^\?I,([^,\r\n]+),([^,\r\n]+)/i) {
+ $raw_type = $1;
+ $firmware = $2;
+ $raw_reply = $line;
+ last;
+ }
+ }
+
+ die "Could not identify EZO probe.\n"
+ unless defined $raw_type;
+
+ my $type = _normalize_probe_type($raw_type);
+
+ die "Unsupported EZO probe type '$raw_type'.\n"
+ unless exists $SUPPORTED_PROBES{$type};
+
+ return FAPG::DAQ::EZO::Probe->new(
+ device => $device,
+ port => $port,
+ type => $type,
+ raw_type => $raw_type,
+ firmware => $firmware,
+ unit => $SUPPORTED_PROBES{$type}->{unit},
+ raw_reply => $raw_reply,
+ );
+}
+
+sub _find_usb_device {
+ return $ENV{EZO_SERIAL_DEVICE}
+ if defined $ENV{EZO_SERIAL_DEVICE} && length $ENV{EZO_SERIAL_DEVICE};
+
+ my @devices = grep { /UART|FTDI|Atlas/i } glob '/dev/serial/by-id/*';
+
+ die "Found no EZO USB serial device under /dev/serial/by-id/.\n"
+ unless @devices;
+
+ die "Found multiple USB serial devices. Set EZO_SERIAL_DEVICE explicitly.\n"
+ if @devices > 1;
+
+ return $devices[0];
+}
+
+sub _open_serial_port {
+ my ($device) = @_;
+
+ my $port = Device::SerialPort->new($device)
+ or die "Cannot open serial port $device: $!";
+
+ $port->baudrate(9600);
+ $port->databits(8);
+ $port->parity('none');
+ $port->stopbits(1);
+ $port->handshake('none');
+
+ $port->read_const_time(100);
+ $port->read_char_time(0);
+
+ $port->write_settings
+ or die "Cannot apply serial settings to $device.\n";
+
+ return $port;
+}
+
+sub _ezo_command {
+ my ($port, $command, $timeout) = @_;
+
+ _drain_serial($port, 0.05);
+ _write_ezo_command($port, $command);
+
+ return _read_ezo_lines($port, $timeout);
+}
+
+sub _write_ezo_command {
+ my ($port, $command) = @_;
+
+ my $payload = "$command\r";
+ my $written = $port->write($payload);
+
+ die "Could not write EZO command '$command'.\n"
+ unless defined $written && $written == length($payload);
+
+ return 1;
+}
+
+sub _read_ezo_lines {
+ my ($port, $timeout) = @_;
+
+ my $deadline = time + $timeout;
+ my $buffer = '';
+ my @lines;
+
+ while (time < $deadline) {
+ my ($count, $chunk) = $port->read(128);
+
+ if ($count && defined $chunk) {
+ $buffer .= $chunk;
+
+ while ($buffer =~ s/^([^\r]*)\r//) {
+ my $line = $1;
+ $line =~ s/^\s+//;
+ $line =~ s/\s+$//;
+
+ push @lines, $line if length $line;
+ }
+ }
+
+ sleep 0.02;
+ }
+
+ return @lines;
+}
+
+sub _drain_serial {
+ my ($port, $seconds) = @_;
+
+ my $deadline = time + $seconds;
+
+ while (time < $deadline) {
+ my ($count, undef) = $port->read(255);
+ sleep($count ? 0.01 : 0.05);
+ }
+}
+
+sub _normalize_probe_type {
+ my ($raw_type) = @_;
+
+ my $type = lc $raw_type;
+ $type =~ s/[^a-z0-9]//g;
+
+ return $type;
+}
+
+sub _parse_first_number {
+ my ($line) = @_;
+
+ my ($first_field) = split /,/, $line;
+
+ return undef unless defined $first_field;
+
+ $first_field =~ s/^\s+//;
+ $first_field =~ s/\s+$//;
+
+ return undef
+ unless $first_field =~ /^([+-]?(?:\d+(?:\.\d*)?|\.\d+))$/;
+
+ return 0 + $1;
+}
+
+1;
+
+package FAPG::DAQ::EZO::Probe;
+# -*- mode: perl-ts; -*-
+
+use 5.40.1;
+use strict;
+use warnings;
+
+sub new {
+ my ($class, %args) = @_;
+ return bless \%args, $class;
+}
+
+sub device {
+ my ($self) = @_;
+ return $self->{device};
+}
+
+sub port {
+ my ($self) = @_;
+ return $self->{port};
+}
+
+sub type {
+ my ($self) = @_;
+ return $self->{type};
+}
+
+sub raw_type {
+ my ($self) = @_;
+ return $self->{raw_type};
+}
+
+sub firmware {
+ my ($self) = @_;
+ return $self->{firmware};
+}
+
+sub unit {
+ my ($self) = @_;
+ return $self->{unit};
+}
+
+sub get_reading {
+ my ($self) = @_;
+
+ my @lines = FAPG::DAQ::EZO::_ezo_command($self->port, 'R', 3.0);
+
+ for my $line (@lines) {
+ die "EZO probe returned error while reading: $line\n"
+ if $line =~ /^\*ER/i;
+
+ next if $line =~ /^\*/;
+ next if $line =~ /^\?/;
+
+ my $value = FAPG::DAQ::EZO::_parse_first_number($line);
+
+ next unless defined $value;
+
+ return FAPG::DAQ::EZO::Reading->new(
+ probe => $self->type,
+ value => $value,
+ unit => $self->unit,
+ device => $self->device,
+ raw_reply => $line,
+ );
+ }
+
+ die "Could not parse EZO reading from response: " . join(' | ', @lines) . "\n";
+}
+
+1;
+
+package FAPG::DAQ::EZO::Reading;
+# -*- mode: perl-ts; -*-
+
+use 5.40.1;
+use strict;
+use warnings;
+
+use JSON::PP qw(encode_json);
+use Net::MQTT::Simple;
+use POSIX qw(strftime);
+use Sys::Hostname qw(hostname);
+
+sub new {
+ my ($class, %args) = @_;
+
+ $args{timestamp} //= _utc_timestamp();
+ $args{node} //= $ENV{DAQ_NODE} || hostname();
+
+ return bless \%args, $class;
+}
+
+sub probe {
+ my ($self) = @_;
+ return $self->{probe};
+}
+
+sub value {
+ my ($self) = @_;
+ return $self->{value};
+}
+
+sub unit {
+ my ($self) = @_;
+ return $self->{unit};
+}
+
+sub node {
+ my ($self) = @_;
+ return $self->{node};
+}
+
+sub timestamp {
+ my ($self) = @_;
+ return $self->{timestamp};
+}
+
+sub topic {
+ my ($self) = @_;
+
+ return join '/',
+ 'fapg',
+ 'daq',
+ $self->probe,
+ $self->node,
+ 'reading';
+}
+
+sub as_hash {
+ my ($self) = @_;
+
+ return {
+ timestamp => $self->timestamp,
+ probe => $self->probe,
+ value => $self->value,
+ unit => $self->unit,
+ node => $self->node,
+ };
+}
+
+sub as_json {
+ my ($self) = @_;
+ return encode_json($self->as_hash);
+}
+
+sub publish_mqtt {
+ my ($self) = @_;
+
+ my $host = $ENV{MQTT_HOST} // 'fapg-daq-five-01';
+ my $port = $ENV{MQTT_PORT} // 1883;
+
+ my $server = "$host:$port";
+
+ my $mqtt = Net::MQTT::Simple->new($server);
+
+ my $username = exists $ENV{MQTT_USERNAME}
+ ? $ENV{MQTT_USERNAME}
+ : 'fapg_zero';
+
+ if (defined $username && length $username) {
+ my $password = $ENV{MQTT_PASSWORD};
+
+ die "MQTT_USERNAME is set but MQTT_PASSWORD is missing.\n"
+ unless defined $password && length $password;
+
+ $mqtt->login($username, $password);
+ }
+
+ $mqtt->publish($self->topic => $self->as_json);
+
+ return 1;
+}
+
+sub _utc_timestamp {
+ return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime);
+}
+
+1;
Copyright 2019--2026 Marius PETER