1 /* ============================================================
2  * JRobin : Pure java implementation of RRDTool's functionality
3  * ============================================================
4  *  
5  * Project Info:  http://www.jrobin.org
6  * Project Lead:  Sasa Markovic (saxon@jrobin.org);
7  *
8  * (C) Copyright 2003-2005, by Sasa Markovic.
9  *
10  * Developers:    Sasa Markovic (saxon@jrobin.org)
11  *
12  *
13  * This library is free software; you can redistribute it and/or modify it under the terms
14  * of the GNU Lesser General Public License as published by the Free Software Foundation;
15  * either version 2.1 of the License, or (at your option) any later version.
16  *
17  * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
18  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
19  * See the GNU Lesser General Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser General Public License along with this
22  * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */

25
26 package org.jrobin.core;
27
28 import java.io.IOException;
29
30 /**
31  * Class to represent single RRD archive in a RRD with its internal state.
32  * Normally, you don't need methods to manipulate archive objects directly
33  * because JRobin framework does it automatically for you.<p>
34  * <p/>
35  * Each archive object consists of three parts: archive definition, archive state objects
36  * (one state object for each datasource) and round robin archives (one round robin for
37  * each datasource). API (read-only) is provided to access each of theese parts.<p>
38  *
39  * @author <a href="mailto:saxon@jrobin.org">Sasa Markovic</a>
40  */

