1
25
26 package org.jrobin.core;
27
28 import java.io.IOException;
29
30
41 public class Archive implements RrdUpdater, ConsolFuns {
42 private RrdDb parentDb;
43
44 private RrdString consolFun;
45 private RrdDouble xff;
46 private RrdInt steps, rows;
47
48 private Robin[] robins;
49 private ArcState[] states;
50
51 Archive(RrdDb parentDb, ArcDef arcDef) throws IOException {
52 boolean shouldInitialize = arcDef != null;
53 this.parentDb = parentDb;
54 consolFun = new RrdString(this, true);
55 xff = new RrdDouble(this);
56 steps = new RrdInt(this, true);
57 rows = new RrdInt(this, true);
58 if (shouldInitialize) {
59 consolFun.set(arcDef.getConsolFun());
60 xff.set(arcDef.getXff());
61 steps.set(arcDef.getSteps());
62 rows.set(arcDef.getRows());
63 }
64 int n = parentDb.getHeader().getDsCount();
65 states = new ArcState[n];
66 robins = new Robin[n];
67 for (int i = 0; i < n; i++) {
68 states[i] = new ArcState(this, shouldInitialize);
69 int numRows = rows.get();
70 robins[i] = new Robin(this, numRows, shouldInitialize);
71 }
72 }
73
74
75 Archive(RrdDb parentDb, DataImporter reader, int arcIndex) throws IOException, RrdException {
76 this(parentDb, new ArcDef(
77 reader.getConsolFun(arcIndex), reader.getXff(arcIndex),
78 reader.getSteps(arcIndex), reader.getRows(arcIndex)));
79 int n = parentDb.getHeader().getDsCount();
80 for (int i = 0; i < n; i++) {
81
82 states[i].setAccumValue(reader.getStateAccumValue(arcIndex, i));
83 states[i].setNanSteps(reader.getStateNanSteps(arcIndex, i));
84
85 double[] values = reader.getValues(arcIndex, i);
86 robins[i].update(values);
87 }
88 }
89
90
97 public long getArcStep() throws IOException {
98 long step = parentDb.getHeader().getStep();
99 return step * steps.get();
100 }
101
102 String dump() throws IOException {
103 StringBuffer buffer = new StringBuffer("== ARCHIVE ==\n");
104 buffer.append("RRA:").append(consolFun.get()).append(":").append(xff.get()).append(":").append(steps.get()).
105 append(":").append(rows.get()).append("\n");
106 buffer.append("interval [").append(getStartTime()).append(", ").append(getEndTime()).append("]" + "\n");
107 for (int i = 0; i < robins.length; i++) {
108 buffer.append(states[i].dump());
109 buffer.append(robins[i].dump());
110 }
111 return buffer.toString();
112 }
113
114 RrdDb getParentDb() {
115 return parentDb;
116 }
117
118 void archive(int dsIndex, double value, long numUpdates) throws IOException {
119 Robin robin = robins[dsIndex];
120 ArcState state = states[dsIndex];
121 long step = parentDb.getHeader().getStep();
122 long lastUpdateTime = parentDb.getHeader().getLastUpdateTime();
123 long updateTime = Util.normalize(lastUpdateTime, step) + step;
124 long arcStep = getArcStep();
125
126 while (numUpdates > 0) {
127 accumulate(state, value);
128 numUpdates--;
129 if (updateTime % arcStep == 0) {
130 finalizeStep(state, robin);
131 break;
132 }
133 else {
134 updateTime += step;
135 }
136 }
137
138 int bulkUpdateCount = (int) Math.min(numUpdates / steps.get(), (long) rows.get());
139 robin.bulkStore(value, bulkUpdateCount);
140
141 long remainingUpdates = numUpdates % steps.get();
142 for (long i = 0; i < remainingUpdates; i++) {
143 accumulate(state, value);
144 }
145 }
146
147 private void accumulate(ArcState state, double value) throws IOException {
148 if (Double.isNaN(value)) {
149 state.setNanSteps(state.getNanSteps() + 1);
150 }
151 else {
152 if (consolFun.get().equals(CF_MIN)) {
153 state.setAccumValue(Util.min(state.getAccumValue(), value));
154 }
155 else if (consolFun.get().equals(CF_MAX)) {
156 state.setAccumValue(Util.max(state.getAccumValue(), value));
157 }
158 else if (consolFun.get().equals(CF_LAST)) {
159 state.setAccumValue(value);
160 }
161 else if (consolFun.get().equals(CF_AVERAGE)) {
162 state.setAccumValue(Util.sum(state.getAccumValue(), value));
163 }
164 }
165 }
166
167 private void finalizeStep(ArcState state, Robin robin) throws IOException {
168
169 long arcSteps = steps.get();
170 double arcXff = xff.get();
171 long nanSteps = state.getNanSteps();
172
173 double accumValue = state.getAccumValue();
174 if (nanSteps <= arcXff * arcSteps && !Double.isNaN(accumValue)) {
175 if (consolFun.get().equals(CF_AVERAGE)) {
176 accumValue /= (arcSteps - nanSteps);
177 }
178 robin.store(accumValue);
179 }
180 else {
181 robin.store(Double.NaN);
182 }
183 state.setAccumValue(Double.NaN);
184 state.setNanSteps(0);
185 }
186
187
193 public String getConsolFun() throws IOException {
194 return consolFun.get();
195 }
196
197
203 public double getXff() throws IOException {
204 return xff.get();
205 }
206
207
213 public int getSteps() throws IOException {
214 return steps.get();
215 }
216
217
223 public int getRows() throws IOException {
224 return rows.get();
225 }
226
227
233 public long getStartTime() throws IOException {
234 long endTime = getEndTime();
235 long arcStep = getArcStep();
236 long numRows = rows.get();
237 return endTime - (numRows - 1) * arcStep;
238 }
239
240
246 public long getEndTime() throws IOException {
247 long arcStep = getArcStep();
248 long lastUpdateTime = parentDb.getHeader().getLastUpdateTime();
249 return Util.normalize(lastUpdateTime, arcStep);
250 }
251
252
260 public ArcState getArcState(int dsIndex) {
261 return states[dsIndex];
262 }
263
264
271 public Robin getRobin(int dsIndex) {
272 return robins[dsIndex];
273 }
274
275 FetchData fetchData(FetchRequest request) throws IOException, RrdException {
276 long arcStep = getArcStep();
277 long fetchStart = Util.normalize(request.getFetchStart(), arcStep);
278 long fetchEnd = Util.normalize(request.getFetchEnd(), arcStep);
279 if (fetchEnd < request.getFetchEnd()) {
280 fetchEnd += arcStep;
281 }
282 long startTime = getStartTime();
283 long endTime = getEndTime();
284 String[] dsToFetch = request.getFilter();
285 if (dsToFetch == null) {
286 dsToFetch = parentDb.getDsNames();
287 }
288 int dsCount = dsToFetch.length;
289 int ptsCount = (int) ((fetchEnd - fetchStart) / arcStep + 1);
290 long[] timestamps = new long[ptsCount];
291 double[][] values = new double[dsCount][ptsCount];
292 long matchStartTime = Math.max(fetchStart, startTime);
293 long matchEndTime = Math.min(fetchEnd, endTime);
294 double[][] robinValues = null;
295 if (matchStartTime <= matchEndTime) {
296
297 int matchCount = (int) ((matchEndTime - matchStartTime) / arcStep + 1);
298 int matchStartIndex = (int) ((matchStartTime - startTime) / arcStep);
299 robinValues = new double[dsCount][];
300 for (int i = 0; i < dsCount; i++) {
301 int dsIndex = parentDb.getDsIndex(dsToFetch[i]);
302 robinValues[i] = robins[dsIndex].getValues(matchStartIndex, matchCount);
303 }
304 }
305 for (int ptIndex = 0; ptIndex < ptsCount; ptIndex++) {
306 long time = fetchStart + ptIndex * arcStep;
307 timestamps[ptIndex] = time;
308 for (int i = 0; i < dsCount; i++) {
309 double value = Double.NaN;
310 if (time >= matchStartTime && time <= matchEndTime) {
311
312 int robinValueIndex = (int) ((time - matchStartTime) / arcStep);
313 assert robinValues != null;
314 value = robinValues[i][robinValueIndex];
315 }
316 values[i][ptIndex] = value;
317 }
318 }
319 FetchData fetchData = new FetchData(this, request);
320 fetchData.setTimestamps(timestamps);
321 fetchData.setValues(values);
322 return fetchData;
323 }
324
325 void appendXml(XmlWriter writer) throws IOException {
326 writer.startTag("rra");
327 writer.writeTag("cf", consolFun.get());
328 writer.writeComment(getArcStep() + " seconds");
329 writer.writeTag("pdp_per_row", steps.get());
330 writer.writeTag("xff", xff.get());
331 writer.startTag("cdp_prep");
332 for (ArcState state : states) {
333 state.appendXml(writer);
334 }
335 writer.closeTag();
336 writer.startTag("database");
337 long startTime = getStartTime();
338 for (int i = 0; i < rows.get(); i++) {
339 long time = startTime + i * getArcStep();
340 writer.writeComment(Util.getDate(time) + " / " + time);
341 writer.startTag("row");
342 for (Robin robin : robins) {
343 writer.writeTag("v", robin.getValue(i));
344 }
345 writer.closeTag();
346 }
347 writer.closeTag();
348 writer.closeTag();
349 }
350
351
358 public void copyStateTo(RrdUpdater other) throws IOException, RrdException {
359 if (!(other instanceof Archive)) {
360 throw new RrdException(
361 "Cannot copy Archive object to " + other.getClass().getName());
362 }
363 Archive arc = (Archive) other;
364 if (!arc.consolFun.get().equals(consolFun.get())) {
365 throw new RrdException("Incompatible consolidation functions");
366 }
367 if (arc.steps.get() != steps.get()) {
368 throw new RrdException("Incompatible number of steps");
369 }
370 int count = parentDb.getHeader().getDsCount();
371 for (int i = 0; i < count; i++) {
372 int j = Util.getMatchingDatasourceIndex(parentDb, i, arc.parentDb);
373 if (j >= 0) {
374 states[i].copyStateTo(arc.states[j]);
375 robins[i].copyStateTo(arc.robins[j]);
376 }
377 }
378 }
379
380
387 public void setXff(double xff) throws RrdException, IOException {
388 if (xff < 0D || xff >= 1D) {
389 throw new RrdException("Invalid xff supplied (" + xff + "), must be >= 0 and < 1");
390 }
391 this.xff.set(xff);
392 }
393
394
400 public RrdBackend getRrdBackend() {
401 return parentDb.getRrdBackend();
402 }
403
404
409 public RrdAllocator getRrdAllocator() {
410 return parentDb.getRrdAllocator();
411 }
412 }
413