Unverified Commit 8bbc121f authored by Kay Kasemir's avatar Kay Kasemir Committed by GitHub
Browse files

Merge pull request #2265 from ControlSystemStudio/alarmserver_kafkaerrors

Alarm server Kafka error handling
parents 57a04648 85d7562e
Pipeline #113198 passed with stage
in 14 minutes and 10 seconds
......@@ -127,9 +127,12 @@ public class AlarmSystem
/** Heartbeat PV period in milliseconds */
public static final long heartbeat_ms;
/** Nag period in seconds */
/** Nag period in milliseconds */
public static final long nag_period_ms;
/** Connection validation period in seconds */
@Preference public static long connection_check_secs;
/** Disable notify feature */
@Preference public static boolean disable_notify_visible;
......
......@@ -92,6 +92,14 @@ heartbeat_secs=10
# Set to 0 to disable
nag_period=00:15:00
# Connection validation period in seconds
#
# Server will check the Kafka connection at this period.
# After re-establishing the connection, it will
# re-send the state of every alarm tree item.
# Set to 0 to disable.
connection_check_secs=5
# To turn on disable notifications feature, set the value to true
disable_notify_visible=false
......
/*******************************************************************************
* Copyright (c) 2018-2019 Oak Ridge National Laboratory.
* Copyright (c) 2018-2022 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
......@@ -79,6 +79,8 @@ class AddComponentAction extends MenuItem
layout.add(message, 1, 2);
// Update hint as user types or selects type
types.selectedToggleProperty().addListener((prop, old, value) -> checkName(name.getText()));
name.textProperty().addListener( (prop, old, value) -> checkName(value));
setTitle("Add Component to " + parent.getPathName());
......@@ -125,7 +127,7 @@ class AddComponentAction extends MenuItem
message.setText("Adding " + names.size() + " PVs");
}
else
message.setText("");
message.setText("Add name of new Node");
}
public boolean isPV()
......@@ -152,7 +154,7 @@ class AddComponentAction extends MenuItem
final AddComponentDialog dialog = new AddComponentDialog(parent);
DialogHelper.positionDialog(dialog, node, -100, -50);
final String new_name = dialog.showAndWait().orElse(null);
if (new_name == null || new_name.isEmpty())
if (new_name == null || new_name.isBlank())
return;
// Add in background thread
......@@ -188,8 +190,17 @@ class AddComponentAction extends MenuItem
}
}
else
if (! haveExistingItem(node, parent, new_name))
model.addComponent(parent.getPathName(), new_name);
{
// Dialog allows entering several space- or line-separated names
// to support a list of PVs.
// For components, squash that into one new, trimmed name
final String comp_name = new_name.replace('\r', ' ')
.replace('\n', ' ')
.replaceAll(" +", " ")
.trim();
if (! haveExistingItem(node, parent, comp_name))
model.addComponent(parent.getPathName(), comp_name);
}
});
});
}
......
......@@ -20,7 +20,6 @@ import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.prefs.Preferences;
import org.phoebus.applications.alarm.AlarmSystem;
import org.phoebus.applications.alarm.client.ClientState;
import org.phoebus.applications.alarm.model.AlarmTreeItem;
......@@ -66,6 +65,7 @@ public class AlarmServerMain implements ServerModelListener
"\tmode - Show mode.\n" +
"\tmode normal - Select normal mode.\n" +
"\tmode maintenance - Select maintenance mode.\n" +
"\tresend - Re-send all PV states to clients (for tests after network issues).\n" +
"\trestart - Re-load alarm configuration and restart.\n" +
"\tshutdown - Shut alarm server down and exit.\n";
......@@ -141,6 +141,8 @@ public class AlarmServerMain implements ServerModelListener
restart.offer(false);
else if (args[0].equals("restart"))
restart.offer(true);
else if (args[0].equals("resend"))
model.resend(model.getRoot());
else if (args[0].equals("mode"))
System.out.println(AlarmLogic.getMaintenanceMode() ? "Maintenance mode" : "Normal mode");
else if (args[0].startsWith("h"))
......
/*******************************************************************************
* Copyright (c) 2018-2021 Oak Ridge National Laboratory.
* Copyright (c) 2018-2022 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
......@@ -10,12 +10,12 @@ package org.phoebus.applications.alarm.server;
import static org.phoebus.applications.alarm.AlarmSystem.logger;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.time.LocalDateTime;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
......@@ -26,8 +26,8 @@ import org.phoebus.applications.alarm.AlarmSystem;
import org.phoebus.applications.alarm.client.AlarmClientNode;
import org.phoebus.applications.alarm.client.ClientState;
import org.phoebus.applications.alarm.client.KafkaHelper;
import org.phoebus.applications.alarm.model.AlarmTreeItem;
import org.phoebus.applications.alarm.model.AlarmState;
import org.phoebus.applications.alarm.model.AlarmTreeItem;
import org.phoebus.applications.alarm.model.AlarmTreePath;
import org.phoebus.applications.alarm.model.BasicState;
import org.phoebus.applications.alarm.model.SeverityLevel;
......@@ -70,6 +70,13 @@ class ServerModel
private long last_state_update = 0;
private long last_annunciation = 0;
/** Time of last connectivity check */
private long last_connection_check = System.currentTimeMillis();
/** Did the last connectivity check fail? */
private boolean connection_lost = false;
/** @param kafka_servers Servers
* @param config_name Name of alarm tree root
* @param initial_states
......@@ -117,7 +124,12 @@ class ServerModel
return root;
}
/** Background thread loop that checks for alarm tree updates */
/** Background thread
*
* <p>Checks for alarm tree updates,
* emits idle or nag messages,
* validates connection
*/
private void run()
{
try
......@@ -128,6 +140,7 @@ class ServerModel
final long now = System.currentTimeMillis();
checkIdle(now);
checkNag(now);
checkConnectivity(now);
}
}
catch (Throwable ex)
......@@ -142,10 +155,64 @@ class ServerModel
}
}
/** Periodically check for Kafka connectivity
* @param now Current millisec
*/
private void checkConnectivity(final long now)
{
if (AlarmSystem.connection_check_secs < 0 ||
(now - last_connection_check) < AlarmSystem.connection_check_secs*1000)
return;
boolean connected = false;
try
{
// There is no consumer.isConnected() type of API?
// https://stackoverflow.com/questions/38103198/how-to-check-kafka-consumer-state
// suggest calling listTopics with timeout
logger.log(Level.FINE, "Testing Kafka connectitity");
consumer.listTopics(Duration.ofSeconds(1));
connected = true;
}
catch (Throwable ex)
{
logger.log(Level.FINE, "No Kafka connectitity", ex);
}
// While disconnected, the Kafka API still allows sending messages
// but silently drops them, so clients will get out of sync,
// and since Kafka is down, it won't track the most recent alarm state
// for future clients...
if (connected == false && connection_lost == false)
logger.log(Level.WARNING, "Lost Kafka connectitity");
else if (connected && connection_lost)
{
logger.log(Level.WARNING, "Regained Kafka connectitity");
// Update Kafka and thus clients with current state
// as soon as connectivity is restored
resend(getRoot());
sendAnnunciatorMessage(root.getPathName(), SeverityLevel.OK, "* Alarm server re-connected");
}
connection_lost = ! connected;
last_connection_check = now;
}
/** Perform one check for updates */
private void checkUpdates()
{
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
final ConsumerRecords<String, String> records;
try
{
records = consumer.poll(Duration.ofMillis(100));
}
catch (Throwable ex)
{
// This typically doesn't happen, 'poll' will simply not return any new
// records while disconnected from Kafka...
logger.log(Level.WARNING, "Error reading updates from Kafka", ex);
return;
}
for (ConsumerRecord<String, String> record : records)
{
final int sep = record.key().indexOf(':');
......@@ -211,7 +278,7 @@ class ServerModel
// before the PV connects
pv.getParent().maximizeSeverity();
pv.start();
//check if using past disabled date
LocalDateTime enabled_date = pv.getEnabledDate();
if (enabled_date != null && enabled_date.isBefore(LocalDateTime.now())) {
......@@ -409,7 +476,7 @@ class ServerModel
}
}
/** Send alarm update to 'config' topic
/** Send alarm update to 'config' topic
* @param path Path of item that has a new state
* @param new_state That new state
*/
......@@ -448,6 +515,30 @@ class ServerModel
}
}
/** Re-send current state (after e.g. network trouble)
* @param node Node from which on to send state update, recursively
*/
public void resend(final AlarmTreeItem<?> node)
{
final BasicState state;
if (node instanceof AlarmServerPV)
{
final AlarmServerPV pv = (AlarmServerPV) node;
final AlarmState current = pv.getCurrentState();
state = new ClientState(pv.getState(), current.getSeverity(), current.getMessage());
sendStateUpdate(pv.getPathName(), state);
}
else
state = node.getState();
logger.log(Level.INFO, "Resend state:" + node.getPathName() + ": " + state);
sendStateUpdate(node.getPathName(), state);
for (AlarmTreeItem<?> child : node.getChildren())
resend(child);
}
/** Check if 'idle' message should be sent since there were no state updates
* @param now Current millisec
*/
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment