READING IN MULTIPLE FILES WITH GOOGLE CLOUD DATAFLOWd(从gcs读取多个文件转化成PCOLLECTION)
Whenever you want to read in files from multiple folders from Google Cloud Storage in a Dataflow pipeline (as I was facing myself this week), you might run into a little trouble (as described in this Stackoverflow question). In this tip I will describe how I solved the problem before and provide some code you can use when you have a comma-seperated list of files you want to process.
Steps
What you can do in this case is read in the files in the usual way using TextIO
and a PCollection
, then constructing a PCollectionList
, consisting of these PCollections
and then flattening the whole thing into one PCollection
.
Code
When you have a comma-separated list as input, with all the paths you want to read in (which can be easily passed as a command line argument) you should be able to use the following Java code:
String input = "gs://xxx/*,gs://yyy/zzz/*" // (or options.getInput())
ArrayList<PCollection<String>> pcollectionlist = new ArrayList<>();
String[] input = inputs.split(",");
for(String i : input) {
PCollection<String> extra = p.apply(TextIO.read().from(i));
pcollectionlist.add(extra);
}
PCollectionList<String> tempRes = PCollectionList.of(pcollectionlist);
PCollection<String> res = tempRes.apply(Flatten.pCollections());
Result
And this is what your input graph could look like for 7 input files for example:
Find out more
Learn about: