/* * Copyright 2013 Splunk, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"): you may * not use this file except in compliance with the License. You may obtain * a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ namespace Splunk { using System; using System.Collections.Generic; using System.ComponentModel; using System.Diagnostics; using System.IO; using Newtonsoft.Json; /// /// The class reads a results/event JSON /// stream one event at a time. /// public class ResultsReaderJson : ResultsReader { /// /// Helper object that will only be constructed if the reader is /// handling JSON format used by export. /// private ExportHelper exportHelper; /// /// Whether the "preview" flag is read. /// private bool previewFlagRead; /// /// Initializes a new instance of the /// class. /// /// The JSON stream to parse. public ResultsReaderJson(Stream stream) : this(stream, false) { } /// /// Initializes a new instance of the /// class. /// /// The JSON stream to parse. /// /// Whether the reader is the underlying reader of a multi reader. /// internal ResultsReaderJson(Stream stream, bool isInMultiReader) : base(stream, isInMultiReader) { StreamReader = new StreamReader(stream); if (this.IsExportStream || isInMultiReader) { this.exportHelper = new ExportHelper(this); } this.FinishInitialization(); } /// /// Gets the stream reader on the JSON stream to parse. /// internal StreamReader StreamReader { get; private set; } /// /// Gets or sets the JSON reader. /// private JsonTextReader JsonReader { get; set; } /// /// Gets a value indicating whether the results are /// a preview from an unfinished search. /// public override bool IsPreview { get { if (!this.previewFlagRead) { throw new InvalidOperationException( "isPreview() is not supported " + "with a stream from a Splunk 4.x server by this class. " + "Use the XML format and an XML result reader instead."); } return base.IsPreview; } protected set { base.IsPreview = value; } } /// /// Not supported. /// public override ICollection Fields { // Unlike XML and CSV, JSON result streams do not provide field name // list before events. get { throw new InvalidOperationException( "Fields is not supported by this subclass."); } } /// /// Gets a value indicating whether the reader has been /// disposed. /// private bool IsDisposed { get { return this.StreamReader == null; } } /// /// Advances to the next set, skipping remaining event(s) /// if there are any in the current set. /// /// Returns false if the end is reached. internal override bool AdvanceStreamToNextSet() { return this.AdvanceIntoNextSetBeforeEvent(); } /// /// Advances to the next set, skipping remaining event(s) /// if there are any in the current set, and reads metadata before the /// first event in the next result set. /// /// Return false if the end is reached. internal bool AdvanceIntoNextSetBeforeEvent() { // If the end of stream has been reached, don't continue. if (this.IsDisposed) { return false; } // In Splunk 5.0 from the export endpoint, // each result is in its own top level object. // In Splunk 5.0 not from the export endpoint, the results are // an array at that object's key "results". // In Splunk 4.3, the // array was the top level returned. So if we find an object // at top level, we step into it until we find the right key, // then leave it in that state to iterate over. // // JSON single-reader depends on 'isExportStream' flag to function. // It does not support a stream from a file saved from // a stream from an export endpoint. // JSON multi-reader assumes export format thus does not support // a stream from none export endpoints. if (this.exportHelper != null) { /* * We're on Splunk 5 with a single-reader not from * an export endpoint * Below is an example of an input stream. * {"preview":true,"offset":0,"lastrow":true,"result":{"host":"Andy-PC","count":"62"}} * {"preview":true,"offset":0,"result":{"host":"Andy-PC","count":"1682"}} */ // Read into first result object of the cachedElement set. while (true) { bool endPassed = this.exportHelper.LastRow; this.exportHelper.SkipRestOfRow(); if (!this.exportHelper.ReadIntoRow()) { return false; } if (endPassed) { break; } } return true; } // Introduced in Splunk 5.0, the format of the JSON object // changed. Prior to 5.0, the array of events were a top level // JSON element. In 5.0 (not from Export), // the results are an array under the // key "results". // Note: Reading causes the side effect of setting the JSON node // information. this.JsonReader = new JsonTextReader(StreamReader); this.JsonReader.Read(); if (this.JsonReader.TokenType.Equals(JsonToken.StartObject)) { /* * We're on Splunk 5 with a single-reader not from * an export endpoint * Below is an example of an input stream. * {"preview":false,"init_offset":0,"messages":[{"type":"DEBUG","text":"base lispy: [ AND index::_internal ]"},{"type":"DEBUG","text":"search context: user=\"admin\", app=\"search\", bs-pathname=\"/Users/fross/splunks/splunk-5.0/etc\""}],"results":[{"sum(kb)":"14372242.758775","series":"twitter"},{"sum(kb)":"267802.333926","series":"splunkd"},{"sum(kb)":"5979.036338","series":"splunkd_access"}]} */ while (true) { if (!this.JsonReader.Read()) { this.Dispose(); return false; } if (this .JsonReader .TokenType .Equals(JsonToken.PropertyName)) { if (this.JsonReader.Value.Equals("preview")) { this.ReadPreviewFlag(); } if (this.JsonReader.Value.Equals("results")) { this.JsonReader.Read(); return true; } } } } else { /* Pre Splunk 5.0 * Below is an example of an input stream * [ * { * "sum(kb)":"14372242.758775", * "series":"twitter" * }, * { * "sum(kb)":"267802.333926", * "series":"splunkd" * }, * { * "sum(kb)":"5979.036338", * "series":"splunkd_access" * } * ] */ return true; } } /// /// Releases unmanaged resources. /// public override void Dispose() { if (this.IsDisposed) { return; } // The JSON reader is created after // constructor so the property could be // null. if (this.JsonReader != null) { ((IDisposable)this.JsonReader).Dispose(); } if (this.exportHelper != null) { this.exportHelper.Dispose(); } this.StreamReader.Close(); // Marking this reader as disposed. this.StreamReader = null; } /// /// Retrieves the enumerator for data returned from Splunk. /// /// A enumerator. internal override IEnumerable GetEventsFromCurrentSet() { while (true) { if (this.IsDisposed) { yield break; } if (this.exportHelper != null) { // If the last row has been passed and // AdvanceStreamToNextSet // has not been called, end the current set. if (this.exportHelper.LastRow && !this.exportHelper.InRow) { yield break; } this.exportHelper.ReadIntoRow(); } var returnData = this.ReadEvent(); if (this.exportHelper != null) { this.exportHelper.SkipRestOfRow(); } if (returnData == null) { // End the result reader. This is needed for Splunk 4.x this.Dispose(); yield break; } yield return returnData; } } /// /// Reads an event from the JSON reader. /// /// /// The event. A value of null indicates the end of stream, /// which is used by none --export cases. /// private Event ReadEvent() { string name = null; Event returnData = null; // Events are almost flat, so no need for a true general parser // solution. while (this.JsonReader.Read()) { if (returnData == null) { returnData = new Event(); } if (this.JsonReader.TokenType.Equals(JsonToken.StartObject)) { // skip } else if (this.JsonReader.TokenType.Equals(JsonToken.StartArray)) { var data = new List(); while (this.JsonReader.Read()) { if (this .JsonReader .TokenType .Equals(JsonToken.EndArray)) { break; } if (this .JsonReader .TokenType .Equals(JsonToken.PropertyName)) { data.Add((string)this.JsonReader.Value); } } Debug.Assert(name != null, "Event field name is not set."); returnData.Add(name, new Event.FieldValue(data.ToArray())); } else if (this .JsonReader .TokenType .Equals(JsonToken.PropertyName)) { name = (string)this.JsonReader.Value; } else if (this.JsonReader.TokenType.Equals(JsonToken.String)) { Debug.Assert(name != null, "Event field name is not set."); returnData.Add(name, new Event.FieldValue((string)this.JsonReader.Value)); } else if (this.JsonReader.TokenType.Equals(JsonToken.EndObject)) { break; } else if (this.JsonReader.TokenType.Equals(JsonToken.EndArray)) { return null; // this is the end of the event set. } } return returnData; } /// /// Reads the preview flag value from the stream. /// private void ReadPreviewFlag() { this.JsonReader.Read(); this.IsPreview = (bool)this.JsonReader.Value; this.previewFlagRead = true; } /// /// Contains code only used for streams from the export endpoint. /// private class ExportHelper : IDisposable { /// /// The JSON reader. /// private readonly ResultsReaderJson resultsReader; /// /// The row being read. /// private StringReader currentRow; /// /// Initializes a new instance of the class. /// /// The result reader that is using this helper. public ExportHelper(ResultsReaderJson resultsReader) { this.resultsReader = resultsReader; // Initial value must be true so that // the first row is treated as the start of a new set. this.LastRow = true; } /// /// Gets or sets the JSON reader, which is also used by /// the result reader itself. /// private JsonTextReader JsonReader { get { return this.resultsReader.JsonReader; } set { this.resultsReader.JsonReader = value; } } /// /// Gets a value indicating whether the row /// is the last in the current set. /// internal bool LastRow { get; private set; } /// /// Gets a value indicating whether the reader is in the middle /// or a row. /// public bool InRow { get; private set; } /// /// Reads metadata in the current row before event data. /// /// Returns false if the end of the stream is /// encountered. public bool ReadIntoRow() { if (this.InRow) { return true; } this.InRow = true; // Each row is a JSON object. Multiple such rows together is not // valid JSON format. JsonTestReader will fail on those. The JSON output format // is designed to use line breaks to seperate the rows. Below // we take one row and give it to a seperate JSON reader. var line = this.resultsReader.StreamReader.ReadLine(); if (line == null) { this.resultsReader.Dispose(); return false; } var stringReader = new StringReader(line); this.currentRow = stringReader; this.JsonReader = new JsonTextReader(this.currentRow); this.JsonReader.Read(); if (this.JsonReader.TokenType == JsonToken.StartArray) { throw new InvalidOperationException( "A stream from an export endpoint of " + "a Splunk 4.x server in the JSON output format " + "is not supported by this class. " + "Use the XML search output format, " + "and an XML result reader instead."); } // lastrow name and value pair does not appear if the row // is not the last in the set. this.LastRow = false; while (this.JsonReader.Read()) { if (this .JsonReader .TokenType .Equals(JsonToken.PropertyName)) { var name = (string)this.JsonReader.Value; if (name == "preview") { this.resultsReader.ReadPreviewFlag(); } else if (name == "lastrow") { this.JsonReader.Read(); this.LastRow = (bool)this.JsonReader.Value; } else if (name == "result") { return true; } else { this.JsonReader.Skip(); } } } return false; } /// /// Skips the rest of the current row. /// public void SkipRestOfRow() { if (!this.InRow) { return; } this.InRow = false; ((IDisposable)this.JsonReader).Dispose(); this.currentRow.Dispose(); } /// /// Releases resources, including unmanaged ones. /// public void Dispose() { ((IDisposable)this.JsonReader).Dispose(); this.currentRow.Dispose(); } } } }