1 /* ============================================================
2 * JRobin : Pure java implementation of RRDTool's functionality
3 * ============================================================
4 *
5 * Project Info: http://www.jrobin.org
6 * Project Lead: Sasa Markovic (saxon@jrobin.org);
7 *
8 * (C) Copyright 2003-2005, by Sasa Markovic.
9 *
10 * Developers: Sasa Markovic (saxon@jrobin.org)
11 *
12 *
13 * This library is free software; you can redistribute it and/or modify it under the terms
14 * of the GNU Lesser General Public License as published by the Free Software Foundation;
15 * either version 2.1 of the License, or (at your option) any later version.
16 *
17 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
18 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
19 * See the GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License along with this
22 * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
23 * Boston, MA 02111-1307, USA.
24 */
25
26 package org.jrobin.core;
27
28 import java.io.IOException;
29
30 /**
31 * Class to represent single datasource within RRD. Each datasource object holds the
32 * following information: datasource definition (once set, never changed) and
33 * datasource state variables (changed whenever RRD gets updated).<p>
34 * <p/>
35 * Normally, you don't need to manipluate Datasource objects directly, it's up to
36 * JRobin framework to do it for you.
37 *
38 * @author <a href="mailto:saxon@jrobin.org">Sasa Markovic</a>
39 */
40
41 public class Datasource implements RrdUpdater, DsTypes {
42 private static final double MAX_32_BIT = Math.pow(2, 32);
43 private static final double MAX_64_BIT = Math.pow(2, 64);
44
45 private RrdDb parentDb;
46 // definition
47 private RrdString dsName, dsType;
48 private RrdLong heartbeat;
49 private RrdDouble minValue, maxValue;
50
51 // state variables
52 private RrdDouble lastValue;
53 private RrdLong nanSeconds;
54 private RrdDouble accumValue;
55
56 Datasource(RrdDb parentDb, DsDef dsDef) throws IOException {
57 boolean shouldInitialize = dsDef != null;
58 this.parentDb = parentDb;
59 dsName = new RrdString(this);
60 dsType = new RrdString(this);
61 heartbeat = new RrdLong(this);
62 minValue = new RrdDouble(this);
63 maxValue = new RrdDouble(this);
64 lastValue = new RrdDouble(this);
65 accumValue = new RrdDouble(this);
66 nanSeconds = new RrdLong(this);
67 if (shouldInitialize) {
68 dsName.set(dsDef.getDsName());
69 dsType.set(dsDef.getDsType());
70 heartbeat.set(dsDef.getHeartbeat());
71 minValue.set(dsDef.getMinValue());
72 maxValue.set(dsDef.getMaxValue());
73 lastValue.set(Double.NaN);
74 accumValue.set(0.0);
75 Header header = parentDb.getHeader();
76 nanSeconds.set(header.getLastUpdateTime() % header.getStep());
77 }
78 }
79
80 Datasource(RrdDb parentDb, DataImporter reader, int dsIndex) throws IOException, RrdException {
81 this(parentDb, null);
82 dsName.set(reader.getDsName(dsIndex));
83 dsType.set(reader.getDsType(dsIndex));
84 heartbeat.set(reader.getHeartbeat(dsIndex));
85 minValue.set(reader.getMinValue(dsIndex));
86 maxValue.set(reader.getMaxValue(dsIndex));
87 lastValue.set(reader.getLastValue(dsIndex));
88 accumValue.set(reader.getAccumValue(dsIndex));
89 nanSeconds.set(reader.getNanSeconds(dsIndex));
90 }
91
92 String dump() throws IOException {
93 return "== DATASOURCE ==\n" +
94 "DS:" + dsName.get() + ":" + dsType.get() + ":" +
95 heartbeat.get() + ":" + minValue.get() + ":" +
96 maxValue.get() + "\nlastValue:" + lastValue.get() +
97 " nanSeconds:" + nanSeconds.get() +
98 " accumValue:" + accumValue.get() + "\n";
99 }
100
101 /**
102 * Returns datasource name.
103 *
104 * @return Datasource name
105 * @throws IOException Thrown in case of I/O error
106 */
107 public String getDsName() throws IOException {
108 return dsName.get();
109 }
110
111 /**
112 * Returns datasource type (GAUGE, COUNTER, DERIVE, ABSOLUTE).
113 *
114 * @return Datasource type.
115 * @throws IOException Thrown in case of I/O error
116 */
117 public String getDsType() throws IOException {
118 return dsType.get();
119 }
120
121 /**
122 * Returns datasource heartbeat
123 *
124 * @return Datasource heartbeat
125 * @throws IOException Thrown in case of I/O error
126 */
127
128 public long getHeartbeat() throws IOException {
129 return heartbeat.get();
130 }
131
132 /**
133 * Returns mimimal allowed value for this datasource.
134 *
135 * @return Minimal value allowed.
136 * @throws IOException Thrown in case of I/O error
137 */
138 public double getMinValue() throws IOException {
139 return minValue.get();
140 }
141
142 /**
143 * Returns maximal allowed value for this datasource.
144 *
145 * @return Maximal value allowed.
146 * @throws IOException Thrown in case of I/O error
147 */
148 public double getMaxValue() throws IOException {
149 return maxValue.get();
150 }
151
152 /**
153 * Returns last known value of the datasource.
154 *
155 * @return Last datasource value.
156 * @throws IOException Thrown in case of I/O error
157 */
158 public double getLastValue() throws IOException {
159 return lastValue.get();
160 }
161
162 /**
163 * Returns value this datasource accumulated so far.
164 *
165 * @return Accumulated datasource value.
166 * @throws IOException Thrown in case of I/O error
167 */
168 public double getAccumValue() throws IOException {
169 return accumValue.get();
170 }
171
172 /**
173 * Returns the number of accumulated NaN seconds.
174 *
175 * @return Accumulated NaN seconds.
176 * @throws IOException Thrown in case of I/O error
177 */
178 public long getNanSeconds() throws IOException {
179 return nanSeconds.get();
180 }
181
182 void process(long newTime, double newValue) throws IOException, RrdException {
183 Header header = parentDb.getHeader();
184 long step = header.getStep();
185 long oldTime = header.getLastUpdateTime();
186 long startTime = Util.normalize(oldTime, step);
187 long endTime = startTime + step;
188 double oldValue = lastValue.get();
189 double updateValue = calculateUpdateValue(oldTime, oldValue, newTime, newValue);
190 if (newTime < endTime) {
191 accumulate(oldTime, newTime, updateValue);
192 }
193 else {
194 // should store something
195 long boundaryTime = Util.normalize(newTime, step);
196 accumulate(oldTime, boundaryTime, updateValue);
197 double value = calculateTotal(startTime, boundaryTime);
198 // how many updates?
199 long numSteps = (boundaryTime - endTime) / step + 1L;
200 // ACTION!
201 parentDb.archive(this, value, numSteps);
202 // cleanup
203 nanSeconds.set(0);
204 accumValue.set(0.0);
205 accumulate(boundaryTime, newTime, updateValue);
206 }
207 }
208
209 private double calculateUpdateValue(long oldTime, double oldValue,
210 long newTime, double newValue) throws IOException {
211 double updateValue = Double.NaN;
212 if (newTime - oldTime <= heartbeat.get()) {
213 String type = dsType.get();
214 if (type.equals(DT_GAUGE)) {
215 updateValue = newValue;
216 }
217 else if (type.equals(DT_ABSOLUTE)) {
218 if (!Double.isNaN(newValue)) {
219 updateValue = newValue / (newTime - oldTime);
220 }
221 }
222 else if (type.equals(DT_DERIVE)) {
223 if (!Double.isNaN(newValue) && !Double.isNaN(oldValue)) {
224 updateValue = (newValue - oldValue) / (newTime - oldTime);
225 }
226 }
227 else if (type.equals(DT_COUNTER)) {
228 if (!Double.isNaN(newValue) && !Double.isNaN(oldValue)) {
229 double diff = newValue - oldValue;
230 if (diff < 0) {
231 diff += MAX_32_BIT;
232 }
233 if (diff < 0) {
234 diff += MAX_64_BIT - MAX_32_BIT;
235 }
236 if (diff >= 0) {
237 updateValue = diff / (newTime - oldTime);
238 }
239 }
240 }
241 if (!Double.isNaN(updateValue)) {
242 double minVal = minValue.get();
243 double maxVal = maxValue.get();
244 if (!Double.isNaN(minVal) && updateValue < minVal) {
245 updateValue = Double.NaN;
246 }
247 if (!Double.isNaN(maxVal) && updateValue > maxVal) {
248 updateValue = Double.NaN;
249 }
250 }
251 }
252 lastValue.set(newValue);
253 return updateValue;
254 }
255
256 private void accumulate(long oldTime, long newTime, double updateValue) throws IOException {
257 if (Double.isNaN(updateValue)) {
258 nanSeconds.set(nanSeconds.get() + (newTime - oldTime));
259 }
260 else {
261 accumValue.set(accumValue.get() + updateValue * (newTime - oldTime));
262 }
263 }
264
265 private double calculateTotal(long startTime, long boundaryTime) throws IOException {
266 double totalValue = Double.NaN;
267 long validSeconds = boundaryTime - startTime - nanSeconds.get();
268 if (nanSeconds.get() <= heartbeat.get() && validSeconds > 0) {
269 totalValue = accumValue.get() / validSeconds;
270 }
271 // IMPORTANT:
272 // if datasource name ends with "!", we'll send zeros instead of NaNs
273 // this might be handy from time to time
274 if (Double.isNaN(totalValue) && dsName.get().endsWith(DsDef.FORCE_ZEROS_FOR_NANS_SUFFIX)) {
275 totalValue = 0D;
276 }
277 return totalValue;
278 }
279
280 void appendXml(XmlWriter writer) throws IOException {
281 writer.startTag("ds");
282 writer.writeTag("name", dsName.get());
283 writer.writeTag("type", dsType.get());
284 writer.writeTag("minimal_heartbeat", heartbeat.get());
285 writer.writeTag("min", minValue.get());
286 writer.writeTag("max", maxValue.get());
287 writer.writeComment("PDP Status");
288 writer.writeTag("last_ds", lastValue.get(), "UNKN");
289 writer.writeTag("value", accumValue.get());
290 writer.writeTag("unknown_sec", nanSeconds.get());
291 writer.closeTag(); // ds
292 }
293
294 /**
295 * Copies object's internal state to another Datasource object.
296 *
297 * @param other New Datasource object to copy state to
298 * @throws IOException Thrown in case of I/O error
299 * @throws RrdException Thrown if supplied argument is not a Datasource object
300 */
301 public void copyStateTo(RrdUpdater other) throws IOException, RrdException {
302 if (!(other instanceof Datasource)) {
303 throw new RrdException(
304 "Cannot copy Datasource object to " + other.getClass().getName());
305 }
306 Datasource datasource = (Datasource) other;
307 if (!datasource.dsName.get().equals(dsName.get())) {
308 throw new RrdException("Incomaptible datasource names");
309 }
310 if (!datasource.dsType.get().equals(dsType.get())) {
311 throw new RrdException("Incomaptible datasource types");
312 }
313 datasource.lastValue.set(lastValue.get());
314 datasource.nanSeconds.set(nanSeconds.get());
315 datasource.accumValue.set(accumValue.get());
316 }
317
318 /**
319 * Returns index of this Datasource object in the RRD.
320 *
321 * @return Datasource index in the RRD.
322 * @throws IOException Thrown in case of I/O error
323 */
324 public int getDsIndex() throws IOException {
325 try {
326 return parentDb.getDsIndex(dsName.get());
327 }
328 catch (RrdException e) {
329 return -1;
330 }
331 }
332
333 /**
334 * Sets datasource heartbeat to a new value.
335 *
336 * @param heartbeat New heartbeat value
337 * @throws IOException Thrown in case of I/O error
338 * @throws RrdException Thrown if invalid (non-positive) heartbeat value is specified.
339 */
340 public void setHeartbeat(long heartbeat) throws RrdException, IOException {
341 if (heartbeat < 1L) {
342 throw new RrdException("Invalid heartbeat specified: " + heartbeat);
343 }
344 this.heartbeat.set(heartbeat);
345 }
346
347 /**
348 * Sets datasource name to a new value
349 *
350 * @param newDsName New datasource name
351 * @throws RrdException Thrown if invalid data source name is specified (name too long, or
352 * name already defined in the RRD
353 * @throws IOException Thrown in case of I/O error
354 */
355 public void setDsName(String newDsName) throws RrdException, IOException {
356 if (newDsName.length() > RrdString.STRING_LENGTH) {
357 throw new RrdException("Invalid datasource name specified: " + newDsName);
358 }
359 if (parentDb.containsDs(newDsName)) {
360 throw new RrdException("Datasource already defined in this RRD: " + newDsName);
361 }
362 this.dsName.set(newDsName);
363 }
364
365 public void setDsType(String newDsType) throws RrdException, IOException {
366 if (!DsDef.isValidDsType(newDsType)) {
367 throw new RrdException("Invalid datasource type: " + newDsType);
368 }
369 // set datasource type
370 this.dsType.set(newDsType);
371 // reset datasource status
372 lastValue.set(Double.NaN);
373 accumValue.set(0.0);
374 // reset archive status
375 int dsIndex = parentDb.getDsIndex(dsName.get());
376 Archive[] archives = parentDb.getArchives();
377 for (Archive archive : archives) {
378 archive.getArcState(dsIndex).setAccumValue(Double.NaN);
379 }
380 }
381
382 /**
383 * Sets minimum allowed value for this datasource. If <code>filterArchivedValues</code>
384 * argment is set to true, all archived values less then <code>minValue</code> will
385 * be fixed to NaN.
386 *
387 * @param minValue New minimal value. Specify <code>Double.NaN</code> if no minimal
388 * value should be set
389 * @param filterArchivedValues true, if archived datasource values should be fixed;
390 * false, otherwise.
391 * @throws IOException Thrown in case of I/O error
392 * @throws RrdException Thrown if invalid minValue was supplied (not less then maxValue)
393 */
394 public void setMinValue(double minValue, boolean filterArchivedValues)
395 throws IOException, RrdException {
396 double maxValue = this.maxValue.get();
397 if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
398 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
399 }
400 this.minValue.set(minValue);
401 if (!Double.isNaN(minValue) && filterArchivedValues) {
402 int dsIndex = getDsIndex();
403 Archive[] archives = parentDb.getArchives();
404 for (Archive archive : archives) {
405 archive.getRobin(dsIndex).filterValues(minValue, Double.NaN);
406 }
407 }
408 }
409
410 /**
411 * Sets maximum allowed value for this datasource. If <code>filterArchivedValues</code>
412 * argment is set to true, all archived values greater then <code>maxValue</code> will
413 * be fixed to NaN.
414 *
415 * @param maxValue New maximal value. Specify <code>Double.NaN</code> if no max
416 * value should be set.
417 * @param filterArchivedValues true, if archived datasource values should be fixed;
418 * false, otherwise.
419 * @throws IOException Thrown in case of I/O error
420 * @throws RrdException Thrown if invalid maxValue was supplied (not greater then minValue)
421 */
422 public void setMaxValue(double maxValue, boolean filterArchivedValues)
423 throws IOException, RrdException {
424 double minValue = this.minValue.get();
425 if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
426 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
427 }
428 this.maxValue.set(maxValue);
429 if (!Double.isNaN(maxValue) && filterArchivedValues) {
430 int dsIndex = getDsIndex();
431 Archive[] archives = parentDb.getArchives();
432 for (Archive archive : archives) {
433 archive.getRobin(dsIndex).filterValues(Double.NaN, maxValue);
434 }
435 }
436 }
437
438 /**
439 * Sets min/max values allowed for this datasource. If <code>filterArchivedValues</code>
440 * argment is set to true, all archived values less then <code>minValue</code> or
441 * greater then <code>maxValue</code> will be fixed to NaN.
442 *
443 * @param minValue New minimal value. Specify <code>Double.NaN</code> if no min
444 * value should be set.
445 * @param maxValue New maximal value. Specify <code>Double.NaN</code> if no max
446 * value should be set.
447 * @param filterArchivedValues true, if archived datasource values should be fixed;
448 * false, otherwise.
449 * @throws IOException Thrown in case of I/O error
450 * @throws RrdException Thrown if invalid min/max values were supplied
451 */
452 public void setMinMaxValue(double minValue, double maxValue, boolean filterArchivedValues)
453 throws IOException, RrdException {
454 if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
455 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
456 }
457 this.minValue.set(minValue);
458 this.maxValue.set(maxValue);
459 if (!(Double.isNaN(minValue) && Double.isNaN(maxValue)) && filterArchivedValues) {
460 int dsIndex = getDsIndex();
461 Archive[] archives = parentDb.getArchives();
462 for (Archive archive : archives) {
463 archive.getRobin(dsIndex).filterValues(minValue, maxValue);
464 }
465 }
466 }
467
468 /**
469 * Returns the underlying storage (backend) object which actually performs all
470 * I/O operations.
471 *
472 * @return I/O backend object
473 */
474 public RrdBackend getRrdBackend() {
475 return parentDb.getRrdBackend();
476 }
477
478 /**
479 * Required to implement RrdUpdater interface. You should never call this method directly.
480 *
481 * @return Allocator object
482 */
483 public RrdAllocator getRrdAllocator() {
484 return parentDb.getRrdAllocator();
485 }
486 }
487
488