41 public class Archive implements RrdUpdater, ConsolFuns {
42     private RrdDb parentDb;
43     // definition
44     private RrdString consolFun;
45     private RrdDouble xff;
46     private RrdInt steps, rows;
47     // state
48     private Robin[] robins;
49     private ArcState[] states;
50
51     Archive(RrdDb parentDb, ArcDef arcDef) throws IOException {
52         boolean shouldInitialize = arcDef != null;
53         this.parentDb = parentDb;
54         consolFun = new RrdString(thistrue);  // constant, may be cached
55         xff = new RrdDouble(this);
56         steps = new RrdInt(thistrue);            // constant, may be cached
57         rows = new RrdInt(thistrue);            // constant, may be cached
58         if (shouldInitialize) {
59             consolFun.set(arcDef.getConsolFun());
60             xff.set(arcDef.getXff());
61             steps.set(arcDef.getSteps());
62             rows.set(arcDef.getRows());
63         }
64         int n = parentDb.getHeader().getDsCount();
65         states = new ArcState[n];
66         robins = new Robin[n];
67         for (int i = 0; i < n; i++) {
68             states[i] = new ArcState(this, shouldInitialize);
69             int numRows = rows.get();
70             robins[i] = new Robin(this, numRows, shouldInitialize);
71         }
72     }
73
74     // read from XML
75     Archive(RrdDb parentDb, DataImporter reader, int arcIndex) throws IOException, RrdException {
76         this(parentDb, new ArcDef(
77                 reader.getConsolFun(arcIndex), reader.getXff(arcIndex),
78                 reader.getSteps(arcIndex), reader.getRows(arcIndex)));
79         int n = parentDb.getHeader().getDsCount();
80         for (int i = 0; i < n; i++) {
81             // restore state
82             states[i].setAccumValue(reader.getStateAccumValue(arcIndex, i));
83             states[i].setNanSteps(reader.getStateNanSteps(arcIndex, i));
84             // restore robins
85             double[] values = reader.getValues(arcIndex, i);
86             robins[i].update(values);
87         }
88     }
89
90     /**
91      * Returns archive time step in seconds. Archive step is equal to RRD step
92      * multiplied with the number of archive steps.
93      *
94      * @return Archive time step in seconds
95      * @throws IOException Thrown in case of I/O error.
96      */

97     public long getArcStep() throws IOException {
98         long step = parentDb.getHeader().getStep();
99         return step * steps.get();
100     }
101
102     String dump() throws IOException {
103         StringBuffer buffer = new StringBuffer("== ARCHIVE ==\n");
104         buffer.append("RRA:").append(consolFun.get()).append(":").append(xff.get()).append(":").append(steps.get()).
105                 append(":").append(rows.get()).append("\n");
106         buffer.append("interval [").append(getStartTime()).append(", ").append(getEndTime()).append("]" + "\n");
107         for (int i = 0; i < robins.length; i++) {
108             buffer.append(states[i].dump());
109             buffer.append(robins[i].dump());
110         }
111         return buffer.toString();
112     }
113
114     RrdDb getParentDb() {
115         return parentDb;
116     }
117
118     void archive(int dsIndex, double value, long numUpdates) throws IOException {
119         Robin robin = robins[dsIndex];
120         ArcState state = states[dsIndex];
121         long step = parentDb.getHeader().getStep();
122         long lastUpdateTime = parentDb.getHeader().getLastUpdateTime();
123         long updateTime = Util.normalize(lastUpdateTime, step) + step;
124         long arcStep = getArcStep();
125         // finish current step
126         while (numUpdates > 0) {
127             accumulate(state, value);
128             numUpdates--;
129             if (updateTime % arcStep == 0) {
130                 finalizeStep(state, robin);
131                 break;
132             }
133             else {
134                 updateTime += step;
135             }
136         }
137         // update robin in bulk
138         int bulkUpdateCount = (int) Math.min(numUpdates / steps.get(), (long) rows.get());
139         robin.bulkStore(value, bulkUpdateCount);
140         // update remaining steps
141         long remainingUpdates = numUpdates % steps.get();
142         for (long i = 0; i < remainingUpdates; i++) {
143             accumulate(state, value);
144         }
145     }
146
147     private void accumulate(ArcState state, double value) throws IOException {
148         if (Double.isNaN(value)) {
149             state.setNanSteps(state.getNanSteps() + 1);
150         }
151         else {
152             if (consolFun.get().equals(CF_MIN)) {
153                 state.setAccumValue(Util.min(state.getAccumValue(), value));
154             }
155             else if (consolFun.get().equals(CF_MAX)) {
156                 state.setAccumValue(Util.max(state.getAccumValue(), value));
157             }
158             else if (consolFun.get().equals(CF_LAST)) {
159                 state.setAccumValue(value);
160             }
161             else if (consolFun.get().equals(CF_AVERAGE)) {
162                 state.setAccumValue(Util.sum(state.getAccumValue(), value));
163             }
164         }
165     }
166
167     private void finalizeStep(ArcState state, Robin robin) throws IOException {
168         // should store
169         long arcSteps = steps.get();
170         double arcXff = xff.get();
171         long nanSteps = state.getNanSteps();
172         //double nanPct = (double) nanSteps / (double) arcSteps;
173         double accumValue = state.getAccumValue();
174         if (nanSteps <= arcXff * arcSteps && !Double.isNaN(accumValue)) {
175             if (consolFun.get().equals(CF_AVERAGE)) {
176                 accumValue /= (arcSteps - nanSteps);
177             }
178             robin.store(accumValue);
179         }
180         else {
181             robin.store(Double.NaN);
182         }
183         state.setAccumValue(Double.NaN);
184         state.setNanSteps(0);
185     }
186
187     /**
188      * Returns archive consolidation function ("AVERAGE""MIN""MAX" or "LAST").
189      *
190      * @return Archive consolidation function.
191      * @throws IOException Thrown in case of I/O error.
192      */

193     public String getConsolFun() throws IOException {
194         return consolFun.get();
195     }
196
197     /**
198      * Returns archive X-files factor.
199      *
200      * @return Archive X-files factor (between 0 and 1).
201      * @throws IOException Thrown in case of I/O error.
202      */

203     public double getXff() throws IOException {
204         return xff.get();
205     }
206
207     /**
208      * Returns the number of archive steps.
209      *
210      * @return Number of archive steps.
211      * @throws IOException Thrown in case of I/O error.
212      */

213     public int getSteps() throws IOException {
214         return steps.get();
215     }
216
217     /**
218      * Returns the number of archive rows.
219      *
220      * @return Number of archive rows.
221      * @throws IOException Thrown in case of I/O error.
222      */

223     public int getRows() throws IOException {
224         return rows.get();
225     }
226
227     /**
228      * Returns current starting timestamp. This value is not constant.
229      *
230      * @return Timestamp corresponding to the first archive row
231      * @throws IOException Thrown in case of I/O error.
232      */

233     public long getStartTime() throws IOException {
234         long endTime = getEndTime();
235         long arcStep = getArcStep();
236         long numRows = rows.get();
237         return endTime - (numRows - 1) * arcStep;
238     }
239
240     /**
241      * Returns current ending timestamp. This value is not constant.
242      *
243      * @return Timestamp corresponding to the last archive row
244      * @throws IOException Thrown in case of I/O error.
245      */

246     public long getEndTime() throws IOException {
247         long arcStep = getArcStep();
248         long lastUpdateTime = parentDb.getHeader().getLastUpdateTime();
249         return Util.normalize(lastUpdateTime, arcStep);
250     }
251
252     /**
253      * Returns the underlying archive state object. Each datasource has its
254      * corresponding ArcState object (archive states are managed independently
255      * for each RRD datasource).
256      *
257      * @param dsIndex Datasource index
258      * @return Underlying archive state object
259      */

260     public ArcState getArcState(int dsIndex) {
261         return states[dsIndex];
262     }
263
264     /**
265      * Returns the underlying round robin archive. Robins are used to store actual
266      * archive values on a per-datasource basis.
267      *
268      * @param dsIndex Index of the datasource in the RRD.
269      * @return Underlying round robin archive for the given datasource.
270      */

271     public Robin getRobin(int dsIndex) {
272         return robins[dsIndex];
273     }
274
275     FetchData fetchData(FetchRequest request) throws IOException, RrdException {
276         long arcStep = getArcStep();
277         long fetchStart = Util.normalize(request.getFetchStart(), arcStep);
278         long fetchEnd = Util.normalize(request.getFetchEnd(), arcStep);
279         if (fetchEnd < request.getFetchEnd()) {
280             fetchEnd += arcStep;
281         }
282         long startTime = getStartTime();
283         long endTime = getEndTime();
284         String[] dsToFetch = request.getFilter();
285         if (dsToFetch == null) {
286             dsToFetch = parentDb.getDsNames();
287         }
288         int dsCount = dsToFetch.length;
289         int ptsCount = (int) ((fetchEnd - fetchStart) / arcStep + 1);
290         long[] timestamps = new long[ptsCount];
291         double[][] values = new double[dsCount][ptsCount];
292         long matchStartTime = Math.max(fetchStart, startTime);
293         long matchEndTime = Math.min(fetchEnd, endTime);
294         double[][] robinValues = null;
295         if (matchStartTime <= matchEndTime) {
296             // preload robin values
297             int matchCount = (int) ((matchEndTime - matchStartTime) / arcStep + 1);
298             int matchStartIndex = (int) ((matchStartTime - startTime) / arcStep);
299             robinValues = new double[dsCount][];
300             for (int i = 0; i < dsCount; i++) {
301                 int dsIndex = parentDb.getDsIndex(dsToFetch[i]);
302                 robinValues[i] = robins[dsIndex].getValues(matchStartIndex, matchCount);
303             }
304         }
305         for (int ptIndex = 0; ptIndex < ptsCount; ptIndex++) {
306             long time = fetchStart + ptIndex * arcStep;
307             timestamps[ptIndex] = time;
308             for (int i = 0; i < dsCount; i++) {
309                 double value = Double.NaN;
310                 if (time >= matchStartTime && time <= matchEndTime) {
311                     // inbound time
312                     int robinValueIndex = (int) ((time - matchStartTime) / arcStep);
313                     assert robinValues != null;
314                     value = robinValues[i][robinValueIndex];
315                 }
316                 values[i][ptIndex] = value;
317             }
318         }
319         FetchData fetchData = new FetchData(this, request);
320         fetchData.setTimestamps(timestamps);
321         fetchData.setValues(values);
322         return fetchData;
323     }
324
325     void appendXml(XmlWriter writer) throws IOException {
326         writer.startTag("rra");
327         writer.writeTag("cf", consolFun.get());
328         writer.writeComment(getArcStep() + " seconds");
329         writer.writeTag("pdp_per_row", steps.get());
330         writer.writeTag("xff", xff.get());
331         writer.startTag("cdp_prep");
332         for (ArcState state : states) {
333             state.appendXml(writer);
334         }
335         writer.closeTag(); // cdp_prep
336         writer.startTag("database");
337         long startTime = getStartTime();
338         for (int i = 0; i < rows.get(); i++) {
339             long time = startTime + i * getArcStep();
340             writer.writeComment(Util.getDate(time) + " / " + time);
341             writer.startTag("row");
342             for (Robin robin : robins) {
343                 writer.writeTag("v", robin.getValue(i));
344             }
345             writer.closeTag(); // row
346         }
347         writer.closeTag(); // database
348         writer.closeTag(); // rra
349     }
350
351     /**
352      * Copies object's internal state to another Archive object.
353      *
354      * @param other New Archive object to copy state to
355      * @throws IOException  Thrown in case of I/O error
356      * @throws RrdException Thrown if supplied argument is not an Archive object
357      */

358     public void copyStateTo(RrdUpdater other) throws IOException, RrdException {
359         if (!(other instanceof Archive)) {
360             throw new RrdException(
361                     "Cannot copy Archive object to " + other.getClass().getName());
362         }
363         Archive arc = (Archive) other;
364         if (!arc.consolFun.get().equals(consolFun.get())) {
365             throw new RrdException("Incompatible consolidation functions");
366         }
367         if (arc.steps.get() != steps.get()) {
368             throw new RrdException("Incompatible number of steps");
369         }
370         int count = parentDb.getHeader().getDsCount();
371         for (int i = 0; i < count; i++) {
372             int j = Util.getMatchingDatasourceIndex(parentDb, i, arc.parentDb);
373             if (j >= 0) {
374                 states[i].copyStateTo(arc.states[j]);
375                 robins[i].copyStateTo(arc.robins[j]);
376             }
377         }
378     }
379
380     /**
381      * Sets X-files factor to a new value.
382      *
383      * @param xff New X-files factor value. Must be >= 0 and < 1.
384      * @throws RrdException Thrown if invalid value is supplied
385      * @throws IOException  Thrown in case of I/O error
386      */

387     public void setXff(double xff) throws RrdException, IOException {
388         if (xff < 0D || xff >= 1D) {
389             throw new RrdException("Invalid xff supplied (" + xff + "), must be >= 0 and < 1");
390         }
391         this.xff.set(xff);
392     }
393
394     /**
395      * Returns the underlying storage (backend) object which actually performs all
396      * I/O operations.
397      *
398      * @return I/O backend object
399      */

400     public RrdBackend getRrdBackend() {
401         return parentDb.getRrdBackend();
402     }
403
404     /**
405      * Required to implement RrdUpdater interface. You should never call this method directly.
406      *
407      * @return Allocator object
408      */

409     public RrdAllocator getRrdAllocator() {
410         return parentDb.getRrdAllocator();
411     }
412 }
413