/*
* Copyright 2013 Splunk, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"): you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
namespace Splunk
{
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using Newtonsoft.Json;
///
/// The class reads a results/event JSON
/// stream one event at a time.
///
public class ResultsReaderJson : ResultsReader
{
///
/// Helper object that will only be constructed if the reader is
/// handling JSON format used by export.
///
private ExportHelper exportHelper;
///
/// Whether the "preview" flag is read.
///
private bool previewFlagRead;
///
/// Initializes a new instance of the
/// class.
///
/// The JSON stream to parse.
public ResultsReaderJson(Stream stream) :
this(stream, false)
{
}
///
/// Initializes a new instance of the
/// class.
///
/// The JSON stream to parse.
///
/// Whether the reader is the underlying reader of a multi reader.
///
internal ResultsReaderJson(Stream stream, bool isInMultiReader) :
base(stream, isInMultiReader)
{
StreamReader = new StreamReader(stream);
if (this.IsExportStream || isInMultiReader)
{
this.exportHelper = new ExportHelper(this);
}
this.FinishInitialization();
}
///
/// Gets the stream reader on the JSON stream to parse.
///
internal StreamReader StreamReader { get; private set; }
///
/// Gets or sets the JSON reader.
///
private JsonTextReader JsonReader
{
get;
set;
}
///
/// Gets a value indicating whether the results are
/// a preview from an unfinished search.
///
public override bool IsPreview
{
get
{
if (!this.previewFlagRead)
{
throw new InvalidOperationException(
"isPreview() is not supported " +
"with a stream from a Splunk 4.x server by this class. " +
"Use the XML format and an XML result reader instead.");
}
return base.IsPreview;
}
protected set
{
base.IsPreview = value;
}
}
///
/// Not supported.
///
public override ICollection Fields
{
// Unlike XML and CSV, JSON result streams do not provide field name
// list before events.
get
{
throw new InvalidOperationException(
"Fields is not supported by this subclass.");
}
}
///
/// Gets a value indicating whether the reader has been
/// disposed.
///
private bool IsDisposed
{
get { return this.StreamReader == null; }
}
///
/// Advances to the next set, skipping remaining event(s)
/// if there are any in the current set.
///
/// Returns false if the end is reached.
internal override bool AdvanceStreamToNextSet()
{
return this.AdvanceIntoNextSetBeforeEvent();
}
///
/// Advances to the next set, skipping remaining event(s)
/// if there are any in the current set, and reads metadata before the
/// first event in the next result set.
///
/// Return false if the end is reached.
internal bool AdvanceIntoNextSetBeforeEvent()
{
// If the end of stream has been reached, don't continue.
if (this.IsDisposed)
{
return false;
}
// In Splunk 5.0 from the export endpoint,
// each result is in its own top level object.
// In Splunk 5.0 not from the export endpoint, the results are
// an array at that object's key "results".
// In Splunk 4.3, the
// array was the top level returned. So if we find an object
// at top level, we step into it until we find the right key,
// then leave it in that state to iterate over.
//
// JSON single-reader depends on 'isExportStream' flag to function.
// It does not support a stream from a file saved from
// a stream from an export endpoint.
// JSON multi-reader assumes export format thus does not support
// a stream from none export endpoints.
if (this.exportHelper != null)
{
/*
* We're on Splunk 5 with a single-reader not from
* an export endpoint
* Below is an example of an input stream.
* {"preview":true,"offset":0,"lastrow":true,"result":{"host":"Andy-PC","count":"62"}}
* {"preview":true,"offset":0,"result":{"host":"Andy-PC","count":"1682"}}
*/
// Read into first result object of the cachedElement set.
while (true)
{
bool endPassed = this.exportHelper.LastRow;
this.exportHelper.SkipRestOfRow();
if (!this.exportHelper.ReadIntoRow())
{
return false;
}
if (endPassed)
{
break;
}
}
return true;
}
// Introduced in Splunk 5.0, the format of the JSON object
// changed. Prior to 5.0, the array of events were a top level
// JSON element. In 5.0 (not from Export),
// the results are an array under the
// key "results".
// Note: Reading causes the side effect of setting the JSON node
// information.
this.JsonReader = new JsonTextReader(StreamReader);
this.JsonReader.Read();
if (this.JsonReader.TokenType.Equals(JsonToken.StartObject))
{
/*
* We're on Splunk 5 with a single-reader not from
* an export endpoint
* Below is an example of an input stream.
* {"preview":false,"init_offset":0,"messages":[{"type":"DEBUG","text":"base lispy: [ AND index::_internal ]"},{"type":"DEBUG","text":"search context: user=\"admin\", app=\"search\", bs-pathname=\"/Users/fross/splunks/splunk-5.0/etc\""}],"results":[{"sum(kb)":"14372242.758775","series":"twitter"},{"sum(kb)":"267802.333926","series":"splunkd"},{"sum(kb)":"5979.036338","series":"splunkd_access"}]}
*/
while (true)
{
if (!this.JsonReader.Read())
{
this.Dispose();
return false;
}
if (this
.JsonReader
.TokenType
.Equals(JsonToken.PropertyName))
{
if (this.JsonReader.Value.Equals("preview"))
{
this.ReadPreviewFlag();
}
if (this.JsonReader.Value.Equals("results"))
{
this.JsonReader.Read();
return true;
}
}
}
}
else
{
/* Pre Splunk 5.0
* Below is an example of an input stream
* [
* {
* "sum(kb)":"14372242.758775",
* "series":"twitter"
* },
* {
* "sum(kb)":"267802.333926",
* "series":"splunkd"
* },
* {
* "sum(kb)":"5979.036338",
* "series":"splunkd_access"
* }
* ]
*/
return true;
}
}
///
/// Releases unmanaged resources.
///
public override void Dispose()
{
if (this.IsDisposed)
{
return;
}
// The JSON reader is created after
// constructor so the property could be
// null.
if (this.JsonReader != null)
{
((IDisposable)this.JsonReader).Dispose();
}
if (this.exportHelper != null)
{
this.exportHelper.Dispose();
}
this.StreamReader.Close();
// Marking this reader as disposed.
this.StreamReader = null;
}
///
/// Retrieves the enumerator for data returned from Splunk.
///
/// A enumerator.
internal override IEnumerable GetEventsFromCurrentSet()
{
while (true)
{
if (this.IsDisposed)
{
yield break;
}
if (this.exportHelper != null)
{
// If the last row has been passed and
// AdvanceStreamToNextSet
// has not been called, end the current set.
if (this.exportHelper.LastRow && !this.exportHelper.InRow)
{
yield break;
}
this.exportHelper.ReadIntoRow();
}
var returnData = this.ReadEvent();
if (this.exportHelper != null)
{
this.exportHelper.SkipRestOfRow();
}
if (returnData == null)
{
// End the result reader. This is needed for Splunk 4.x
this.Dispose();
yield break;
}
yield return returnData;
}
}
///
/// Reads an event from the JSON reader.
///
///
/// The event. A value of null indicates the end of stream,
/// which is used by none --export cases.
///
private Event ReadEvent()
{
string name = null;
Event returnData = null;
// Events are almost flat, so no need for a true general parser
// solution.
while (this.JsonReader.Read())
{
if (returnData == null)
{
returnData = new Event();
}
if (this.JsonReader.TokenType.Equals(JsonToken.StartObject))
{
// skip
}
else if (this.JsonReader.TokenType.Equals(JsonToken.StartArray))
{
var data = new List();
while (this.JsonReader.Read())
{
if (this
.JsonReader
.TokenType
.Equals(JsonToken.EndArray))
{
break;
}
if (this
.JsonReader
.TokenType
.Equals(JsonToken.PropertyName))
{
data.Add((string)this.JsonReader.Value);
}
}
Debug.Assert(name != null, "Event field name is not set.");
returnData.Add(name, new Event.FieldValue(data.ToArray()));
}
else if (this
.JsonReader
.TokenType
.Equals(JsonToken.PropertyName))
{
name = (string)this.JsonReader.Value;
}
else if (this.JsonReader.TokenType.Equals(JsonToken.String))
{
Debug.Assert(name != null, "Event field name is not set.");
returnData.Add(name, new Event.FieldValue((string)this.JsonReader.Value));
}
else if (this.JsonReader.TokenType.Equals(JsonToken.EndObject))
{
break;
}
else if (this.JsonReader.TokenType.Equals(JsonToken.EndArray))
{
return null; // this is the end of the event set.
}
}
return returnData;
}
///
/// Reads the preview flag value from the stream.
///
private void ReadPreviewFlag()
{
this.JsonReader.Read();
this.IsPreview = (bool)this.JsonReader.Value;
this.previewFlagRead = true;
}
///
/// Contains code only used for streams from the export endpoint.
///
private class ExportHelper : IDisposable
{
///
/// The JSON reader.
///
private readonly ResultsReaderJson resultsReader;
///
/// The row being read.
///
private StringReader currentRow;
///
/// Initializes a new instance of the class.
///
/// The result reader that is using this helper.
public ExportHelper(ResultsReaderJson resultsReader)
{
this.resultsReader = resultsReader;
// Initial value must be true so that
// the first row is treated as the start of a new set.
this.LastRow = true;
}
///
/// Gets or sets the JSON reader, which is also used by
/// the result reader itself.
///
private JsonTextReader JsonReader
{
get { return this.resultsReader.JsonReader; }
set { this.resultsReader.JsonReader = value; }
}
///
/// Gets a value indicating whether the row
/// is the last in the current set.
///
internal bool LastRow { get; private set; }
///
/// Gets a value indicating whether the reader is in the middle
/// or a row.
///
public bool InRow { get; private set; }
///
/// Reads metadata in the current row before event data.
///
/// Returns false if the end of the stream is
/// encountered.
public bool ReadIntoRow()
{
if (this.InRow)
{
return true;
}
this.InRow = true;
// Each row is a JSON object. Multiple such rows together is not
// valid JSON format. JsonTestReader will fail on those. The JSON output format
// is designed to use line breaks to seperate the rows. Below
// we take one row and give it to a seperate JSON reader.
var line = this.resultsReader.StreamReader.ReadLine();
if (line == null)
{
this.resultsReader.Dispose();
return false;
}
var stringReader = new StringReader(line);
this.currentRow = stringReader;
this.JsonReader = new JsonTextReader(this.currentRow);
this.JsonReader.Read();
if (this.JsonReader.TokenType == JsonToken.StartArray)
{
throw new InvalidOperationException(
"A stream from an export endpoint of " +
"a Splunk 4.x server in the JSON output format " +
"is not supported by this class. " +
"Use the XML search output format, " +
"and an XML result reader instead.");
}
// lastrow name and value pair does not appear if the row
// is not the last in the set.
this.LastRow = false;
while (this.JsonReader.Read())
{
if (this
.JsonReader
.TokenType
.Equals(JsonToken.PropertyName))
{
var name = (string)this.JsonReader.Value;
if (name == "preview")
{
this.resultsReader.ReadPreviewFlag();
}
else if (name == "lastrow")
{
this.JsonReader.Read();
this.LastRow = (bool)this.JsonReader.Value;
}
else if (name == "result")
{
return true;
}
else
{
this.JsonReader.Skip();
}
}
}
return false;
}
///
/// Skips the rest of the current row.
///
public void SkipRestOfRow()
{
if (!this.InRow)
{
return;
}
this.InRow = false;
((IDisposable)this.JsonReader).Dispose();
this.currentRow.Dispose();
}
///
/// Releases resources, including unmanaged ones.
///
public void Dispose()
{
((IDisposable)this.JsonReader).Dispose();
this.currentRow.Dispose();
}
}
}
}