Tomcat 本身并不直接提供 JMS 远程监控功能,但您可以通过以下方式实现 JMS 监控:
catalina.sh/catalina.bat 中添加:
# Linux (catalina.sh)
CATALINA_OPTS="$CATALINA_OPTS -Dcom.sun.management.jmxremote"
CATALINA_OPTS="$CATALINA_OPTS -Dcom.sun.management.jmxremote.port=9999"
CATALINA_OPTS="$CATALINA_OPTS -Dcom.sun.management.jmxremote.ssl=false"
CATALINA_OPTS="$CATALINA_OPTS -Dcom.sun.management.jmxremote.authenticate=false"
CATALINA_OPTS="$CATALINA_OPTS -Djava.rmi.server.hostname=your_server_ip"
# Windows (catalina.bat)
set CATALINA_OPTS=%CATALINA_OPTS% -Dcom.sun.management.jmxremote
set CATALINA_OPTS=%CATALINA_OPTS% -Dcom.sun.management.jmxremote.port=9999
set CATALINA_OPTS=%CATALINA_OPTS% -Dcom.sun.management.jmxremote.ssl=false
set CATALINA_OPTS=%CATALINA_OPTS% -Dcom.sun.management.jmxremote.authenticate=false
// 通过 JMX 监控 ActiveMQ 示例
public class JMSJMXMonitor {
public static void main(String[] args) throws Exception {
JMXServiceURL url = new JMXServiceURL(
"service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi"
);
JMXConnector connector = JMXConnectorFactory.connect(url);
MBeanServerConnection connection = connector.getMBeanServerConnection();
// 查询 JMS 相关的 MBean
Set<ObjectName> mbeans = connection.queryNames(
new ObjectName("org.apache.activemq:*"), null
);
for (ObjectName name : mbeans) {
System.out.println("Found MBean: " + name);
}
}
}
web.xml 配置:
<servlet>
<servlet-name>JMSMonitorServlet</servlet-name>
<servlet-class>com.example.JMSMonitorServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>JMSMonitorServlet</servlet-name>
<url-pattern>/jms-monitor/*</url-pattern>
</servlet-mapping>
JMSMonitorServlet.java:
@WebServlet("/jms-monitor/*")
public class JMSMonitorServlet extends HttpServlet {
// ActiveMQ 连接工厂
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
@Override
public void init() throws ServletException {
try {
// 初始化 JMS 连接
String brokerURL = "tcp://localhost:61616";
connectionFactory = new ActiveMQConnectionFactory(brokerURL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
throw new ServletException("JMS 初始化失败", e);
}
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setContentType("application/json");
resp.setCharacterEncoding("UTF-8");
PrintWriter out = resp.getWriter();
JSONObject result = new JSONObject();
try {
String path = req.getPathInfo();
if ("/queues".equals(path)) {
result = monitorQueues();
} else if ("/topics".equals(path)) {
result = monitorTopics();
} else if ("/stats".equals(path)) {
result = getJMSStatistics();
} else {
result.put("status", "error");
result.put("message", "无效的监控路径");
}
} catch (Exception e) {
result.put("status", "error");
result.put("message", e.getMessage());
}
out.print(result.toString());
out.flush();
}
private JSONObject monitorQueues() throws JMSException {
JSONObject queues = new JSONObject();
// 使用 JMX 或直接查询 Broker
DestinationSource destinationSource =
((ActiveMQConnection) connection).getDestinationSource();
Set<ActiveMQQueue> queueSet = destinationSource.getQueues();
for (ActiveMQQueue queue : queueSet) {
JSONObject queueInfo = new JSONObject();
queueInfo.put("name", queue.getQueueName());
queueInfo.put("size", getQueueSize(queue));
queueInfo.put("consumerCount", getConsumerCount(queue));
queues.put(queue.getQueueName(), queueInfo);
}
return queues;
}
private int getQueueSize(ActiveMQQueue queue) throws JMSException {
// 创建临时消费者获取消息数量
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> messages = browser.getEnumeration();
int count = 0;
while (messages.hasMoreElements()) {
messages.nextElement();
count++;
}
browser.close();
return count;
}
private JSONObject getJMSStatistics() throws Exception {
JSONObject stats = new JSONObject();
// 使用 JMX 获取统计信息
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName brokerName = new ObjectName(
"org.apache.activemq:type=Broker,brokerName=localhost"
);
// 获取总消息数
Long totalMessageCount = (Long) mbeanServer.getAttribute(
brokerName, "TotalMessageCount"
);
stats.put("totalMessages", totalMessageCount);
// 获取总消费者数
Integer totalConsumerCount = (Integer) mbeanServer.getAttribute(
brokerName, "TotalConsumerCount"
);
stats.put("totalConsumers", totalConsumerCount);
return stats;
}
@Override
public void destroy() {
try {
if (session != null) session.close();
if (connection != null) connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
JMSHealthIndicator.java:
@Component
public class JMSHealthIndicator implements HealthIndicator {
@Autowired
private JmsTemplate jmsTemplate;
@Override
public Health health() {
try {
// 测试 JMS 连接
ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 检查默认队列
Queue queue = session.createQueue("TEST.MONITOR");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Health check");
producer.send(message);
producer.close();
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
consumer.close();
session.close();
connection.close();
if (received != null) {
return Health.up()
.withDetail("jms", "ActiveMQ is running")
.withDetail("broker", "tcp://localhost:61616")
.build();
} else {
return Health.down()
.withDetail("jms", "No response from broker")
.build();
}
} catch (Exception e) {
return Health.down()
.withDetail("jms", "Connection failed: " + e.getMessage())
.build();
}
}
}
monitor.html:
<!DOCTYPE html>
<html>
<head>
<title>JMS 监控面板</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
.dashboard {
display: grid;
grid-template-columns: repeat(3, 1fr);
gap: 20px;
padding: 20px;
}
.card {
border: 1px solid #ddd;
border-radius: 8px;
padding: 15px;
background: white;
}
</style>
</head>
<body>
<div class="dashboard">
<div class="card">
<h3>队列监控</h3>
<div id="queues"></div>
</div>
<div class="card">
<h3>主题监控</h3>
<div id="topics"></div>
</div>
<div class="card">
<h3>消息统计</h3>
<canvas id="messageChart" width="200" height="200"></canvas>
</div>
</div>
<script>
// 定时获取监控数据
function fetchMonitoringData() {
fetch('/your-app/jms-monitor/stats')
.then(response => response.json())
.then(data => updateDashboard(data));
}
function updateDashboard(data) {
// 更新队列信息
let queuesDiv = document.getElementById('queues');
queuesDiv.innerHTML = '';
for (let queue in data.queues) {
let queueInfo = data.queues[queue];
queuesDiv.innerHTML += `
<div>
<strong>${queue}:</strong>
消息数: ${queueInfo.size} |
消费者: ${queueInfo.consumerCount}
</div>
`;
}
// 更新图表
updateChart(data.messageStats);
}
// 每5秒更新一次
setInterval(fetchMonitoringData, 5000);
fetchMonitoringData();
</script>
</body>
</html>
JMS 监控安全过滤器:
@WebFilter("/jms-monitor/*")
public class MonitorSecurityFilter implements Filter {
private static final String AUTH_TOKEN = "your-secret-token";
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
String token = httpRequest.getHeader("X-Monitor-Token");
if (!AUTH_TOKEN.equals(token)) {
httpResponse.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
httpResponse.getWriter().write("Unauthorized access");
return;
}
chain.doFilter(request, response);
}
}
根据您使用的具体 JMS 实现(ActiveMQ、RabbitMQ、HornetQ 等),监控方式和 MBean 名称可能会有所不同。